diff --git a/.ai/active/SPRINT_PACKET.md b/.ai/active/SPRINT_PACKET.md
index af3a5eb..6043c2b 100644
--- a/.ai/active/SPRINT_PACKET.md
+++ b/.ai/active/SPRINT_PACKET.md
@@ -2,7 +2,7 @@
## Sprint Title
-Phase 10 Sprint 4 (P10-S4): Daily Brief + Notifications + Scheduled Open-Loop Review
+Phase 10 Sprint 5 (P10-S5): Beta Hardening + Launch Readiness
Historical baseline marker: Phase 10 Sprint 1 (P10-S1): Identity + Workspace Bootstrap.
@@ -12,21 +12,21 @@ feature
## Sprint Reason
-`P10-S1` shipped hosted identity, workspace bootstrap, device management, and preferences. `P10-S2` shipped Telegram transport, channel linking, normalized inbound messages, routing, and delivery receipts. `P10-S3` shipped chat-native continuity and approval handling. `P10-S4` now adds the scheduled habit loop: daily brief generation, notification policy enforcement, quiet-hours-respecting delivery, and scheduled prompts for waiting-for and stale open-loop review.
+`P10-S1` shipped hosted identity, workspace bootstrap, device management, and preferences. `P10-S2` shipped Telegram transport, channel linking, normalized inbound messages, routing, and delivery receipts. `P10-S3` shipped chat-native continuity and approval handling. `P10-S4` shipped daily brief delivery, notification policy, quiet hours, and scheduled prompts. `P10-S5` is the launch gate sprint: beta onboarding hardening, support/admin visibility, analytics and observability for hosted chat flows, rate limiting and abuse controls, rollout flags, and launch-facing product clarity.
-Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap, `P10-S2` Telegram Transport + Message Normalization, and `P10-S3` Chat-Native Continuity + Approvals.
+Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap, `P10-S2` Telegram Transport + Message Normalization, `P10-S3` Chat-Native Continuity + Approvals, and `P10-S4` Daily Brief + Notifications + Scheduled Open-Loop Review.
## Sprint Intent
-- daily brief generation and Telegram delivery
-- notification policy enforcement including quiet hours and user preferences
-- scheduled waiting-for and stale-item prompts
-- scheduled open-loop review nudges that reuse shipped `P10-S3` review actions
-- deterministic job and delivery evidence without widening into launch tooling
+- beta onboarding funnel hardening
+- support and admin visibility for hosted Telegram operations
+- analytics and observability for hosted chat and scheduled delivery flows
+- rate limiting, abuse controls, and rollout-flag enforcement
+- launch assets and hosted-vs-OSS product clarity without widening feature scope
## Git Instructions
-- Branch Name: `codex/phase10-sprint-4-daily-brief-notifications`
+- Branch Name: `codex/phase10-sprint-5-beta-hardening-launch`
- Base Branch: `main`
- PR Strategy: one sprint branch, one PR
- Merge Policy: squash merge only after review `PASS` and explicit approval
@@ -42,37 +42,36 @@ Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap, `P10-S2` Te
- `P10-S1` hosted auth, workspace bootstrap, device management, preferences, beta cohorts, and feature flags
- `P10-S2` Telegram transport, link/unlink, normalization, routing, and delivery receipts
- `P10-S3` Telegram chat-native continuity, open-loop review, and approval handling
+ - `P10-S4` Telegram daily briefs, notification policy, quiet hours, and scheduled prompts
- Phase 9 shipped scope is baseline truth, not sprint work
- Required now:
- - daily brief compilation from durable continuity state
- - scheduler execution and due-job selection
- - notification policy enforcement using timezone, quiet hours, and brief preferences
- - scheduled waiting-for and stale open-loop prompts delivered through shipped Telegram transport
-- Explicitly out of `P10-S4`:
+ - beta onboarding readiness and failure-state hardening
+ - support/admin surfaces for hosted operational inspection
+ - analytics, observability, rate limiting, and abuse controls
+ - rollout-flag enforcement and launch-facing documentation clarity
+- Explicitly out of `P10-S5`:
- new hosted auth, session, or workspace bootstrap flows
- Telegram transport or link/unlink contract redesign
- generic chat-native capture/recall/resume/correction behavior already shipped in `P10-S3`
- - support/admin dashboards
+ - daily brief and notification feature redesign already shipped in `P10-S4`
- broad channel expansion beyond Telegram
- - launch hardening
+ - new product-surface scope beyond the Phase 10 plan
## Exact APIs In Scope
-- `GET /v1/channels/telegram/daily-brief`
-- `POST /v1/channels/telegram/daily-brief/deliver`
-- `GET /v1/channels/telegram/notification-preferences`
-- `PATCH /v1/channels/telegram/notification-preferences`
-- `GET /v1/channels/telegram/open-loop-prompts`
-- `POST /v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver`
-- `GET /v1/channels/telegram/delivery-receipts`
-- `GET /v1/channels/telegram/scheduler/jobs`
+- `GET /v1/admin/hosted/overview`
+- `GET /v1/admin/hosted/workspaces`
+- `GET /v1/admin/hosted/delivery-receipts`
+- `GET /v1/admin/hosted/incidents`
+- `GET /v1/admin/hosted/rollout-flags`
+- `PATCH /v1/admin/hosted/rollout-flags`
+- `GET /v1/admin/hosted/analytics`
+- `GET /v1/admin/hosted/rate-limits`
## Exact Data Additions In Scope
-- `continuity_briefs`
-- `daily_brief_jobs`
-- `notification_subscriptions`
-- additive scheduled-delivery fields required on `channel_delivery_receipts` and related scheduler/job records
+- `chat_telemetry`
+- additive rollout, support, incident, and rate-limit evidence fields required on hosted delivery / job / workspace records
## Exact Files And Modules In Scope
@@ -80,51 +79,50 @@ Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap, `P10-S2` Te
- `apps/api/src/alicebot_api/contracts.py`
- `apps/api/src/alicebot_api/store.py`
- `apps/api/src/alicebot_api/telegram_channels.py`
-- `apps/api/src/alicebot_api/chief_of_staff.py`
-- `apps/api/src/alicebot_api/continuity_open_loops.py`
-- new daily-brief / notification scheduling helpers under `apps/api/src/alicebot_api/`
+- `apps/api/src/alicebot_api/telegram_notifications.py`
+- new hosted admin / telemetry / rollout helpers under `apps/api/src/alicebot_api/`
- API migrations under `apps/api/alembic/versions/`
-- hosted brief-preference / notification-status pages or components under `apps/web/app/` and `apps/web/components/`
+- hosted admin / support / onboarding-status pages or components under `apps/web/app/` and `apps/web/components/`
- sprint-owned unit, integration, and web tests under `tests/` and `apps/web/app/**/*.test.tsx`
-- sprint-owned documentation updates required to keep active control truth aligned
+- sprint-owned launch-facing docs and product-clarity updates required to keep control truth aligned
## Implementation Workstreams
### API And Persistence
-- add brief/job/subscription contracts and persistence for scheduled delivery
-- add due-job selection and delivery bookkeeping that reuses shipped Telegram delivery seams
-- persist scheduled prompt and brief outcomes without creating a parallel chat history model
+- add telemetry, rollout, incident, and rate-limit contracts needed for hosted beta operations
+- add support/admin query surfaces that summarize hosted workspace, delivery, and failure posture without mutating shipped product behavior
+- persist launch-gate evidence without creating a second truth source for the underlying continuity flows
-### Delivery Behavior
+### Hardening And Launch
-- compile useful daily brief payloads from durable continuity and chief-of-staff state
-- enforce timezone, quiet hours, and notification preferences before delivery
-- deliver scheduled waiting-for and stale-item prompts that point back into shipped `P10-S3` open-loop review handling
+- harden onboarding and hosted flow failure handling around the already shipped Phase 10 surfaces
+- enforce rollout flags and abuse/rate-limit protections on hosted chat and scheduled delivery paths
+- make hosted-vs-OSS product boundaries explicit in launch-facing docs and surfaces
### Verification
-- add unit coverage for brief compilation, quiet-hours gating, and due-job selection
-- add integration coverage for all `P10-S4` endpoints, including quiet-hours suppression, disabled notifications, repeated-job idempotency, and stale-item prompt delivery
-- add web tests for brief-preference / notification-status UX if sprint-owned UI changes are introduced
+- add unit coverage for rollout gating, rate limiting, and telemetry aggregation helpers
+- add integration coverage for admin/support endpoints, rollout-flag enforcement, abuse/rate-limit behavior, and hosted failure-state visibility
+- add web tests for sprint-owned admin/support or onboarding-status UX
- keep control-doc truth checks passing after packet and current-state updates
## Required Deliverables
-- daily brief compiler and Telegram delivery path
-- notification preference and quiet-hours enforcement
-- scheduled waiting-for and stale-item prompts
-- persisted brief/job/subscription evidence
-- status surface for brief and notification posture
+- beta onboarding hardening
+- hosted admin/support visibility
+- analytics and observability for hosted chat and scheduled delivery flows
+- rate limiting, abuse controls, and rollout flags
+- launch-facing docs and OSS-versus-hosted product clarity
## Acceptance Criteria
-- a linked Telegram user with notifications enabled can receive a useful daily brief generated from durable stored state
-- quiet hours and notification preference settings suppress or defer delivery deterministically
-- waiting-for and stale open-loop prompts are generated and delivered without reopening generic open-loop review semantics already shipped in `P10-S3`
-- delivery jobs and receipts are persisted with deterministic status evidence
-- `P10-S1`, `P10-S2`, and `P10-S3` semantics remain baseline truth and are not reopened as sprint work
-- no `P10-S4` endpoint or screen claims that beta admin/support tooling or launch hardening is already active
+- hosted beta operators can inspect workspace, delivery, incident, and rollout posture without touching the database directly
+- hosted chat and scheduled-delivery paths enforce rollout and abuse/rate-limit controls deterministically
+- onboarding and hosted failure states are visible enough to support a beta user without reopening shipped feature seams
+- launch-facing docs and surfaces clearly distinguish OSS Alice Core from hosted Alice Connect beta scope
+- `P10-S1` through `P10-S4` semantics remain baseline truth and are not reopened as sprint work
+- no `P10-S5` work widens scope beyond beta hardening and launch readiness
## Required Verification Commands
@@ -135,7 +133,7 @@ Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap, `P10-S2` Te
## Review Evidence Requirements
- `BUILD_REPORT.md` must list the exact sprint-owned files changed and the exact command results above
-- `REVIEW_REPORT.md` must grade against `P10-S4` specifically, not generic Phase 10 planning
+- `REVIEW_REPORT.md` must grade against `P10-S5` specifically, not generic Phase 10 planning
- if local archive paths remain dirty, they must be called out explicitly as excluded from sprint merge scope
## Implementation Constraints
@@ -143,11 +141,11 @@ Reference baseline markers: `P10-S1` Identity + Workspace Bootstrap, `P10-S2` Te
- do not fork continuity semantics between hosted surfaces and Alice Core
- keep OSS versus product boundaries explicit in docs and API naming
- preserve existing approval, provenance, and correction discipline
-- do not widen `P10-S4` into beta admin tooling or launch work
-- reuse the shipped `P10-S1`, `P10-S2`, and `P10-S3` identity/workspace/channel/chat foundations instead of duplicating control-plane state
-- do not re-implement generic open-loop review actions that already shipped in `P10-S3`; this sprint only adds scheduled prompting and brief delivery
+- do not widen `P10-S5` into new feature-surface scope beyond hardening and launch readiness
+- reuse the shipped `P10-S1` through `P10-S4` identity/workspace/channel/chat/scheduling foundations instead of duplicating control-plane state
+- do not re-implement generic continuity, approval, or notification behavior that already shipped in earlier sprints
- prefer additive hosted-control-plane seams over invasive rewrites of shipped Phase 9 paths
## Exit Condition
-`P10-S4` is complete when a linked Telegram user can receive a daily brief and scheduled waiting-for/stale prompts under deterministic quiet-hours and notification-policy enforcement, with persisted job and delivery evidence and no reopening of hosted identity, transport, or generic chat-continuity scope.
+`P10-S5` is complete when hosted beta operations have enough onboarding hardening, support/admin visibility, telemetry, rollout control, abuse protection, and launch-facing documentation clarity to treat Phase 10 as a launch-ready beta without reopening shipped identity, transport, chat-continuity, or notification scope.
diff --git a/.ai/handoff/CURRENT_STATE.md b/.ai/handoff/CURRENT_STATE.md
index 0aa8563..76480e8 100644
--- a/.ai/handoff/CURRENT_STATE.md
+++ b/.ai/handoff/CURRENT_STATE.md
@@ -8,8 +8,9 @@
- `P10-S1` (Identity + Workspace Bootstrap) is shipped.
- `P10-S2` (Telegram Transport + Message Normalization) is shipped.
- `P10-S3` (Chat-Native Continuity + Approvals) is shipped.
-- `P10-S4` (Daily Brief + Notifications + Scheduled Open-Loop Review) is the active execution sprint packet.
-- No scheduled daily-brief and notification loop is shipped yet.
+- `P10-S4` (Daily Brief + Notifications + Scheduled Open-Loop Review) is shipped.
+- `P10-S5` (Beta Hardening + Launch Readiness) is the active execution sprint packet.
+- No launch-ready beta hardening surface is shipped yet.
## Canonical Baseline
@@ -31,8 +32,8 @@
- `P10-S1` shipped the hosted account/session foundations, workspace bootstrap, device management, preferences, and beta controls.
- `P10-S2` shipped Telegram transport, link/unlink flow, message normalization, routing, and delivery receipts.
- `P10-S3` shipped chat-native continuity behavior and approval handling on top of the shipped Telegram transport.
-- `P10-S4` covers daily brief delivery, notification policy, quiet hours, and scheduled waiting-for / stale-item prompts.
-- Launch hardening is the later Phase 10 milestone.
+- `P10-S4` shipped daily brief delivery, notification policy, quiet hours, and scheduled waiting-for / stale-item prompts.
+- `P10-S5` covers beta onboarding hardening, support/admin visibility, analytics/observability, rollout/rate-limit controls, and launch-facing product clarity.
- Phase 9 shipped scope is baseline truth and must not be reopened as sprint work.
## Active Constraints
diff --git a/BUILD_REPORT.md b/BUILD_REPORT.md
index 73a0a8f..36b0318 100644
--- a/BUILD_REPORT.md
+++ b/BUILD_REPORT.md
@@ -1,101 +1,110 @@
-# BUILD_REPORT.md
+# BUILD_REPORT
-## Sprint Objective
-Implement `P10-S4` Daily Brief + Notifications + Scheduled Open-Loop Review for hosted Telegram by adding:
-- daily brief compile + delivery path
-- notification preferences + quiet-hours policy enforcement
-- scheduled waiting-for/stale prompt generation + delivery
-- persisted scheduler/job/receipt evidence
-- hosted settings status surface for brief/notification posture
+## sprint objective
+Implement `P10-S5` (Beta Hardening + Launch Readiness) from `.ai/active/SPRINT_PACKET.md`: hosted beta onboarding hardening, hosted admin/support visibility, hosted chat/scheduler telemetry and observability, rollout/rate-limit/abuse controls, and launch-facing hosted-vs-OSS clarity without reopening `P10-S1` through `P10-S4` feature seams.
-## Completed Work
-- Updated the active control/docs layer to reflect an active `P10-S4` execution sprint:
+## completed work
+- Updated the active control/docs layer to reflect an active `P10-S5` execution sprint:
- `.ai/active/SPRINT_PACKET.md`
- `.ai/handoff/CURRENT_STATE.md`
- `README.md`
-- Added additive migration `20260408_0046_phase10_daily_brief_notifications.py` with:
- - `notification_subscriptions`
- - `continuity_briefs`
- - `daily_brief_jobs`
- - scheduled-delivery metadata columns on `channel_delivery_receipts`
- - receipt status extension to include `suppressed`
-- Added new API helper module `apps/api/src/alicebot_api/telegram_notifications.py` implementing:
- - preference ensure/patch/read
- - quiet-hours + window + enablement policy gating
- - daily brief preview composition (continuity + chief-of-staff summary)
- - daily brief delivery with idempotent job handling
- - open-loop prompt listing/delivery (waiting-for + stale)
- - scheduler due-job materialization + listing
- - workspace-scoped internal idempotency derivation and lookup for custom delivery keys
-- Extended Telegram delivery seam in `telegram_channels.py` with workspace-level scheduled dispatch:
- - `dispatch_telegram_workspace_message(...)`
- - scheduled receipt metadata persistence
- - receipt serialization/query updates
-- Added `P10-S4` endpoints in `main.py`:
- - `GET /v1/channels/telegram/daily-brief`
- - `POST /v1/channels/telegram/daily-brief/deliver`
- - `GET /v1/channels/telegram/notification-preferences`
- - `PATCH /v1/channels/telegram/notification-preferences`
- - `GET /v1/channels/telegram/open-loop-prompts`
- - `POST /v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver`
- - `GET /v1/channels/telegram/scheduler/jobs`
-- Updated contract/store typing for new scheduler/subscription/receipt fields.
-- Updated hosted settings UI to surface `P10-S4` notification posture, daily brief preview/delivery, open-loop prompts, and scheduler jobs.
-- Updated control-truth wording in hosted web surfaces to avoid claims about admin/support/launch hardening.
-- Added/updated tests for migration, unit policy logic, new integration API flows, and web settings UI.
-- Updated `.ai/handoff/CURRENT_STATE.md` marker required by control-doc truth check.
+- Added `20260409_0047` migration for:
+ - `chat_telemetry` table.
+ - additive support/rollout/rate-limit/incident evidence fields on `workspaces`, `channel_delivery_receipts`, and `daily_brief_jobs`.
+ - hosted rollout/admin/rate-limit feature-flag seeds.
+- Added hosted helper modules:
+ - `hosted_rollout.py` (flag resolution/list/patch helpers).
+ - `hosted_rate_limits.py` (deterministic hosted flow limit + abuse decisions).
+ - `hosted_telemetry.py` (telemetry write/list/aggregate helpers).
+ - `hosted_admin.py` (overview/workspace/delivery/incident/rate-limit admin queries).
+- Extended API/config/contracts/store wiring for P10-S5 data and control-plane fields.
+- Implemented exact in-scope admin APIs in `main.py`:
+ - `GET /v1/admin/hosted/overview`
+ - `GET /v1/admin/hosted/workspaces`
+ - `GET /v1/admin/hosted/delivery-receipts`
+ - `GET /v1/admin/hosted/incidents`
+ - `GET /v1/admin/hosted/rollout-flags`
+ - `PATCH /v1/admin/hosted/rollout-flags`
+ - `GET /v1/admin/hosted/analytics`
+ - `GET /v1/admin/hosted/rate-limits`
+- Added rollout/rate-limit/abuse gating + telemetry recording on hosted chat/scheduler paths:
+ - `/v1/channels/telegram/daily-brief/deliver`
+ - `/v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver`
+ - `/v1/channels/telegram/messages/{message_id}/handle`
+- Added onboarding failure-state hardening:
+ - bootstrap conflict/not-found failures now persist workspace support/onboarding incident evidence.
+- Added sprint-owned web surfaces/tests:
+ - hosted admin page/panel + tests.
+ - onboarding failure visibility copy + onboarding page test coverage.
+ - shell/home hosted-admin navigation and launch-clarity updates.
+- Added sprint-owned test coverage:
+ - migration unit tests.
+ - helper unit tests for rollout/rate-limit/telemetry aggregation behavior.
+ - integration coverage for admin endpoints, rollout blocking, rate-limit/abuse blocking, and onboarding failure visibility.
+- Fixed implementation issues discovered by tests:
+ - escaped `%` in psycopg SQL `LIKE` clauses (`hosted_%%`).
+ - resolved JSONB parameter typing in onboarding-failure persistence (`%s::text`).
+ - adjusted hosted admin rollout patch behavior in web UI to include loaded cohort scope when toggling flags.
+- Applied reviewer-driven hardening fixes:
+ - restricted hosted admin endpoints to explicit operator authorization (`hosted_admin_read` + `hosted_admin_operator`).
+ - added operator cohort (`p10-ops`) feature-flag seeding for deterministic admin access control.
+ - prevented bootstrap not-found paths from mutating workspace support/incident evidence unless workspace membership was resolved.
+ - constrained hosted rollout patching to hosted-prefixed flags and caller cohort scope.
+ - added integration coverage for non-operator admin denial, hosted-only rollout patch validation, and cross-tenant onboarding-evidence safety.
-## Incomplete Work
-- None identified within `P10-S4` sprint packet scope.
+## incomplete work
+- None identified against the sprint packet acceptance criteria.
-## Files Changed
-- `/Users/samirusani/Desktop/Codex/AliceBot/.ai/active/SPRINT_PACKET.md`
-- `/Users/samirusani/Desktop/Codex/AliceBot/.ai/handoff/CURRENT_STATE.md`
-- `/Users/samirusani/Desktop/Codex/AliceBot/README.md`
-- `/Users/samirusani/Desktop/Codex/AliceBot/BUILD_REPORT.md`
-- `/Users/samirusani/Desktop/Codex/AliceBot/REVIEW_REPORT.md`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/alembic/versions/20260408_0046_phase10_daily_brief_notifications.py`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/telegram_notifications.py`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/telegram_channels.py`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/contracts.py`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/store.py`
-- `/Users/samirusani/Desktop/Codex/AliceBot/tests/integration/test_phase10_daily_brief_notifications_api.py`
-- `/Users/samirusani/Desktop/Codex/AliceBot/tests/unit/test_telegram_notifications.py`
-- `/Users/samirusani/Desktop/Codex/AliceBot/tests/unit/test_20260408_0046_phase10_daily_brief_notifications.py`
-- `/Users/samirusani/Desktop/Codex/AliceBot/tests/unit/test_main.py`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/web/components/hosted-settings-panel.tsx`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/web/components/hosted-settings-panel.test.tsx`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/web/app/settings/page.tsx`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/web/app/settings/page.test.tsx`
-- `/Users/samirusani/Desktop/Codex/AliceBot/apps/web/app/page.tsx`
+## files changed
+- `.ai/active/SPRINT_PACKET.md`
+- `.ai/handoff/CURRENT_STATE.md`
+- `README.md`
+- `BUILD_REPORT.md`
+- `REVIEW_REPORT.md`
+- `apps/api/alembic/versions/20260409_0047_phase10_beta_hardening_launch.py`
+- `apps/api/src/alicebot_api/config.py`
+- `apps/api/src/alicebot_api/contracts.py`
+- `apps/api/src/alicebot_api/hosted_admin.py`
+- `apps/api/src/alicebot_api/hosted_rate_limits.py`
+- `apps/api/src/alicebot_api/hosted_rollout.py`
+- `apps/api/src/alicebot_api/hosted_telemetry.py`
+- `apps/api/src/alicebot_api/main.py`
+- `apps/api/src/alicebot_api/store.py`
+- `apps/api/src/alicebot_api/telegram_channels.py`
+- `apps/api/src/alicebot_api/telegram_notifications.py`
+- `apps/web/app/admin/page.test.tsx`
+- `apps/web/app/admin/page.tsx`
+- `apps/web/app/onboarding/page.test.tsx`
+- `apps/web/app/page.tsx`
+- `apps/web/components/app-shell.tsx`
+- `apps/web/components/hosted-admin-panel.test.tsx`
+- `apps/web/components/hosted-admin-panel.tsx`
+- `apps/web/components/hosted-onboarding-panel.tsx`
+- `tests/integration/test_phase10_beta_hardening_launch_api.py`
+- `tests/unit/test_20260409_0047_phase10_beta_hardening_launch.py`
+- `tests/unit/test_config.py`
+- `tests/unit/test_phase10_beta_hardening_helpers.py`
-## Tests Run
+## tests run
1. `python3 scripts/check_control_doc_truth.py`
- Result: PASS
-- Output:
- - `Control-doc truth check: PASS`
- - verified: `README.md`
- - verified: `ROADMAP.md`
- - verified: `.ai/active/SPRINT_PACKET.md`
- - verified: `RULES.md`
- - verified: `.ai/handoff/CURRENT_STATE.md`
- - verified: `docs/archive/planning/2026-04-08-context-compaction/README.md`
+- Output summary: verified `README.md`, `ROADMAP.md`, `.ai/active/SPRINT_PACKET.md`, `RULES.md`, `.ai/handoff/CURRENT_STATE.md`, and `docs/archive/planning/2026-04-08-context-compaction/README.md`.
2. `./.venv/bin/python -m pytest tests/unit tests/integration -q`
- Result: PASS
-- Output summary: `1025 passed in 139.39s (0:02:19)`
+- Output summary: `1045 passed in 134.02s (0:02:14)`.
3. `pnpm --dir apps/web test`
- Result: PASS
-- Output summary:
- - `Test Files 60 passed (60)`
- - `Tests 196 passed (196)`
+- Output summary: `Test Files 62 passed (62)`, `Tests 199 passed (199)`.
-## Blockers/Issues
-- Initial control-doc truth check failed due a required marker missing in `.ai/handoff/CURRENT_STATE.md`; resolved by adding the required marker.
-- Fixed during review: custom idempotency keys are now tenant/workspace scoped to prevent cross-workspace collision/replay.
+(Additional focused preflight runs were executed during development and passed after fixes.)
+
+## blockers/issues
- No remaining blockers.
+- Resolved during implementation:
+ - psycopg placeholder parsing conflict with SQL `LIKE 'hosted_%'`.
+ - PostgreSQL parameter type ambiguity in onboarding failure JSONB update statement.
-## Recommended Next Step
-Seek explicit Control Tower merge approval for `P10-S4`, using this branch head and the verification evidence above.
+## recommended next step
+Seek explicit Control Tower merge approval for `P10-S5`, using this branch head and the verification evidence above.
diff --git a/README.md b/README.md
index d188b07..e8d5d01 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
Alice is a local-first memory and continuity engine for AI agents.
-Phase 9 is complete. Alice Connect is the planned Phase 10 product layer on top of that shipped core, `P10-S1` through `P10-S3` are shipped, and `P10-S4` is the active execution sprint.
+Phase 9 is complete. Alice Connect is the planned Phase 10 product layer on top of that shipped core, `P10-S1` through `P10-S4` are shipped, and `P10-S5` is the active execution sprint.
## What v0.1 Ships
diff --git a/REVIEW_REPORT.md b/REVIEW_REPORT.md
index e296310..4bd6a0f 100644
--- a/REVIEW_REPORT.md
+++ b/REVIEW_REPORT.md
@@ -4,58 +4,57 @@
PASS
## criteria met
-- `P10-S4` API scope is implemented and routed in `apps/api/src/alicebot_api/main.py`:
- - `GET /v1/channels/telegram/daily-brief`
- - `POST /v1/channels/telegram/daily-brief/deliver`
- - `GET /v1/channels/telegram/notification-preferences`
- - `PATCH /v1/channels/telegram/notification-preferences`
- - `GET /v1/channels/telegram/open-loop-prompts`
- - `POST /v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver`
- - `GET /v1/channels/telegram/scheduler/jobs`
-- In-scope persistence additions are present:
- - `notification_subscriptions`
- - `continuity_briefs`
- - `daily_brief_jobs`
- - additive scheduler metadata on `channel_delivery_receipts`
-- Daily brief generation is built from durable continuity/chief-of-staff state and delivered through Telegram delivery seams.
-- Quiet-hours and notification preference gates deterministically suppress or allow delivery (`suppressed_disabled`, `suppressed_quiet_hours`, `suppressed_outside_window`, etc.).
-- Waiting-for and stale open-loop prompts are generated and delivered as scheduled nudges without reimplementing `P10-S3` generic review semantics.
-- Job + receipt evidence is persisted with deterministic status and metadata.
-- Blocking idempotency isolation defect identified in prior review is fixed:
- - internal idempotency keys are workspace-scoped for client-supplied values
- - job lookup/upsert is workspace/channel scoped
- - fallback outbound-message idempotency reads are workspace scoped
- - migration enforces workspace/channel/idempotency uniqueness for `daily_brief_jobs`
-- Regression coverage added for cross-workspace reuse of the same custom idempotency key (`tests/integration/test_phase10_daily_brief_notifications_api.py`).
-- Control docs are aligned to an active `P10-S4` execution sprint and baseline-shipped `P10-S1` through `P10-S3` state:
+- Hosted admin/support visibility for `P10-S5` is implemented with all in-scope endpoints in [`/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py`](/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py):
+ - `GET /v1/admin/hosted/overview`
+ - `GET /v1/admin/hosted/workspaces`
+ - `GET /v1/admin/hosted/delivery-receipts`
+ - `GET /v1/admin/hosted/incidents`
+ - `GET /v1/admin/hosted/rollout-flags`
+ - `PATCH /v1/admin/hosted/rollout-flags`
+ - `GET /v1/admin/hosted/analytics`
+ - `GET /v1/admin/hosted/rate-limits`
+- In-scope data additions are present in [`/Users/samirusani/Desktop/Codex/AliceBot/apps/api/alembic/versions/20260409_0047_phase10_beta_hardening_launch.py`](/Users/samirusani/Desktop/Codex/AliceBot/apps/api/alembic/versions/20260409_0047_phase10_beta_hardening_launch.py), including `chat_telemetry` plus additive rollout/support/rate-limit/incident evidence fields.
+- Hosted chat and scheduled-delivery paths enforce deterministic rollout and abuse/rate-limit controls with telemetry evidence.
+- Onboarding failure-state visibility is hardened and now avoids cross-tenant side effects:
+ - admin access requires explicit operator authorization (`hosted_admin_read` + `hosted_admin_operator`) in [`/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py:1557`](/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py:1557).
+ - bootstrap failure recording only occurs after a resolved member workspace (`resolved_workspace_id` no longer trusts request input) in [`/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py:5069`](/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py:5069).
+- Rollout patch scope is constrained to hosted flags and caller cohort:
+ - hosted-only key guard in [`/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/hosted_rollout.py:55`](/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/hosted_rollout.py:55).
+ - caller-cohort enforcement in [`/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/hosted_rollout.py:221`](/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/hosted_rollout.py:221) and [`/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py:5568`](/Users/samirusani/Desktop/Codex/AliceBot/apps/api/src/alicebot_api/main.py:5568).
+- Launch-facing OSS-vs-hosted clarity updates are present in sprint-owned web/docs surfaces (`README`, admin/onboarding/home shell copy).
+- `P10-S1` through `P10-S4` behavior remains baseline truth; `P10-S5` changes are additive hardening/control-plane seams.
+- Control docs are aligned to an active `P10-S5` execution sprint and baseline-shipped `P10-S1` through `P10-S4` state:
- `.ai/active/SPRINT_PACKET.md`
- `.ai/handoff/CURRENT_STATE.md`
- `README.md`
- Required verification commands were rerun and pass:
- `python3 scripts/check_control_doc_truth.py` -> PASS
- - `./.venv/bin/python -m pytest tests/unit tests/integration -q` -> `1025 passed`
- - `pnpm --dir apps/web test` -> `60 passed files`, `196 passed tests`
+ - `./.venv/bin/python -m pytest tests/unit tests/integration -q` -> `1045 passed`
+ - `pnpm --dir apps/web test` -> `62 passed files`, `199 passed tests`
## criteria missed
- None.
## quality issues
-- No blocking quality issues found in sprint-owned implementation after the idempotency scoping fix.
+- No blocking quality issues found in current sprint-owned implementation.
+- Previously identified admin/tenancy/scope issues are fixed and covered by tests.
## regression risks
- Low.
-- Residual risk is mainly operational (scheduler volume/throughput behavior under larger datasets), not correctness of the `P10-S4` contracts.
+- Residual operational risk remains around production-scale telemetry/admin query volume, not correctness of `P10-S5` contracts.
## docs issues
-- No blocking docs issues.
-- `BUILD_REPORT.md` now reflects the idempotency fix and latest verification totals.
+- None blocking.
+- `BUILD_REPORT.md` has been aligned with the reviewer-driven fixes and latest verification totals.
## should anything be added to RULES.md?
-- Optional: add an explicit durable rule that hosted/channel idempotency must be tenant/workspace scoped for lookup and uniqueness.
+- Yes (recommended): codify that hosted admin/control-plane routes must require explicit operator authorization beyond cohort membership.
+- Yes (recommended): codify that request-supplied resource IDs that fail auth/membership checks must not drive side-effect writes.
+- Yes (recommended): codify that hosted rollout patch APIs must be constrained to hosted-prefixed keys and authorized cohort scope.
## should anything update ARCHITECTURE.md?
-- Optional: add a short security invariant under hosted control-plane boundaries that dedupe/idempotency keys are tenant-scoped.
+- Yes (recommended): add a short hosted control-plane security invariant section covering operator authorization boundaries and tenant-safe failure evidence rules.
## recommended next action
1. Ready for Control Tower merge approval under policy.
-2. After merge, open `P10-S5` only for beta hardening and launch-readiness work on top of these scheduled-delivery seams.
+2. After merge, Phase 10 execution scope is complete and follow-on work should start from the launch/beta baseline rather than reopening shipped Phase 10 seams.
diff --git a/apps/api/alembic/versions/20260409_0047_phase10_beta_hardening_launch.py b/apps/api/alembic/versions/20260409_0047_phase10_beta_hardening_launch.py
new file mode 100644
index 0000000..c67d780
--- /dev/null
+++ b/apps/api/alembic/versions/20260409_0047_phase10_beta_hardening_launch.py
@@ -0,0 +1,207 @@
+"""Add Phase 10 Sprint 5 beta hardening telemetry and evidence fields."""
+
+from __future__ import annotations
+
+from alembic import op
+
+
+revision = "20260409_0047"
+down_revision = "20260408_0046"
+branch_labels = None
+depends_on = None
+
+
+_UPGRADE_STATEMENTS = (
+ """
+ CREATE TABLE chat_telemetry (
+ id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
+ user_account_id uuid NOT NULL REFERENCES user_accounts(id) ON DELETE CASCADE,
+ workspace_id uuid NULL REFERENCES workspaces(id) ON DELETE SET NULL,
+ channel_message_id uuid NULL REFERENCES channel_messages(id) ON DELETE SET NULL,
+ daily_brief_job_id uuid NULL REFERENCES daily_brief_jobs(id) ON DELETE SET NULL,
+ delivery_receipt_id uuid NULL REFERENCES channel_delivery_receipts(id) ON DELETE SET NULL,
+ flow_kind text NOT NULL,
+ event_kind text NOT NULL,
+ status text NOT NULL,
+ route_path text NOT NULL,
+ rollout_flag_key text NULL,
+ rollout_flag_state text NULL,
+ rate_limit_key text NULL,
+ rate_limit_window_seconds integer NULL,
+ rate_limit_max_requests integer NULL,
+ retry_after_seconds integer NULL,
+ abuse_signal text NULL,
+ evidence jsonb NOT NULL DEFAULT '{}'::jsonb,
+ created_at timestamptz NOT NULL DEFAULT now(),
+ CONSTRAINT chat_telemetry_flow_kind_check
+ CHECK (flow_kind IN ('chat_handle', 'scheduler_daily_brief', 'scheduler_open_loop_prompt')),
+ CONSTRAINT chat_telemetry_event_kind_check
+ CHECK (event_kind IN ('attempt', 'result', 'rollout_block', 'rate_limited', 'abuse_block', 'incident')),
+ CONSTRAINT chat_telemetry_status_check
+ CHECK (
+ status IN (
+ 'ok',
+ 'failed',
+ 'blocked_rollout',
+ 'rate_limited',
+ 'abuse_blocked',
+ 'suppressed',
+ 'simulated',
+ 'delivered'
+ )
+ ),
+ CONSTRAINT chat_telemetry_route_path_length_check
+ CHECK (char_length(route_path) >= 1 AND char_length(route_path) <= 200),
+ CONSTRAINT chat_telemetry_rate_limit_window_positive_check
+ CHECK (rate_limit_window_seconds IS NULL OR rate_limit_window_seconds > 0),
+ CONSTRAINT chat_telemetry_rate_limit_max_positive_check
+ CHECK (rate_limit_max_requests IS NULL OR rate_limit_max_requests > 0),
+ CONSTRAINT chat_telemetry_retry_after_non_negative_check
+ CHECK (retry_after_seconds IS NULL OR retry_after_seconds >= 0)
+ )
+ """,
+ (
+ "CREATE INDEX chat_telemetry_workspace_created_idx "
+ "ON chat_telemetry (workspace_id, created_at DESC, id DESC)"
+ ),
+ (
+ "CREATE INDEX chat_telemetry_flow_status_created_idx "
+ "ON chat_telemetry (flow_kind, status, created_at DESC, id DESC)"
+ ),
+ """
+ ALTER TABLE workspaces
+ ADD COLUMN support_status text NOT NULL DEFAULT 'healthy',
+ ADD COLUMN support_notes jsonb NOT NULL DEFAULT '{}'::jsonb,
+ ADD COLUMN onboarding_last_error_code text NULL,
+ ADD COLUMN onboarding_last_error_detail text NULL,
+ ADD COLUMN onboarding_last_error_at timestamptz NULL,
+ ADD COLUMN onboarding_error_count integer NOT NULL DEFAULT 0,
+ ADD COLUMN rollout_evidence jsonb NOT NULL DEFAULT '{}'::jsonb,
+ ADD COLUMN rate_limit_evidence jsonb NOT NULL DEFAULT '{}'::jsonb,
+ ADD COLUMN incident_evidence jsonb NOT NULL DEFAULT '{}'::jsonb
+ """,
+ """
+ ALTER TABLE workspaces
+ ADD CONSTRAINT workspaces_support_status_check
+ CHECK (support_status IN ('healthy', 'needs_attention', 'blocked'))
+ """,
+ """
+ ALTER TABLE workspaces
+ ADD CONSTRAINT workspaces_onboarding_error_count_non_negative_check
+ CHECK (onboarding_error_count >= 0)
+ """,
+ (
+ "CREATE INDEX workspaces_support_status_updated_idx "
+ "ON workspaces (support_status, updated_at DESC, id DESC)"
+ ),
+ """
+ ALTER TABLE channel_delivery_receipts
+ ADD COLUMN rollout_flag_state text NOT NULL DEFAULT 'enabled',
+ ADD COLUMN support_evidence jsonb NOT NULL DEFAULT '{}'::jsonb,
+ ADD COLUMN rate_limit_evidence jsonb NOT NULL DEFAULT '{}'::jsonb,
+ ADD COLUMN incident_evidence jsonb NOT NULL DEFAULT '{}'::jsonb
+ """,
+ """
+ ALTER TABLE channel_delivery_receipts
+ ADD CONSTRAINT channel_delivery_receipts_rollout_flag_state_check
+ CHECK (rollout_flag_state IN ('enabled', 'blocked'))
+ """,
+ (
+ "CREATE INDEX channel_delivery_receipts_rollout_recorded_idx "
+ "ON channel_delivery_receipts (rollout_flag_state, recorded_at DESC, id DESC)"
+ ),
+ """
+ ALTER TABLE daily_brief_jobs
+ ADD COLUMN rollout_flag_state text NOT NULL DEFAULT 'enabled',
+ ADD COLUMN support_evidence jsonb NOT NULL DEFAULT '{}'::jsonb,
+ ADD COLUMN rate_limit_evidence jsonb NOT NULL DEFAULT '{}'::jsonb,
+ ADD COLUMN incident_evidence jsonb NOT NULL DEFAULT '{}'::jsonb
+ """,
+ """
+ ALTER TABLE daily_brief_jobs
+ ADD CONSTRAINT daily_brief_jobs_rollout_flag_state_check
+ CHECK (rollout_flag_state IN ('enabled', 'blocked'))
+ """,
+ (
+ "CREATE INDEX daily_brief_jobs_rollout_due_idx "
+ "ON daily_brief_jobs (rollout_flag_state, due_at DESC, id DESC)"
+ ),
+ """
+ INSERT INTO beta_cohorts (cohort_key, description)
+ VALUES ('p10-ops', 'Phase 10 hosted beta operator cohort')
+ ON CONFLICT (cohort_key) DO NOTHING
+ """,
+ """
+ INSERT INTO feature_flags (flag_key, cohort_key, enabled, description)
+ VALUES
+ ('hosted_admin_read', 'p10-beta', true, 'Hosted admin visibility for beta operations'),
+ ('hosted_chat_handle_enabled', 'p10-beta', true, 'Rollout gate for hosted telegram chat handling'),
+ ('hosted_scheduler_delivery_enabled', 'p10-beta', true, 'Rollout gate for hosted scheduler-driven deliveries'),
+ ('hosted_abuse_controls_enabled', 'p10-beta', true, 'Enable hosted abuse controls for chat and scheduler paths'),
+ ('hosted_rate_limits_enabled', 'p10-beta', true, 'Enable hosted rate limiting controls'),
+ ('hosted_admin_read', 'p10-ops', true, 'Hosted admin visibility for beta operators'),
+ ('hosted_admin_operator', 'p10-ops', true, 'Hosted admin operator authorization'),
+ ('hosted_chat_handle_enabled', 'p10-ops', true, 'Rollout gate for hosted telegram chat handling'),
+ ('hosted_scheduler_delivery_enabled', 'p10-ops', true, 'Rollout gate for hosted scheduler-driven deliveries'),
+ ('hosted_abuse_controls_enabled', 'p10-ops', true, 'Enable hosted abuse controls for chat and scheduler paths'),
+ ('hosted_rate_limits_enabled', 'p10-ops', true, 'Enable hosted rate limiting controls')
+ ON CONFLICT DO NOTHING
+ """,
+)
+
+_UPGRADE_GRANT_STATEMENTS = (
+ "GRANT SELECT, INSERT, UPDATE, DELETE ON chat_telemetry TO alicebot_app",
+)
+
+_DOWNGRADE_STATEMENTS = (
+ "DROP INDEX IF EXISTS daily_brief_jobs_rollout_due_idx",
+ "ALTER TABLE daily_brief_jobs DROP CONSTRAINT IF EXISTS daily_brief_jobs_rollout_flag_state_check",
+ """
+ ALTER TABLE daily_brief_jobs
+ DROP COLUMN IF EXISTS incident_evidence,
+ DROP COLUMN IF EXISTS rate_limit_evidence,
+ DROP COLUMN IF EXISTS support_evidence,
+ DROP COLUMN IF EXISTS rollout_flag_state
+ """,
+ "DROP INDEX IF EXISTS channel_delivery_receipts_rollout_recorded_idx",
+ "ALTER TABLE channel_delivery_receipts DROP CONSTRAINT IF EXISTS channel_delivery_receipts_rollout_flag_state_check",
+ """
+ ALTER TABLE channel_delivery_receipts
+ DROP COLUMN IF EXISTS incident_evidence,
+ DROP COLUMN IF EXISTS rate_limit_evidence,
+ DROP COLUMN IF EXISTS support_evidence,
+ DROP COLUMN IF EXISTS rollout_flag_state
+ """,
+ "DROP INDEX IF EXISTS workspaces_support_status_updated_idx",
+ "ALTER TABLE workspaces DROP CONSTRAINT IF EXISTS workspaces_onboarding_error_count_non_negative_check",
+ "ALTER TABLE workspaces DROP CONSTRAINT IF EXISTS workspaces_support_status_check",
+ """
+ ALTER TABLE workspaces
+ DROP COLUMN IF EXISTS incident_evidence,
+ DROP COLUMN IF EXISTS rate_limit_evidence,
+ DROP COLUMN IF EXISTS rollout_evidence,
+ DROP COLUMN IF EXISTS onboarding_error_count,
+ DROP COLUMN IF EXISTS onboarding_last_error_at,
+ DROP COLUMN IF EXISTS onboarding_last_error_detail,
+ DROP COLUMN IF EXISTS onboarding_last_error_code,
+ DROP COLUMN IF EXISTS support_notes,
+ DROP COLUMN IF EXISTS support_status
+ """,
+ "DROP INDEX IF EXISTS chat_telemetry_flow_status_created_idx",
+ "DROP INDEX IF EXISTS chat_telemetry_workspace_created_idx",
+ "DROP TABLE IF EXISTS chat_telemetry",
+)
+
+
+def _execute_statements(statements: tuple[str, ...]) -> None:
+ for statement in statements:
+ op.execute(statement)
+
+
+def upgrade() -> None:
+ _execute_statements(_UPGRADE_STATEMENTS)
+ _execute_statements(_UPGRADE_GRANT_STATEMENTS)
+
+
+def downgrade() -> None:
+ _execute_statements(_DOWNGRADE_STATEMENTS)
diff --git a/apps/api/src/alicebot_api/config.py b/apps/api/src/alicebot_api/config.py
index ae07b93..aaa5aa9 100644
--- a/apps/api/src/alicebot_api/config.py
+++ b/apps/api/src/alicebot_api/config.py
@@ -44,6 +44,14 @@
DEFAULT_TELEGRAM_BOT_USERNAME = "alicebot"
DEFAULT_TELEGRAM_WEBHOOK_SECRET = ""
DEFAULT_TELEGRAM_BOT_TOKEN = ""
+DEFAULT_HOSTED_CHAT_RATE_LIMIT_WINDOW_SECONDS = 60
+DEFAULT_HOSTED_CHAT_RATE_LIMIT_MAX_REQUESTS = 20
+DEFAULT_HOSTED_SCHEDULER_RATE_LIMIT_WINDOW_SECONDS = 300
+DEFAULT_HOSTED_SCHEDULER_RATE_LIMIT_MAX_REQUESTS = 20
+DEFAULT_HOSTED_ABUSE_WINDOW_SECONDS = 600
+DEFAULT_HOSTED_ABUSE_BLOCK_THRESHOLD = 5
+DEFAULT_HOSTED_RATE_LIMITS_ENABLED_BY_DEFAULT = True
+DEFAULT_HOSTED_ABUSE_CONTROLS_ENABLED_BY_DEFAULT = True
Environment = Mapping[str, str]
@@ -94,6 +102,14 @@ class Settings:
telegram_bot_username: str = DEFAULT_TELEGRAM_BOT_USERNAME
telegram_webhook_secret: str = DEFAULT_TELEGRAM_WEBHOOK_SECRET
telegram_bot_token: str = DEFAULT_TELEGRAM_BOT_TOKEN
+ hosted_chat_rate_limit_window_seconds: int = DEFAULT_HOSTED_CHAT_RATE_LIMIT_WINDOW_SECONDS
+ hosted_chat_rate_limit_max_requests: int = DEFAULT_HOSTED_CHAT_RATE_LIMIT_MAX_REQUESTS
+ hosted_scheduler_rate_limit_window_seconds: int = DEFAULT_HOSTED_SCHEDULER_RATE_LIMIT_WINDOW_SECONDS
+ hosted_scheduler_rate_limit_max_requests: int = DEFAULT_HOSTED_SCHEDULER_RATE_LIMIT_MAX_REQUESTS
+ hosted_abuse_window_seconds: int = DEFAULT_HOSTED_ABUSE_WINDOW_SECONDS
+ hosted_abuse_block_threshold: int = DEFAULT_HOSTED_ABUSE_BLOCK_THRESHOLD
+ hosted_rate_limits_enabled_by_default: bool = DEFAULT_HOSTED_RATE_LIMITS_ENABLED_BY_DEFAULT
+ hosted_abuse_controls_enabled_by_default: bool = DEFAULT_HOSTED_ABUSE_CONTROLS_ENABLED_BY_DEFAULT
@classmethod
def from_env(cls, env: Environment | None = None) -> "Settings":
@@ -192,6 +208,48 @@ def from_env(cls, env: Environment | None = None) -> "Settings":
"TELEGRAM_BOT_TOKEN",
cls.telegram_bot_token,
).strip(),
+ hosted_chat_rate_limit_window_seconds=_get_env_int(
+ current_env,
+ "HOSTED_CHAT_RATE_LIMIT_WINDOW_SECONDS",
+ cls.hosted_chat_rate_limit_window_seconds,
+ ),
+ hosted_chat_rate_limit_max_requests=_get_env_int(
+ current_env,
+ "HOSTED_CHAT_RATE_LIMIT_MAX_REQUESTS",
+ cls.hosted_chat_rate_limit_max_requests,
+ ),
+ hosted_scheduler_rate_limit_window_seconds=_get_env_int(
+ current_env,
+ "HOSTED_SCHEDULER_RATE_LIMIT_WINDOW_SECONDS",
+ cls.hosted_scheduler_rate_limit_window_seconds,
+ ),
+ hosted_scheduler_rate_limit_max_requests=_get_env_int(
+ current_env,
+ "HOSTED_SCHEDULER_RATE_LIMIT_MAX_REQUESTS",
+ cls.hosted_scheduler_rate_limit_max_requests,
+ ),
+ hosted_abuse_window_seconds=_get_env_int(
+ current_env,
+ "HOSTED_ABUSE_WINDOW_SECONDS",
+ cls.hosted_abuse_window_seconds,
+ ),
+ hosted_abuse_block_threshold=_get_env_int(
+ current_env,
+ "HOSTED_ABUSE_BLOCK_THRESHOLD",
+ cls.hosted_abuse_block_threshold,
+ ),
+ hosted_rate_limits_enabled_by_default=_get_env_value(
+ current_env,
+ "HOSTED_RATE_LIMITS_ENABLED_BY_DEFAULT",
+ "true" if cls.hosted_rate_limits_enabled_by_default else "false",
+ ).strip().lower()
+ in {"1", "true", "yes", "on"},
+ hosted_abuse_controls_enabled_by_default=_get_env_value(
+ current_env,
+ "HOSTED_ABUSE_CONTROLS_ENABLED_BY_DEFAULT",
+ "true" if cls.hosted_abuse_controls_enabled_by_default else "false",
+ ).strip().lower()
+ in {"1", "true", "yes", "on"},
)
return _validate_settings(settings)
@@ -217,6 +275,18 @@ def _validate_settings(settings: Settings) -> Settings:
raise ValueError("TELEGRAM_LINK_TTL_SECONDS must be a positive integer")
if settings.telegram_bot_username == "":
raise ValueError("TELEGRAM_BOT_USERNAME must be provided")
+ if settings.hosted_chat_rate_limit_window_seconds <= 0:
+ raise ValueError("HOSTED_CHAT_RATE_LIMIT_WINDOW_SECONDS must be a positive integer")
+ if settings.hosted_chat_rate_limit_max_requests <= 0:
+ raise ValueError("HOSTED_CHAT_RATE_LIMIT_MAX_REQUESTS must be a positive integer")
+ if settings.hosted_scheduler_rate_limit_window_seconds <= 0:
+ raise ValueError("HOSTED_SCHEDULER_RATE_LIMIT_WINDOW_SECONDS must be a positive integer")
+ if settings.hosted_scheduler_rate_limit_max_requests <= 0:
+ raise ValueError("HOSTED_SCHEDULER_RATE_LIMIT_MAX_REQUESTS must be a positive integer")
+ if settings.hosted_abuse_window_seconds <= 0:
+ raise ValueError("HOSTED_ABUSE_WINDOW_SECONDS must be a positive integer")
+ if settings.hosted_abuse_block_threshold <= 0:
+ raise ValueError("HOSTED_ABUSE_BLOCK_THRESHOLD must be a positive integer")
if settings.app_env not in {"development", "test"}:
if settings.auth_user_id == "":
diff --git a/apps/api/src/alicebot_api/contracts.py b/apps/api/src/alicebot_api/contracts.py
index bb1f48a..b2179fc 100644
--- a/apps/api/src/alicebot_api/contracts.py
+++ b/apps/api/src/alicebot_api/contracts.py
@@ -5148,6 +5148,15 @@ class HostedWorkspaceRecord(TypedDict):
name: str
bootstrap_status: HostedWorkspaceBootstrapStatus
bootstrapped_at: str | None
+ support_status: Literal["healthy", "needs_attention", "blocked"]
+ support_notes: JsonObject
+ onboarding_last_error_code: str | None
+ onboarding_last_error_detail: str | None
+ onboarding_last_error_at: str | None
+ onboarding_error_count: int
+ rollout_evidence: JsonObject
+ rate_limit_evidence: JsonObject
+ incident_evidence: JsonObject
created_at: str
updated_at: str
@@ -5302,6 +5311,10 @@ class ChannelDeliveryReceiptRecord(TypedDict):
scheduled_for: str | None
schedule_slot: str | None
notification_policy: JsonObject
+ rollout_flag_state: Literal["enabled", "blocked"]
+ support_evidence: JsonObject
+ rate_limit_evidence: JsonObject
+ incident_evidence: JsonObject
recorded_at: str
created_at: str
@@ -5339,12 +5352,47 @@ class TelegramDailyBriefJobRecord(TypedDict):
delivery_receipt_id: str | None
payload: JsonObject
result_payload: JsonObject
+ rollout_flag_state: Literal["enabled", "blocked"]
+ support_evidence: JsonObject
+ rate_limit_evidence: JsonObject
+ incident_evidence: JsonObject
attempted_at: str | None
completed_at: str | None
created_at: str
updated_at: str
+class ChatTelemetryRecord(TypedDict):
+ id: str
+ user_account_id: str
+ workspace_id: str | None
+ channel_message_id: str | None
+ daily_brief_job_id: str | None
+ delivery_receipt_id: str | None
+ flow_kind: Literal["chat_handle", "scheduler_daily_brief", "scheduler_open_loop_prompt"]
+ event_kind: Literal["attempt", "result", "rollout_block", "rate_limited", "abuse_block", "incident"]
+ status: Literal[
+ "ok",
+ "failed",
+ "blocked_rollout",
+ "rate_limited",
+ "abuse_blocked",
+ "suppressed",
+ "simulated",
+ "delivered",
+ ]
+ route_path: str
+ rollout_flag_key: str | None
+ rollout_flag_state: str | None
+ rate_limit_key: str | None
+ rate_limit_window_seconds: int | None
+ rate_limit_max_requests: int | None
+ retry_after_seconds: int | None
+ abuse_signal: str | None
+ evidence: JsonObject
+ created_at: str
+
+
class ApprovalChallengeRecord(TypedDict):
id: str
workspace_id: str
diff --git a/apps/api/src/alicebot_api/hosted_admin.py b/apps/api/src/alicebot_api/hosted_admin.py
new file mode 100644
index 0000000..d893814
--- /dev/null
+++ b/apps/api/src/alicebot_api/hosted_admin.py
@@ -0,0 +1,597 @@
+from __future__ import annotations
+
+from datetime import UTC, datetime, timedelta
+from typing import Any, Literal
+from uuid import UUID
+
+from alicebot_api.hosted_telemetry import serialize_chat_telemetry
+
+
+def utc_now() -> datetime:
+ return datetime.now(UTC)
+
+
+def list_hosted_workspaces_for_admin(
+ conn,
+ *,
+ limit: int,
+) -> list[dict[str, object]]:
+ bounded_limit = max(1, min(limit, 200))
+
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT w.id,
+ w.owner_user_account_id,
+ w.slug,
+ w.name,
+ w.bootstrap_status,
+ w.bootstrapped_at,
+ w.support_status,
+ w.support_notes,
+ w.onboarding_last_error_code,
+ w.onboarding_last_error_detail,
+ w.onboarding_last_error_at,
+ w.onboarding_error_count,
+ w.rollout_evidence,
+ w.rate_limit_evidence,
+ w.incident_evidence,
+ w.created_at,
+ w.updated_at,
+ count(DISTINCT wm.user_account_id) AS member_count,
+ count(DISTINCT CASE WHEN ci.status = 'linked' THEN ci.id END) AS linked_identity_count,
+ max(cm.created_at) AS last_message_at,
+ max(dr.recorded_at) AS last_delivery_receipt_at
+ FROM workspaces AS w
+ LEFT JOIN workspace_members AS wm
+ ON wm.workspace_id = w.id
+ LEFT JOIN channel_identities AS ci
+ ON ci.workspace_id = w.id
+ AND ci.channel_type = 'telegram'
+ LEFT JOIN channel_messages AS cm
+ ON cm.workspace_id = w.id
+ AND cm.channel_type = 'telegram'
+ LEFT JOIN channel_delivery_receipts AS dr
+ ON dr.workspace_id = w.id
+ AND dr.channel_type = 'telegram'
+ GROUP BY w.id
+ ORDER BY w.updated_at DESC, w.id DESC
+ LIMIT %s
+ """,
+ (bounded_limit,),
+ )
+ rows = cur.fetchall()
+
+ payload: list[dict[str, object]] = []
+ for row in rows:
+ payload.append(
+ {
+ "id": str(row["id"]),
+ "owner_user_account_id": str(row["owner_user_account_id"]),
+ "slug": row["slug"],
+ "name": row["name"],
+ "bootstrap_status": row["bootstrap_status"],
+ "bootstrapped_at": None
+ if row["bootstrapped_at"] is None
+ else row["bootstrapped_at"].isoformat(),
+ "support_status": row["support_status"],
+ "support_notes": row["support_notes"],
+ "onboarding_last_error_code": row["onboarding_last_error_code"],
+ "onboarding_last_error_detail": row["onboarding_last_error_detail"],
+ "onboarding_last_error_at": None
+ if row["onboarding_last_error_at"] is None
+ else row["onboarding_last_error_at"].isoformat(),
+ "onboarding_error_count": row["onboarding_error_count"],
+ "rollout_evidence": row["rollout_evidence"],
+ "rate_limit_evidence": row["rate_limit_evidence"],
+ "incident_evidence": row["incident_evidence"],
+ "member_count": int(row["member_count"]),
+ "linked_identity_count": int(row["linked_identity_count"]),
+ "last_message_at": None
+ if row["last_message_at"] is None
+ else row["last_message_at"].isoformat(),
+ "last_delivery_receipt_at": None
+ if row["last_delivery_receipt_at"] is None
+ else row["last_delivery_receipt_at"].isoformat(),
+ "created_at": row["created_at"].isoformat(),
+ "updated_at": row["updated_at"].isoformat(),
+ }
+ )
+
+ return payload
+
+
+def list_hosted_delivery_receipts_for_admin(
+ conn,
+ *,
+ limit: int,
+ workspace_id: UUID | None = None,
+) -> list[dict[str, object]]:
+ bounded_limit = max(1, min(limit, 400))
+
+ with conn.cursor() as cur:
+ if workspace_id is None:
+ cur.execute(
+ """
+ SELECT r.id,
+ r.workspace_id,
+ r.channel_message_id,
+ r.channel_type,
+ r.status,
+ r.provider_receipt_id,
+ r.failure_code,
+ r.failure_detail,
+ r.scheduled_job_id,
+ r.scheduler_job_kind,
+ r.scheduled_for,
+ r.schedule_slot,
+ r.notification_policy,
+ r.rollout_flag_state,
+ r.support_evidence,
+ r.rate_limit_evidence,
+ r.incident_evidence,
+ r.recorded_at,
+ r.created_at,
+ w.slug AS workspace_slug,
+ w.name AS workspace_name,
+ m.direction AS message_direction
+ FROM channel_delivery_receipts AS r
+ JOIN workspaces AS w
+ ON w.id = r.workspace_id
+ LEFT JOIN channel_messages AS m
+ ON m.id = r.channel_message_id
+ WHERE r.channel_type = 'telegram'
+ ORDER BY r.recorded_at DESC, r.id DESC
+ LIMIT %s
+ """,
+ (bounded_limit,),
+ )
+ else:
+ cur.execute(
+ """
+ SELECT r.id,
+ r.workspace_id,
+ r.channel_message_id,
+ r.channel_type,
+ r.status,
+ r.provider_receipt_id,
+ r.failure_code,
+ r.failure_detail,
+ r.scheduled_job_id,
+ r.scheduler_job_kind,
+ r.scheduled_for,
+ r.schedule_slot,
+ r.notification_policy,
+ r.rollout_flag_state,
+ r.support_evidence,
+ r.rate_limit_evidence,
+ r.incident_evidence,
+ r.recorded_at,
+ r.created_at,
+ w.slug AS workspace_slug,
+ w.name AS workspace_name,
+ m.direction AS message_direction
+ FROM channel_delivery_receipts AS r
+ JOIN workspaces AS w
+ ON w.id = r.workspace_id
+ LEFT JOIN channel_messages AS m
+ ON m.id = r.channel_message_id
+ WHERE r.channel_type = 'telegram'
+ AND r.workspace_id = %s
+ ORDER BY r.recorded_at DESC, r.id DESC
+ LIMIT %s
+ """,
+ (workspace_id, bounded_limit),
+ )
+ rows = cur.fetchall()
+
+ payload: list[dict[str, object]] = []
+ for row in rows:
+ payload.append(
+ {
+ "id": str(row["id"]),
+ "workspace_id": str(row["workspace_id"]),
+ "workspace_slug": row["workspace_slug"],
+ "workspace_name": row["workspace_name"],
+ "channel_message_id": str(row["channel_message_id"]),
+ "message_direction": row["message_direction"],
+ "channel_type": row["channel_type"],
+ "status": row["status"],
+ "provider_receipt_id": row["provider_receipt_id"],
+ "failure_code": row["failure_code"],
+ "failure_detail": row["failure_detail"],
+ "scheduled_job_id": None
+ if row["scheduled_job_id"] is None
+ else str(row["scheduled_job_id"]),
+ "scheduler_job_kind": row["scheduler_job_kind"],
+ "scheduled_for": None if row["scheduled_for"] is None else row["scheduled_for"].isoformat(),
+ "schedule_slot": row["schedule_slot"],
+ "notification_policy": row["notification_policy"],
+ "rollout_flag_state": row["rollout_flag_state"],
+ "support_evidence": row["support_evidence"],
+ "rate_limit_evidence": row["rate_limit_evidence"],
+ "incident_evidence": row["incident_evidence"],
+ "recorded_at": row["recorded_at"].isoformat(),
+ "created_at": row["created_at"].isoformat(),
+ }
+ )
+
+ return payload
+
+
+def _workspace_onboarding_incidents(conn) -> list[dict[str, object]]:
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT id,
+ slug,
+ name,
+ support_status,
+ onboarding_last_error_code,
+ onboarding_last_error_detail,
+ onboarding_last_error_at,
+ onboarding_error_count,
+ incident_evidence,
+ updated_at
+ FROM workspaces
+ WHERE onboarding_error_count > 0
+ AND onboarding_last_error_at IS NOT NULL
+ ORDER BY onboarding_last_error_at DESC, id DESC
+ """,
+ )
+ rows = cur.fetchall()
+
+ incidents: list[dict[str, object]] = []
+ for row in rows:
+ evidence = row["incident_evidence"] or {}
+ resolved = bool(evidence.get("resolved", False))
+ incidents.append(
+ {
+ "incident_id": f"workspace-onboarding:{row['id']}",
+ "workspace_id": str(row["id"]),
+ "workspace_slug": row["slug"],
+ "workspace_name": row["name"],
+ "source": "workspace_onboarding",
+ "severity": "critical" if row["support_status"] == "blocked" else "warning",
+ "status": "resolved" if resolved else "open",
+ "code": row["onboarding_last_error_code"] or "onboarding_error",
+ "detail": row["onboarding_last_error_detail"]
+ or "workspace onboarding encountered an error",
+ "evidence": {
+ "onboarding_error_count": int(row["onboarding_error_count"]),
+ "support_status": row["support_status"],
+ **evidence,
+ },
+ "occurred_at": row["onboarding_last_error_at"].isoformat(),
+ "updated_at": row["updated_at"].isoformat(),
+ }
+ )
+
+ return incidents
+
+
+def _delivery_incidents(conn) -> list[dict[str, object]]:
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT r.id,
+ r.workspace_id,
+ w.slug,
+ w.name,
+ r.status,
+ r.failure_code,
+ r.failure_detail,
+ r.scheduler_job_kind,
+ r.incident_evidence,
+ r.recorded_at
+ FROM channel_delivery_receipts AS r
+ JOIN workspaces AS w
+ ON w.id = r.workspace_id
+ WHERE r.channel_type = 'telegram'
+ AND (
+ r.status IN ('failed', 'suppressed')
+ OR r.incident_evidence <> '{}'::jsonb
+ )
+ ORDER BY r.recorded_at DESC, r.id DESC
+ LIMIT 400
+ """,
+ )
+ rows = cur.fetchall()
+
+ incidents: list[dict[str, object]] = []
+ for row in rows:
+ evidence = row["incident_evidence"] or {}
+ resolved = bool(evidence.get("resolved", False))
+ incidents.append(
+ {
+ "incident_id": f"delivery-receipt:{row['id']}",
+ "workspace_id": str(row["workspace_id"]),
+ "workspace_slug": row["slug"],
+ "workspace_name": row["name"],
+ "source": "delivery_receipt",
+ "severity": "critical" if row["status"] == "failed" else "warning",
+ "status": "resolved" if resolved else "open",
+ "code": row["failure_code"] or f"delivery_{row['status']}",
+ "detail": row["failure_detail"] or "delivery receipt indicates a non-delivered status",
+ "evidence": {
+ "scheduler_job_kind": row["scheduler_job_kind"],
+ **evidence,
+ },
+ "occurred_at": row["recorded_at"].isoformat(),
+ "updated_at": row["recorded_at"].isoformat(),
+ }
+ )
+
+ return incidents
+
+
+def _telemetry_incidents(conn) -> list[dict[str, object]]:
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT t.id,
+ t.workspace_id,
+ w.slug,
+ w.name,
+ t.flow_kind,
+ t.event_kind,
+ t.status,
+ t.route_path,
+ t.evidence,
+ t.created_at
+ FROM chat_telemetry AS t
+ LEFT JOIN workspaces AS w
+ ON w.id = t.workspace_id
+ WHERE t.status IN ('failed', 'blocked_rollout', 'rate_limited', 'abuse_blocked')
+ OR t.event_kind = 'incident'
+ ORDER BY t.created_at DESC, t.id DESC
+ LIMIT 400
+ """,
+ )
+ rows = cur.fetchall()
+
+ incidents: list[dict[str, object]] = []
+ for row in rows:
+ evidence = row["evidence"] or {}
+ resolved = bool(evidence.get("resolved", False))
+ incidents.append(
+ {
+ "incident_id": f"chat-telemetry:{row['id']}",
+ "workspace_id": None if row["workspace_id"] is None else str(row["workspace_id"]),
+ "workspace_slug": row["slug"],
+ "workspace_name": row["name"],
+ "source": "chat_telemetry",
+ "severity": "critical"
+ if row["status"] in {"failed", "abuse_blocked"}
+ else "warning",
+ "status": "resolved" if resolved else "open",
+ "code": str(row["status"]),
+ "detail": f"{row['flow_kind']} {row['event_kind']} via {row['route_path']}",
+ "evidence": {
+ "flow_kind": row["flow_kind"],
+ "event_kind": row["event_kind"],
+ "route_path": row["route_path"],
+ **evidence,
+ },
+ "occurred_at": row["created_at"].isoformat(),
+ "updated_at": row["created_at"].isoformat(),
+ }
+ )
+
+ return incidents
+
+
+def list_hosted_incidents_for_admin(
+ conn,
+ *,
+ limit: int,
+ status_filter: Literal["open", "resolved", "all"] = "open",
+ workspace_id: UUID | None = None,
+) -> list[dict[str, object]]:
+ bounded_limit = max(1, min(limit, 500))
+
+ incidents = [
+ *_workspace_onboarding_incidents(conn),
+ *_delivery_incidents(conn),
+ *_telemetry_incidents(conn),
+ ]
+
+ filtered: list[dict[str, object]] = []
+ for incident in incidents:
+ if workspace_id is not None and incident["workspace_id"] != str(workspace_id):
+ continue
+ if status_filter != "all" and incident["status"] != status_filter:
+ continue
+ filtered.append(incident)
+
+ filtered.sort(key=lambda item: str(item["occurred_at"]), reverse=True)
+ return filtered[:bounded_limit]
+
+
+def get_hosted_overview_for_admin(
+ conn,
+ *,
+ window_hours: int,
+) -> dict[str, object]:
+ bounded_hours = max(1, min(window_hours, 168))
+ window_start = utc_now() - timedelta(hours=bounded_hours)
+
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT count(*) AS total_count,
+ count(*) FILTER (WHERE bootstrap_status = 'ready') AS ready_count,
+ count(*) FILTER (WHERE bootstrap_status = 'pending') AS pending_count,
+ count(*) FILTER (WHERE support_status = 'blocked') AS blocked_support_count,
+ count(*) FILTER (WHERE support_status = 'needs_attention') AS attention_support_count
+ FROM workspaces
+ """,
+ )
+ workspace_counts = cur.fetchone()
+
+ cur.execute(
+ """
+ SELECT count(DISTINCT workspace_id) AS linked_workspace_count
+ FROM channel_identities
+ WHERE channel_type = 'telegram'
+ AND status = 'linked'
+ """,
+ )
+ linked_counts = cur.fetchone()
+
+ cur.execute(
+ """
+ SELECT count(*) AS total_count,
+ count(*) FILTER (WHERE status = 'failed') AS failed_count,
+ count(*) FILTER (WHERE status = 'suppressed') AS suppressed_count,
+ count(*) FILTER (WHERE status IN ('simulated', 'delivered')) AS delivered_or_simulated_count
+ FROM channel_delivery_receipts
+ WHERE channel_type = 'telegram'
+ AND recorded_at >= %s
+ """,
+ (window_start,),
+ )
+ delivery_counts = cur.fetchone()
+
+ cur.execute(
+ """
+ SELECT count(*) AS total_count,
+ count(*) FILTER (WHERE status = 'ok') AS ok_count,
+ count(*) FILTER (WHERE status = 'failed') AS failed_count,
+ count(*) FILTER (WHERE status = 'blocked_rollout') AS rollout_blocked_count,
+ count(*) FILTER (WHERE status = 'rate_limited') AS rate_limited_count,
+ count(*) FILTER (WHERE status = 'abuse_blocked') AS abuse_blocked_count
+ FROM chat_telemetry
+ WHERE created_at >= %s
+ """,
+ (window_start,),
+ )
+ telemetry_counts = cur.fetchone()
+
+ cur.execute(
+ """
+ SELECT count(*) AS total_count,
+ count(*) FILTER (WHERE enabled = true) AS enabled_count,
+ count(*) FILTER (WHERE enabled = false) AS disabled_count
+ FROM feature_flags
+ WHERE flag_key LIKE 'hosted_%%'
+ """,
+ )
+ rollout_counts = cur.fetchone()
+
+ incident_count = len(
+ list_hosted_incidents_for_admin(
+ conn,
+ limit=500,
+ status_filter="open",
+ workspace_id=None,
+ )
+ )
+
+ return {
+ "window_hours": bounded_hours,
+ "window_start": window_start.isoformat(),
+ "workspaces": {
+ "total_count": int(workspace_counts["total_count"]),
+ "ready_count": int(workspace_counts["ready_count"]),
+ "pending_count": int(workspace_counts["pending_count"]),
+ "blocked_support_count": int(workspace_counts["blocked_support_count"]),
+ "attention_support_count": int(workspace_counts["attention_support_count"]),
+ "linked_telegram_workspace_count": int(linked_counts["linked_workspace_count"]),
+ },
+ "delivery_receipts": {
+ "total_count": int(delivery_counts["total_count"]),
+ "failed_count": int(delivery_counts["failed_count"]),
+ "suppressed_count": int(delivery_counts["suppressed_count"]),
+ "delivered_or_simulated_count": int(delivery_counts["delivered_or_simulated_count"]),
+ },
+ "chat_telemetry": {
+ "total_count": int(telemetry_counts["total_count"]),
+ "ok_count": int(telemetry_counts["ok_count"]),
+ "failed_count": int(telemetry_counts["failed_count"]),
+ "rollout_blocked_count": int(telemetry_counts["rollout_blocked_count"]),
+ "rate_limited_count": int(telemetry_counts["rate_limited_count"]),
+ "abuse_blocked_count": int(telemetry_counts["abuse_blocked_count"]),
+ },
+ "rollout_flags": {
+ "total_count": int(rollout_counts["total_count"]),
+ "enabled_count": int(rollout_counts["enabled_count"]),
+ "disabled_count": int(rollout_counts["disabled_count"]),
+ },
+ "incidents": {
+ "open_count": incident_count,
+ },
+ }
+
+
+def get_hosted_rate_limits_for_admin(
+ conn,
+ *,
+ window_hours: int,
+ limit: int,
+) -> dict[str, object]:
+ bounded_hours = max(1, min(window_hours, 168))
+ bounded_limit = max(1, min(limit, 200))
+ window_start = utc_now() - timedelta(hours=bounded_hours)
+
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT flow_kind,
+ status,
+ count(*) AS total_count
+ FROM chat_telemetry
+ WHERE created_at >= %s
+ AND status IN ('rate_limited', 'abuse_blocked')
+ GROUP BY flow_kind, status
+ ORDER BY flow_kind ASC, status ASC
+ """,
+ (window_start,),
+ )
+ grouped_rows = cur.fetchall()
+
+ summary: dict[str, dict[str, int]] = {}
+ for row in grouped_rows:
+ flow_kind = str(row["flow_kind"])
+ status = str(row["status"])
+ bucket = summary.setdefault(flow_kind, {})
+ bucket[status] = int(row["total_count"])
+
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT id,
+ user_account_id,
+ workspace_id,
+ channel_message_id,
+ daily_brief_job_id,
+ delivery_receipt_id,
+ flow_kind,
+ event_kind,
+ status,
+ route_path,
+ rollout_flag_key,
+ rollout_flag_state,
+ rate_limit_key,
+ rate_limit_window_seconds,
+ rate_limit_max_requests,
+ retry_after_seconds,
+ abuse_signal,
+ evidence,
+ created_at
+ FROM chat_telemetry
+ WHERE created_at >= %s
+ AND status IN ('rate_limited', 'abuse_blocked')
+ ORDER BY created_at DESC, id DESC
+ LIMIT %s
+ """,
+ (window_start, bounded_limit),
+ )
+ recent_rows = cur.fetchall()
+
+ return {
+ "window_hours": bounded_hours,
+ "window_start": window_start.isoformat(),
+ "summary": summary,
+ "items": [serialize_chat_telemetry(row) for row in recent_rows],
+ }
diff --git a/apps/api/src/alicebot_api/hosted_rate_limits.py b/apps/api/src/alicebot_api/hosted_rate_limits.py
new file mode 100644
index 0000000..2511a1c
--- /dev/null
+++ b/apps/api/src/alicebot_api/hosted_rate_limits.py
@@ -0,0 +1,165 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from datetime import UTC, datetime, timedelta
+from typing import Literal, TypedDict
+from uuid import UUID
+
+from alicebot_api.config import Settings
+
+
+HostedFlowKind = Literal["chat_handle", "scheduler_daily_brief", "scheduler_open_loop_prompt"]
+
+
+@dataclass(frozen=True)
+class RateLimitPolicy:
+ key: str
+ window_seconds: int
+ max_requests: int
+
+
+class RateLimitDecision(TypedDict):
+ allowed: bool
+ code: str | None
+ message: str
+ retry_after_seconds: int
+ rate_limit_key: str
+ window_seconds: int
+ max_requests: int
+ observed_requests: int
+ abuse_signal: str | None
+
+
+def utc_now() -> datetime:
+ return datetime.now(UTC)
+
+
+def _policy_for_flow(settings: Settings, *, flow_kind: HostedFlowKind) -> RateLimitPolicy:
+ if flow_kind == "chat_handle":
+ return RateLimitPolicy(
+ key="hosted_chat_handle",
+ window_seconds=settings.hosted_chat_rate_limit_window_seconds,
+ max_requests=settings.hosted_chat_rate_limit_max_requests,
+ )
+
+ return RateLimitPolicy(
+ key="hosted_scheduler_delivery",
+ window_seconds=settings.hosted_scheduler_rate_limit_window_seconds,
+ max_requests=settings.hosted_scheduler_rate_limit_max_requests,
+ )
+
+
+def evaluate_hosted_flow_limits(
+ conn,
+ *,
+ settings: Settings,
+ user_account_id: UUID,
+ workspace_id: UUID,
+ flow_kind: HostedFlowKind,
+ now: datetime | None = None,
+) -> RateLimitDecision:
+ del user_account_id
+ timestamp = utc_now() if now is None else now
+ policy = _policy_for_flow(settings, flow_kind=flow_kind)
+
+ if not settings.hosted_rate_limits_enabled_by_default:
+ return {
+ "allowed": True,
+ "code": None,
+ "message": "hosted rate limits are disabled by configuration",
+ "retry_after_seconds": 0,
+ "rate_limit_key": policy.key,
+ "window_seconds": policy.window_seconds,
+ "max_requests": policy.max_requests,
+ "observed_requests": 0,
+ "abuse_signal": None,
+ }
+
+ window_start = timestamp - timedelta(seconds=policy.window_seconds)
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT count(*) AS total_count,
+ min(created_at) AS oldest_created_at
+ FROM chat_telemetry
+ WHERE workspace_id = %s
+ AND flow_kind = %s
+ AND event_kind = 'attempt'
+ AND created_at >= %s
+ """,
+ (workspace_id, flow_kind, window_start),
+ )
+ attempts_row = cur.fetchone()
+
+ observed_requests = int(attempts_row["total_count"]) if attempts_row is not None else 0
+
+ abuse_window_start = timestamp - timedelta(seconds=settings.hosted_abuse_window_seconds)
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT count(*) AS blocked_count,
+ min(created_at) AS oldest_created_at
+ FROM chat_telemetry
+ WHERE workspace_id = %s
+ AND flow_kind = %s
+ AND status IN ('rate_limited', 'abuse_blocked')
+ AND created_at >= %s
+ """,
+ (workspace_id, flow_kind, abuse_window_start),
+ )
+ blocked_row = cur.fetchone()
+
+ blocked_count = int(blocked_row["blocked_count"]) if blocked_row is not None else 0
+ if settings.hosted_abuse_controls_enabled_by_default and blocked_count >= settings.hosted_abuse_block_threshold:
+ oldest = blocked_row["oldest_created_at"]
+ retry_after = settings.hosted_abuse_window_seconds
+ if oldest is not None:
+ elapsed = int((timestamp - oldest).total_seconds())
+ retry_after = max(1, settings.hosted_abuse_window_seconds - elapsed)
+ return {
+ "allowed": False,
+ "code": "hosted_abuse_limit_exceeded",
+ "message": (
+ "hosted abuse controls blocked this flow after repeated rate-limit violations"
+ ),
+ "retry_after_seconds": retry_after,
+ "rate_limit_key": policy.key,
+ "window_seconds": settings.hosted_abuse_window_seconds,
+ "max_requests": settings.hosted_abuse_block_threshold,
+ "observed_requests": blocked_count,
+ "abuse_signal": "repeated_rate_limit_violations",
+ }
+
+ if observed_requests >= policy.max_requests:
+ oldest = attempts_row["oldest_created_at"] if attempts_row is not None else None
+ retry_after = policy.window_seconds
+ if oldest is not None:
+ elapsed = int((timestamp - oldest).total_seconds())
+ retry_after = max(1, policy.window_seconds - elapsed)
+
+ return {
+ "allowed": False,
+ "code": "hosted_rate_limit_exceeded",
+ "message": (
+ "hosted flow rate limit exceeded; "
+ f"max {policy.max_requests} requests per {policy.window_seconds} seconds"
+ ),
+ "retry_after_seconds": retry_after,
+ "rate_limit_key": policy.key,
+ "window_seconds": policy.window_seconds,
+ "max_requests": policy.max_requests,
+ "observed_requests": observed_requests,
+ "abuse_signal": None,
+ }
+
+ return {
+ "allowed": True,
+ "code": None,
+ "message": "within hosted flow rate limits",
+ "retry_after_seconds": 0,
+ "rate_limit_key": policy.key,
+ "window_seconds": policy.window_seconds,
+ "max_requests": policy.max_requests,
+ "observed_requests": observed_requests,
+ "abuse_signal": None,
+ }
diff --git a/apps/api/src/alicebot_api/hosted_rollout.py b/apps/api/src/alicebot_api/hosted_rollout.py
new file mode 100644
index 0000000..d24975c
--- /dev/null
+++ b/apps/api/src/alicebot_api/hosted_rollout.py
@@ -0,0 +1,289 @@
+from __future__ import annotations
+
+from datetime import datetime
+from typing import TypedDict
+from uuid import UUID
+
+
+class RolloutFlagBlockedError(RuntimeError):
+ """Raised when a hosted rollout flag blocks the requested operation."""
+
+
+class FeatureFlagRow(TypedDict):
+ id: UUID
+ flag_key: str
+ cohort_key: str | None
+ enabled: bool
+ description: str | None
+ created_at: datetime
+ updated_at: datetime
+
+
+class RolloutFlagResolution(TypedDict):
+ flag_key: str
+ enabled: bool
+ source_scope: str
+ source_cohort_key: str | None
+ description: str | None
+ updated_at: str
+
+
+class RolloutFlagPatch(TypedDict):
+ flag_key: str
+ enabled: bool
+ cohort_key: str | None
+ description: str | None
+
+
+def _get_user_cohort(conn, *, user_account_id: UUID) -> str | None:
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT beta_cohort_key
+ FROM user_accounts
+ WHERE id = %s
+ LIMIT 1
+ """,
+ (user_account_id,),
+ )
+ row = cur.fetchone()
+ if row is None:
+ return None
+ return row["beta_cohort_key"]
+
+
+def _normalize_flag_key(flag_key: str, *, hosted_only: bool = False) -> str:
+ normalized = flag_key.strip()
+ if normalized == "":
+ raise ValueError("rollout flag key is required")
+ if len(normalized) > 120:
+ raise ValueError("rollout flag key must be 120 characters or less")
+ if hosted_only and not normalized.startswith("hosted_"):
+ raise ValueError("rollout flag key must start with 'hosted_'")
+ return normalized
+
+
+def _normalize_cohort_key(cohort_key: str | None) -> str | None:
+ if cohort_key is None:
+ return None
+ normalized = cohort_key.strip()
+ if normalized == "":
+ return None
+ if len(normalized) > 120:
+ raise ValueError("cohort key must be 120 characters or less")
+ return normalized
+
+
+def resolve_rollout_flag(
+ conn,
+ *,
+ user_account_id: UUID,
+ flag_key: str,
+) -> RolloutFlagResolution:
+ normalized_flag_key = _normalize_flag_key(flag_key)
+ cohort_key = _get_user_cohort(conn, user_account_id=user_account_id)
+
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT id,
+ flag_key,
+ cohort_key,
+ enabled,
+ description,
+ created_at,
+ updated_at
+ FROM feature_flags
+ WHERE flag_key = %s
+ AND (cohort_key IS NULL OR cohort_key = %s)
+ ORDER BY CASE WHEN cohort_key = %s THEN 0 ELSE 1 END,
+ updated_at DESC,
+ id DESC
+ LIMIT 1
+ """,
+ (normalized_flag_key, cohort_key, cohort_key),
+ )
+ row = cur.fetchone()
+
+ if row is None:
+ return {
+ "flag_key": normalized_flag_key,
+ "enabled": False,
+ "source_scope": "missing",
+ "source_cohort_key": None,
+ "description": None,
+ "updated_at": "",
+ }
+
+ source_scope = "cohort" if row["cohort_key"] is not None else "global"
+ return {
+ "flag_key": row["flag_key"],
+ "enabled": bool(row["enabled"]),
+ "source_scope": source_scope,
+ "source_cohort_key": row["cohort_key"],
+ "description": row["description"],
+ "updated_at": row["updated_at"].isoformat(),
+ }
+
+
+def ensure_rollout_flag_enabled(
+ conn,
+ *,
+ user_account_id: UUID,
+ flag_key: str,
+) -> RolloutFlagResolution:
+ resolution = resolve_rollout_flag(
+ conn,
+ user_account_id=user_account_id,
+ flag_key=flag_key,
+ )
+ if not resolution["enabled"]:
+ raise RolloutFlagBlockedError(
+ f"rollout flag '{resolution['flag_key']}' is disabled for this account"
+ )
+ return resolution
+
+
+def list_rollout_flags_for_admin(
+ conn,
+ *,
+ user_account_id: UUID,
+ include_non_hosted_flags: bool = False,
+) -> list[RolloutFlagResolution]:
+ cohort_key = _get_user_cohort(conn, user_account_id=user_account_id)
+
+ with conn.cursor() as cur:
+ if include_non_hosted_flags:
+ cur.execute(
+ """
+ SELECT id,
+ flag_key,
+ cohort_key,
+ enabled,
+ description,
+ created_at,
+ updated_at
+ FROM feature_flags
+ WHERE cohort_key IS NULL OR cohort_key = %s
+ ORDER BY flag_key ASC,
+ CASE WHEN cohort_key = %s THEN 0 ELSE 1 END,
+ updated_at DESC,
+ id DESC
+ """,
+ (cohort_key, cohort_key),
+ )
+ else:
+ cur.execute(
+ """
+ SELECT id,
+ flag_key,
+ cohort_key,
+ enabled,
+ description,
+ created_at,
+ updated_at
+ FROM feature_flags
+ WHERE flag_key LIKE 'hosted_%%'
+ AND (cohort_key IS NULL OR cohort_key = %s)
+ ORDER BY flag_key ASC,
+ CASE WHEN cohort_key = %s THEN 0 ELSE 1 END,
+ updated_at DESC,
+ id DESC
+ """,
+ (cohort_key, cohort_key),
+ )
+ rows = cur.fetchall()
+
+ selected: dict[str, FeatureFlagRow] = {}
+ for row in rows:
+ key = str(row["flag_key"])
+ if key in selected:
+ continue
+ selected[key] = row
+
+ payload: list[RolloutFlagResolution] = []
+ for key in sorted(selected):
+ row = selected[key]
+ payload.append(
+ {
+ "flag_key": row["flag_key"],
+ "enabled": bool(row["enabled"]),
+ "source_scope": "cohort" if row["cohort_key"] is not None else "global",
+ "source_cohort_key": row["cohort_key"],
+ "description": row["description"],
+ "updated_at": row["updated_at"].isoformat(),
+ }
+ )
+
+ return payload
+
+
+def patch_rollout_flags(
+ conn,
+ *,
+ patches: list[RolloutFlagPatch],
+ allowed_cohort_key: str | None = None,
+) -> list[RolloutFlagResolution]:
+ updated: list[RolloutFlagResolution] = []
+ with conn.cursor() as cur:
+ for patch in patches:
+ flag_key = _normalize_flag_key(patch["flag_key"], hosted_only=True)
+ cohort_key = _normalize_cohort_key(patch.get("cohort_key"))
+ description = patch.get("description")
+ enabled = bool(patch["enabled"])
+ if cohort_key != allowed_cohort_key:
+ raise ValueError("rollout flag cohort must match caller cohort")
+
+ if cohort_key is not None:
+ cur.execute(
+ """
+ SELECT 1
+ FROM beta_cohorts
+ WHERE cohort_key = %s
+ LIMIT 1
+ """,
+ (cohort_key,),
+ )
+ if cur.fetchone() is None:
+ raise ValueError(f"cohort {cohort_key!r} was not found")
+
+ cur.execute(
+ """
+ UPDATE feature_flags
+ SET enabled = %s,
+ description = COALESCE(%s, description),
+ updated_at = clock_timestamp()
+ WHERE flag_key = %s
+ AND cohort_key IS NOT DISTINCT FROM %s
+ RETURNING flag_key, cohort_key, enabled, description, updated_at
+ """,
+ (enabled, description, flag_key, cohort_key),
+ )
+ row = cur.fetchone()
+
+ if row is None:
+ cur.execute(
+ """
+ INSERT INTO feature_flags (flag_key, cohort_key, enabled, description)
+ VALUES (%s, %s, %s, %s)
+ RETURNING flag_key, cohort_key, enabled, description, updated_at
+ """,
+ (flag_key, cohort_key, enabled, description),
+ )
+ row = cur.fetchone()
+
+ if row is None:
+ raise RuntimeError("failed to patch rollout flag")
+
+ updated.append(
+ {
+ "flag_key": row["flag_key"],
+ "enabled": bool(row["enabled"]),
+ "source_scope": "cohort" if row["cohort_key"] is not None else "global",
+ "source_cohort_key": row["cohort_key"],
+ "description": row["description"],
+ "updated_at": row["updated_at"].isoformat(),
+ }
+ )
+
+ return updated
diff --git a/apps/api/src/alicebot_api/hosted_telemetry.py b/apps/api/src/alicebot_api/hosted_telemetry.py
new file mode 100644
index 0000000..86fc9b5
--- /dev/null
+++ b/apps/api/src/alicebot_api/hosted_telemetry.py
@@ -0,0 +1,336 @@
+from __future__ import annotations
+
+from datetime import UTC, datetime, timedelta
+from typing import Any, Literal, TypedDict
+from uuid import UUID
+
+from psycopg.types.json import Jsonb
+
+
+HostedFlowKind = Literal["chat_handle", "scheduler_daily_brief", "scheduler_open_loop_prompt"]
+HostedTelemetryEventKind = Literal[
+ "attempt",
+ "result",
+ "rollout_block",
+ "rate_limited",
+ "abuse_block",
+ "incident",
+]
+HostedTelemetryStatus = Literal[
+ "ok",
+ "failed",
+ "blocked_rollout",
+ "rate_limited",
+ "abuse_blocked",
+ "suppressed",
+ "simulated",
+ "delivered",
+]
+
+
+class ChatTelemetryRow(TypedDict):
+ id: UUID
+ user_account_id: UUID
+ workspace_id: UUID | None
+ channel_message_id: UUID | None
+ daily_brief_job_id: UUID | None
+ delivery_receipt_id: UUID | None
+ flow_kind: HostedFlowKind
+ event_kind: HostedTelemetryEventKind
+ status: HostedTelemetryStatus
+ route_path: str
+ rollout_flag_key: str | None
+ rollout_flag_state: str | None
+ rate_limit_key: str | None
+ rate_limit_window_seconds: int | None
+ rate_limit_max_requests: int | None
+ retry_after_seconds: int | None
+ abuse_signal: str | None
+ evidence: dict[str, Any]
+ created_at: datetime
+
+
+def utc_now() -> datetime:
+ return datetime.now(UTC)
+
+
+def record_chat_telemetry(
+ conn,
+ *,
+ user_account_id: UUID,
+ workspace_id: UUID | None,
+ flow_kind: HostedFlowKind,
+ event_kind: HostedTelemetryEventKind,
+ status: HostedTelemetryStatus,
+ route_path: str,
+ channel_message_id: UUID | None = None,
+ daily_brief_job_id: UUID | None = None,
+ delivery_receipt_id: UUID | None = None,
+ rollout_flag_key: str | None = None,
+ rollout_flag_state: str | None = None,
+ rate_limit_key: str | None = None,
+ rate_limit_window_seconds: int | None = None,
+ rate_limit_max_requests: int | None = None,
+ retry_after_seconds: int | None = None,
+ abuse_signal: str | None = None,
+ evidence: dict[str, Any] | None = None,
+ created_at: datetime | None = None,
+) -> ChatTelemetryRow:
+ normalized_route_path = route_path.strip()
+ if normalized_route_path == "":
+ raise ValueError("route_path is required for chat telemetry")
+
+ timestamp = utc_now() if created_at is None else created_at
+
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ INSERT INTO chat_telemetry (
+ user_account_id,
+ workspace_id,
+ channel_message_id,
+ daily_brief_job_id,
+ delivery_receipt_id,
+ flow_kind,
+ event_kind,
+ status,
+ route_path,
+ rollout_flag_key,
+ rollout_flag_state,
+ rate_limit_key,
+ rate_limit_window_seconds,
+ rate_limit_max_requests,
+ retry_after_seconds,
+ abuse_signal,
+ evidence,
+ created_at
+ )
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+ RETURNING id,
+ user_account_id,
+ workspace_id,
+ channel_message_id,
+ daily_brief_job_id,
+ delivery_receipt_id,
+ flow_kind,
+ event_kind,
+ status,
+ route_path,
+ rollout_flag_key,
+ rollout_flag_state,
+ rate_limit_key,
+ rate_limit_window_seconds,
+ rate_limit_max_requests,
+ retry_after_seconds,
+ abuse_signal,
+ evidence,
+ created_at
+ """,
+ (
+ user_account_id,
+ workspace_id,
+ channel_message_id,
+ daily_brief_job_id,
+ delivery_receipt_id,
+ flow_kind,
+ event_kind,
+ status,
+ normalized_route_path,
+ rollout_flag_key,
+ rollout_flag_state,
+ rate_limit_key,
+ rate_limit_window_seconds,
+ rate_limit_max_requests,
+ retry_after_seconds,
+ abuse_signal,
+ Jsonb(evidence or {}),
+ timestamp,
+ ),
+ )
+ row = cur.fetchone()
+
+ if row is None:
+ raise RuntimeError("failed to persist hosted chat telemetry")
+ return row
+
+
+def serialize_chat_telemetry(row: ChatTelemetryRow) -> dict[str, object]:
+ return {
+ "id": str(row["id"]),
+ "user_account_id": str(row["user_account_id"]),
+ "workspace_id": None if row["workspace_id"] is None else str(row["workspace_id"]),
+ "channel_message_id": None if row["channel_message_id"] is None else str(row["channel_message_id"]),
+ "daily_brief_job_id": None if row["daily_brief_job_id"] is None else str(row["daily_brief_job_id"]),
+ "delivery_receipt_id": None
+ if row["delivery_receipt_id"] is None
+ else str(row["delivery_receipt_id"]),
+ "flow_kind": row["flow_kind"],
+ "event_kind": row["event_kind"],
+ "status": row["status"],
+ "route_path": row["route_path"],
+ "rollout_flag_key": row["rollout_flag_key"],
+ "rollout_flag_state": row["rollout_flag_state"],
+ "rate_limit_key": row["rate_limit_key"],
+ "rate_limit_window_seconds": row["rate_limit_window_seconds"],
+ "rate_limit_max_requests": row["rate_limit_max_requests"],
+ "retry_after_seconds": row["retry_after_seconds"],
+ "abuse_signal": row["abuse_signal"],
+ "evidence": row["evidence"],
+ "created_at": row["created_at"].isoformat(),
+ }
+
+
+def list_recent_chat_telemetry(
+ conn,
+ *,
+ limit: int,
+ workspace_id: UUID | None = None,
+) -> list[ChatTelemetryRow]:
+ bounded_limit = max(1, min(limit, 200))
+
+ with conn.cursor() as cur:
+ if workspace_id is None:
+ cur.execute(
+ """
+ SELECT id,
+ user_account_id,
+ workspace_id,
+ channel_message_id,
+ daily_brief_job_id,
+ delivery_receipt_id,
+ flow_kind,
+ event_kind,
+ status,
+ route_path,
+ rollout_flag_key,
+ rollout_flag_state,
+ rate_limit_key,
+ rate_limit_window_seconds,
+ rate_limit_max_requests,
+ retry_after_seconds,
+ abuse_signal,
+ evidence,
+ created_at
+ FROM chat_telemetry
+ ORDER BY created_at DESC, id DESC
+ LIMIT %s
+ """,
+ (bounded_limit,),
+ )
+ else:
+ cur.execute(
+ """
+ SELECT id,
+ user_account_id,
+ workspace_id,
+ channel_message_id,
+ daily_brief_job_id,
+ delivery_receipt_id,
+ flow_kind,
+ event_kind,
+ status,
+ route_path,
+ rollout_flag_key,
+ rollout_flag_state,
+ rate_limit_key,
+ rate_limit_window_seconds,
+ rate_limit_max_requests,
+ retry_after_seconds,
+ abuse_signal,
+ evidence,
+ created_at
+ FROM chat_telemetry
+ WHERE workspace_id = %s
+ ORDER BY created_at DESC, id DESC
+ LIMIT %s
+ """,
+ (workspace_id, bounded_limit),
+ )
+ rows = cur.fetchall()
+
+ return rows
+
+
+def aggregate_chat_telemetry(
+ conn,
+ *,
+ window_hours: int,
+) -> dict[str, object]:
+ bounded_hours = max(1, min(window_hours, 168))
+ start_at = utc_now() - timedelta(hours=bounded_hours)
+
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT flow_kind,
+ status,
+ count(*) AS total_count
+ FROM chat_telemetry
+ WHERE created_at >= %s
+ GROUP BY flow_kind, status
+ ORDER BY flow_kind ASC, status ASC
+ """,
+ (start_at,),
+ )
+ rows = cur.fetchall()
+
+ flow_counts: dict[str, int] = {}
+ status_counts: dict[str, int] = {}
+ matrix: dict[str, dict[str, int]] = {}
+
+ for row in rows:
+ flow_kind = str(row["flow_kind"])
+ status = str(row["status"])
+ count = int(row["total_count"])
+
+ flow_counts[flow_kind] = flow_counts.get(flow_kind, 0) + count
+ status_counts[status] = status_counts.get(status, 0) + count
+
+ bucket = matrix.setdefault(flow_kind, {})
+ bucket[status] = count
+
+ total_events = sum(status_counts.values())
+
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ SELECT date_trunc('hour', created_at) AS hour_bucket,
+ count(*) AS total_count,
+ count(*) FILTER (WHERE status = 'ok') AS ok_count,
+ count(*) FILTER (WHERE status = 'failed') AS failed_count,
+ count(*) FILTER (WHERE status = 'blocked_rollout') AS blocked_rollout_count,
+ count(*) FILTER (WHERE status = 'rate_limited') AS rate_limited_count,
+ count(*) FILTER (WHERE status = 'abuse_blocked') AS abuse_blocked_count
+ FROM chat_telemetry
+ WHERE created_at >= %s
+ GROUP BY hour_bucket
+ ORDER BY hour_bucket DESC
+ LIMIT 48
+ """,
+ (start_at,),
+ )
+ hourly_rows = cur.fetchall()
+
+ hourly: list[dict[str, object]] = []
+ for row in hourly_rows:
+ hourly.append(
+ {
+ "hour": row["hour_bucket"].isoformat(),
+ "total_count": int(row["total_count"]),
+ "ok_count": int(row["ok_count"]),
+ "failed_count": int(row["failed_count"]),
+ "blocked_rollout_count": int(row["blocked_rollout_count"]),
+ "rate_limited_count": int(row["rate_limited_count"]),
+ "abuse_blocked_count": int(row["abuse_blocked_count"]),
+ }
+ )
+
+ return {
+ "window_hours": bounded_hours,
+ "window_start": start_at.isoformat(),
+ "total_events": total_events,
+ "flow_counts": flow_counts,
+ "status_counts": status_counts,
+ "flow_status_matrix": matrix,
+ "hourly": hourly,
+ }
diff --git a/apps/api/src/alicebot_api/main.py b/apps/api/src/alicebot_api/main.py
index f40d277..0c79cad 100644
--- a/apps/api/src/alicebot_api/main.py
+++ b/apps/api/src/alicebot_api/main.py
@@ -387,6 +387,23 @@
serialize_workspace,
set_session_workspace,
)
+from alicebot_api.hosted_rollout import (
+ list_rollout_flags_for_admin,
+ patch_rollout_flags,
+ resolve_rollout_flag,
+)
+from alicebot_api.hosted_telemetry import (
+ aggregate_chat_telemetry,
+ record_chat_telemetry,
+)
+from alicebot_api.hosted_rate_limits import evaluate_hosted_flow_limits
+from alicebot_api.hosted_admin import (
+ get_hosted_overview_for_admin,
+ get_hosted_rate_limits_for_admin,
+ list_hosted_delivery_receipts_for_admin,
+ list_hosted_incidents_for_admin,
+ list_hosted_workspaces_for_admin,
+)
from alicebot_api.telegram_channels import (
TelegramIdentityNotFoundError,
TelegramLinkPendingError,
@@ -1451,6 +1468,21 @@ class TelegramScheduledDeliveryRequest(BaseModel):
idempotency_key: str | None = Field(default=None, min_length=8, max_length=200)
+class HostedRolloutFlagPatchItemRequest(BaseModel):
+ model_config = ConfigDict(extra="forbid")
+
+ flag_key: str = Field(min_length=1, max_length=120)
+ enabled: bool
+ cohort_key: str | None = Field(default=None, min_length=1, max_length=120)
+ description: str | None = Field(default=None, min_length=1, max_length=500)
+
+
+class HostedRolloutFlagsPatchRequest(BaseModel):
+ model_config = ConfigDict(extra="forbid")
+
+ updates: list[HostedRolloutFlagPatchItemRequest] = Field(min_length=1, max_length=100)
+
+
def _extract_bearer_token(request: Request) -> str:
raw_authorization = request.headers.get("authorization", "").strip()
if raw_authorization == "":
@@ -1522,6 +1554,101 @@ def _resolve_workspace_for_hosted_channel_request(
return workspace
+def _ensure_hosted_admin_access(conn, *, user_account_id: UUID) -> None:
+ enabled_flags = set(list_feature_flags_for_user(conn, user_account_id=user_account_id))
+ required_flags = {"hosted_admin_read", "hosted_admin_operator"}
+ missing_flags = sorted(required_flags - enabled_flags)
+ if missing_flags:
+ raise PermissionError(
+ "hosted admin access requires hosted_admin_read and hosted_admin_operator flags"
+ )
+
+
+def _record_workspace_onboarding_failure(
+ conn,
+ *,
+ workspace_id: UUID,
+ error_code: str,
+ error_detail: str,
+) -> None:
+ with conn.cursor() as cur:
+ cur.execute(
+ """
+ UPDATE workspaces
+ SET support_status = CASE WHEN support_status = 'blocked' THEN support_status ELSE 'needs_attention' END,
+ onboarding_last_error_code = %s,
+ onboarding_last_error_detail = %s,
+ onboarding_last_error_at = clock_timestamp(),
+ onboarding_error_count = onboarding_error_count + 1,
+ support_notes = COALESCE(support_notes, '{}'::jsonb) || jsonb_build_object(
+ 'last_onboarding_error_code', %s::text,
+ 'last_onboarding_error_detail', %s::text,
+ 'last_onboarding_error_at', clock_timestamp()
+ ),
+ incident_evidence = COALESCE(incident_evidence, '{}'::jsonb) || jsonb_build_object(
+ 'last_onboarding_error_code', %s::text,
+ 'last_onboarding_error_detail', %s::text
+ )
+ WHERE id = %s
+ """,
+ (
+ error_code,
+ error_detail,
+ error_code,
+ error_detail,
+ error_code,
+ error_detail,
+ workspace_id,
+ ),
+ )
+
+
+def _hosted_rollout_block_error(
+ *,
+ flag_key: str,
+) -> JSONResponse:
+ return JSONResponse(
+ status_code=403,
+ content={
+ "detail": {
+ "code": "hosted_rollout_blocked",
+ "message": f"hosted flow is blocked by rollout flag {flag_key}",
+ "flag_key": flag_key,
+ }
+ },
+ )
+
+
+def _hosted_rate_limit_error(
+ *,
+ detail_code: str,
+ message: str,
+ retry_after_seconds: int,
+ rate_limit_key: str,
+ window_seconds: int,
+ max_requests: int,
+ observed_requests: int,
+ abuse_signal: str | None,
+) -> JSONResponse:
+ payload: dict[str, object] = {
+ "code": detail_code,
+ "message": message,
+ "retry_after_seconds": retry_after_seconds,
+ "rate_limit_key": rate_limit_key,
+ "window_seconds": window_seconds,
+ "max_requests": max_requests,
+ "observed_requests": observed_requests,
+ }
+ if abuse_signal is not None:
+ payload["abuse_signal"] = abuse_signal
+
+ return JSONResponse(
+ status_code=429,
+ headers={"Retry-After": str(retry_after_seconds)},
+ content={"detail": payload},
+ )
+
+
@app.get("/healthz")
def healthcheck() -> JSONResponse:
settings = get_settings()
@@ -4939,6 +5066,7 @@ def bootstrap_v1_workspace(
body: HostedWorkspaceBootstrapRequest,
) -> JSONResponse:
settings = get_settings()
+ resolved_workspace_id: UUID | None = None
try:
session_token = _extract_bearer_token(request)
@@ -4955,6 +5083,7 @@ def bootstrap_v1_workspace(
)
if workspace is None:
raise HostedWorkspaceNotFoundError(f"workspace {body.workspace_id} was not found")
+ resolved_workspace_id = workspace["id"]
set_session_workspace(
conn,
session_id=resolution["session"]["id"],
@@ -4967,6 +5096,8 @@ def bootstrap_v1_workspace(
user_account_id=user_account_id,
preferred_workspace_id=resolution["session"]["workspace_id"],
)
+ if workspace is not None:
+ resolved_workspace_id = workspace["id"]
if workspace is None:
return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"})
@@ -4985,8 +5116,26 @@ def bootstrap_v1_workspace(
except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
return JSONResponse(status_code=401, content={"detail": str(exc)})
except HostedWorkspaceNotFoundError as exc:
+ if resolved_workspace_id is not None:
+ with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
+ with conn.transaction():
+ _record_workspace_onboarding_failure(
+ conn,
+ workspace_id=resolved_workspace_id,
+ error_code="workspace_not_found",
+ error_detail=str(exc),
+ )
return JSONResponse(status_code=404, content={"detail": str(exc)})
except HostedWorkspaceBootstrapConflictError as exc:
+ if resolved_workspace_id is not None:
+ with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
+ with conn.transaction():
+ _record_workspace_onboarding_failure(
+ conn,
+ workspace_id=resolved_workspace_id,
+ error_code="bootstrap_conflict",
+ error_detail=str(exc),
+ )
return JSONResponse(status_code=409, content={"detail": str(exc)})
return JSONResponse(
@@ -5225,6 +5374,285 @@ def patch_v1_preferences(
)
+@app.get("/v1/admin/hosted/overview")
+def get_v1_admin_hosted_overview(
+ request: Request,
+ window_hours: int = Query(default=24, ge=1, le=168),
+) -> JSONResponse:
+ settings = get_settings()
+
+ try:
+ session_token = _extract_bearer_token(request)
+ with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
+ with conn.transaction():
+ resolution = resolve_auth_session(conn, session_token=session_token)
+ user_account_id = resolution["user_account"]["id"]
+ _ensure_hosted_admin_access(conn, user_account_id=user_account_id)
+ payload = get_hosted_overview_for_admin(conn, window_hours=window_hours)
+ except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
+ return JSONResponse(status_code=401, content={"detail": str(exc)})
+ except PermissionError as exc:
+ return JSONResponse(status_code=403, content={"detail": str(exc)})
+
+ return JSONResponse(status_code=200, content=jsonable_encoder(payload))
+
+
+@app.get("/v1/admin/hosted/workspaces")
+def get_v1_admin_hosted_workspaces(
+ request: Request,
+ limit: int = Query(default=50, ge=1, le=200),
+) -> JSONResponse:
+ settings = get_settings()
+
+ try:
+ session_token = _extract_bearer_token(request)
+ with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
+ with conn.transaction():
+ resolution = resolve_auth_session(conn, session_token=session_token)
+ user_account_id = resolution["user_account"]["id"]
+ _ensure_hosted_admin_access(conn, user_account_id=user_account_id)
+ items = list_hosted_workspaces_for_admin(conn, limit=limit)
+ except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
+ return JSONResponse(status_code=401, content={"detail": str(exc)})
+ except PermissionError as exc:
+ return JSONResponse(status_code=403, content={"detail": str(exc)})
+
+ return JSONResponse(
+ status_code=200,
+ content=jsonable_encoder(
+ {
+ "items": items,
+ "summary": {
+ "total_count": len(items),
+ "returned_count": len(items),
+ "order": ["updated_at_desc", "id_desc"],
+ },
+ }
+ ),
+ )
+
+
+@app.get("/v1/admin/hosted/delivery-receipts")
+def get_v1_admin_hosted_delivery_receipts(
+ request: Request,
+ limit: int = Query(default=100, ge=1, le=400),
+ workspace_id: UUID | None = None,
+) -> JSONResponse:
+ settings = get_settings()
+
+ try:
+ session_token = _extract_bearer_token(request)
+ with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
+ with conn.transaction():
+ resolution = resolve_auth_session(conn, session_token=session_token)
+ user_account_id = resolution["user_account"]["id"]
+ _ensure_hosted_admin_access(conn, user_account_id=user_account_id)
+ items = list_hosted_delivery_receipts_for_admin(
+ conn,
+ limit=limit,
+ workspace_id=workspace_id,
+ )
+ except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
+ return JSONResponse(status_code=401, content={"detail": str(exc)})
+ except PermissionError as exc:
+ return JSONResponse(status_code=403, content={"detail": str(exc)})
+
+ return JSONResponse(
+ status_code=200,
+ content=jsonable_encoder(
+ {
+ "items": items,
+ "summary": {
+ "total_count": len(items),
+ "returned_count": len(items),
+ "order": ["recorded_at_desc", "id_desc"],
+ },
+ }
+ ),
+ )
+
+
+@app.get("/v1/admin/hosted/incidents")
+def get_v1_admin_hosted_incidents(
+ request: Request,
+ status: str = Query(default="open", min_length=1, max_length=20),
+ limit: int = Query(default=100, ge=1, le=500),
+ workspace_id: UUID | None = None,
+) -> JSONResponse:
+ settings = get_settings()
+ normalized_status = status.strip().casefold()
+ if normalized_status not in {"open", "resolved", "all"}:
+ return JSONResponse(status_code=400, content={"detail": "status must be one of: open, resolved, all"})
+
+ try:
+ session_token = _extract_bearer_token(request)
+ with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
+ with conn.transaction():
+ resolution = resolve_auth_session(conn, session_token=session_token)
+ user_account_id = resolution["user_account"]["id"]
+ _ensure_hosted_admin_access(conn, user_account_id=user_account_id)
+ items = list_hosted_incidents_for_admin(
+ conn,
+ limit=limit,
+ status_filter=normalized_status, # type: ignore[arg-type]
+ workspace_id=workspace_id,
+ )
+ except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
+ return JSONResponse(status_code=401, content={"detail": str(exc)})
+ except PermissionError as exc:
+ return JSONResponse(status_code=403, content={"detail": str(exc)})
+
+ return JSONResponse(
+ status_code=200,
+ content=jsonable_encoder(
+ {
+ "items": items,
+ "summary": {
+ "total_count": len(items),
+ "returned_count": len(items),
+ "status_filter": normalized_status,
+ "order": ["occurred_at_desc", "incident_id_desc"],
+ },
+ }
+ ),
+ )
+
+
+@app.get("/v1/admin/hosted/rollout-flags")
+def get_v1_admin_hosted_rollout_flags(request: Request) -> JSONResponse:
+ settings = get_settings()
+
+ try:
+ session_token = _extract_bearer_token(request)
+ with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
+ with conn.transaction():
+ resolution = resolve_auth_session(conn, session_token=session_token)
+ user_account_id = resolution["user_account"]["id"]
+ _ensure_hosted_admin_access(conn, user_account_id=user_account_id)
+ flags = list_rollout_flags_for_admin(conn, user_account_id=user_account_id)
+ except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
+ return JSONResponse(status_code=401, content={"detail": str(exc)})
+ except PermissionError as exc:
+ return JSONResponse(status_code=403, content={"detail": str(exc)})
+
+ return JSONResponse(
+ status_code=200,
+ content=jsonable_encoder(
+ {
+ "items": flags,
+ "summary": {
+ "total_count": len(flags),
+ "enabled_count": sum(1 for flag in flags if flag["enabled"]),
+ "disabled_count": sum(1 for flag in flags if not flag["enabled"]),
+ "order": ["flag_key_asc"],
+ },
+ }
+ ),
+ )
+
+
+@app.patch("/v1/admin/hosted/rollout-flags")
+def patch_v1_admin_hosted_rollout_flags(
+ request: Request,
+ body: HostedRolloutFlagsPatchRequest,
+) -> JSONResponse:
+ settings = get_settings()
+
+ try:
+ session_token = _extract_bearer_token(request)
+ with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
+ with conn.transaction():
+ resolution = resolve_auth_session(conn, session_token=session_token)
+ user_account_id = resolution["user_account"]["id"]
+ _ensure_hosted_admin_access(conn, user_account_id=user_account_id)
+ updated_flags = patch_rollout_flags(
+ conn,
+ patches=[
+ {
+ "flag_key": item.flag_key,
+ "enabled": item.enabled,
+ "cohort_key": item.cohort_key,
+ "description": item.description,
+ }
+ for item in body.updates
+ ],
+ allowed_cohort_key=resolution["user_account"]["beta_cohort_key"],
+ )
+ flags = list_rollout_flags_for_admin(conn, user_account_id=user_account_id)
+ except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
+ return JSONResponse(status_code=401, content={"detail": str(exc)})
+ except PermissionError as exc:
+ return JSONResponse(status_code=403, content={"detail": str(exc)})
+ except ValueError as exc:
+ return JSONResponse(status_code=400, content={"detail": str(exc)})
+
+ return JSONResponse(
+ status_code=200,
+ content=jsonable_encoder(
+ {
+ "updated": updated_flags,
+ "items": flags,
+ "summary": {
+ "total_count": len(flags),
+ "enabled_count": sum(1 for flag in flags if flag["enabled"]),
+ "disabled_count": sum(1 for flag in flags if not flag["enabled"]),
+ },
+ }
+ ),
+ )
+
+
+@app.get("/v1/admin/hosted/analytics")
+def get_v1_admin_hosted_analytics(
+ request: Request,
+ window_hours: int = Query(default=24, ge=1, le=168),
+) -> JSONResponse:
+ settings = get_settings()
+
+ try:
+ session_token = _extract_bearer_token(request)
+ with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
+ with conn.transaction():
+ resolution = resolve_auth_session(conn, session_token=session_token)
+ user_account_id = resolution["user_account"]["id"]
+ _ensure_hosted_admin_access(conn, user_account_id=user_account_id)
+ telemetry = aggregate_chat_telemetry(conn, window_hours=window_hours)
+ except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
+ return JSONResponse(status_code=401, content={"detail": str(exc)})
+ except PermissionError as exc:
+ return JSONResponse(status_code=403, content={"detail": str(exc)})
+
+ return JSONResponse(status_code=200, content=jsonable_encoder({"analytics": telemetry}))
+
+
+@app.get("/v1/admin/hosted/rate-limits")
+def get_v1_admin_hosted_rate_limits(
+ request: Request,
+ window_hours: int = Query(default=24, ge=1, le=168),
+ limit: int = Query(default=100, ge=1, le=200),
+) -> JSONResponse:
+ settings = get_settings()
+
+ try:
+ session_token = _extract_bearer_token(request)
+ with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
+ with conn.transaction():
+ resolution = resolve_auth_session(conn, session_token=session_token)
+ user_account_id = resolution["user_account"]["id"]
+ _ensure_hosted_admin_access(conn, user_account_id=user_account_id)
+ payload = get_hosted_rate_limits_for_admin(
+ conn,
+ window_hours=window_hours,
+ limit=limit,
+ )
+ except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
+ return JSONResponse(status_code=401, content={"detail": str(exc)})
+ except PermissionError as exc:
+ return JSONResponse(status_code=403, content={"detail": str(exc)})
+
+ return JSONResponse(status_code=200, content=jsonable_encoder(payload))
+
+
@app.post("/v1/channels/telegram/link/start")
def start_v1_telegram_link(request: Request, body: TelegramLinkStartRequest) -> JSONResponse:
settings = get_settings()
@@ -5741,6 +6169,107 @@ def post_v1_telegram_daily_brief_deliver(
if workspace is None:
return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"})
+ rollout_resolution = resolve_rollout_flag(
+ conn,
+ user_account_id=user_account_id,
+ flag_key="hosted_scheduler_delivery_enabled",
+ )
+ if not rollout_resolution["enabled"]:
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="scheduler_daily_brief",
+ event_kind="rollout_block",
+ status="blocked_rollout",
+ route_path="/v1/channels/telegram/daily-brief/deliver",
+ rollout_flag_key=rollout_resolution["flag_key"],
+ rollout_flag_state="blocked",
+ evidence={
+ "force": body.force,
+ "idempotency_key": body.idempotency_key,
+ },
+ )
+ return _hosted_rollout_block_error(flag_key=rollout_resolution["flag_key"])
+
+ rate_limit_rollout = resolve_rollout_flag(
+ conn,
+ user_account_id=user_account_id,
+ flag_key="hosted_rate_limits_enabled",
+ )
+ abuse_rollout = resolve_rollout_flag(
+ conn,
+ user_account_id=user_account_id,
+ flag_key="hosted_abuse_controls_enabled",
+ )
+ if rate_limit_rollout["enabled"]:
+ decision = evaluate_hosted_flow_limits(
+ conn,
+ settings=settings,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="scheduler_daily_brief",
+ )
+ if decision["code"] == "hosted_abuse_limit_exceeded" and not abuse_rollout["enabled"]:
+ decision = {
+ **decision,
+ "allowed": True,
+ "code": None,
+ "message": "abuse controls disabled by rollout",
+ "retry_after_seconds": 0,
+ "abuse_signal": None,
+ }
+
+ if not decision["allowed"]:
+ blocked_status = "abuse_blocked" if decision["code"] == "hosted_abuse_limit_exceeded" else "rate_limited"
+ blocked_event = "abuse_block" if blocked_status == "abuse_blocked" else "rate_limited"
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="scheduler_daily_brief",
+ event_kind=blocked_event, # type: ignore[arg-type]
+ status=blocked_status, # type: ignore[arg-type]
+ route_path="/v1/channels/telegram/daily-brief/deliver",
+ rollout_flag_key=rate_limit_rollout["flag_key"],
+ rollout_flag_state="enabled",
+ rate_limit_key=decision["rate_limit_key"],
+ rate_limit_window_seconds=decision["window_seconds"],
+ rate_limit_max_requests=decision["max_requests"],
+ retry_after_seconds=decision["retry_after_seconds"],
+ abuse_signal=decision["abuse_signal"],
+ evidence={
+ "force": body.force,
+ "idempotency_key": body.idempotency_key,
+ },
+ )
+ return _hosted_rate_limit_error(
+ detail_code=decision["code"] or "hosted_rate_limit_exceeded",
+ message=decision["message"],
+ retry_after_seconds=decision["retry_after_seconds"],
+ rate_limit_key=decision["rate_limit_key"],
+ window_seconds=decision["window_seconds"],
+ max_requests=decision["max_requests"],
+ observed_requests=decision["observed_requests"],
+ abuse_signal=decision["abuse_signal"],
+ )
+
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="scheduler_daily_brief",
+ event_kind="attempt",
+ status="ok",
+ route_path="/v1/channels/telegram/daily-brief/deliver",
+ rollout_flag_key=rollout_resolution["flag_key"],
+ rollout_flag_state="enabled",
+ evidence={
+ "force": body.force,
+ "idempotency_key": body.idempotency_key,
+ },
+ )
+
payload = deliver_workspace_daily_brief(
conn,
user_account_id=user_account_id,
@@ -5749,6 +6278,36 @@ def post_v1_telegram_daily_brief_deliver(
force=body.force,
idempotency_key=body.idempotency_key,
)
+ delivery_receipt = payload.get("delivery_receipt")
+ delivery_receipt_id: UUID | None = None
+ if isinstance(delivery_receipt, dict) and isinstance(delivery_receipt.get("id"), str):
+ delivery_receipt_id = UUID(delivery_receipt["id"])
+
+ status_value: str = "ok"
+ if isinstance(payload.get("job"), dict):
+ job_status = str(payload["job"].get("status", "ok"))
+ if job_status in {"failed"}:
+ status_value = "failed"
+ elif job_status.startswith("suppressed"):
+ status_value = "suppressed"
+ elif job_status in {"simulated", "delivered"}:
+ status_value = job_status
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="scheduler_daily_brief",
+ event_kind="result",
+ status=status_value, # type: ignore[arg-type]
+ route_path="/v1/channels/telegram/daily-brief/deliver",
+ rollout_flag_key=rollout_resolution["flag_key"],
+ rollout_flag_state="enabled",
+ delivery_receipt_id=delivery_receipt_id,
+ evidence={
+ "idempotent_replay": bool(payload.get("idempotent_replay")),
+ "force": body.force,
+ },
+ )
except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
return JSONResponse(status_code=401, content={"detail": str(exc)})
except TelegramIdentityNotFoundError as exc:
@@ -5823,6 +6382,110 @@ def post_v1_telegram_open_loop_prompt_deliver(
if workspace is None:
return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"})
+ rollout_resolution = resolve_rollout_flag(
+ conn,
+ user_account_id=user_account_id,
+ flag_key="hosted_scheduler_delivery_enabled",
+ )
+ if not rollout_resolution["enabled"]:
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="scheduler_open_loop_prompt",
+ event_kind="rollout_block",
+ status="blocked_rollout",
+ route_path=f"/v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver",
+ rollout_flag_key=rollout_resolution["flag_key"],
+ rollout_flag_state="blocked",
+ evidence={
+ "prompt_id": prompt_id,
+ "force": body.force,
+ "idempotency_key": body.idempotency_key,
+ },
+ )
+ return _hosted_rollout_block_error(flag_key=rollout_resolution["flag_key"])
+
+ rate_limit_rollout = resolve_rollout_flag(
+ conn,
+ user_account_id=user_account_id,
+ flag_key="hosted_rate_limits_enabled",
+ )
+ abuse_rollout = resolve_rollout_flag(
+ conn,
+ user_account_id=user_account_id,
+ flag_key="hosted_abuse_controls_enabled",
+ )
+ if rate_limit_rollout["enabled"]:
+ decision = evaluate_hosted_flow_limits(
+ conn,
+ settings=settings,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="scheduler_open_loop_prompt",
+ )
+ if decision["code"] == "hosted_abuse_limit_exceeded" and not abuse_rollout["enabled"]:
+ decision = {
+ **decision,
+ "allowed": True,
+ "code": None,
+ "message": "abuse controls disabled by rollout",
+ "retry_after_seconds": 0,
+ "abuse_signal": None,
+ }
+
+ if not decision["allowed"]:
+ blocked_status = "abuse_blocked" if decision["code"] == "hosted_abuse_limit_exceeded" else "rate_limited"
+ blocked_event = "abuse_block" if blocked_status == "abuse_blocked" else "rate_limited"
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="scheduler_open_loop_prompt",
+ event_kind=blocked_event, # type: ignore[arg-type]
+ status=blocked_status, # type: ignore[arg-type]
+ route_path=f"/v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver",
+ rollout_flag_key=rate_limit_rollout["flag_key"],
+ rollout_flag_state="enabled",
+ rate_limit_key=decision["rate_limit_key"],
+ rate_limit_window_seconds=decision["window_seconds"],
+ rate_limit_max_requests=decision["max_requests"],
+ retry_after_seconds=decision["retry_after_seconds"],
+ abuse_signal=decision["abuse_signal"],
+ evidence={
+ "prompt_id": prompt_id,
+ "force": body.force,
+ "idempotency_key": body.idempotency_key,
+ },
+ )
+ return _hosted_rate_limit_error(
+ detail_code=decision["code"] or "hosted_rate_limit_exceeded",
+ message=decision["message"],
+ retry_after_seconds=decision["retry_after_seconds"],
+ rate_limit_key=decision["rate_limit_key"],
+ window_seconds=decision["window_seconds"],
+ max_requests=decision["max_requests"],
+ observed_requests=decision["observed_requests"],
+ abuse_signal=decision["abuse_signal"],
+ )
+
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="scheduler_open_loop_prompt",
+ event_kind="attempt",
+ status="ok",
+ route_path=f"/v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver",
+ rollout_flag_key=rollout_resolution["flag_key"],
+ rollout_flag_state="enabled",
+ evidence={
+ "prompt_id": prompt_id,
+ "force": body.force,
+ "idempotency_key": body.idempotency_key,
+ },
+ )
+
payload = deliver_workspace_open_loop_prompt(
conn,
user_account_id=user_account_id,
@@ -5832,6 +6495,37 @@ def post_v1_telegram_open_loop_prompt_deliver(
force=body.force,
idempotency_key=body.idempotency_key,
)
+ delivery_receipt = payload.get("delivery_receipt")
+ delivery_receipt_id: UUID | None = None
+ if isinstance(delivery_receipt, dict) and isinstance(delivery_receipt.get("id"), str):
+ delivery_receipt_id = UUID(delivery_receipt["id"])
+
+ status_value: str = "ok"
+ if isinstance(payload.get("job"), dict):
+ job_status = str(payload["job"].get("status", "ok"))
+ if job_status in {"failed"}:
+ status_value = "failed"
+ elif job_status.startswith("suppressed"):
+ status_value = "suppressed"
+ elif job_status in {"simulated", "delivered"}:
+ status_value = job_status
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="scheduler_open_loop_prompt",
+ event_kind="result",
+ status=status_value, # type: ignore[arg-type]
+ route_path=f"/v1/channels/telegram/open-loop-prompts/{prompt_id}/deliver",
+ rollout_flag_key=rollout_resolution["flag_key"],
+ rollout_flag_state="enabled",
+ delivery_receipt_id=delivery_receipt_id,
+ evidence={
+ "idempotent_replay": bool(payload.get("idempotent_replay")),
+ "prompt_id": prompt_id,
+ "force": body.force,
+ },
+ )
except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
return JSONResponse(status_code=401, content={"detail": str(exc)})
except TelegramIdentityNotFoundError as exc:
@@ -5908,6 +6602,101 @@ def handle_v1_telegram_message(
if workspace is None:
return JSONResponse(status_code=404, content={"detail": "no workspace is currently selected"})
+ rollout_resolution = resolve_rollout_flag(
+ conn,
+ user_account_id=user_account_id,
+ flag_key="hosted_chat_handle_enabled",
+ )
+ if not rollout_resolution["enabled"]:
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="chat_handle",
+ event_kind="rollout_block",
+ status="blocked_rollout",
+ route_path="/v1/channels/telegram/messages/{message_id}/handle",
+ channel_message_id=message_id,
+ rollout_flag_key=rollout_resolution["flag_key"],
+ rollout_flag_state="blocked",
+ evidence={"intent_hint": body.intent_hint},
+ )
+ return _hosted_rollout_block_error(flag_key=rollout_resolution["flag_key"])
+
+ rate_limit_rollout = resolve_rollout_flag(
+ conn,
+ user_account_id=user_account_id,
+ flag_key="hosted_rate_limits_enabled",
+ )
+ abuse_rollout = resolve_rollout_flag(
+ conn,
+ user_account_id=user_account_id,
+ flag_key="hosted_abuse_controls_enabled",
+ )
+ if rate_limit_rollout["enabled"]:
+ decision = evaluate_hosted_flow_limits(
+ conn,
+ settings=settings,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="chat_handle",
+ )
+ if decision["code"] == "hosted_abuse_limit_exceeded" and not abuse_rollout["enabled"]:
+ decision = {
+ **decision,
+ "allowed": True,
+ "code": None,
+ "message": "abuse controls disabled by rollout",
+ "retry_after_seconds": 0,
+ "abuse_signal": None,
+ }
+
+ if not decision["allowed"]:
+ blocked_status = "abuse_blocked" if decision["code"] == "hosted_abuse_limit_exceeded" else "rate_limited"
+ blocked_event = "abuse_block" if blocked_status == "abuse_blocked" else "rate_limited"
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="chat_handle",
+ event_kind=blocked_event, # type: ignore[arg-type]
+ status=blocked_status, # type: ignore[arg-type]
+ route_path="/v1/channels/telegram/messages/{message_id}/handle",
+ channel_message_id=message_id,
+ rollout_flag_key=rate_limit_rollout["flag_key"],
+ rollout_flag_state="enabled",
+ rate_limit_key=decision["rate_limit_key"],
+ rate_limit_window_seconds=decision["window_seconds"],
+ rate_limit_max_requests=decision["max_requests"],
+ retry_after_seconds=decision["retry_after_seconds"],
+ abuse_signal=decision["abuse_signal"],
+ evidence={"intent_hint": body.intent_hint},
+ )
+ return _hosted_rate_limit_error(
+ detail_code=decision["code"] or "hosted_rate_limit_exceeded",
+ message=decision["message"],
+ retry_after_seconds=decision["retry_after_seconds"],
+ rate_limit_key=decision["rate_limit_key"],
+ window_seconds=decision["window_seconds"],
+ max_requests=decision["max_requests"],
+ observed_requests=decision["observed_requests"],
+ abuse_signal=decision["abuse_signal"],
+ )
+
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="chat_handle",
+ event_kind="attempt",
+ status="ok",
+ route_path="/v1/channels/telegram/messages/{message_id}/handle",
+ channel_message_id=message_id,
+ rollout_flag_key=rollout_resolution["flag_key"],
+ rollout_flag_state="enabled",
+ evidence={"intent_hint": body.intent_hint},
+ )
+
prepare_telegram_continuity_context(conn, user_account_id=user_account_id)
payload = handle_telegram_message(
conn,
@@ -5917,6 +6706,29 @@ def handle_v1_telegram_message(
bot_token=settings.telegram_bot_token,
intent_hint=body.intent_hint,
)
+ intent_status = str(payload["intent"].get("status", "handled"))
+ telemetry_status = "ok" if intent_status == "handled" else "failed"
+ delivery_receipt = payload.get("delivery_receipt")
+ delivery_receipt_id: UUID | None = None
+ if isinstance(delivery_receipt, dict) and isinstance(delivery_receipt.get("id"), str):
+ delivery_receipt_id = UUID(delivery_receipt["id"])
+ record_chat_telemetry(
+ conn,
+ user_account_id=user_account_id,
+ workspace_id=workspace["id"],
+ flow_kind="chat_handle",
+ event_kind="result",
+ status=telemetry_status, # type: ignore[arg-type]
+ route_path="/v1/channels/telegram/messages/{message_id}/handle",
+ channel_message_id=message_id,
+ delivery_receipt_id=delivery_receipt_id,
+ rollout_flag_key=rollout_resolution["flag_key"],
+ rollout_flag_state="enabled",
+ evidence={
+ "intent_status": intent_status,
+ "intent_kind": payload["intent"].get("intent_kind"),
+ },
+ )
except (AuthSessionInvalidError, AuthSessionExpiredError, AuthSessionRevokedDeviceError) as exc:
return JSONResponse(status_code=401, content={"detail": str(exc)})
except HostedUserAccountNotFoundError as exc:
diff --git a/apps/api/src/alicebot_api/store.py b/apps/api/src/alicebot_api/store.py
index bde596a..6ecb768 100644
--- a/apps/api/src/alicebot_api/store.py
+++ b/apps/api/src/alicebot_api/store.py
@@ -490,6 +490,10 @@ class ChannelDeliveryReceiptRow(TypedDict):
scheduled_for: datetime | None
schedule_slot: str | None
notification_policy: JsonObject
+ rollout_flag_state: str
+ support_evidence: JsonObject
+ rate_limit_evidence: JsonObject
+ incident_evidence: JsonObject
recorded_at: datetime
created_at: datetime
@@ -570,12 +574,38 @@ class DailyBriefJobRow(TypedDict):
delivery_receipt_id: UUID | None
payload: JsonObject
result_payload: JsonObject
+ rollout_flag_state: str
+ support_evidence: JsonObject
+ rate_limit_evidence: JsonObject
+ incident_evidence: JsonObject
attempted_at: datetime | None
completed_at: datetime | None
created_at: datetime
updated_at: datetime
+class ChatTelemetryRow(TypedDict):
+ id: UUID
+ user_account_id: UUID
+ workspace_id: UUID | None
+ channel_message_id: UUID | None
+ daily_brief_job_id: UUID | None
+ delivery_receipt_id: UUID | None
+ flow_kind: str
+ event_kind: str
+ status: str
+ route_path: str
+ rollout_flag_key: str | None
+ rollout_flag_state: str | None
+ rate_limit_key: str | None
+ rate_limit_window_seconds: int | None
+ rate_limit_max_requests: int | None
+ retry_after_seconds: int | None
+ abuse_signal: str | None
+ evidence: JsonObject
+ created_at: datetime
+
+
class TaskArtifactRow(TypedDict):
id: UUID
user_id: UUID
diff --git a/apps/api/src/alicebot_api/telegram_channels.py b/apps/api/src/alicebot_api/telegram_channels.py
index 753c9da..7cb1c2a 100644
--- a/apps/api/src/alicebot_api/telegram_channels.py
+++ b/apps/api/src/alicebot_api/telegram_channels.py
@@ -128,6 +128,10 @@ class TelegramDeliveryReceiptRow(TypedDict):
scheduled_for: datetime | None
schedule_slot: str | None
notification_policy: dict[str, Any]
+ rollout_flag_state: str
+ support_evidence: dict[str, Any]
+ rate_limit_evidence: dict[str, Any]
+ incident_evidence: dict[str, Any]
recorded_at: datetime
created_at: datetime
@@ -1104,6 +1108,10 @@ def dispatch_telegram_message(
text: str,
dispatch_idempotency_key: str | None,
bot_token: str,
+ rollout_flag_state: str = "enabled",
+ support_evidence: dict[str, Any] | None = None,
+ rate_limit_evidence: dict[str, Any] | None = None,
+ incident_evidence: dict[str, Any] | None = None,
) -> tuple[TelegramChannelMessageRow, TelegramDeliveryReceiptRow]:
normalized_text = text.strip()
if normalized_text == "":
@@ -1252,19 +1260,28 @@ def dispatch_telegram_message(
provider_receipt_id,
failure_code,
failure_detail,
+ rollout_flag_state,
+ support_evidence,
+ rate_limit_evidence,
+ incident_evidence,
recorded_at
)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (channel_message_id) DO UPDATE
SET status = EXCLUDED.status,
provider_receipt_id = EXCLUDED.provider_receipt_id,
failure_code = EXCLUDED.failure_code,
failure_detail = EXCLUDED.failure_detail,
+ rollout_flag_state = EXCLUDED.rollout_flag_state,
+ support_evidence = EXCLUDED.support_evidence,
+ rate_limit_evidence = EXCLUDED.rate_limit_evidence,
+ incident_evidence = EXCLUDED.incident_evidence,
recorded_at = EXCLUDED.recorded_at
RETURNING id, workspace_id, channel_message_id, channel_type,
status, provider_receipt_id, failure_code, failure_detail,
scheduled_job_id, scheduler_job_kind, scheduled_for, schedule_slot,
- notification_policy, recorded_at, created_at
+ notification_policy, rollout_flag_state, support_evidence,
+ rate_limit_evidence, incident_evidence, recorded_at, created_at
""",
(
workspace_id,
@@ -1274,6 +1291,10 @@ def dispatch_telegram_message(
provider_receipt_id,
failure_code,
failure_detail,
+ rollout_flag_state,
+ Jsonb(support_evidence or {}),
+ Jsonb(rate_limit_evidence or {}),
+ Jsonb(incident_evidence or {}),
utc_now(),
),
)
@@ -1302,6 +1323,10 @@ def dispatch_telegram_workspace_message(
scheduled_for: datetime | None = None,
schedule_slot: str | None = None,
notification_policy: dict[str, Any] | None = None,
+ rollout_flag_state: str = "enabled",
+ support_evidence: dict[str, Any] | None = None,
+ rate_limit_evidence: dict[str, Any] | None = None,
+ incident_evidence: dict[str, Any] | None = None,
) -> tuple[TelegramChannelMessageRow, TelegramDeliveryReceiptRow]:
normalized_text = text.strip()
if normalized_text == "":
@@ -1463,9 +1488,13 @@ def dispatch_telegram_workspace_message(
scheduled_for,
schedule_slot,
notification_policy,
+ rollout_flag_state,
+ support_evidence,
+ rate_limit_evidence,
+ incident_evidence,
recorded_at
)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (channel_message_id) DO UPDATE
SET status = EXCLUDED.status,
provider_receipt_id = EXCLUDED.provider_receipt_id,
@@ -1476,11 +1505,16 @@ def dispatch_telegram_workspace_message(
scheduled_for = EXCLUDED.scheduled_for,
schedule_slot = EXCLUDED.schedule_slot,
notification_policy = EXCLUDED.notification_policy,
+ rollout_flag_state = EXCLUDED.rollout_flag_state,
+ support_evidence = EXCLUDED.support_evidence,
+ rate_limit_evidence = EXCLUDED.rate_limit_evidence,
+ incident_evidence = EXCLUDED.incident_evidence,
recorded_at = EXCLUDED.recorded_at
RETURNING id, workspace_id, channel_message_id, channel_type,
status, provider_receipt_id, failure_code, failure_detail,
scheduled_job_id, scheduler_job_kind, scheduled_for, schedule_slot,
- notification_policy, recorded_at, created_at
+ notification_policy, rollout_flag_state, support_evidence,
+ rate_limit_evidence, incident_evidence, recorded_at, created_at
""",
(
workspace_id,
@@ -1495,6 +1529,10 @@ def dispatch_telegram_workspace_message(
scheduled_for,
schedule_slot,
Jsonb(notification_policy or {}),
+ rollout_flag_state,
+ Jsonb(support_evidence or {}),
+ Jsonb(rate_limit_evidence or {}),
+ Jsonb(incident_evidence or {}),
utc_now(),
),
)
@@ -1529,6 +1567,10 @@ def list_workspace_telegram_delivery_receipts(
r.scheduled_for,
r.schedule_slot,
r.notification_policy,
+ r.rollout_flag_state,
+ r.support_evidence,
+ r.rate_limit_evidence,
+ r.incident_evidence,
r.recorded_at,
r.created_at
FROM channel_delivery_receipts AS r
@@ -1652,6 +1694,10 @@ def serialize_delivery_receipt(receipt: TelegramDeliveryReceiptRow) -> dict[str,
"scheduled_for": None if receipt["scheduled_for"] is None else receipt["scheduled_for"].isoformat(),
"schedule_slot": receipt["schedule_slot"],
"notification_policy": receipt["notification_policy"],
+ "rollout_flag_state": receipt["rollout_flag_state"],
+ "support_evidence": receipt["support_evidence"],
+ "rate_limit_evidence": receipt["rate_limit_evidence"],
+ "incident_evidence": receipt["incident_evidence"],
"recorded_at": receipt["recorded_at"].isoformat(),
"created_at": receipt["created_at"].isoformat(),
}
diff --git a/apps/api/src/alicebot_api/telegram_notifications.py b/apps/api/src/alicebot_api/telegram_notifications.py
index 567d584..e172403 100644
--- a/apps/api/src/alicebot_api/telegram_notifications.py
+++ b/apps/api/src/alicebot_api/telegram_notifications.py
@@ -116,6 +116,10 @@ class DailyBriefJobRow(TypedDict):
delivery_receipt_id: UUID | None
payload: dict[str, Any]
result_payload: dict[str, Any]
+ rollout_flag_state: str
+ support_evidence: dict[str, Any]
+ rate_limit_evidence: dict[str, Any]
+ incident_evidence: dict[str, Any]
attempted_at: datetime | None
completed_at: datetime | None
created_at: datetime
@@ -237,8 +241,9 @@ def _job_columns_sql() -> str:
return (
"id, workspace_id, channel_type, channel_identity_id, job_kind, prompt_kind, prompt_id, "
"continuity_object_id, continuity_brief_id, schedule_slot, idempotency_key, due_at, status, "
- "suppression_reason, attempt_count, delivery_receipt_id, payload, result_payload, attempted_at, "
- "completed_at, created_at, updated_at"
+ "suppression_reason, attempt_count, delivery_receipt_id, payload, result_payload, "
+ "rollout_flag_state, support_evidence, rate_limit_evidence, incident_evidence, "
+ "attempted_at, completed_at, created_at, updated_at"
)
@@ -246,7 +251,8 @@ def _receipt_columns_sql() -> str:
return (
"id, workspace_id, channel_message_id, channel_type, status, provider_receipt_id, failure_code, "
"failure_detail, scheduled_job_id, scheduler_job_kind, scheduled_for, schedule_slot, "
- "notification_policy, recorded_at, created_at"
+ "notification_policy, rollout_flag_state, support_evidence, rate_limit_evidence, "
+ "incident_evidence, recorded_at, created_at"
)
@@ -722,6 +728,10 @@ def _serialize_job(
else str(row["delivery_receipt_id"]),
"payload": row["payload"],
"result_payload": row["result_payload"],
+ "rollout_flag_state": row["rollout_flag_state"],
+ "support_evidence": row["support_evidence"],
+ "rate_limit_evidence": row["rate_limit_evidence"],
+ "incident_evidence": row["incident_evidence"],
"attempted_at": None if row["attempted_at"] is None else row["attempted_at"].isoformat(),
"completed_at": None if row["completed_at"] is None else row["completed_at"].isoformat(),
"created_at": row["created_at"].isoformat(),
diff --git a/apps/web/app/admin/page.test.tsx b/apps/web/app/admin/page.test.tsx
new file mode 100644
index 0000000..de5add7
--- /dev/null
+++ b/apps/web/app/admin/page.test.tsx
@@ -0,0 +1,20 @@
+import React from "react";
+import { cleanup, render, screen } from "@testing-library/react";
+import { afterEach, describe, expect, it } from "vitest";
+
+import HostedAdminPage from "./page";
+
+describe("HostedAdminPage", () => {
+ afterEach(() => {
+ cleanup();
+ });
+
+ it("renders hosted admin launch-readiness controls", () => {
+ render(
Calm, governed views for hosted onboarding/settings plus requests, approvals, tasks, - artifacts, Gmail, Calendar, memories, chief-of-staff priorities, entities, and - explainability. + hosted admin operations, artifacts, Gmail, Calendar, memories, chief-of-staff + priorities, entities, and explainability.
diff --git a/apps/web/components/hosted-admin-panel.test.tsx b/apps/web/components/hosted-admin-panel.test.tsx new file mode 100644 index 0000000..5eb55cb --- /dev/null +++ b/apps/web/components/hosted-admin-panel.test.tsx @@ -0,0 +1,56 @@ +import React from "react"; +import { cleanup, fireEvent, render, screen, waitFor } from "@testing-library/react"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { HostedAdminPanel } from "./hosted-admin-panel"; + +describe("HostedAdminPanel", () => { + const fetchMock = vi.fn(); + + beforeEach(() => { + vi.stubGlobal("fetch", fetchMock); + fetchMock.mockReset(); + }); + + afterEach(() => { + cleanup(); + vi.unstubAllGlobals(); + }); + + it("renders hosted admin scope and OSS-versus-hosted boundary copy", () => { + render(+ This panel is for Alice Connect hosted beta operations. Alice Core OSS runtime and + deterministic CLI/MCP semantics remain unchanged. +
+ + ++ {statusText} +
++ When onboarding fails, operators should inspect hosted admin incidents and workspace support + posture before retrying. This keeps support workflows explicit and deterministic. +
+