Conversation
WalkthroughThis PR adds a public Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 6✅ Passed checks (6 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
No actionable comments were generated in the recent review. 🎉 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@echo/server/dembrane/api/stats.py`:
- Around line 164-187: The lock functions must be ownership-aware: modify
_acquire_lock to generate a unique token (e.g., UUID), set STATS_CACHE_LOCK_KEY
to that token with nx and ex and return the token (or None/False on failure)
instead of a boolean; modify _release_lock to accept the token and perform an
atomic compare-and-delete using a small Lua script (EVAL) that checks if the
current value equals the token and only then deletes the key; update callers
(e.g., _compute_stats and its finally block) to store the token returned by
_acquire_lock and pass it into _release_lock so you only delete the lock you
own, and keep existing exception handling around get_redis_client and TTL
(STATS_LOCK_TTL_SECONDS).
- Around line 87-111: The _fetch_conversation_stats function currently passes
all project_ids into a single Directus `_in` filter which can exceed URL/query
limits for large lists; update it to avoid unbounded IN lists by either batching
project_ids into safe-size chunks and aggregating results from multiple
directus.get_items calls, or (preferred) change the filter to a relational
server-side filter on the conversation collection (e.g., use a relational
condition like project_id.<related_field>._nin or project_id.created_at._gte so
Directus performs the join) and remove the large `_in` array; locate the call to
directus.get_items in _fetch_conversation_stats and implement batching or
replace the filter with the relational filter approach.
- Around line 1-29: Remove the legacy List import and use Python 3.11+ built-in
generics: delete List from "from typing import Any, List" and replace all
occurrences of List[...] with list[...] (and any tuple[...] uses if present) in
this module—update the type annotations referenced in the stats API (e.g.,
function signatures, Pydantic models, and any variables that currently use List
such as the public stats response annotations) so they use the built-in generics
instead.
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import asyncio | ||
| from typing import Any, List | ||
| from logging import getLogger | ||
|
|
||
| from fastapi import Request, APIRouter, HTTPException | ||
| from pydantic import BaseModel | ||
|
|
||
| from dembrane.directus import directus | ||
| from dembrane.redis_async import get_redis_client | ||
| from dembrane.async_helpers import run_in_thread_pool | ||
| from dembrane.api.rate_limit import create_rate_limiter | ||
|
|
||
| logger = getLogger("api.stats") | ||
|
|
||
| StatsRouter = APIRouter(tags=["stats"]) | ||
|
|
||
| STATS_CACHE_KEY = "dembrane:stats:public" | ||
| STATS_CACHE_LOCK_KEY = "dembrane:stats:public:lock" | ||
| STATS_CACHE_TTL_SECONDS = 3600 # 1 hour | ||
| STATS_LOCK_TTL_SECONDS = 30 # Lock expires after 30s to prevent deadlocks | ||
|
|
||
| _stats_rate_limiter = create_rate_limiter( | ||
| name="public_stats", | ||
| capacity=10, | ||
| window_seconds=60, # 10 requests per IP per minute | ||
| ) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Imports and constants look clean, ship it 🚀
One tiny nit: you're on Python 3.11+ per the guidelines — List from typing is legacy. Use the built-in list and tuple generics directly.
♻️ Use built-in generics (Python 3.11+)
-from typing import Any, List
+from typing import AnyThen update all List[...] annotations to list[...] throughout the file (lines 49, 65, 67, 87).
As per coding guidelines: echo/server/**/*.py: Follow Python 3.11+ with FastAPI and Pydantic patterns in backend code.
🤖 Prompt for AI Agents
In `@echo/server/dembrane/api/stats.py` around lines 1 - 29, Remove the legacy
List import and use Python 3.11+ built-in generics: delete List from "from
typing import Any, List" and replace all occurrences of List[...] with list[...]
(and any tuple[...] uses if present) in this module—update the type annotations
referenced in the stats API (e.g., function signatures, Pydantic models, and any
variables that currently use List such as the public stats response annotations)
so they use the built-in generics instead.
| def _fetch_conversation_stats(project_ids: List[str]) -> tuple[int, float]: | ||
| """ | ||
| Fetch conversation count and total duration for given project IDs. | ||
| Returns (count, total_seconds). | ||
| Sync — must be wrapped. | ||
| """ | ||
| if not project_ids: | ||
| return 0, 0.0 | ||
|
|
||
| conversations = directus.get_items( | ||
| "conversation", | ||
| { | ||
| "query": { | ||
| "filter": {"project_id": {"_in": project_ids}}, | ||
| "fields": ["duration"], | ||
| "limit": -1, | ||
| } | ||
| }, | ||
| ) | ||
| if not conversations: | ||
| return 0, 0.0 | ||
|
|
||
| count = len(conversations) | ||
| total_seconds = sum(c.get("duration") or 0 for c in conversations) | ||
| return count, total_seconds |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Unbounded _in filter could hit query limits with many project IDs.
If the platform grows to thousands of projects, passing all IDs in a single _in filter may exceed Directus URL/query-string limits (especially if this is a GET under the hood). Consider batching, or better yet, doing a relational filter on the conversation table directly (e.g., filter conversations where project_id.directus_user_id._nin and project_id.created_at._gte), letting Directus handle the join server-side.
Not a blocker today but worth keeping on the radar.
🤖 Prompt for AI Agents
In `@echo/server/dembrane/api/stats.py` around lines 87 - 111, The
_fetch_conversation_stats function currently passes all project_ids into a
single Directus `_in` filter which can exceed URL/query limits for large lists;
update it to avoid unbounded IN lists by either batching project_ids into
safe-size chunks and aggregating results from multiple directus.get_items calls,
or (preferred) change the filter to a relational server-side filter on the
conversation collection (e.g., use a relational condition like
project_id.<related_field>._nin or project_id.created_at._gte so Directus
performs the join) and remove the large `_in` array; locate the call to
directus.get_items in _fetch_conversation_stats and implement batching or
replace the filter with the relational filter approach.
| async def _acquire_lock() -> bool: | ||
| """Try to acquire a Redis lock for cache computation. Returns True if acquired.""" | ||
| try: | ||
| redis = await get_redis_client() | ||
| # SET NX (only if not exists) with expiry to prevent deadlocks | ||
| acquired = await redis.set( | ||
| STATS_CACHE_LOCK_KEY, | ||
| "1", | ||
| nx=True, | ||
| ex=STATS_LOCK_TTL_SECONDS, | ||
| ) | ||
| return acquired is not None | ||
| except Exception as e: | ||
| logger.warning("Lock acquire error: %s", e) | ||
| return False | ||
|
|
||
|
|
||
| async def _release_lock() -> None: | ||
| """Release the Redis lock.""" | ||
| try: | ||
| redis = await get_redis_client() | ||
| await redis.delete(STATS_CACHE_LOCK_KEY) | ||
| except Exception as e: | ||
| logger.warning("Lock release error: %s", e) |
There was a problem hiding this comment.
Lock release doesn't verify ownership — classic distributed-lock footgun 🔫
If _compute_stats() takes longer than STATS_LOCK_TTL_SECONDS (30s), the lock auto-expires, another request acquires it, and then the finally block on line 219 deletes that request's lock. This cascades into a stampede.
Fix: store a unique token (e.g., UUID) when acquiring, and only delete if the value matches. A Lua script makes this atomic.
🔒 Proposed fix: ownership-aware lock
+import uuid
+
-async def _acquire_lock() -> bool:
- """Try to acquire a Redis lock for cache computation. Returns True if acquired."""
+async def _acquire_lock() -> str | None:
+ """Try to acquire a Redis lock. Returns a unique token if acquired, else None."""
try:
redis = await get_redis_client()
- acquired = await redis.set(
+ token = str(uuid.uuid4())
+ acquired = await redis.set(
STATS_CACHE_LOCK_KEY,
- "1",
+ token,
nx=True,
ex=STATS_LOCK_TTL_SECONDS,
)
- return acquired is not None
+ return token if acquired else None
except Exception as e:
logger.warning("Lock acquire error: %s", e)
- return False
+ return None
-async def _release_lock() -> None:
- """Release the Redis lock."""
+_RELEASE_LOCK_LUA = """
+if redis.call("get", KEYS[1]) == ARGV[1] then
+ return redis.call("del", KEYS[1])
+else
+ return 0
+end
+"""
+
+async def _release_lock(token: str) -> None:
+ """Release the Redis lock only if we still own it."""
try:
redis = await get_redis_client()
- await redis.delete(STATS_CACHE_LOCK_KEY)
+ await redis.eval(_RELEASE_LOCK_LUA, 1, STATS_CACHE_LOCK_KEY, token)
except Exception as e:
logger.warning("Lock release error: %s", e)And in the endpoint:
- if await _acquire_lock():
+ token = await _acquire_lock()
+ if token:
try:
...
finally:
- await _release_lock()
+ await _release_lock(token)Also applies to: 207-219
🤖 Prompt for AI Agents
In `@echo/server/dembrane/api/stats.py` around lines 164 - 187, The lock functions
must be ownership-aware: modify _acquire_lock to generate a unique token (e.g.,
UUID), set STATS_CACHE_LOCK_KEY to that token with nx and ex and return the
token (or None/False on failure) instead of a boolean; modify _release_lock to
accept the token and perform an atomic compare-and-delete using a small Lua
script (EVAL) that checks if the current value equals the token and only then
deletes the key; update callers (e.g., _compute_stats and its finally block) to
store the token returned by _acquire_lock and pass it into _release_lock so you
only delete the lock you own, and keep existing exception handling around
get_redis_client and TTL (STATS_LOCK_TTL_SECONDS).
Summary by CodeRabbit
New Features
/statsendpoint returning aggregate metrics: project count, conversation count, and total hours recorded.Style