From 56367fe6ad36a312dc0ee942624767944369f375 Mon Sep 17 00:00:00 2001 From: Roy Date: Tue, 22 Apr 2025 08:59:32 +0000 Subject: [PATCH 01/13] Enhance chat functionality with citation and conversation reference handling - Introduced new asynchronous functions `get_conversation_references` and `get_conversation_citations` in `chat_utils.py` to retrieve conversation details and citations based on the RAG prompt. - Updated `ProjectChatMessageModel` to include `conversation_references` and `citations` fields for structured data storage. - Refactored `post_chat` in `chat.py` to utilize the new citation and reference functions, improving the chat context management. - Added a new Jinja template for text structuring to facilitate citation mapping in responses. - Improved error handling and logging throughout the chat API for better traceability of citation generation processes. --- echo/server/dembrane/api/chat.py | 118 +++++------------- .../audio_lightrag/utils/lightrag_utils.py | 1 - echo/server/dembrane/chat_utils.py | 66 +++++++++- echo/server/dembrane/database.py | 9 +- .../text_structuring_model_message.en.jinja | 8 ++ 5 files changed, 109 insertions(+), 93 deletions(-) create mode 100644 echo/server/prompt_templates/text_structuring_model_message.en.jinja diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index feb6e4de..4c89936a 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -8,10 +8,7 @@ import litellm from fastapi import Query, APIRouter, HTTPException -from litellm import ( # type: ignore - # completion, - token_counter, -) +from litellm import token_counter # type: ignore from pydantic import BaseModel from fastapi.responses import StreamingResponse @@ -22,11 +19,6 @@ LIGHTRAG_LITELLM_INFERENCE_MODEL, LIGHTRAG_LITELLM_INFERENCE_API_KEY, ) - -# LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_MODEL, -# LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_KEY, -# LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_BASE, -# LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_VERSION, from dembrane.database import ( DatabaseSession, ProjectChatModel, @@ -39,6 +31,8 @@ from dembrane.chat_utils import ( MAX_CHAT_CONTEXT_LENGTH, get_project_chat_history, + get_conversation_citations, + get_conversation_references, get_lightrag_prompt_by_params, create_system_messages_for_chat, ) @@ -47,9 +41,6 @@ from dembrane.api.dependency_auth import DirectusSession, DependencyDirectusSession from dembrane.audio_lightrag.utils.lightrag_utils import ( get_project_id, - # get_conversation_name_from_id, - # run_segment_id_to_conversation_id, - get_conversation_details_for_rag_query, ) ChatRouter = APIRouter(tags=["chat"]) @@ -365,12 +356,6 @@ class ChatBodyMessageSchema(BaseModel): class ChatBodySchema(BaseModel): messages: List[ChatBodyMessageSchema] -class CitationSingleSchema(BaseModel): - segment_id: int - verbatim_reference_text_chunk: str - -class CitationsSchema(BaseModel): - citations: List[CitationSingleSchema] @ChatRouter.post("/{chat_id}") async def post_chat( @@ -463,25 +448,22 @@ async def post_chat( db.add(dembrane_dummy_message) db.commit() - try: - conversation_references = await get_conversation_details_for_rag_query(rag_prompt) - conversation_references = {'conversation_references': conversation_references} - except Exception as e: - logger.info(f"No references found. Error: {str(e)}") - conversation_references = {'conversation_references':{}} + conversation_references = await get_conversation_references(rag_prompt) - ## TODO: Enable when frontend can handle - # dembrane_prompt_conversations_message = ProjectChatMessageModel( - # id=generate_uuid(), - # date_created=get_utc_timestamp(), - # message_from="dembrane", - # text="prompt_conversations created", - # prompt_conversations=conversation_references, - # project_chat_id=chat_id, - # ) - # db.add(dembrane_prompt_conversations_message) - # db.commit() + dembrane_conversation_reference_message = ProjectChatMessageModel( + id=generate_uuid(), + date_created=get_utc_timestamp(), + message_from="dembrane", + text="conversation references created", + project_chat_id=chat_id, + conversation_references=conversation_references, + ) + db.add(dembrane_conversation_reference_message) + db.commit() async def stream_response_async() -> AsyncGenerator[str, None]: + conversation_references_yeild = f"2:{json.dumps(conversation_references)}\n" + yield conversation_references_yeild + accumulated_response = "" try: response = await litellm.acompletion( @@ -490,7 +472,7 @@ async def stream_response_async() -> AsyncGenerator[str, None]: stream=True, api_key=LIGHTRAG_LITELLM_INFERENCE_API_KEY ) - async for chunk in response: + async for chunk in response: #type: ignore if chunk.choices[0].delta.content: content = chunk.choices[0].delta.content accumulated_response += content @@ -511,56 +493,20 @@ async def stream_response_async() -> AsyncGenerator[str, None]: yield "Error: An error occurred while processing the chat response." return # Stop generation on error - ## TODO: Enable when frontend can handle - # # Move all this to utils - # text_structuring_model_message = f''' - # You are a helpful assistant that maps the correct references to the generated response. - # Your task is to map the references segment_id to the correct reference text. - # For every reference segment_id, you need to provide the most relevant reference text verbatim. - # Segment ID is always of the format: SEGMENT_ID_. - # Here is the generated response: - # {accumulated_response} - # Here are the rag prompt: - # {rag_prompt} - # ''' - # text_structuring_model_messages = [ - # {"role": "system", "content": text_structuring_model_message}, - # ] - # # Generate citations - - # text_structuring_model_generation = completion( - # model=f"{LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_MODEL}", - # messages=text_structuring_model_messages, - # api_base=LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_BASE, - # api_version=LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_VERSION, - # api_key=LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_KEY, - # response_format=CitationsSchema) - # try: - # citations_dict = json.loads(text_structuring_model_generation.choices[0].message.content) - # citations_list = citations_dict["citations"]# List[Dict[str, str]] - # if len(citations_list) > 0: - # for idx, citation in enumerate(citations_list): - # conversation_id = await run_segment_id_to_conversation_id(citation['segment_id']) - # citations_list[idx]['conversation_id'] = conversation_id - # conversation_name = get_conversation_name_from_id(conversation_id) - # citations_list[idx]['conversation_name'] = conversation_name - # else: - # logger.warning("WARNING: No citations found") - # citations_list = json.dumps(citations_list) - # except Exception as e: - # logger.warning(f"WARNING: Error in citation extraction. Skipping citations: {str(e)}") - # citations_list = [] - # citations_count = len(citations_list) - # dembrane_citations_message = ProjectChatMessageModel( - # id=generate_uuid(), - # date_created=get_utc_timestamp(), - # message_from="dembrane", - # text=f"{citations_count} citations found.", - # project_chat_id=chat_id, - # citations=citations_list, - # ) - # db.add(dembrane_citations_message) - # db.commit() + citations_list = await get_conversation_citations(rag_prompt, accumulated_response) + citations_count = len(citations_list) + citations_yeild = f"2:{json.dumps([{'citations': citations_list}])}\n" + yield citations_yeild + dembrane_citations_message = ProjectChatMessageModel( + id=generate_uuid(), + date_created=get_utc_timestamp(), + message_from="dembrane", + text=f"{citations_count} citations found.", + project_chat_id=chat_id, + citations=citations_list, + ) + db.add(dembrane_citations_message) + db.commit() headers = {"Content-Type": "text/event-stream"} if protocol == "data": headers["x-vercel-ai-data-stream"] = "v1" diff --git a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py index d6276c59..180a1759 100644 --- a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py +++ b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py @@ -43,7 +43,6 @@ def is_valid_uuid(uuid_str: str) -> bool: def get_conversation_name_from_id(conversation_id: str) -> str: query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['participant_name']}} - print(query) return directus.get_items("conversation", query)[0]['participant_name'] async def run_segment_id_to_conversation_id(segment_id: int) -> str: diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index dcbb9988..eefa82b3 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -1,15 +1,28 @@ +import json import logging from typing import Any, Dict, List, Optional +from litellm import completion from pydantic import BaseModel from sqlalchemy.orm import Session +from dembrane.config import ( + LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_MODEL, + LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_KEY, + LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_BASE, + LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_VERSION, +) from dembrane.prompts import render_prompt from dembrane.database import ConversationModel, ProjectChatMessageModel from dembrane.directus import directus from dembrane.api.stateless import GetLightragQueryRequest, get_lightrag_prompt from dembrane.api.conversation import get_conversation_transcript from dembrane.api.dependency_auth import DirectusSession +from dembrane.audio_lightrag.utils.lightrag_utils import ( + get_conversation_name_from_id, + run_segment_id_to_conversation_id, + get_conversation_details_for_rag_query, +) MAX_CHAT_CONTEXT_LENGTH = 100000 @@ -139,4 +152,55 @@ async def get_lightrag_prompt_by_params(top_k: int, ) session = DirectusSession(user_id="none", is_admin=True)#fake session rag_prompt = await get_lightrag_prompt(payload, session) - return rag_prompt \ No newline at end of file + return rag_prompt + + +async def get_conversation_references(rag_prompt: str) -> List[Dict[str, Any]]: + try: + conversation_references = await get_conversation_details_for_rag_query(rag_prompt) + conversation_references = {'references': conversation_references} + except Exception as e: + logger.info(f"No references found. Error: {str(e)}") + conversation_references = {'references':{}} + return [conversation_references] + +class CitationSingleSchema(BaseModel): + segment_id: int + verbatim_reference_text_chunk: str + +class CitationsSchema(BaseModel): + citations: List[CitationSingleSchema] + +async def get_conversation_citations(rag_prompt: str, accumulated_response: str, language: str = "en") -> List[Dict[int, Any]]: + text_structuring_model_message = render_prompt("text_structuring_model_message", language, + { + 'accumulated_response': accumulated_response, + 'rag_prompt':rag_prompt + } + ) + text_structuring_model_messages = [ + {"role": "system", "content": text_structuring_model_message}, + ] + text_structuring_model_generation = completion( + model=f"{LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_MODEL}", + messages=text_structuring_model_messages, + api_base=LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_BASE, + api_version=LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_VERSION, + api_key=LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_KEY, + response_format=CitationsSchema) + try: + citations_dict = json.loads(text_structuring_model_generation.choices[0].message.content)#type: ignore + citations_list = citations_dict["citations"] + if len(citations_list) > 0: + for idx, citation in enumerate(citations_list): + conversation_id = await run_segment_id_to_conversation_id(citation['segment_id']) + citations_list[idx]['conversation_id'] = conversation_id + conversation_name = get_conversation_name_from_id(conversation_id) + citations_list[idx]['conversation_name'] = conversation_name + else: + logger.warning("WARNING: No citations found") + citations_list = json.dumps(citations_list) + except Exception as e: + logger.warning(f"WARNING: Error in citation extraction. Skipping citations: {str(e)}") + citations_list = [] + return citations_list \ No newline at end of file diff --git a/echo/server/dembrane/database.py b/echo/server/dembrane/database.py index ba4c8510..9e2c91f4 100644 --- a/echo/server/dembrane/database.py +++ b/echo/server/dembrane/database.py @@ -1,6 +1,6 @@ # this is not upto date. switched to directus for a better life from enum import Enum -from typing import Any, List, Optional, Annotated, Generator #, Dict +from typing import Any, Dict, List, Optional, Annotated, Generator from logging import getLogger from datetime import datetime, timezone @@ -28,7 +28,7 @@ declarative_base, ) from pgvector.sqlalchemy import Vector # type: ignore -from sqlalchemy.dialects.postgresql import UUID #, JSONB +from sqlalchemy.dialects.postgresql import UUID, JSONB from dembrane.config import DATABASE_URL from dembrane.embedding import EMBEDDING_DIM @@ -280,9 +280,8 @@ class ProjectChatMessageModel(Base): secondary=project_chat_message_conversation_association_1_table, ) tokens_count: Mapped[int] = mapped_column(Integer) - # # Enable when frontend can handle - # prompt_conversations: Mapped[List[Dict[str, str]]] = mapped_column(JSONB, default=[]) - # citations: Mapped[List[Dict[str, str]]] = mapped_column(JSONB, default=[]) + conversation_references: Mapped[List[Dict[str, str]]] = mapped_column(JSONB, default=[]) + citations: Mapped[List[Dict[str, str]]] = mapped_column(JSONB, default=[]) class ProjectChatModel(Base): diff --git a/echo/server/prompt_templates/text_structuring_model_message.en.jinja b/echo/server/prompt_templates/text_structuring_model_message.en.jinja new file mode 100644 index 00000000..70e0a4eb --- /dev/null +++ b/echo/server/prompt_templates/text_structuring_model_message.en.jinja @@ -0,0 +1,8 @@ +You are a helpful assistant that maps the correct references to the generated response. +Your task is to map the references segment_id to the correct reference text. +For every reference segment_id, you need to provide the most relevant reference text snippet. +Segment ID is always of the format: SEGMENT_ID_. +Here is the generated response: +{{accumulated_response}} +Here are the rag prompt: +{{rag_prompt}} \ No newline at end of file From adb8d913ed5c381f68849561c2f1afa07fe80c29 Mon Sep 17 00:00:00 2001 From: Roy Date: Tue, 22 Apr 2025 19:14:44 +0000 Subject: [PATCH 02/13] Refactor chat_utils.py and chat.py for improved citation handling and logging - Changed logging level from info to warning in get_conversation_references for better error visibility. - Updated get_conversation_citations to return structured citations by conversation, enhancing data organization. - Simplified citation extraction logic and improved error handling in post_chat, removing unnecessary database commits for conversation references and citations. - Enhanced clarity in variable naming and streamlined the return structure for better maintainability. --- echo/server/dembrane/api/chat.py | 29 ++++------------------------- echo/server/dembrane/chat_utils.py | 20 +++++++++----------- 2 files changed, 13 insertions(+), 36 deletions(-) diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index 4c89936a..97476cca 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -156,7 +156,7 @@ async def get_chat_context( ) for conversation in used_conversations: - is_conversation_locked = conversation.id in locked_conversations + is_conversation_locked = conversation.id in locked_conversations # Verify with directus chat_context_resource = ChatContextConversationSchema( conversation_id=conversation.id, conversation_participant_name=conversation.participant_name, @@ -326,7 +326,7 @@ async def lock_conversations( .all() ) - dembrane_message = ProjectChatMessageModel( + dembrane_search_complete_message = ProjectChatMessageModel( id=generate_uuid(), date_created=get_utc_timestamp(), message_from="dembrane", @@ -335,7 +335,7 @@ async def lock_conversations( used_conversations=added_conversations, added_conversations=added_conversations, ) - db.add(dembrane_message) + db.add(dembrane_search_complete_message) db.commit() # Fetch ConversationModel objects for used_conversations @@ -450,16 +450,6 @@ async def post_chat( conversation_references = await get_conversation_references(rag_prompt) - dembrane_conversation_reference_message = ProjectChatMessageModel( - id=generate_uuid(), - date_created=get_utc_timestamp(), - message_from="dembrane", - text="conversation references created", - project_chat_id=chat_id, - conversation_references=conversation_references, - ) - db.add(dembrane_conversation_reference_message) - db.commit() async def stream_response_async() -> AsyncGenerator[str, None]: conversation_references_yeild = f"2:{json.dumps(conversation_references)}\n" yield conversation_references_yeild @@ -494,19 +484,8 @@ async def stream_response_async() -> AsyncGenerator[str, None]: return # Stop generation on error citations_list = await get_conversation_citations(rag_prompt, accumulated_response) - citations_count = len(citations_list) - citations_yeild = f"2:{json.dumps([{'citations': citations_list}])}\n" + citations_yeild = f"2:{json.dumps(citations_list)}\n" yield citations_yeild - dembrane_citations_message = ProjectChatMessageModel( - id=generate_uuid(), - date_created=get_utc_timestamp(), - message_from="dembrane", - text=f"{citations_count} citations found.", - project_chat_id=chat_id, - citations=citations_list, - ) - db.add(dembrane_citations_message) - db.commit() headers = {"Content-Type": "text/event-stream"} if protocol == "data": headers["x-vercel-ai-data-stream"] = "v1" diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index eefa82b3..c2b77e90 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -160,7 +160,7 @@ async def get_conversation_references(rag_prompt: str) -> List[Dict[str, Any]]: conversation_references = await get_conversation_details_for_rag_query(rag_prompt) conversation_references = {'references': conversation_references} except Exception as e: - logger.info(f"No references found. Error: {str(e)}") + logger.warning(f"No references found. Error: {str(e)}") conversation_references = {'references':{}} return [conversation_references] @@ -171,7 +171,7 @@ class CitationSingleSchema(BaseModel): class CitationsSchema(BaseModel): citations: List[CitationSingleSchema] -async def get_conversation_citations(rag_prompt: str, accumulated_response: str, language: str = "en") -> List[Dict[int, Any]]: +async def get_conversation_citations(rag_prompt: str, accumulated_response: str, language: str = "en") -> List[Dict[str, Any]]: text_structuring_model_message = render_prompt("text_structuring_model_message", language, { 'accumulated_response': accumulated_response, @@ -189,18 +189,16 @@ async def get_conversation_citations(rag_prompt: str, accumulated_response: str, api_key=LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_KEY, response_format=CitationsSchema) try: - citations_dict = json.loads(text_structuring_model_generation.choices[0].message.content)#type: ignore - citations_list = citations_dict["citations"] + citations_by_segment_dict = json.loads(text_structuring_model_generation.choices[0].message.content) #type: ignore + citations_list = citations_by_segment_dict["citations"] + citations_by_conversation_dict: Dict[str, List[Dict[str, Any]]] = {"citations": []} if len(citations_list) > 0: - for idx, citation in enumerate(citations_list): + for _, citation in enumerate(citations_list): conversation_id = await run_segment_id_to_conversation_id(citation['segment_id']) - citations_list[idx]['conversation_id'] = conversation_id - conversation_name = get_conversation_name_from_id(conversation_id) - citations_list[idx]['conversation_name'] = conversation_name + current_citation_dict = {"conversation_id": conversation_id, "reference_text": citation['verbatim_reference_text_chunk']} + citations_by_conversation_dict["citations"].append(current_citation_dict) else: logger.warning("WARNING: No citations found") - citations_list = json.dumps(citations_list) except Exception as e: logger.warning(f"WARNING: Error in citation extraction. Skipping citations: {str(e)}") - citations_list = [] - return citations_list \ No newline at end of file + return [citations_by_conversation_dict] \ No newline at end of file From 91c2757799ebbe97ff50d0792770a6504a9af2ab Mon Sep 17 00:00:00 2001 From: Roy Date: Thu, 24 Apr 2025 09:44:06 +0000 Subject: [PATCH 03/13] Orphan segment IDs fixed --- .../audio_lightrag/utils/lightrag_utils.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py index 180a1759..f0631ca2 100644 --- a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py +++ b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py @@ -79,7 +79,7 @@ async def get_segment_from_conversation_chunk_ids(db: PostgreSQLDB, sql = SQL_TEMPLATES["GET_SEGMENT_IDS_FROM_CONVERSATION_CHUNK_IDS" ].format(conversation_ids=conversation_chunk_ids) result = await db.query(sql, multirows=True) - return [int(x['conversation_segment_id']) for x in result] + return [int(x['conversation_segment_id']) for x in result if x['conversation_segment_id'] is not None] async def get_segment_from_conversation_ids(db: PostgreSQLDB, conversation_ids: list[str]) -> list[int]: @@ -95,7 +95,7 @@ async def get_segment_from_conversation_ids(db: PostgreSQLDB, conversation_request["query"]["filter"] = {"id": {"_in": conversation_ids}} conversation_request_result = directus.get_items("conversation", conversation_request) conversation_chunk_ids = [[x['id'] for x in conversation_request_result_dict['chunks']] for conversation_request_result_dict in conversation_request_result] - flat_conversation_chunk_ids: list[str] = [item for sublist in conversation_chunk_ids for item in sublist] + flat_conversation_chunk_ids: list[str] = [item for sublist in conversation_chunk_ids for item in sublist if item is not None] return await get_segment_from_conversation_chunk_ids(db, flat_conversation_chunk_ids) async def get_segment_from_project_ids(db: PostgreSQLDB, @@ -106,7 +106,7 @@ async def get_segment_from_project_ids(db: PostgreSQLDB, project_request["query"]["filter"] = {"id": {"_in": project_ids}} project_request_result = directus.get_items("project", project_request) conversation_ids = [[x['id'] for x in project_request_result_dict['conversations']] for project_request_result_dict in project_request_result] - flat_conversation_ids: list[str] = [item for sublist in conversation_ids for item in sublist] + flat_conversation_ids: list[str] = [item for sublist in conversation_ids for item in sublist if item is not None] return await get_segment_from_conversation_ids(db, flat_conversation_ids) async def with_distributed_lock( @@ -247,10 +247,16 @@ async def get_ratio_abs(rag_prompt: str, segment2chunk = await run_segment_ids_to_conversation_chunk_ids(list(segment_ratios_abs.keys())) chunk_ratios_abs: Dict[str, float] = {} for segment,ratio in segment_ratios_abs.items(): - if segment2chunk[segment] not in chunk_ratios_abs.keys(): - chunk_ratios_abs[segment2chunk[segment]] = ratio - else: - chunk_ratios_abs[segment2chunk[segment]] += ratio + if segment in segment2chunk.keys(): + if segment2chunk[segment] not in chunk_ratios_abs.keys(): + chunk_ratios_abs[segment2chunk[segment]] = ratio + else: + chunk_ratios_abs[segment2chunk[segment]] += ratio + + #normalize chunk_ratios_abs + total_ratio = sum(chunk_ratios_abs.values()) + chunk_ratios_abs = {k:v/total_ratio for k,v in chunk_ratios_abs.items()} + if return_type == "chunk": return chunk_ratios_abs conversation_ratios_abs: Dict[str, float] = {} From a577f1fd2145de99149ab57433218fc7373add20 Mon Sep 17 00:00:00 2001 From: Roy Date: Thu, 24 Apr 2025 09:50:45 +0000 Subject: [PATCH 04/13] Orphan segment IDs fix for citation --- echo/server/dembrane/chat_utils.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index c2b77e90..4adcd0f5 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -19,7 +19,6 @@ from dembrane.api.conversation import get_conversation_transcript from dembrane.api.dependency_auth import DirectusSession from dembrane.audio_lightrag.utils.lightrag_utils import ( - get_conversation_name_from_id, run_segment_id_to_conversation_id, get_conversation_details_for_rag_query, ) @@ -190,11 +189,17 @@ async def get_conversation_citations(rag_prompt: str, accumulated_response: str, response_format=CitationsSchema) try: citations_by_segment_dict = json.loads(text_structuring_model_generation.choices[0].message.content) #type: ignore + logger.debug(f"Citations by segment dict: {citations_by_segment_dict}") citations_list = citations_by_segment_dict["citations"] + logger.debug(f"Citations list: {citations_list}") citations_by_conversation_dict: Dict[str, List[Dict[str, Any]]] = {"citations": []} if len(citations_list) > 0: for _, citation in enumerate(citations_list): - conversation_id = await run_segment_id_to_conversation_id(citation['segment_id']) + try: + conversation_id = await run_segment_id_to_conversation_id(citation['segment_id']) + except Exception as e: + logger.warning(f"WARNING: Error in citation extraction for segment {citation['segment_id']}. Skipping citations: {str(e)}") + continue current_citation_dict = {"conversation_id": conversation_id, "reference_text": citation['verbatim_reference_text_chunk']} citations_by_conversation_dict["citations"].append(current_citation_dict) else: From 9b01904540bd62afd303908b7917b928561984c1 Mon Sep 17 00:00:00 2001 From: Roy Date: Thu, 24 Apr 2025 10:52:02 +0000 Subject: [PATCH 05/13] Enhance logging and error handling in chat_utils and audio ETL pipeline - Added debug print statements in `get_conversation_references` and `get_conversation_details_for_rag_query` for improved traceability of conversation data. - Updated `run_etl.py` to load conversation UUID from environment variables, enhancing configurability for ETL execution. - Implemented a check in `get_ratio_abs` to return an empty dictionary if no segment ratios are found, improving error handling. --- echo/server/dembrane/audio_lightrag/main/run_etl.py | 8 +++++++- .../dembrane/audio_lightrag/utils/lightrag_utils.py | 4 ++++ echo/server/dembrane/chat_utils.py | 1 + 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/echo/server/dembrane/audio_lightrag/main/run_etl.py b/echo/server/dembrane/audio_lightrag/main/run_etl.py index 4d9adabf..f3566d66 100644 --- a/echo/server/dembrane/audio_lightrag/main/run_etl.py +++ b/echo/server/dembrane/audio_lightrag/main/run_etl.py @@ -77,5 +77,11 @@ def run_etl_pipeline(conv_id_list: list[str]) -> Optional[bool]: # Steps for manual run # cd server # python -m dembrane.audio_lightrag.main.run_etl - conv_id_list: list[str] = ['8f224582-5d1b-4d96-a450-a0bdb891dd28'] # Upload UUIDs + import os + + from dotenv import load_dotenv + load_dotenv() + + TEST_CONV_UUID = str(os.getenv("TEST_CONV_UUID")) + conv_id_list: list[str] = [TEST_CONV_UUID] run_etl_pipeline(conv_id_list) diff --git a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py index f0631ca2..93cb0814 100644 --- a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py +++ b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py @@ -242,6 +242,8 @@ def fetch_segment_ratios(response_text: str) -> dict[int, float]: async def get_ratio_abs(rag_prompt: str, return_type: Literal["segment", "chunk", "conversation"]) -> Dict[str, float]: segment_ratios_abs = fetch_segment_ratios(str(rag_prompt)) + if segment_ratios_abs == {}: + return {} if return_type == "segment": return {str(k):v for k,v in segment_ratios_abs.items()} segment2chunk = await run_segment_ids_to_conversation_chunk_ids(list(segment_ratios_abs.keys())) @@ -275,11 +277,13 @@ def get_project_id(proj_chat_id: str) -> str: async def get_conversation_details_for_rag_query(rag_prompt: str) -> dict[str, dict[str, Any]]: ratio_abs = await get_ratio_abs(rag_prompt, "conversation") + logger.debug(f'*** ratio_abs: {ratio_abs}') conversation_details_dict = {} for conversation_id,ratio in ratio_abs.items(): query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['participant_name']}} conversation_title = directus.get_items("conversation", query)[0]['participant_name'] conversation_details_dict[conversation_id] = {'ratio': ratio, 'conversation_title': conversation_title} + logger.debug(f'*** conversation_details_dict: {conversation_details_dict}') return conversation_details_dict TABLES = { diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index 4adcd0f5..aae62d24 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -157,6 +157,7 @@ async def get_lightrag_prompt_by_params(top_k: int, async def get_conversation_references(rag_prompt: str) -> List[Dict[str, Any]]: try: conversation_references = await get_conversation_details_for_rag_query(rag_prompt) + print('***', conversation_references) conversation_references = {'references': conversation_references} except Exception as e: logger.warning(f"No references found. Error: {str(e)}") From ed7053161e6b4a785ef4f871f0b7399c30bcb6e2 Mon Sep 17 00:00:00 2001 From: Usama Date: Fri, 25 Apr 2025 10:18:53 +0000 Subject: [PATCH 06/13] Refactor citation handling and conversation reference structure in chat_utils.py and chat.py - Updated get_conversation_references to return an empty list instead of a dictionary when no references are found, improving data consistency. - Enhanced get_conversation_citations to use a more descriptive key for conversation IDs, changing "conversation_id" to "conversation" for clarity. - Modified post_chat function to yield conversation references and citations with updated formatting, ensuring better compatibility with client-side applications. --- echo/server/dembrane/api/chat.py | 8 +++---- .../audio_lightrag/utils/lightrag_utils.py | 24 ++++++++++++------- echo/server/dembrane/chat_utils.py | 4 ++-- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index 97476cca..9d01f783 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -449,9 +449,8 @@ async def post_chat( db.commit() conversation_references = await get_conversation_references(rag_prompt) - async def stream_response_async() -> AsyncGenerator[str, None]: - conversation_references_yeild = f"2:{json.dumps(conversation_references)}\n" + conversation_references_yeild = f"h:{json.dumps(conversation_references)}\n" yield conversation_references_yeild accumulated_response = "" @@ -460,7 +459,8 @@ async def stream_response_async() -> AsyncGenerator[str, None]: model=LIGHTRAG_LITELLM_INFERENCE_MODEL, messages=formatted_messages, stream=True, - api_key=LIGHTRAG_LITELLM_INFERENCE_API_KEY + api_key=LIGHTRAG_LITELLM_INFERENCE_API_KEY, + # mock_response="It's simple to use and easy to get started", ) async for chunk in response: #type: ignore if chunk.choices[0].delta.content: @@ -484,7 +484,7 @@ async def stream_response_async() -> AsyncGenerator[str, None]: return # Stop generation on error citations_list = await get_conversation_citations(rag_prompt, accumulated_response) - citations_yeild = f"2:{json.dumps(citations_list)}\n" + citations_yeild = f"h:{json.dumps(citations_list)}\n" yield citations_yeild headers = {"Content-Type": "text/event-stream"} if protocol == "data": diff --git a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py index 93cb0814..f4dfcf99 100644 --- a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py +++ b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py @@ -275,16 +275,22 @@ def get_project_id(proj_chat_id: str) -> str: query = {'query': {'filter': {'id': {'_eq': proj_chat_id}},'fields': ['project_id']}} return directus.get_items("project_chat", query)[0]['project_id'] -async def get_conversation_details_for_rag_query(rag_prompt: str) -> dict[str, dict[str, Any]]: +async def get_conversation_details_for_rag_query(rag_prompt: str) -> list[dict[str, Any]]: ratio_abs = await get_ratio_abs(rag_prompt, "conversation") - logger.debug(f'*** ratio_abs: {ratio_abs}') - conversation_details_dict = {} - for conversation_id,ratio in ratio_abs.items(): - query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['participant_name']}} - conversation_title = directus.get_items("conversation", query)[0]['participant_name'] - conversation_details_dict[conversation_id] = {'ratio': ratio, 'conversation_title': conversation_title} - logger.debug(f'*** conversation_details_dict: {conversation_details_dict}') - return conversation_details_dict + # Take the first conversation since we want to flatten it + conversation_details = [] + if ratio_abs: + # conversation_id = next(iter(ratio_abs)) + for conversation_id, ratio in ratio_abs.items(): + # ratio = ratio_abs[conversation_id] + query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['participant_name']}} + conversation_title = directus.get_items("conversation", query)[0]['participant_name'] + conversation_details.append({ + 'conversation': conversation_id, + 'conversation_title': conversation_title, + 'ratio': ratio + }) + return conversation_details TABLES = { "LIGHTRAG_VDB_TRANSCRIPT": """ diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index aae62d24..1f9704cb 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -161,7 +161,7 @@ async def get_conversation_references(rag_prompt: str) -> List[Dict[str, Any]]: conversation_references = {'references': conversation_references} except Exception as e: logger.warning(f"No references found. Error: {str(e)}") - conversation_references = {'references':{}} + conversation_references = {'references':[]} return [conversation_references] class CitationSingleSchema(BaseModel): @@ -201,7 +201,7 @@ async def get_conversation_citations(rag_prompt: str, accumulated_response: str, except Exception as e: logger.warning(f"WARNING: Error in citation extraction for segment {citation['segment_id']}. Skipping citations: {str(e)}") continue - current_citation_dict = {"conversation_id": conversation_id, "reference_text": citation['verbatim_reference_text_chunk']} + current_citation_dict = {"conversation": conversation_id, "reference_text": citation['verbatim_reference_text_chunk']} citations_by_conversation_dict["citations"].append(current_citation_dict) else: logger.warning("WARNING: No citations found") From 01444632c9363188fc0b63df950ebdf98964157e Mon Sep 17 00:00:00 2001 From: Roy Date: Fri, 25 Apr 2025 12:22:42 +0000 Subject: [PATCH 07/13] Enhance conversation reference and citation handling in chat_utils and chat API - Updated get_conversation_references and get_conversation_citations to accept project_ids as parameters, improving filtering based on project context. - Introduced get_project_id_from_conversation_id function to retrieve project IDs associated with conversation IDs, enhancing data integrity. - Refactored post_chat function to utilize updated reference and citation methods, ensuring better compatibility with project-specific data handling. --- echo/server/dembrane/api/chat.py | 4 +-- .../audio_lightrag/utils/lightrag_utils.py | 31 ++++++++++--------- echo/server/dembrane/chat_utils.py | 16 +++++----- echo/server/dembrane/database.py | 4 +-- 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index 9d01f783..30992eaf 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -448,7 +448,7 @@ async def post_chat( db.add(dembrane_dummy_message) db.commit() - conversation_references = await get_conversation_references(rag_prompt) + conversation_references = await get_conversation_references(rag_prompt, [project_id]) async def stream_response_async() -> AsyncGenerator[str, None]: conversation_references_yeild = f"h:{json.dumps(conversation_references)}\n" yield conversation_references_yeild @@ -483,7 +483,7 @@ async def stream_response_async() -> AsyncGenerator[str, None]: yield "Error: An error occurred while processing the chat response." return # Stop generation on error - citations_list = await get_conversation_citations(rag_prompt, accumulated_response) + citations_list = await get_conversation_citations(rag_prompt, accumulated_response, [project_id]) citations_yeild = f"h:{json.dumps(citations_list)}\n" yield citations_yeild headers = {"Content-Type": "text/event-stream"} diff --git a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py index f4dfcf99..25e20411 100644 --- a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py +++ b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py @@ -41,6 +41,10 @@ def is_valid_uuid(uuid_str: str) -> bool: db_manager = PostgresDBManager() +def get_project_id_from_conversation_id(conversation_id: str) -> str: + query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['project_id']}} + return directus.get_items("conversation", query)[0]['project_id'] + def get_conversation_name_from_id(conversation_id: str) -> str: query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['participant_name']}} return directus.get_items("conversation", query)[0]['participant_name'] @@ -51,7 +55,6 @@ async def run_segment_id_to_conversation_id(segment_id: int) -> str: query = {'query': {'filter': {'id': {'_in': conversation_chunk_ids}},'fields': ['conversation_id']}} return directus.get_items("conversation_chunk", query)[0]['conversation_id'] - async def run_segment_ids_to_conversation_chunk_ids(segment_ids: list[int]) -> dict[int, str]: db = await db_manager.get_initialized_db() return await get_conversation_chunk_ids_from_segment_ids(db, segment_ids) @@ -275,21 +278,21 @@ def get_project_id(proj_chat_id: str) -> str: query = {'query': {'filter': {'id': {'_eq': proj_chat_id}},'fields': ['project_id']}} return directus.get_items("project_chat", query)[0]['project_id'] -async def get_conversation_details_for_rag_query(rag_prompt: str) -> list[dict[str, Any]]: +async def get_conversation_details_for_rag_query(rag_prompt: str, project_ids: list[str]) -> list[dict[str, Any]]: ratio_abs = await get_ratio_abs(rag_prompt, "conversation") - # Take the first conversation since we want to flatten it conversation_details = [] if ratio_abs: - # conversation_id = next(iter(ratio_abs)) for conversation_id, ratio in ratio_abs.items(): - # ratio = ratio_abs[conversation_id] - query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['participant_name']}} - conversation_title = directus.get_items("conversation", query)[0]['participant_name'] - conversation_details.append({ - 'conversation': conversation_id, - 'conversation_title': conversation_title, - 'ratio': ratio - }) + query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['participant_name', 'project_id']}} + response = directus.get_items("conversation", query)[0] + conversation_title = response['participant_name'] + response_project_id = response['project_id'] + if response_project_id in project_ids: + conversation_details.append({ + 'conversation': conversation_id, + 'conversation_title': conversation_title, + 'ratio': ratio + }) return conversation_details TABLES = { @@ -335,12 +338,12 @@ async def get_conversation_details_for_rag_query(rag_prompt: str) -> list[dict[s """, "GET_SEGMENT_IDS_FROM_CONVERSATION_CHUNK_IDS": """ - SELECT conversation_segment_id FROM conversation_segment_conversation_chunk_1 + SELECT conversation_segment_id FROM conversation_segment_conversation_chunk WHERE conversation_chunk_id = ANY(ARRAY[{conversation_ids}]) """, "GET_CONVERSATION_CHUNK_IDS_FROM_SEGMENT_IDS": """ - SELECT conversation_chunk_id, conversation_segment_id FROM conversation_segment_conversation_chunk_1 + SELECT conversation_chunk_id, conversation_segment_id FROM conversation_segment_conversation_chunk WHERE conversation_segment_id = ANY(ARRAY[{segment_ids}]) """ } \ No newline at end of file diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index 1f9704cb..311b6a46 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -20,6 +20,7 @@ from dembrane.api.dependency_auth import DirectusSession from dembrane.audio_lightrag.utils.lightrag_utils import ( run_segment_id_to_conversation_id, + get_project_id_from_conversation_id, get_conversation_details_for_rag_query, ) @@ -154,11 +155,10 @@ async def get_lightrag_prompt_by_params(top_k: int, return rag_prompt -async def get_conversation_references(rag_prompt: str) -> List[Dict[str, Any]]: +async def get_conversation_references(rag_prompt: str, project_ids: List[str]) -> List[Dict[str, Any]]: try: - conversation_references = await get_conversation_details_for_rag_query(rag_prompt) - print('***', conversation_references) - conversation_references = {'references': conversation_references} + references = await get_conversation_details_for_rag_query(rag_prompt, project_ids) + conversation_references = {'references': references} except Exception as e: logger.warning(f"No references found. Error: {str(e)}") conversation_references = {'references':[]} @@ -171,7 +171,7 @@ class CitationSingleSchema(BaseModel): class CitationsSchema(BaseModel): citations: List[CitationSingleSchema] -async def get_conversation_citations(rag_prompt: str, accumulated_response: str, language: str = "en") -> List[Dict[str, Any]]: +async def get_conversation_citations(rag_prompt: str, accumulated_response: str, project_ids: List[str],language: str = "en", ) -> List[Dict[str, Any]]: text_structuring_model_message = render_prompt("text_structuring_model_message", language, { 'accumulated_response': accumulated_response, @@ -198,11 +198,13 @@ async def get_conversation_citations(rag_prompt: str, accumulated_response: str, for _, citation in enumerate(citations_list): try: conversation_id = await run_segment_id_to_conversation_id(citation['segment_id']) + citation_project_id = get_project_id_from_conversation_id(conversation_id) except Exception as e: logger.warning(f"WARNING: Error in citation extraction for segment {citation['segment_id']}. Skipping citations: {str(e)}") continue - current_citation_dict = {"conversation": conversation_id, "reference_text": citation['verbatim_reference_text_chunk']} - citations_by_conversation_dict["citations"].append(current_citation_dict) + if citation_project_id in project_ids: + current_citation_dict = {"conversation": conversation_id, "reference_text": citation['verbatim_reference_text_chunk']} + citations_by_conversation_dict["citations"].append(current_citation_dict) else: logger.warning("WARNING: No citations found") except Exception as e: diff --git a/echo/server/dembrane/database.py b/echo/server/dembrane/database.py index 9e2c91f4..8a012e5e 100644 --- a/echo/server/dembrane/database.py +++ b/echo/server/dembrane/database.py @@ -280,8 +280,8 @@ class ProjectChatMessageModel(Base): secondary=project_chat_message_conversation_association_1_table, ) tokens_count: Mapped[int] = mapped_column(Integer) - conversation_references: Mapped[List[Dict[str, str]]] = mapped_column(JSONB, default=[]) - citations: Mapped[List[Dict[str, str]]] = mapped_column(JSONB, default=[]) + # conversation_references: Mapped[List[Dict[str, str]]] = mapped_column(JSONB, default=[]) + # citations: Mapped[List[Dict[str, str]]] = mapped_column(JSONB, default=[]) class ProjectChatModel(Base): From e14f59ed0277b9ab13e2e8e8fa8f2358715c6e11 Mon Sep 17 00:00:00 2001 From: Roy Date: Sat, 26 Apr 2025 08:45:12 +0000 Subject: [PATCH 08/13] Fix table name in audio ETL pipeline mapping - Updated the table name in the AudioETLPipeline from "conversation_segment_conversation_chunk_1" to "conversation_segment_conversation_chunk" for consistency and accuracy in data mapping. --- .../dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py b/echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py index 045e7415..c43186fe 100644 --- a/echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py +++ b/echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py @@ -79,7 +79,7 @@ def transform(self) -> None: "conversation_segment_id": segment_id, "conversation_chunk_id": chunk_id } - directus.create_item("conversation_segment_conversation_chunk_1", mapping_data) + directus.create_item("conversation_segment_conversation_chunk", mapping_data) chunk_id_2_segment.extend(chunk_id_2_segment_temp) except Exception as e: From 3bbc2cda7cd791cd6ac6d79a8bb4b10760f9e073 Mon Sep 17 00:00:00 2001 From: Roy Date: Mon, 28 Apr 2025 08:17:47 +0000 Subject: [PATCH 09/13] Update lightrag-dembrane dependency to version 1.2.7.4 and enhance logging in get_lightrag_prompt function - Upgraded lightrag-dembrane from version 1.2.7.1 to 1.2.7.4 in pyproject.toml and lock files for improved functionality. - Added a debug logging statement in the get_lightrag_prompt function to log the response, enhancing traceability during execution. --- echo/server/dembrane/api/stateless.py | 1 + echo/server/dembrane/audio_lightrag/utils/prompts.py | 3 ++- echo/server/pyproject.toml | 2 +- echo/server/requirements-dev.lock | 2 +- echo/server/requirements.lock | 2 +- 5 files changed, 6 insertions(+), 4 deletions(-) diff --git a/echo/server/dembrane/api/stateless.py b/echo/server/dembrane/api/stateless.py index 09176185..0a7f09af 100644 --- a/echo/server/dembrane/api/stateless.py +++ b/echo/server/dembrane/api/stateless.py @@ -222,6 +222,7 @@ async def get_lightrag_prompt(payload: GetLightragQueryRequest, ids= [str(id) for id in echo_segment_ids], top_k = payload.top_k) response = await rag.aquery(payload.query, param=param) + logger.debug(f"***Response: {response}") return response else: diff --git a/echo/server/dembrane/audio_lightrag/utils/prompts.py b/echo/server/dembrane/audio_lightrag/utils/prompts.py index 8aafceea..38d31f7b 100644 --- a/echo/server/dembrane/audio_lightrag/utils/prompts.py +++ b/echo/server/dembrane/audio_lightrag/utils/prompts.py @@ -12,7 +12,7 @@ def audio_model_system_prompt() -> str: # where every item is a different speaker's speech (speaker turn sperated list) # Task 2: CONTEXTUAL ANALYSIS -# - Analyze the conversation in excessive detail +# - Analyze the conversation in excessive detail, *providing a detailed analysis of the discussion in the conversation* # - Point out all the details and nuances of the conversation # - Break down in detail the different user's opinions throughout the conversation. # - Analyze in relation to: @@ -25,6 +25,7 @@ def audio_model_system_prompt() -> str: # • Named entity identification and explanation # • Acoustic details (background sounds, voice qualities, speaker's emotions) # • Conversational dynamics between masked speakers +# - At the bottom, add a section of topics, and add a list of keywords that are relevant to the conversation. # - Always provide the analysis in English (translate if source is non-English) # Output Format: diff --git a/echo/server/pyproject.toml b/echo/server/pyproject.toml index 6d517003..fbf6626e 100644 --- a/echo/server/pyproject.toml +++ b/echo/server/pyproject.toml @@ -47,7 +47,7 @@ dependencies = [ "types-python-jose>=3.3.4.20240106", "litellm==1.59.*", # Additional Dependencies - "lightrag-dembrane==1.2.7.1", + "lightrag-dembrane==1.2.7.4", # "lightrag-hku @ file:///workspaces/echo/LightRAG", "nest-asyncio==1.6.0", "pydantic==2.10.6", diff --git a/echo/server/requirements-dev.lock b/echo/server/requirements-dev.lock index e46230d4..3b5960ac 100644 --- a/echo/server/requirements-dev.lock +++ b/echo/server/requirements-dev.lock @@ -163,7 +163,7 @@ langsmith==0.1.59 # via langchain # via langchain-community # via langchain-core -lightrag-dembrane==1.2.7.1 +lightrag-dembrane==1.2.7.4 litellm==1.59.9 mako==1.3.5 # via alembic diff --git a/echo/server/requirements.lock b/echo/server/requirements.lock index e46230d4..3b5960ac 100644 --- a/echo/server/requirements.lock +++ b/echo/server/requirements.lock @@ -163,7 +163,7 @@ langsmith==0.1.59 # via langchain # via langchain-community # via langchain-core -lightrag-dembrane==1.2.7.1 +lightrag-dembrane==1.2.7.4 litellm==1.59.9 mako==1.3.5 # via alembic From d058290de9451ca3cd86d14a9999a8832efc59bb Mon Sep 17 00:00:00 2001 From: Roy Date: Mon, 28 Apr 2025 10:50:38 +0000 Subject: [PATCH 10/13] Update lightrag-dembrane dependency to version 1.2.7.6 in pyproject.toml and lock files - Upgraded lightrag-dembrane from version 1.2.7.4 to 1.2.7.6 for improved functionality and compatibility across the project. --- echo/server/pyproject.toml | 3 +-- echo/server/requirements-dev.lock | 2 +- echo/server/requirements.lock | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/echo/server/pyproject.toml b/echo/server/pyproject.toml index fbf6626e..d1216dc1 100644 --- a/echo/server/pyproject.toml +++ b/echo/server/pyproject.toml @@ -47,8 +47,7 @@ dependencies = [ "types-python-jose>=3.3.4.20240106", "litellm==1.59.*", # Additional Dependencies - "lightrag-dembrane==1.2.7.4", - # "lightrag-hku @ file:///workspaces/echo/LightRAG", + "lightrag-dembrane==1.2.7.6", "nest-asyncio==1.6.0", "pydantic==2.10.6", "pydub==0.25.1", diff --git a/echo/server/requirements-dev.lock b/echo/server/requirements-dev.lock index 3b5960ac..ae6fbb20 100644 --- a/echo/server/requirements-dev.lock +++ b/echo/server/requirements-dev.lock @@ -163,7 +163,7 @@ langsmith==0.1.59 # via langchain # via langchain-community # via langchain-core -lightrag-dembrane==1.2.7.4 +lightrag-dembrane==1.2.7.6 litellm==1.59.9 mako==1.3.5 # via alembic diff --git a/echo/server/requirements.lock b/echo/server/requirements.lock index 3b5960ac..ae6fbb20 100644 --- a/echo/server/requirements.lock +++ b/echo/server/requirements.lock @@ -163,7 +163,7 @@ langsmith==0.1.59 # via langchain # via langchain-community # via langchain-core -lightrag-dembrane==1.2.7.4 +lightrag-dembrane==1.2.7.6 litellm==1.59.9 mako==1.3.5 # via alembic From 44e09510d35cced913591d312c168a003d64c8d5 Mon Sep 17 00:00:00 2001 From: Roy Date: Mon, 28 Apr 2025 17:14:43 +0000 Subject: [PATCH 11/13] Add LiteLLM configuration documentation and refactor database management - Introduced a new documentation file for LiteLLM configurations, detailing required settings for various models including LLM, audio transcription, text structure, embedding, and inference models. - Refactored database management by implementing a singleton `PostgresDBManager` class to handle PostgreSQLDB initialization and access. - Updated the main application to utilize the new `PostgresDBManager` for database connections, enhancing modularity and maintainability. - Improved the RAGManager to ensure proper initialization of the LightRAG instance. - Enhanced audio processing utilities and prompts to support new configurations and improve functionality. --- echo/docs/litellm_config.md | 53 ++++++++++++++++ echo/server/dembrane/api/chat.py | 10 --- echo/server/dembrane/api/stateless.py | 4 +- .../contextual_chunk_etl_pipeline.py | 4 +- .../audio_lightrag/utils/lightrag_utils.py | 34 +++++++--- .../audio_lightrag/utils/litellm_utils.py | 3 +- .../dembrane/audio_lightrag/utils/prompts.py | 62 ++++++------------- echo/server/dembrane/config.py | 27 -------- echo/server/dembrane/main.py | 22 ++----- ...gresdbmanager.py => postgresdb_manager.py} | 0 .../dembrane/{rag.py => rag_manager.py} | 3 + .../audio_model_system_prompt.en.jinja | 40 ++++++++++++ ...t_structuring_model_system_prompt.en.jinja | 3 + 13 files changed, 155 insertions(+), 110 deletions(-) create mode 100644 echo/docs/litellm_config.md rename echo/server/dembrane/{postgresdbmanager.py => postgresdb_manager.py} (100%) rename echo/server/dembrane/{rag.py => rag_manager.py} (93%) create mode 100644 echo/server/prompt_templates/audio_model_system_prompt.en.jinja create mode 100644 echo/server/prompt_templates/text_structuring_model_system_prompt.en.jinja diff --git a/echo/docs/litellm_config.md b/echo/docs/litellm_config.md new file mode 100644 index 00000000..c7354ba2 --- /dev/null +++ b/echo/docs/litellm_config.md @@ -0,0 +1,53 @@ +# LiteLLM Configuration Documentation + +This document outlines all LiteLLM-related configurations and their explanations used in the system. + +## Main LLM Model +**LIGHTRAG_LITELLM_MODEL**: Used by lightrag to perform Named Entity Recognition (NER) and create the knowledge graph +- Required Configurations: + - `LIGHTRAG_LITELLM_MODEL`: Model identifier (e.g., azure/gpt-4o-mini) + - `LIGHTRAG_LITELLM_API_KEY`: API key for authentication + - `LIGHTRAG_LITELLM_API_VERSION`: API version + - `LIGHTRAG_LITELLM_API_BASE`: Base URL for the API + +## Audio Transcription Model +**LIGHTRAG_LITELLM_AUDIOMODEL_MODEL**: Used by audio-lightrag to convert input to transcript and generate contextual transcript +- Required Configurations: + - `LIGHTRAG_LITELLM_AUDIOMODEL_MODEL`: Model identifier (e.g., azure/whisper-large-v3) + - `LIGHTRAG_LITELLM_AUDIOMODEL_API_BASE`: Base URL for the audio model API + - `LIGHTRAG_LITELLM_AUDIOMODEL_API_KEY`: API key for authentication + - `LIGHTRAG_LITELLM_AUDIOMODEL_API_VERSION`: API version + +## Text Structure Model +**LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_MODEL**: Used to structure the output of the audio model into desired format +- Required Configurations: + - `LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_MODEL`: Model identifier (e.g., azure/gpt-4o-mini) + - `LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_BASE`: Base URL for the text structure model API + - `LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_KEY`: API key for authentication + - `LIGHTRAG_LITELLM_TEXTSTRUCTUREMODEL_API_VERSION`: API version + +## Embedding Model +**LIGHTRAG_LITELLM_EMBEDDING_MODEL**: Used by lightrag to create embeddings for text +- Required Configurations: + - `LIGHTRAG_LITELLM_EMBEDDING_MODEL`: Model identifier (e.g., azure/text-embedding-ada-002) + - `LIGHTRAG_LITELLM_EMBEDDING_API_BASE`: Base URL for the embedding model API + - `LIGHTRAG_LITELLM_EMBEDDING_API_KEY`: API key for authentication + - `LIGHTRAG_LITELLM_EMBEDDING_API_VERSION`: API version + +## Inference Model +**LIGHTRAG_LITELLM_INFERENCE_MODEL**: Used for responding to queries with auto-select capability +- Required Configurations: + - `LIGHTRAG_LITELLM_INFERENCE_MODEL`: Model identifier (default: anthropic/claude-3-5-sonnet-20240620) + - `LIGHTRAG_LITELLM_INFERENCE_API_KEY`: API key for authentication + +## Additional Audio LightRAG Configurations + +### Audio Processing Settings +- `AUDIO_LIGHTRAG_CONVERSATION_HISTORY_NUM`: Number of conversation history items to maintain (default: 10) +- `AUDIO_LIGHTRAG_TIME_THRESHOLD_SECONDS`: Time threshold for audio processing in seconds (default: 60) +- `AUDIO_LIGHTRAG_MAX_AUDIO_FILE_SIZE_MB`: Maximum allowed audio file size in MB (default: 15) +- `AUDIO_LIGHTRAG_TOP_K_PROMPT`: Top K value for prompt processing (default: 100) + +### Feature Flags +- `ENABLE_AUDIO_LIGHTRAG_INPUT`: Enable/disable audio input processing (default: false) +- `AUTO_SELECT_ENABLED`: Enable/disable auto-select feature (default: false) \ No newline at end of file diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index 30992eaf..1ff43bba 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -437,16 +437,6 @@ async def post_chat( messages=formatted_messages) if top_k <= 5: raise HTTPException(status_code=400, detail="Auto select is not possible with the current context length") - - dembrane_dummy_message = ProjectChatMessageModel( - id=generate_uuid(), - date_created=get_utc_timestamp(), - message_from="dembrane", - text="searched", - project_chat_id=chat_id, - ) - db.add(dembrane_dummy_message) - db.commit() conversation_references = await get_conversation_references(rag_prompt, [project_id]) async def stream_response_async() -> AsyncGenerator[str, None]: diff --git a/echo/server/dembrane/api/stateless.py b/echo/server/dembrane/api/stateless.py index 0a7f09af..41719cc7 100644 --- a/echo/server/dembrane/api/stateless.py +++ b/echo/server/dembrane/api/stateless.py @@ -7,9 +7,9 @@ from lightrag.lightrag import QueryParam from lightrag.kg.shared_storage import initialize_pipeline_status -from dembrane.rag import RAGManager, get_rag from dembrane.prompts import render_prompt -from dembrane.postgresdbmanager import PostgresDBManager +from dembrane.rag_manager import RAGManager, get_rag +from dembrane.postgresdb_manager import PostgresDBManager from dembrane.api.dependency_auth import DependencyDirectusSession from dembrane.audio_lightrag.utils.lightrag_utils import ( upsert_transcript, diff --git a/echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py b/echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py index 639e5043..2a8eb661 100644 --- a/echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py +++ b/echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py @@ -51,9 +51,7 @@ async def load(self) -> None: logger.exception(f"Error in getting contextual transcript : {e}") continue previous_contextual_transcript = '\n\n'.join(previous_contextual_transcript_li) - audio_model_prompt = Prompts.audio_model_system_prompt() - audio_model_prompt = audio_model_prompt.format(event_text = event_text, - previous_conversation_text = previous_contextual_transcript) + audio_model_prompt = Prompts.audio_model_system_prompt(event_text, previous_contextual_transcript) try: response = directus.get_item('conversation_segment', int(segment_id)) except Exception as e: diff --git a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py index 25e20411..9a8cb94b 100644 --- a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py +++ b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py @@ -1,4 +1,8 @@ -# import os +# Hierachy: +# Chunk is the lowest level +# Conversation is a collection of chunks +# Project is a collection of conversations +# Segment is a many to many of chunks import os import re import uuid @@ -6,12 +10,13 @@ import hashlib import logging from typing import Any, Dict, Literal, TypeVar, Callable, Optional +from urllib.parse import urlparse import redis from lightrag.kg.postgres_impl import PostgreSQLDB from dembrane.directus import directus -from dembrane.postgresdbmanager import PostgresDBManager +from dembrane.postgresdb_manager import PostgresDBManager from dembrane.audio_lightrag.utils.litellm_utils import embedding_func logger = logging.getLogger('audio_lightrag_utils') @@ -33,14 +38,29 @@ def is_valid_uuid(uuid_str: str) -> bool: except ValueError: return False -# Hierachy: -# Chunk is the lowest level -# Conversation is a collection of chunks -# Project is a collection of conversations -# Segment is a many to many of chunks + db_manager = PostgresDBManager() +def _load_postgres_env_vars(database_url: str) -> bool: + """Parse a database URL into connection parameters.""" + result = urlparse(database_url) + path = result.path + if path.startswith("/"): + path = path[1:] + userinfo = result.netloc.split("@")[0] if "@" in result.netloc else "" + username = userinfo.split(":")[0] if ":" in userinfo else userinfo + password = userinfo.split(":")[1] if ":" in userinfo else "" + host_part = result.netloc.split("@")[-1] + host = host_part.split(":")[0] if ":" in host_part else host_part + port = host_part.split(":")[1] if ":" in host_part else "5432" + os.environ["POSTGRES_HOST"] = host + os.environ["POSTGRES_PORT"] = port + os.environ["POSTGRES_USER"] = username + os.environ["POSTGRES_PASSWORD"] = password + os.environ["POSTGRES_DATABASE"] = path + return True + def get_project_id_from_conversation_id(conversation_id: str) -> str: query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['project_id']}} return directus.get_items("conversation", query)[0]['project_id'] diff --git a/echo/server/dembrane/audio_lightrag/utils/litellm_utils.py b/echo/server/dembrane/audio_lightrag/utils/litellm_utils.py index 2945468e..5f41aeed 100644 --- a/echo/server/dembrane/audio_lightrag/utils/litellm_utils.py +++ b/echo/server/dembrane/audio_lightrag/utils/litellm_utils.py @@ -32,6 +32,7 @@ class Transcriptions(BaseModel): def get_json_dict_from_audio(wav_encoding: str, audio_model_prompt: str, + language: str = "en" ) -> dict: # type: ignore audio_model_messages=[ { @@ -72,7 +73,7 @@ def get_json_dict_from_audio(wav_encoding: str, "content": [ { "type": "text", - "text": Prompts.text_structuring_model_system_prompt(), + "text": Prompts.text_structuring_model_system_prompt(language), } ] }, diff --git a/echo/server/dembrane/audio_lightrag/utils/prompts.py b/echo/server/dembrane/audio_lightrag/utils/prompts.py index 38d31f7b..aa3cd45d 100644 --- a/echo/server/dembrane/audio_lightrag/utils/prompts.py +++ b/echo/server/dembrane/audio_lightrag/utils/prompts.py @@ -1,48 +1,22 @@ -class Prompts: - @staticmethod - def audio_model_system_prompt() -> str: - return '''You are an expert audio transcriber and conversation analyst. Your task is to process audio conversations with high accuracy and provide detailed analysis. - -# Task 1: TRANSCRIPTION -# - Produce a verbatim transcription of the audio -# - Do not modify, interpret, or rename the speaker IDs -# - Maintain 100% accuracy in word capture -# - Include all audible speech elements -# - The trascription should be a comma seperated list of the verbatim speech, -# where every item is a different speaker's speech (speaker turn sperated list) +from dembrane.prompts import render_prompt -# Task 2: CONTEXTUAL ANALYSIS -# - Analyze the conversation in excessive detail, *providing a detailed analysis of the discussion in the conversation* -# - Point out all the details and nuances of the conversation -# - Break down in detail the different user's opinions throughout the conversation. -# - Analyze in relation to: -# • Previous conversation history -# • Event context -# • Speaker dynamics -# - Focus on: -# • In depth analysis of the different user's opinions throughout the conversation -# • Tone and sentiment analysis per masked speaker -# • Named entity identification and explanation -# • Acoustic details (background sounds, voice qualities, speaker's emotions) -# • Conversational dynamics between masked speakers -# - At the bottom, add a section of topics, and add a list of keywords that are relevant to the conversation. -# - Always provide the analysis in English (translate if source is non-English) -# Output Format: -# {{ -# "TRANSCRIPTS": ["","", ...], -# "CONTEXTUAL_TRANSCRIPT": "" -# }} - -# Context Information: -# EVENT CONTEXT: -# {event_text} +class Prompts: + @staticmethod + def audio_model_system_prompt(event_text: str, previous_conversation_text: str, language: str = "en") -> str: + return render_prompt( + "audio_model_system_prompt", + language, + { + "event_text": event_text, + "previous_conversation_text": previous_conversation_text, + }, + ) -# CONVERSATION HISTORY: -# {previous_conversation_text} -# ''' @staticmethod - def text_structuring_model_system_prompt() -> str: - return '''You are a text structuring assistant. - Extract all relevant text verbatim into the appropriate fields. - *Always provide CONTEXTUAL_TRANSCRIPT in English. Translate if necessary.*''' \ No newline at end of file + def text_structuring_model_system_prompt(language: str = "en") -> str: + return render_prompt( + "text_structuring_model_system_prompt", + language, + {} + ) \ No newline at end of file diff --git a/echo/server/dembrane/config.py b/echo/server/dembrane/config.py index 60eb2b82..e0fca28e 100644 --- a/echo/server/dembrane/config.py +++ b/echo/server/dembrane/config.py @@ -117,13 +117,6 @@ NEO4J_PASSWORD = os.environ.get("NEO4J_PASSWORD", "admin@dembrane") logger.debug("NEO4J_PASSWORD: set") -AZURE_OPENAI_API_KEY = os.environ.get("AZURE_OPENAI_API_KEY") -assert AZURE_OPENAI_API_KEY, "AZURE_OPENAI_API_KEY environment variable is not set" -logger.debug("AZURE_OPENAI_API_KEY: set") -AZURE_OPENAI_API_VERSION = os.environ.get("AZURE_OPENAI_API_VERSION") -assert AZURE_OPENAI_API_VERSION, "AZURE_OPENAI_API_VERSION environment variable is not set" -logger.debug("AZURE_OPENAI_API_VERSION: set") - STORAGE_S3_BUCKET = os.environ.get("STORAGE_S3_BUCKET") assert STORAGE_S3_BUCKET, "STORAGE_S3_BUCKET environment variable is not set" logger.debug("STORAGE_S3_BUCKET: set") @@ -231,26 +224,6 @@ LIGHTRAG_LITELLM_INFERENCE_API_KEY = os.environ.get("LIGHTRAG_LITELLM_INFERENCE_API_KEY") assert LIGHTRAG_LITELLM_INFERENCE_API_KEY, "LIGHTRAG_LITELLM_INFERENCE_API_KEY environment variable is not set" logger.debug("LIGHTRAG_LITELLM_INFERENCE_API_KEY: set") - -POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "postgres") -assert POSTGRES_HOST, "POSTGRES_HOST environment variable is not set" -logger.debug("POSTGRES_HOST: set") - -POSTGRES_PORT = os.environ.get("POSTGRES_PORT", 5432) -assert POSTGRES_PORT, "POSTGRES_PORT environment variable is not set" -logger.debug("POSTGRES_PORT: set") - -POSTGRES_USER = os.environ.get("POSTGRES_USER", "dembrane") -assert POSTGRES_USER, "POSTGRES_USER environment variable is not set" -logger.debug("POSTGRES_USER: set") - -POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "dembrane") -assert POSTGRES_PASSWORD, "POSTGRES_PASSWORD environment variable is not set" -logger.debug("POSTGRES_PASSWORD: set") - -POSTGRES_DATABASE = os.environ.get("POSTGRES_DATABASE", "dembrane") -assert POSTGRES_DATABASE, "POSTGRES_DATABASE environment variable is not set" -logger.debug("POSTGRES_DATABASE: set") #---------------/Secrets--------------- diff --git a/echo/server/dembrane/main.py b/echo/server/dembrane/main.py index db9f6ea3..e75d410f 100644 --- a/echo/server/dembrane/main.py +++ b/echo/server/dembrane/main.py @@ -14,29 +14,26 @@ from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.middleware import Middleware from fastapi.openapi.utils import get_openapi -from lightrag.kg.postgres_impl import PostgreSQLDB from starlette.middleware.cors import CORSMiddleware from lightrag.kg.shared_storage import initialize_pipeline_status from dembrane.config import ( REDIS_URL, + DATABASE_URL, DISABLE_CORS, - POSTGRES_HOST, - POSTGRES_PORT, - POSTGRES_USER, ADMIN_BASE_URL, SERVE_API_DOCS, - POSTGRES_DATABASE, - POSTGRES_PASSWORD, PARTICIPANT_BASE_URL, ) from dembrane.sentry import init_sentry from dembrane.api.api import api +from dembrane.postgresdb_manager import PostgresDBManager # from lightrag.llm.azure_openai import azure_openai_complete from dembrane.audio_lightrag.utils.litellm_utils import embedding_func, llm_model_func from dembrane.audio_lightrag.utils.lightrag_utils import ( with_distributed_lock, + _load_postgres_env_vars, check_audio_lightrag_tables, ) @@ -48,18 +45,11 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # startup logger.info("starting server") - init_sentry() + init_sentry() # Initialize PostgreSQL and LightRAG - postgres_config = { - "host": POSTGRES_HOST, - "port": POSTGRES_PORT, - "user": POSTGRES_USER, - "password": POSTGRES_PASSWORD, - "database": POSTGRES_DATABASE, - } - - postgres_db = PostgreSQLDB(config=postgres_config) + _load_postgres_env_vars(str(DATABASE_URL)) + postgres_db = await PostgresDBManager.get_initialized_db() # Define the critical initialization operation async def initialize_database() -> bool: diff --git a/echo/server/dembrane/postgresdbmanager.py b/echo/server/dembrane/postgresdb_manager.py similarity index 100% rename from echo/server/dembrane/postgresdbmanager.py rename to echo/server/dembrane/postgresdb_manager.py diff --git a/echo/server/dembrane/rag.py b/echo/server/dembrane/rag_manager.py similarity index 93% rename from echo/server/dembrane/rag.py rename to echo/server/dembrane/rag_manager.py index dc0c7a51..c1097315 100644 --- a/echo/server/dembrane/rag.py +++ b/echo/server/dembrane/rag_manager.py @@ -3,8 +3,11 @@ from lightrag import LightRAG +from dembrane.config import DATABASE_URL from dembrane.audio_lightrag.utils.litellm_utils import embedding_func, llm_model_func +from dembrane.audio_lightrag.utils.lightrag_utils import _load_postgres_env_vars +_load_postgres_env_vars(str(DATABASE_URL)) logger = getLogger(__name__) class RAGManager: diff --git a/echo/server/prompt_templates/audio_model_system_prompt.en.jinja b/echo/server/prompt_templates/audio_model_system_prompt.en.jinja new file mode 100644 index 00000000..cd7647c5 --- /dev/null +++ b/echo/server/prompt_templates/audio_model_system_prompt.en.jinja @@ -0,0 +1,40 @@ +You are an expert audio transcriber and conversation analyst. Your task is to process audio conversations with high accuracy and provide detailed analysis. + +# Task 1: TRANSCRIPTION +# - Produce a verbatim transcription of the audio +# - Do not modify, interpret, or rename the speaker IDs +# - Maintain 100% accuracy in word capture +# - Include all audible speech elements +# - The trascription should be a comma seperated list of the verbatim speech, +# where every item is a different speaker's speech (speaker turn sperated list) + +# Task 2: CONTEXTUAL ANALYSIS +# - *Provide a detailed summary of the discussion in the conversation, making sure to mention every detail in a report format* +# - Analyze the conversation in excessive detail +# - Point out all the details and nuances of the conversation +# - Break down in detail the different user's opinions throughout the conversation. +# - Analyze in relation to: +# • Previous conversation history +# • Event context +# • Speaker dynamics +# - Focus on: +# • In depth analysis of the different user's opinions throughout the conversation +# • Tone and sentiment analysis per masked speaker +# • Named entity identification and explanation +# • Acoustic details (background sounds, voice qualities, speaker's emotions) +# • Conversational dynamics between masked speakers +# - At the bottom, add a section of topics, and add a list of keywords that are relevant to the conversation. +# - Always provide the analysis in English (translate if source is non-English) + +# Output Format: +{ + "TRANSCRIPTS": ["","", ...], + "CONTEXTUAL_TRANSCRIPT": "" +} + +# Context Information: +# EVENT CONTEXT: +# {{ event_text }} + +# CONVERSATION HISTORY: +# {{ previous_conversation_text }} diff --git a/echo/server/prompt_templates/text_structuring_model_system_prompt.en.jinja b/echo/server/prompt_templates/text_structuring_model_system_prompt.en.jinja new file mode 100644 index 00000000..51ffa686 --- /dev/null +++ b/echo/server/prompt_templates/text_structuring_model_system_prompt.en.jinja @@ -0,0 +1,3 @@ +You are a text structuring assistant. +Extract all relevant text verbatim into the appropriate fields. +*Always provide CONTEXTUAL_TRANSCRIPT in English. Translate if necessary.* From d863b73aaaf309b2a592b4d091abb36821455596 Mon Sep 17 00:00:00 2001 From: Roy Date: Tue, 29 Apr 2025 07:12:50 +0000 Subject: [PATCH 12/13] Refactor database.py to remove unused imports - Removed the unused JSONB import from sqlalchemy.dialects.postgresql, streamlining the code and improving clarity. - Updated comments to reflect the current state of the database management approach. --- echo/server/dembrane/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/echo/server/dembrane/database.py b/echo/server/dembrane/database.py index 8a012e5e..1b206989 100644 --- a/echo/server/dembrane/database.py +++ b/echo/server/dembrane/database.py @@ -1,6 +1,6 @@ # this is not upto date. switched to directus for a better life from enum import Enum -from typing import Any, Dict, List, Optional, Annotated, Generator +from typing import Any, List, Optional, Annotated, Generator from logging import getLogger from datetime import datetime, timezone @@ -28,7 +28,7 @@ declarative_base, ) from pgvector.sqlalchemy import Vector # type: ignore -from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.dialects.postgresql import UUID from dembrane.config import DATABASE_URL from dembrane.embedding import EMBEDDING_DIM From 2dd808fe2797f39fd21734d2a621038d00c6b100 Mon Sep 17 00:00:00 2001 From: Roy Date: Tue, 29 Apr 2025 07:57:32 +0000 Subject: [PATCH 13/13] Refactor lightrag_utils.py for improved functionality and clarity - Changed the return type of _load_postgres_env_vars from bool to None, reflecting its purpose of setting environment variables without returning a value. - Enhanced get_conversation_details_for_rag_query by bulk fetching conversation metadata, reducing redundant API calls and improving performance. - Added a check in get_ratio_abs to return an empty dictionary if no relevant chunks are found, enhancing error handling. --- .../audio_lightrag/utils/lightrag_utils.py | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py index 9a8cb94b..13990c66 100644 --- a/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py +++ b/echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py @@ -42,7 +42,7 @@ def is_valid_uuid(uuid_str: str) -> bool: db_manager = PostgresDBManager() -def _load_postgres_env_vars(database_url: str) -> bool: +def _load_postgres_env_vars(database_url: str) -> None: """Parse a database URL into connection parameters.""" result = urlparse(database_url) path = result.path @@ -59,7 +59,6 @@ def _load_postgres_env_vars(database_url: str) -> bool: os.environ["POSTGRES_USER"] = username os.environ["POSTGRES_PASSWORD"] = password os.environ["POSTGRES_DATABASE"] = path - return True def get_project_id_from_conversation_id(conversation_id: str) -> str: query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['project_id']}} @@ -280,6 +279,9 @@ async def get_ratio_abs(rag_prompt: str, #normalize chunk_ratios_abs total_ratio = sum(chunk_ratios_abs.values()) + if total_ratio == 0: + # 0 ratio means no relevant chunks were found + return {} chunk_ratios_abs = {k:v/total_ratio for k,v in chunk_ratios_abs.items()} if return_type == "chunk": @@ -302,17 +304,21 @@ async def get_conversation_details_for_rag_query(rag_prompt: str, project_ids: l ratio_abs = await get_ratio_abs(rag_prompt, "conversation") conversation_details = [] if ratio_abs: + # Bulk fetch conversation metadata + conv_meta = {c["id"]: c for c in directus.get_items( + "conversation", + {"query": {"filter": {"id": {"_in": list(ratio_abs.keys())}}, + "fields": ["id", "participant_name", "project_id"]}} + )} for conversation_id, ratio in ratio_abs.items(): - query = {'query': {'filter': {'id': {'_eq': conversation_id}},'fields': ['participant_name', 'project_id']}} - response = directus.get_items("conversation", query)[0] - conversation_title = response['participant_name'] - response_project_id = response['project_id'] - if response_project_id in project_ids: - conversation_details.append({ - 'conversation': conversation_id, - 'conversation_title': conversation_title, - 'ratio': ratio - }) + meta = conv_meta.get(conversation_id) + if not meta or meta["project_id"] not in project_ids: + continue + conversation_details.append({ + "conversation": conversation_id, + "conversation_title": meta["participant_name"], + "ratio": ratio + }) return conversation_details TABLES = {