Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions echo/.devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
- 9001:9001
entrypoint: >
/bin/sh -c "
/usr/bin/mc config host add myminio http://minio:9000 ${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD};
/usr/bin/mc config host add myminio http://minio:9000 $$MINIO_ROOT_USER $$MINIO_ROOT_PASSWORD;
/usr/bin/mc mb --ignore-existing myminio/dembrane;
/usr/bin/mc policy set download myminio/dembrane;
minio server /mnt/data --console-address ":9001"
Expand All @@ -22,11 +22,9 @@ services:
restart: unless-stopped

redis:
image: 'bitnami/redis:6.2.14'
environment:
- ALLOW_EMPTY_PASSWORD=yes
image: valkey/valkey:8.0
volumes:
- ./redis_data:/bitnami/redis/data
- ./redis_data:/data

postgres:
image: pgvector/pgvector:0.6.2-pg16
Expand Down Expand Up @@ -141,5 +139,3 @@ services:
- postgres
- redis
- neo4j


4 changes: 2 additions & 2 deletions echo/server/dembrane/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def stream_anthropic_chat_response(
system: List[Dict[str, Any]], messages: List[Dict[str, Any]], protocol: str = "data"
) -> Generator[str, None, None]:
"""
Generates response from Anthropic
and returns openAI like stream response
Generates response from Anthropic
and returns openAI like stream response
"""
stream = anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
Expand Down
89 changes: 39 additions & 50 deletions echo/server/dembrane/api/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

from fastapi import Request, APIRouter
from pydantic import BaseModel
from litellm.utils import token_counter
from sqlalchemy.orm import noload, selectinload
from fastapi.responses import RedirectResponse, StreamingResponse
from fastapi.exceptions import HTTPException
from litellm.exceptions import ContentPolicyViolationError

from dembrane.s3 import get_signed_url
from dembrane.utils import CacheWithExpiration, generate_uuid, get_utc_timestamp
from dembrane.config import LIGHTRAG_LITELLM_INFERENCE_MODEL
from dembrane.database import (
ConversationModel,
ConversationChunkModel,
Expand All @@ -23,7 +25,6 @@
sanitize_filename_component,
merge_multiple_audio_files_and_save_to_s3,
)
from dembrane.quote_utils import count_tokens
from dembrane.reply_utils import generate_reply_for_conversation
from dembrane.api.stateless import (
DeleteConversationRequest,
Expand Down Expand Up @@ -230,10 +231,7 @@ async def get_conversation_counts(

from dembrane.service import conversation_service

counts = await run_in_thread_pool(
conversation_service.get_chunk_counts,
conversation_id
)
counts = await run_in_thread_pool(conversation_service.get_chunk_counts, conversation_id)

return counts

Expand Down Expand Up @@ -339,10 +337,7 @@ async def get_conversation_content(

duration = -1.0
try:
duration = await run_in_thread_pool(
get_duration_from_s3,
merged_path
)
duration = await run_in_thread_pool(get_duration_from_s3, merged_path)
except Exception as e:
logger.error(f"Error getting duration from s3: {str(e)}")

Expand Down Expand Up @@ -462,10 +457,11 @@ async def get_conversation_token_count(

# If not in cache, calculate the token count
transcript = await get_conversation_transcript(conversation_id, auth)

token_count = await run_in_thread_pool(
count_tokens,
transcript,
provider="anthropic"
token_counter,
model=LIGHTRAG_LITELLM_INFERENCE_MODEL,
messages=[{"role": "user", "content": transcript}],
)

# Store the result in the cache
Expand Down Expand Up @@ -535,7 +531,9 @@ async def summarize_conversation(

language = conversation_data["project_id"]["language"]

transcript_str = await get_conversation_transcript(conversation_id, auth, include_project_data=True)
transcript_str = await get_conversation_transcript(
conversation_id, auth, include_project_data=True
)

if transcript_str == "":
return {
Expand All @@ -544,9 +542,7 @@ async def summarize_conversation(
}
else:
summary = await run_in_thread_pool(
generate_summary,
transcript_str,
language if language else "en"
generate_summary, transcript_str, language if language else "en"
)

await run_in_thread_pool(
Expand Down Expand Up @@ -634,10 +630,7 @@ async def retranscribe_conversation(

duration = None
try:
duration = await run_in_thread_pool(
get_duration_from_s3,
merged_audio_path
)
duration = await run_in_thread_pool(get_duration_from_s3, merged_audio_path)
except Exception as e:
logger.error(f"Error getting duration from s3: {str(e)}")

Expand Down Expand Up @@ -669,15 +662,17 @@ async def retranscribe_conversation(

try:
logger.info(f"Creating links from {conversation_id} to {new_conversation_id}")
link_id = (await run_in_thread_pool(
directus.create_item,
"conversation_link",
item_data={
"source_conversation_id": conversation_id,
"target_conversation_id": new_conversation_id,
"link_type": "CLONE",
},
))["data"]["id"]
link_id = (
await run_in_thread_pool(
directus.create_item,
"conversation_link",
item_data={
"source_conversation_id": conversation_id,
"target_conversation_id": new_conversation_id,
"link_type": "CLONE",
},
)
)["data"]["id"]
logger.info(f"Link created: {link_id}")
except Exception as e:
logger.error(f"Error creating links: {str(e)}")
Expand All @@ -687,17 +682,19 @@ async def retranscribe_conversation(
chunk_id = generate_uuid()
timestamp = get_utc_timestamp().isoformat()

(await run_in_thread_pool(
directus.create_item,
"conversation_chunk",
item_data={
"id": chunk_id,
"conversation_id": new_conversation_id,
"timestamp": timestamp,
"path": merged_audio_path,
"source": "CLONE",
},
))["data"]
(
await run_in_thread_pool(
directus.create_item,
"conversation_chunk",
item_data={
"id": chunk_id,
"conversation_id": new_conversation_id,
"timestamp": timestamp,
"path": merged_audio_path,
"source": "CLONE",
},
)
)["data"]

logger.debug(f"Queuing transcription for chunk {chunk_id}")
# Import task locally to avoid circular imports
Expand All @@ -712,11 +709,7 @@ async def retranscribe_conversation(
}
except Exception as e:
# Clean up the partially created conversation
await run_in_thread_pool(
directus.delete_item,
"conversation",
new_conversation_id
)
await run_in_thread_pool(directus.delete_item, "conversation", new_conversation_id)
logger.error(f"Error during retranscription: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to process audio: {str(e)}") from e

Expand Down Expand Up @@ -763,11 +756,7 @@ async def delete_conversation(
session=auth,
)
# Run Directus deletion
await run_in_thread_pool(
directus.delete_item,
"conversation",
conversation_id
)
await run_in_thread_pool(directus.delete_item, "conversation", conversation_id)
return {"status": "success", "message": "Conversation deleted successfully"}
except Exception as e:
logger.exception(f"Error deleting conversation {conversation_id}: {e}")
Expand Down
25 changes: 10 additions & 15 deletions echo/server/dembrane/api/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def iterfile() -> Generator[bytes, None, None]:

async def get_latest_project_analysis_run(project_id: str) -> Optional[dict]:
try:
def _get_analysis_run():
def _get_analysis_run() -> Optional[list[dict]]:
with directus_client_context() as client:
return client.get_items(
"project_analysis_run",
Expand All @@ -215,7 +215,7 @@ def _get_analysis_run():
},
)

analysis_run = await run_in_thread_pool(_get_analysis_run)
analysis_run: Optional[list[dict]] = await run_in_thread_pool(_get_analysis_run)

if analysis_run is None:
return None
Expand Down Expand Up @@ -247,10 +247,7 @@ async def post_create_project_library(
from dembrane.service.project import ProjectNotFoundException

try:
project = await run_in_thread_pool(
project_service.get_by_id_or_raise,
project_id
)
project = await run_in_thread_pool(project_service.get_by_id_or_raise, project_id)
except ProjectNotFoundException as e:
raise HTTPException(status_code=404, detail="Project not found") from e

Expand Down Expand Up @@ -298,10 +295,7 @@ async def post_create_view(
from dembrane.service.project import ProjectNotFoundException

try:
project = await run_in_thread_pool(
project_service.get_by_id_or_raise,
project_id
)
project = await run_in_thread_pool(project_service.get_by_id_or_raise, project_id)
except ProjectNotFoundException as e:
raise HTTPException(status_code=404, detail="Project not found") from e

Expand All @@ -325,12 +319,13 @@ class CreateReportRequestBodySchema(BaseModel):


@ProjectRouter.post("/{project_id}/create-report")
async def create_report(project_id: str, body: CreateReportRequestBodySchema) -> None:
async def create_report(project_id: str, body: CreateReportRequestBodySchema) -> dict:
language = body.language or "en"
try:
report_content_response = await get_report_content_for_project(project_id, language)
Comment on lines +322 to 325
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Blocker: create_report lacks auth and ownership checks.

This endpoint currently allows anyone to create a report for any project. Mirror the access guard used elsewhere (403 for non-owner, 404 if missing).

Apply:

-@ProjectRouter.post("/{project_id}/create-report")
-async def create_report(project_id: str, body: CreateReportRequestBodySchema) -> dict:
+@ProjectRouter.post("/{project_id}/create-report")
+async def create_report(
+    project_id: str,
+    body: CreateReportRequestBodySchema,
+    auth: DependencyDirectusSession,
+) -> dict:
     language = body.language or "en"
+    # Ownership / access guard
+    from dembrane.service import project_service
+    from dembrane.service.project import ProjectNotFoundException
+    try:
+        project = await run_in_thread_pool(project_service.get_by_id_or_raise, project_id)
+    except ProjectNotFoundException as e:
+        raise HTTPException(status_code=404, detail="Project not found") from e
+    if not auth.is_admin and project.get("directus_user_id", "") != auth.user_id:
+        raise HTTPException(status_code=403, detail="User does not have access to this project")

Also note: return type changed from None to dict — verify API consumers.


🏁 Script executed:

#!/bin/bash
# Find server/client code that calls or expects the create-report response shape
rg -n -C2 'create-report' --type=py --type=ts --type=tsx
rg -n -C2 '\bcreate_report\(' --type=py

Length of output: 412



🏁 Script executed:

#!/bin/bash
# Get full implementation of create_report
sed -n '320,360p' echo/server/dembrane/api/project.py

# Find other protected endpoints to see auth pattern
rg -n 'auth.*DependencyDirectusSession' --type=py -A5 echo/server/dembrane/api/

# Check project_service module structure
find . -name 'project_service.py' -o -name 'project.py' | grep service

# Check if create_report is called anywhere
rg -n 'create_report' --type=py -g '!node_modules'

Length of output: 12093


🏁 Script executed:

#!/bin/bash
# Also check what ProjectNotFoundException actually is
rg -n 'class ProjectNotFoundException' --type=py
rg -n 'ProjectNotFoundException' --type=py -B2 -A2

Length of output: 7113



🏁 Script executed:

#!/bin/bash
# Check if create_report has tests
rg -n 'def test.*create_report' --type=py
rg -n 'create_report' echo/server/tests/ --type=py

# Check git to see if function is newly added (look at git diff context)
git log --oneline -n 20 -- echo/server/dembrane/api/project.py 2>/dev/null || echo "Git history unavailable"

# Verify if there are ownership checks in tests for similar endpoints
rg -n 'is_admin.*auth' --type=py -B2 -A2 echo/server/tests/

Length of output: 722


Blocker: create_report lacks auth and ownership checks.

This endpoint allows unauthenticated requests to create reports for any project. Apply the same access guard pattern used in create_library and other protected endpoints:

 @ProjectRouter.post("/{project_id}/create-report")
-async def create_report(project_id: str, body: CreateReportRequestBodySchema) -> dict:
+async def create_report(
+    project_id: str,
+    body: CreateReportRequestBodySchema,
+    auth: DependencyDirectusSession,
+) -> dict:
     language = body.language or "en"
+    # Ownership / access guard
+    from dembrane.service import project_service
+    from dembrane.service.project import ProjectNotFoundException
+    try:
+        project = await run_in_thread_pool(project_service.get_by_id_or_raise, project_id)
+    except ProjectNotFoundException as e:
+        raise HTTPException(status_code=404, detail="Project not found") from e
+    if not auth.is_admin and project.get("directus_user_id", "") != auth.user_id:
+        raise HTTPException(status_code=403, detail="User does not have access to this project")
🤖 Prompt for AI Agents
In echo/server/dembrane/api/project.py around lines 322 to 325, the
create_report function currently performs no authentication or ownership checks;
update it to follow the same access-guard pattern used by create_library and
other protected endpoints by: 1) retrieving the current user (or auth context)
from the request/session/context at the start of the function, 2) returning 401
if unauthenticated, 3) loading the target project and verifying the user has
ownership or required permissions (or calling the existing guard helper used
elsewhere), returning 403 if the user is not allowed, and 4) only then calling
get_report_content_for_project and continuing; also import and reuse the same
guard/auth helper functions and error response types used by create_library to
keep behavior consistent.

except ContextTooLongException:
def _create_error_report():

def _create_error_report() -> dict:
with directus_client_context() as client:
return client.create_item(
"project_report",
Expand All @@ -342,13 +337,13 @@ def _create_error_report():
"error_code": "CONTEXT_TOO_LONG",
},
)["data"]

report = await run_in_thread_pool(_create_error_report)
return report
except Exception as e:
raise e

Comment on lines 343 to 345
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Remove redundant catch/rethrow; log and re-raise cleanly.

Catching Exception just to re-raise is noise. Either let it bubble or log then raise.

Apply:

-    except Exception as e:
-        raise e
+    except Exception:
+        logger.exception(f"create_report failed for project {project_id}")
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
raise e
except Exception:
logger.exception(f"create_report failed for project {project_id}")
raise
🤖 Prompt for AI Agents
In echo/server/dembrane/api/project.py around lines 343-345, the except block
catches Exception only to re-raise it which is redundant; either remove the
try/except so the exception naturally bubbles up, or if you need to record it,
replace the current except block with a logging statement that logs the
exception and then re-raise using plain "raise" (not "raise e") to preserve the
original traceback.

def _create_report():
def _create_report() -> dict:
with directus_client_context() as client:
return client.create_item(
"project_report",
Expand All @@ -359,7 +354,7 @@ def _create_report():
"status": "archived",
},
)["data"]

report = await run_in_thread_pool(_create_report)
return report

Expand Down
13 changes: 11 additions & 2 deletions echo/server/dembrane/api/stateless.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
from lightrag.lightrag import QueryParam
from lightrag.kg.shared_storage import initialize_pipeline_status

from dembrane.config import (
SMALL_LITELLM_MODEL,
SMALL_LITELLM_API_KEY,
SMALL_LITELLM_API_BASE,
SMALL_LITELLM_API_VERSION,
)
from dembrane.prompts import render_prompt
from dembrane.rag_manager import RAGManager, get_rag
from dembrane.postgresdb_manager import PostgresDBManager
Expand Down Expand Up @@ -50,13 +56,16 @@ def generate_summary(transcript: str, language: str | None) -> str:

# Call the model over the provided API endpoint
response = completion(
model="anthropic/claude-3-5-sonnet-20240620",
model=SMALL_LITELLM_MODEL,
messages=[
{
"content": prompt,
"role": "user",
}
],
api_key=SMALL_LITELLM_API_KEY,
api_base=SMALL_LITELLM_API_BASE,
api_version=SMALL_LITELLM_API_VERSION,
)

response_content = response["choices"][0]["message"]["content"]
Expand Down Expand Up @@ -262,7 +271,7 @@ async def get_lightrag_prompt(
ids=[str(id) for id in echo_segment_ids],
top_k=payload.top_k,
)

try:
response = await rag.aquery(payload.query, param=param)
logger.debug(f"Response: {response}")
Expand Down
Loading
Loading