Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion echo/frontend/src/icons/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ const Home = () => {
// Import is at top of file
return (
<HouseIcon
size="var(--app-home-icon-size, 30)"
size={30}
color="currentColor"
style={{
height: "var(--app-home-icon-size, 30px)",
Expand Down
2 changes: 2 additions & 0 deletions echo/server/dembrane/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
)

from dembrane.api.chat import ChatRouter
from dembrane.api.stats import StatsRouter
from dembrane.api.search import SearchRouter
from dembrane.api.verify import VerifyRouter
from dembrane.api.project import ProjectRouter
Expand Down Expand Up @@ -33,3 +34,4 @@ async def health() -> dict:
api.include_router(VerifyRouter, prefix="/verify")
api.include_router(SearchRouter)
api.include_router(UserSettingsRouter, prefix="/user-settings")
api.include_router(StatsRouter, prefix="/stats")
234 changes: 234 additions & 0 deletions echo/server/dembrane/api/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
from __future__ import annotations

import json
import asyncio
from typing import Any, List
from logging import getLogger

from fastapi import Request, APIRouter, HTTPException
from pydantic import BaseModel

from dembrane.directus import directus
from dembrane.redis_async import get_redis_client
from dembrane.async_helpers import run_in_thread_pool
from dembrane.api.rate_limit import create_rate_limiter

logger = getLogger("api.stats")

StatsRouter = APIRouter(tags=["stats"])

STATS_CACHE_KEY = "dembrane:stats:public"
STATS_CACHE_LOCK_KEY = "dembrane:stats:public:lock"
STATS_CACHE_TTL_SECONDS = 3600 # 1 hour
STATS_LOCK_TTL_SECONDS = 30 # Lock expires after 30s to prevent deadlocks

_stats_rate_limiter = create_rate_limiter(
name="public_stats",
capacity=10,
window_seconds=60, # 10 requests per IP per minute
)
Comment on lines +1 to +29
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Imports and constants look clean, ship it 🚀

One tiny nit: you're on Python 3.11+ per the guidelines — List from typing is legacy. Use the built-in list and tuple generics directly.

♻️ Use built-in generics (Python 3.11+)
-from typing import Any, List
+from typing import Any

Then update all List[...] annotations to list[...] throughout the file (lines 49, 65, 67, 87).

As per coding guidelines: echo/server/**/*.py: Follow Python 3.11+ with FastAPI and Pydantic patterns in backend code.

🤖 Prompt for AI Agents
In `@echo/server/dembrane/api/stats.py` around lines 1 - 29, Remove the legacy
List import and use Python 3.11+ built-in generics: delete List from "from
typing import Any, List" and replace all occurrences of List[...] with list[...]
(and any tuple[...] uses if present) in this module—update the type annotations
referenced in the stats API (e.g., function signatures, Pydantic models, and any
variables that currently use List such as the public stats response annotations)
so they use the built-in generics instead.



class StatsResponse(BaseModel):
projects_count: int
conversations_count: int
hours_recorded: int


def _get_client_ip(request: Request) -> str:
"""Extract client IP, respecting X-Forwarded-For behind reverse proxies."""
forwarded_for = request.headers.get("x-forwarded-for")
if forwarded_for:
# First IP in the chain is the original client
return forwarded_for.split(",")[0].strip()
if request.client:
return request.client.host
return "unknown"


def _fetch_admin_user_ids() -> List[str]:
"""Fetch IDs of admin users from Directus (sync, must be wrapped)."""
admin_users = directus.get_users(
{
"query": {
"filter": {"role": {"name": {"_eq": "Administrator"}}},
"fields": ["id"],
"limit": -1,
}
}
)
if not admin_users:
return []
return [u["id"] for u in admin_users if u.get("id")]


def _fetch_projects(admin_user_ids: List[str]) -> List[dict[str, Any]]:
"""Fetch non-admin projects created since 2024 (sync, must be wrapped)."""
filter_conditions: List[dict[str, Any]] = [
{"created_at": {"_gte": "2024-01-01T00:00:00Z"}},
]
if admin_user_ids:
filter_conditions.append(
{"directus_user_id": {"_nin": admin_user_ids}},
)

return directus.get_items(
"project",
{
"query": {
"filter": {"_and": filter_conditions},
"fields": ["id"],
"limit": -1,
}
},
)


def _fetch_conversation_stats(project_ids: List[str]) -> tuple[int, float]:
"""
Fetch conversation count and total duration for given project IDs.
Returns (count, total_seconds).
Sync — must be wrapped.
"""
if not project_ids:
return 0, 0.0

conversations = directus.get_items(
"conversation",
{
"query": {
"filter": {"project_id": {"_in": project_ids}},
"fields": ["duration"],
"limit": -1,
}
},
)
if not conversations:
return 0, 0.0

count = len(conversations)
total_seconds = sum(c.get("duration") or 0 for c in conversations)
return count, total_seconds
Comment on lines +87 to +111
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Unbounded _in filter could hit query limits with many project IDs.

If the platform grows to thousands of projects, passing all IDs in a single _in filter may exceed Directus URL/query-string limits (especially if this is a GET under the hood). Consider batching, or better yet, doing a relational filter on the conversation table directly (e.g., filter conversations where project_id.directus_user_id._nin and project_id.created_at._gte), letting Directus handle the join server-side.

Not a blocker today but worth keeping on the radar.

🤖 Prompt for AI Agents
In `@echo/server/dembrane/api/stats.py` around lines 87 - 111, The
_fetch_conversation_stats function currently passes all project_ids into a
single Directus `_in` filter which can exceed URL/query limits for large lists;
update it to avoid unbounded IN lists by either batching project_ids into
safe-size chunks and aggregating results from multiple directus.get_items calls,
or (preferred) change the filter to a relational server-side filter on the
conversation collection (e.g., use a relational condition like
project_id.<related_field>._nin or project_id.created_at._gte so Directus
performs the join) and remove the large `_in` array; locate the call to
directus.get_items in _fetch_conversation_stats and implement batching or
replace the filter with the relational filter approach.



async def _compute_stats() -> StatsResponse:
"""Compute fresh stats by querying Directus."""
admin_user_ids = await run_in_thread_pool(_fetch_admin_user_ids)
logger.debug("Admin user IDs to exclude: %s", admin_user_ids)

projects = await run_in_thread_pool(_fetch_projects, admin_user_ids)
project_ids = [p["id"] for p in projects] if projects else []
logger.debug("Non-admin projects since 2024: %d", len(project_ids))

conversations_count, total_seconds = await run_in_thread_pool(
_fetch_conversation_stats, project_ids
)
hours_recorded = round(total_seconds / 3600)

return StatsResponse(
projects_count=len(project_ids),
conversations_count=conversations_count,
hours_recorded=hours_recorded,
)


async def _get_cached_stats() -> StatsResponse | None:
"""Try to read stats from Redis cache."""
try:
redis = await get_redis_client()
raw = await redis.get(STATS_CACHE_KEY)
if raw:
# Redis client has decode_responses=False, so raw is bytes
data = json.loads(raw)
logger.debug("Stats cache hit")
return StatsResponse(**data)
except Exception as e:
logger.warning("Stats cache read error: %s", e)
return None


async def _set_cached_stats(stats: StatsResponse) -> None:
"""Store stats in Redis cache with TTL."""
try:
redis = await get_redis_client()
await redis.setex(
STATS_CACHE_KEY,
STATS_CACHE_TTL_SECONDS,
json.dumps(stats.model_dump()),
)
logger.debug("Stats cached with TTL %ds", STATS_CACHE_TTL_SECONDS)
except Exception as e:
logger.warning("Stats cache write error: %s", e)


async def _acquire_lock() -> bool:
"""Try to acquire a Redis lock for cache computation. Returns True if acquired."""
try:
redis = await get_redis_client()
# SET NX (only if not exists) with expiry to prevent deadlocks
acquired = await redis.set(
STATS_CACHE_LOCK_KEY,
"1",
nx=True,
ex=STATS_LOCK_TTL_SECONDS,
)
return acquired is not None
except Exception as e:
logger.warning("Lock acquire error: %s", e)
return False


async def _release_lock() -> None:
"""Release the Redis lock."""
try:
redis = await get_redis_client()
await redis.delete(STATS_CACHE_LOCK_KEY)
except Exception as e:
logger.warning("Lock release error: %s", e)
Comment on lines +164 to +187
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Lock release doesn't verify ownership — classic distributed-lock footgun 🔫

If _compute_stats() takes longer than STATS_LOCK_TTL_SECONDS (30s), the lock auto-expires, another request acquires it, and then the finally block on line 219 deletes that request's lock. This cascades into a stampede.

Fix: store a unique token (e.g., UUID) when acquiring, and only delete if the value matches. A Lua script makes this atomic.

🔒 Proposed fix: ownership-aware lock
+import uuid
+
-async def _acquire_lock() -> bool:
-    """Try to acquire a Redis lock for cache computation. Returns True if acquired."""
+async def _acquire_lock() -> str | None:
+    """Try to acquire a Redis lock. Returns a unique token if acquired, else None."""
     try:
         redis = await get_redis_client()
-        acquired = await redis.set(
+        token = str(uuid.uuid4())
+        acquired = await redis.set(
             STATS_CACHE_LOCK_KEY,
-            "1",
+            token,
             nx=True,
             ex=STATS_LOCK_TTL_SECONDS,
         )
-        return acquired is not None
+        return token if acquired else None
     except Exception as e:
         logger.warning("Lock acquire error: %s", e)
-        return False
+        return None


-async def _release_lock() -> None:
-    """Release the Redis lock."""
+_RELEASE_LOCK_LUA = """
+if redis.call("get", KEYS[1]) == ARGV[1] then
+    return redis.call("del", KEYS[1])
+else
+    return 0
+end
+"""
+
+async def _release_lock(token: str) -> None:
+    """Release the Redis lock only if we still own it."""
     try:
         redis = await get_redis_client()
-        await redis.delete(STATS_CACHE_LOCK_KEY)
+        await redis.eval(_RELEASE_LOCK_LUA, 1, STATS_CACHE_LOCK_KEY, token)
     except Exception as e:
         logger.warning("Lock release error: %s", e)

And in the endpoint:

-    if await _acquire_lock():
+    token = await _acquire_lock()
+    if token:
         try:
             ...
         finally:
-            await _release_lock()
+            await _release_lock(token)

Also applies to: 207-219

🤖 Prompt for AI Agents
In `@echo/server/dembrane/api/stats.py` around lines 164 - 187, The lock functions
must be ownership-aware: modify _acquire_lock to generate a unique token (e.g.,
UUID), set STATS_CACHE_LOCK_KEY to that token with nx and ex and return the
token (or None/False on failure) instead of a boolean; modify _release_lock to
accept the token and perform an atomic compare-and-delete using a small Lua
script (EVAL) that checks if the current value equals the token and only then
deletes the key; update callers (e.g., _compute_stats and its finally block) to
store the token returned by _acquire_lock and pass it into _release_lock so you
only delete the lock you own, and keep existing exception handling around
get_redis_client and TTL (STATS_LOCK_TTL_SECONDS).



@StatsRouter.get("/", response_model=StatsResponse)
async def get_public_stats(request: Request) -> StatsResponse:
"""
Public endpoint returning aggregate platform statistics.
Rate-limited to 10 requests per IP per minute.
Results are cached in Redis for 1 hour.
Uses a lock to prevent cache stampede on expiry.
"""
client_ip = _get_client_ip(request)
await _stats_rate_limiter.check(client_ip)

# Check cache first
cached = await _get_cached_stats()
if cached is not None:
return cached

# Cache miss — try to acquire lock to prevent stampede
if await _acquire_lock():
try:
# Double-check cache (another request may have populated it)
cached = await _get_cached_stats()
if cached is not None:
return cached

# Compute and cache fresh stats
stats = await _compute_stats()
await _set_cached_stats(stats)
return stats
finally:
await _release_lock()
else:
# Another request is computing — wait for it to finish, polling cache
for _ in range(STATS_LOCK_TTL_SECONDS * 2): # Wait up to lock TTL
await asyncio.sleep(0.5)
cached = await _get_cached_stats()
if cached is not None:
return cached

# Lock holder likely failed — return 503 instead of stampeding Directus
logger.warning("Stats computation timed out waiting for lock holder")
raise HTTPException(
status_code=503,
detail="Stats temporarily unavailable. Try again shortly.",
headers={"Retry-After": "5"},
)
Loading