From db776c6b33ac00b90524f7cfd126f7852f2f6009 Mon Sep 17 00:00:00 2001 From: Usama Date: Tue, 30 Sep 2025 08:48:51 +0000 Subject: [PATCH 01/13] - placeholder auto select --- .../conversation/ConversationAccordion.tsx | 1 - .../routes/project/chat/ProjectChatRoute.tsx | 3 + echo/server/dembrane/api/chat.py | 113 +++++++++++++----- echo/server/dembrane/chat_utils.py | 57 +++++++++ 4 files changed, 140 insertions(+), 34 deletions(-) diff --git a/echo/frontend/src/components/conversation/ConversationAccordion.tsx b/echo/frontend/src/components/conversation/ConversationAccordion.tsx index 1af7faad..9be9788d 100644 --- a/echo/frontend/src/components/conversation/ConversationAccordion.tsx +++ b/echo/frontend/src/components/conversation/ConversationAccordion.tsx @@ -498,7 +498,6 @@ const ConversationAccordionItem = ({ "!bg-primary-50": isLocked, })} rightSection={ - (!ENABLE_CHAT_AUTO_SELECT || !isAutoSelectEnabled) && inChatMode && ( { onResponse: async (_response) => { setShowProgress(false); setProgressValue(0); + if (ENABLE_CHAT_AUTO_SELECT && contextToBeAdded?.auto_select_bool) { + chatContextQuery.refetch(); + } }, onFinish: async (message) => { // this uses the response stream from the backend and makes a chat message IN THE FRONTEND diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index 2f194837..5cf4a0a1 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -33,9 +33,7 @@ MAX_CHAT_CONTEXT_LENGTH, generate_title, get_project_chat_history, - get_conversation_citations, - get_conversation_references, - get_lightrag_prompt_by_params, + auto_select_conversations, create_system_messages_for_chat, ) from dembrane.quote_utils import count_tokens @@ -377,25 +375,20 @@ async def post_chat( ) -> StreamingResponse: # ignore: type """ Handle a chat interaction: persist the user's message, optionally generate a title, and stream an LLM-generated response. - This endpoint records the incoming user message into the chat, may asynchronously generate and persist a chat title if missing, and then produces a streaming response from the configured LLM. Two generation modes are supported: - Auto-select (when enabled for the chat): builds a RAG prompt, retrieves conversation references and citations, and streams the model output. - Manual-select: builds system messages from locked conversations and streams the model output. - Side effects: - Persists a new ProjectChatMessageModel for the user message. - May update the chat name and the message's template key. - On generation failure the in-flight user message is deleted. - Parameters: - chat_id: ID of the target chat (used to validate access and load context). - body: ChatBodySchema containing the messages (the last user message is used as the prompt) and optional template_key. - protocol: Response protocol; "data" (default) yields structured data frames, "text" yields raw text chunks. - language: Language code used for title generation and system message creation. - Returns: - StreamingResponse that yields streamed model content and, in auto-select mode, header payloads containing conversation references and citations. - Raises: - HTTPException: 404 if the chat (or required conversation data) is not found; 400 when auto-select cannot satisfy context-length constraints or request validation fails. """ @@ -465,38 +458,98 @@ async def post_chat( and filtered_messages[-2]["content"] == filtered_messages[-1]["content"] ): filtered_messages = filtered_messages[:-1] + top_k = AUDIO_LIGHTRAG_TOP_K_PROMPT prompt_len = float("inf") + query = filtered_messages[-1]["content"] + conversation_history = filtered_messages + + # Track conversations added by auto-select (defined outside loop) + all_conversations_added = [] + while MAX_CHAT_CONTEXT_LENGTH < prompt_len: formatted_messages = [] top_k = max(5, top_k - 10) - query = filtered_messages[-1]["content"] - conversation_history = filtered_messages - rag_prompt = await get_lightrag_prompt_by_params( - query=query, - conversation_history=conversation_history, - echo_conversation_ids=chat_context.conversation_id_list, - echo_project_ids=[project_id], - auto_select_bool=chat_context.auto_select_bool, - get_transcripts=True, - top_k=top_k, + + # Call dummy auto-select function (replaces get_lightrag_prompt_by_params) + user_query_inputs = [query] + + logger.info(f"Calling auto_select_conversations with query: {query}, top_k: {top_k}") + auto_select_result = await auto_select_conversations( + user_query_inputs=user_query_inputs, + project_id_list=[project_id], + db=db, + ) + + logger.info(f"Auto-select result: {auto_select_result}") + + # Extract selected conversation IDs + selected_conversation_ids = [] + if "results" in auto_select_result: + for proj_result in auto_select_result["results"].values(): + if "conversation_id_list" in proj_result: + selected_conversation_ids.extend(proj_result["conversation_id_list"]) + + # Add selected conversations to chat context + conversations_added = [] + for conversation_id in selected_conversation_ids: + conversation = db.get(ConversationModel, conversation_id) + if conversation and conversation not in chat.used_conversations: + chat.used_conversations.append(conversation) + conversations_added.append(conversation) + all_conversations_added.append(conversation) # Track all added + + if conversations_added: + db.commit() + logger.info( + f"Added {len(conversations_added)} conversations via auto-select (not locked)" + ) + + # Get updated chat context + updated_chat_context = await get_chat_context(chat_id, db, auth) + updated_conversation_id_list = updated_chat_context.conversation_id_list + + # Build system messages from the selected conversations + system_messages = await create_system_messages_for_chat( + updated_conversation_id_list, db, language, project_id ) - logger.info(f"rag_prompt: {rag_prompt}") - formatted_messages.append({"role": "system", "content": rag_prompt}) - formatted_messages.append({"role": "user", "content": filtered_messages[-1]["content"]}) + + # Build messages to send + if isinstance(system_messages, list): + for msg in system_messages: + formatted_messages.append({"role": "system", "content": msg["text"]}) + formatted_messages.extend(conversation_history) + else: + formatted_messages = [ + {"role": "system", "content": system_messages} + ] + conversation_history + + # Check context length prompt_len = token_counter( model=LIGHTRAG_LITELLM_INFERENCE_MODEL, messages=formatted_messages ) + if top_k <= 5: raise HTTPException( status_code=400, detail="Auto select is not possible with the current context length", ) - conversation_references = await get_conversation_references(rag_prompt, [project_id]) + # Build references list from all conversations added during auto-select + conversation_references = {"references": []} + for conv in all_conversations_added: + conversation_references["references"].append( + { + "conversation": conv.id, + "conversation_title": conv.participant_name, + } + ) + + logger.info(f"Selected conversations for frontend: {conversation_references}") async def stream_response_async_autoselect() -> AsyncGenerator[str, None]: - conversation_references_yeild = f"h:{json.dumps(conversation_references)}\n" + # Send conversation references (selected conversations) + conversation_references_yeild = f"h:{json.dumps([conversation_references])}\n" yield conversation_references_yeild accumulated_response = "" @@ -531,12 +584,6 @@ async def stream_response_async_autoselect() -> AsyncGenerator[str, None]: yield "Error: An error occurred while processing the chat response." return # Stop generation on error - citations_list = await get_conversation_citations( - rag_prompt, accumulated_response, [project_id] - ) - citations_yeild = f"h:{json.dumps(citations_list)}\n" - yield citations_yeild - headers = {"Content-Type": "text/event-stream"} if protocol == "data": headers["x-vercel-ai-data-stream"] = "v1" @@ -550,17 +597,17 @@ async def stream_response_async_autoselect() -> AsyncGenerator[str, None]: async def stream_response_async_manualselect() -> AsyncGenerator[str, None]: """ Asynchronously stream a model-generated assistant response for the manual-selection chat path. - + Builds the outgoing message sequence by combining provided system messages (list or string) with recent user/assistant messages, removes a duplicated trailing user message if present, then calls the Litellm streaming completion API and yields text chunks as they arrive. - + Yields: - If protocol == "text": successive raw text fragments from the model. - If protocol == "data": framed data lines of the form `0:` for each fragment. - On generation error: a single error payload matching the active protocol (`"Error: ..." ` for text, or `3:"..."` for data). - + Side effects: - On an exception during generation, deletes the in-flight `user_message` from the database and commits the change. - + Notes: - Expects surrounding scope variables: `messages`, `system_messages`, `litellm`, model/API constants, `protocol`, `user_message`, and `logger`. - Returns when the stream completes. diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index d7a1d18a..dab60f87 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -242,6 +242,63 @@ async def generate_title( return response.choices[0].message.content +async def auto_select_conversations( + user_query_inputs: List[str], + project_id_list: List[str], + db: Session, +) -> Dict[str, Any]: + """ + Dummy function for auto-selecting conversations based on user queries. + + This is a placeholder implementation that randomly selects up to 3 conversations + from the given project. In the future, this will use RAG/semantic search to find + the most relevant conversations for the given queries. + + Args: + user_query_inputs: List of user query strings (currently up to 3) + project_id_list: List containing a single project ID + db: Database session + + Returns: + Dictionary with structure: + { + "results": { + "": { + "conversation_id_list": [] + } + } + } + """ + import random + + logger.info(f"Auto-select called with queries: {user_query_inputs}") + logger.info(f"Auto-select called for project(s): {project_id_list}") + + results = {} + + for project_id in project_id_list: + # Get all conversations for this project + conversations = ( + db.query(ConversationModel).filter(ConversationModel.project_id == project_id).all() + ) + + if not conversations: + logger.warning(f"No conversations found for project {project_id}") + results[project_id] = {"conversation_id_list": []} + continue + + # Randomly select up to 3 conversations + num_to_select = min(3, len(conversations)) + selected_conversations = random.sample(conversations, num_to_select) + conversation_ids = [conv.id for conv in selected_conversations] + + logger.info(f"Selected {len(conversation_ids)} conversations: {conversation_ids}") + + results[project_id] = {"conversation_id_list": conversation_ids} + + return {"results": results} + + async def get_conversation_citations( rag_prompt: str, accumulated_response: str, From 639ac45ddcd5e4598e9806cb59dd87a450631bb4 Mon Sep 17 00:00:00 2001 From: Usama Date: Tue, 30 Sep 2025 09:48:39 +0000 Subject: [PATCH 02/13] - logic update --- echo/frontend/src/components/chat/Sources.tsx | 2 +- echo/server/dembrane/api/chat.py | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/echo/frontend/src/components/chat/Sources.tsx b/echo/frontend/src/components/chat/Sources.tsx index 327ddacc..55fe1811 100644 --- a/echo/frontend/src/components/chat/Sources.tsx +++ b/echo/frontend/src/components/chat/Sources.tsx @@ -18,7 +18,7 @@ export const Sources = ({ - Citing the following sources + The following conversations were automatically added to the context diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index 5cf4a0a1..95022912 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -499,11 +499,19 @@ async def post_chat( conversations_added.append(conversation) all_conversations_added.append(conversation) # Track all added + # Create a message to lock the auto-selected conversations if conversations_added: - db.commit() - logger.info( - f"Added {len(conversations_added)} conversations via auto-select (not locked)" + auto_select_message = ProjectChatMessageModel( + id=generate_uuid(), + date_created=get_utc_timestamp(), + message_from="dembrane", + text=f"Auto-selected and added {len(conversations_added)} conversations as context to the chat.", + project_chat_id=chat_id, + used_conversations=conversations_added, ) + db.add(auto_select_message) + db.commit() + logger.info(f"Added {len(conversations_added)} conversations via auto-select") # Get updated chat context updated_chat_context = await get_chat_context(chat_id, db, auth) From 4ae0c7a469346f87b6f3a922aa2a8d3ff956f3fd Mon Sep 17 00:00:00 2001 From: Usama Date: Tue, 30 Sep 2025 13:57:38 +0000 Subject: [PATCH 03/13] - remove processing message from conversations --- .../conversation/AutoSelectConversations.tsx | 25 ------------------- .../conversation/ConversationAccordion.tsx | 22 ---------------- 2 files changed, 47 deletions(-) diff --git a/echo/frontend/src/components/conversation/AutoSelectConversations.tsx b/echo/frontend/src/components/conversation/AutoSelectConversations.tsx index edb9b6c2..251c457d 100644 --- a/echo/frontend/src/components/conversation/AutoSelectConversations.tsx +++ b/echo/frontend/src/components/conversation/AutoSelectConversations.tsx @@ -58,15 +58,6 @@ export const AutoSelectConversations = () => { (conversation) => conversation.is_audio_processing_finished, ); - console.log(hasProcessedConversations, conversations); - - // Show warning if feature is available but no conversations are processed - const showProcessingWarning = - !isDisabled && - conversations && - conversations.length > 0 && - !hasProcessedConversations; - const handleCheckboxChange = (checked: boolean) => { if (isDisabled) { return; @@ -177,22 +168,6 @@ export const AutoSelectConversations = () => { /> - {showProcessingWarning && ( - } - title={Audio Processing In Progress} - className="border-t border-yellow-200 bg-yellow-50 p-3" - > - - - Some conversations are still being processed. Auto-select will - work optimally once audio processing is complete. - - - - )} - {isDisabled && ( diff --git a/echo/frontend/src/components/conversation/ConversationAccordion.tsx b/echo/frontend/src/components/conversation/ConversationAccordion.tsx index 9be9788d..7236cc44 100644 --- a/echo/frontend/src/components/conversation/ConversationAccordion.tsx +++ b/echo/frontend/src/components/conversation/ConversationAccordion.tsx @@ -400,28 +400,6 @@ export const ConversationStatusIndicators = ({ )} - {!!project?.is_enhanced_audio_processing_enabled && - // if processing still - // don't show this if both is_finished and is_audio_processing_finished are true - // but if project.is_enhanced_audio_processing_enabled is true, just see the is_finished - !( - conversation.is_finished && conversation.is_audio_processing_finished - ) && ( - - - - Processing - - - - - )} - {hasOnlyTextContent && ( Text From b8f80c1578e12125e5cec863866f4d5d3942ee3a Mon Sep 17 00:00:00 2001 From: Usama Date: Wed, 1 Oct 2025 13:52:10 +0000 Subject: [PATCH 04/13] - first level implementation for 'smart auto select' with batching --- echo/server/dembrane/api/chat.py | 3 +- echo/server/dembrane/chat_utils.py | 155 +++++++++++++++--- .../auto_select_conversations.de.jinja | 34 ++++ .../auto_select_conversations.en.jinja | 33 ++++ .../auto_select_conversations.es.jinja | 34 ++++ .../auto_select_conversations.fr.jinja | 34 ++++ .../auto_select_conversations.nl.jinja | 34 ++++ 7 files changed, 306 insertions(+), 21 deletions(-) create mode 100644 echo/server/prompt_templates/auto_select_conversations.de.jinja create mode 100644 echo/server/prompt_templates/auto_select_conversations.en.jinja create mode 100644 echo/server/prompt_templates/auto_select_conversations.es.jinja create mode 100644 echo/server/prompt_templates/auto_select_conversations.fr.jinja create mode 100644 echo/server/prompt_templates/auto_select_conversations.nl.jinja diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index 95022912..e0823d26 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -479,6 +479,7 @@ async def post_chat( user_query_inputs=user_query_inputs, project_id_list=[project_id], db=db, + language=language, ) logger.info(f"Auto-select result: {auto_select_result}") @@ -544,7 +545,7 @@ async def post_chat( ) # Build references list from all conversations added during auto-select - conversation_references = {"references": []} + conversation_references: dict[str, list[dict[str, str]]] = {"references": []} for conv in all_conversations_added: conversation_references["references"].append( { diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index dab60f87..ca85f8b1 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -1,10 +1,14 @@ import json +import math import logging from typing import Any, Dict, List, Optional from litellm import completion, acompletion from pydantic import BaseModel -from sqlalchemy.orm import Session +from sqlalchemy.orm import ( + Session, + selectinload, +) from dembrane.config import ( SMALL_LITELLM_MODEL, @@ -205,13 +209,13 @@ async def generate_title( ) -> str | None: """ Generate a short chat title from a user's query using a small LLM. - + If title generation is disabled via configuration or the trimmed query is shorter than 2 characters, the function returns None. The function builds a prompt (using the English prompt template) and asynchronously calls a configured small LLM; it returns the generated title string or None if the model returns no content. - + Parameters: user_query (str): The user's chat message or query to generate a title from. language (str): Target language for the generated title (affects prompt content; the prompt template used is English). - + Returns: str | None: The generated title, or None if generation is disabled, the query is too short, or the model produced no content. """ @@ -246,18 +250,20 @@ async def auto_select_conversations( user_query_inputs: List[str], project_id_list: List[str], db: Session, + language: str = "en", ) -> Dict[str, Any]: """ - Dummy function for auto-selecting conversations based on user queries. + Auto-select conversations based on user queries using LLM-based relevance assessment. - This is a placeholder implementation that randomly selects up to 3 conversations - from the given project. In the future, this will use RAG/semantic search to find - the most relevant conversations for the given queries. + This function fetches conversation summaries from the database and uses an LLM + to select the most relevant conversations for the given queries. It handles + batching to stay within LLM context limits. Args: user_query_inputs: List of user query strings (currently up to 3) project_id_list: List containing a single project ID db: Database session + language: Language code for the prompt template (default: "en") Returns: Dictionary with structure: @@ -269,17 +275,21 @@ async def auto_select_conversations( } } """ - import random - logger.info(f"Auto-select called with queries: {user_query_inputs}") logger.info(f"Auto-select called for project(s): {project_id_list}") - results = {} + results: Dict[str, Any] = {} + # Batch size: number of conversations to process in each LLM call + # Adjust based on context limits and average summary length + BATCH_SIZE = 20 for project_id in project_id_list: # Get all conversations for this project conversations = ( - db.query(ConversationModel).filter(ConversationModel.project_id == project_id).all() + db.query(ConversationModel) + .filter(ConversationModel.project_id == project_id) + .options(selectinload(ConversationModel.tags)) + .all() ) if not conversations: @@ -287,14 +297,119 @@ async def auto_select_conversations( results[project_id] = {"conversation_id_list": []} continue - # Randomly select up to 3 conversations - num_to_select = min(3, len(conversations)) - selected_conversations = random.sample(conversations, num_to_select) - conversation_ids = [conv.id for conv in selected_conversations] + logger.info(f"To remove this line::: {conversations} ::: for project {project_id}") + logger.info(f"Found {len(conversations)} total conversations for project {project_id}") - logger.info(f"Selected {len(conversation_ids)} conversations: {conversation_ids}") + # Calculate expected number of LLM calls for observability + expected_llm_calls = math.ceil(len(conversations) / BATCH_SIZE) + logger.info( + f"Auto-select will make up to {expected_llm_calls} LLM call(s) " + f"for {len(conversations)} conversations (batch size: {BATCH_SIZE})" + ) - results[project_id] = {"conversation_id_list": conversation_ids} + # Batch conversations and process them + all_selected_ids = [] + llm_calls_made = 0 + for i in range(0, len(conversations), BATCH_SIZE): + batch = conversations[i : i + BATCH_SIZE] + logger.info( + f"Processing batch {i // BATCH_SIZE + 1} " + f"({len(batch)} conversations, indices {i} to {i + len(batch) - 1})" + ) + + # Prepare conversation data for the prompt + conversation_data = [] + for conv in batch: + # Get summary or fallback to transcript excerpt + summary_text = None + if conv.summary and conv.summary.strip(): + summary_text = conv.summary + else: + # Use transcript as fallback + try: + transcript = get_conversation_transcript( + conv.id, + DirectusSession(user_id="none", is_admin=True), + ) + # Limit transcript to first 500 characters for context + if transcript and len(transcript) > 500: + summary_text = transcript[:500] + "..." + elif transcript: + summary_text = transcript + except Exception as e: + logger.warning(f"Could not get transcript for conversation {conv.id}: {e}") + + # Skip conversations with no content at all + if not summary_text: + logger.debug(f"Skipping conversation {conv.id} - no summary or transcript") + continue + + conv_data = { + "id": conv.id, + "participant_name": conv.participant_name or "Unknown", + "summary": summary_text, + } + if conv.tags: + conv_data["tags"] = ", ".join([tag.text for tag in conv.tags]) + if conv.created_at: + conv_data["created_at"] = conv.created_at.isoformat() + conversation_data.append(conv_data) + + # Skip batch if no valid conversations + if not conversation_data: + logger.warning( + f"Batch {i // BATCH_SIZE + 1} has no valid conversations with content. Skipping." + ) + continue + + # Render the prompt + prompt = render_prompt( + "auto_select_conversations", + language, + { + "user_queries": user_query_inputs, + "conversations": conversation_data, + }, + ) + + # Call the LLM + try: + response = await acompletion( + model=SMALL_LITELLM_MODEL, + messages=[{"role": "user", "content": prompt}], + api_base=SMALL_LITELLM_API_BASE, + api_key=SMALL_LITELLM_API_KEY, + response_format={"type": "json_object"}, + ) + llm_calls_made += 1 + + if response.choices[0].message.content: + result = json.loads(response.choices[0].message.content) + batch_selected_ids = result.get("selected_conversation_ids", []) + all_selected_ids.extend(batch_selected_ids) + logger.info( + f"Batch {i // BATCH_SIZE + 1} selected {len(batch_selected_ids)} " + f"conversations: {batch_selected_ids}" + ) + else: + logger.warning(f"No response from LLM for batch {i // BATCH_SIZE + 1}") + + except Exception as e: + logger.error( + f"Error processing batch {i // BATCH_SIZE + 1}: {str(e)}. Skipping batch." + ) + continue + + # Remove duplicates while preserving order + unique_selected_ids = list(dict.fromkeys(all_selected_ids)) + + logger.info( + f"Auto-select completed: Made {llm_calls_made} LLM call(s), " + f"selected {len(unique_selected_ids)} unique conversations " + f"for project {project_id}: {unique_selected_ids}" + ) + + results[project_id] = {"conversation_id_list": unique_selected_ids} return {"results": results} @@ -307,7 +422,7 @@ async def get_conversation_citations( ) -> List[Dict[str, Any]]: """ Extract structured conversation citations from an accumulated assistant response using a text-structuring model, map those citations to conversations, and return only citations that belong to the given project IDs. - + This function: - Renders a text-structuring prompt using `rag_prompt` and `accumulated_response` and sends it to the configured text-structure LLM. - Parses the model's JSON response (expected to follow `CitationsSchema`) to obtain citation entries that include `segment_id` and `verbatim_reference_text_chunk`. @@ -317,7 +432,7 @@ async def get_conversation_citations( - "conversation": conversation id (str) - "reference_text": verbatim reference text chunk (str) - "conversation_title": conversation name/title (str) - + If the model output cannot be parsed or a segment-to-conversation mapping fails for an individual citation, that citation is skipped; parsing errors do not raise but are logged and result in an empty citations list in the returned structure. """ text_structuring_model_message = render_prompt( diff --git a/echo/server/prompt_templates/auto_select_conversations.de.jinja b/echo/server/prompt_templates/auto_select_conversations.de.jinja new file mode 100644 index 00000000..e8e68d19 --- /dev/null +++ b/echo/server/prompt_templates/auto_select_conversations.de.jinja @@ -0,0 +1,34 @@ +Sie sind ein Assistent für die Auswahl von Gesprächen. Ihre Aufgabe ist es zu identifizieren, welche Gespräche für die Anfragen des Benutzers am relevantesten sind. + +Benutzeranfragen: +{% for query in user_queries %} +- {{ query }} +{% endfor %} + +Verfügbare Gespräche: +{% for conv in conversations %} +--- +Gesprächs-ID: {{ conv.id }} +Teilnehmer: {{ conv.participant_name }} +Zusammenfassung: {{ conv.summary or "Keine Zusammenfassung verfügbar" }} +{% if conv.tags %}Tags: {{ conv.tags }}{% endif %} +{% if conv.created_at %}Erstellt: {{ conv.created_at }}{% endif %} +{% endfor %} +--- + +Anweisungen: +- Analysieren Sie jede Gesprächszusammenfassung sorgfältig +- Wählen Sie Gespräche aus, die für die Anfragen des Benutzers relevant oder potenziell relevant sind +- Beziehen Sie Gespräche mit direkter Relevanz sowie solche mit teilweiser oder indirekter Relevanz ein +- Berücksichtigen Sie Synonyme, verwandte Themen und kontextuelle Verbindungen +- Wenn ein Gespräch Informationen enthalten könnte, die für die Beantwortung der Anfragen nützlich sind, nehmen Sie es auf +- Schließen Sie nur Gespräche aus, die eindeutig nicht mit allen Anfragen zusammenhängen +- Wenn KEINE Gespräche relevant sind, geben Sie eine leere Liste zurück + +Antworten Sie mit einem JSON-Objekt, das nur die Liste der relevanten Gesprächs-IDs enthält: +{ + "selected_conversation_ids": ["id1", "id2", ...] +} + +Geben Sie NUR das JSON-Objekt zurück, keinen anderen Text. + diff --git a/echo/server/prompt_templates/auto_select_conversations.en.jinja b/echo/server/prompt_templates/auto_select_conversations.en.jinja new file mode 100644 index 00000000..38d6274d --- /dev/null +++ b/echo/server/prompt_templates/auto_select_conversations.en.jinja @@ -0,0 +1,33 @@ +You are a conversation selection assistant. Your task is to identify which conversations are most relevant to the user's queries. + +User Queries: +{% for query in user_queries %} +- {{ query }} +{% endfor %} + +Available Conversations: +{% for conv in conversations %} +--- +Conversation ID: {{ conv.id }} +Participant: {{ conv.participant_name }} +Summary: {{ conv.summary or "No summary available" }} +{% if conv.tags %}Tags: {{ conv.tags }}{% endif %} +{% if conv.created_at %}Created: {{ conv.created_at }}{% endif %} +{% endfor %} +--- + +Instructions: +- Analyze each conversation summary carefully +- Select conversations that are relevant or potentially relevant to the user's queries +- Include conversations with direct relevance as well as those with partial or indirect relevance +- Consider synonyms, related topics, and contextual connections +- If a conversation might contain information useful for answering the queries, include it +- Only exclude conversations that are clearly unrelated to all queries +- If NO conversations are relevant, return an empty list + +Respond with a JSON object containing only the list of relevant conversation IDs: +{ + "selected_conversation_ids": ["id1", "id2", ...] +} + +Return ONLY the JSON object, no other text. diff --git a/echo/server/prompt_templates/auto_select_conversations.es.jinja b/echo/server/prompt_templates/auto_select_conversations.es.jinja new file mode 100644 index 00000000..3a63c3e5 --- /dev/null +++ b/echo/server/prompt_templates/auto_select_conversations.es.jinja @@ -0,0 +1,34 @@ +Eres un asistente de selección de conversaciones. Tu tarea es identificar qué conversaciones son más relevantes para las consultas del usuario. + +Consultas del usuario: +{% for query in user_queries %} +- {{ query }} +{% endfor %} + +Conversaciones disponibles: +{% for conv in conversations %} +--- +ID de conversación: {{ conv.id }} +Participante: {{ conv.participant_name }} +Resumen: {{ conv.summary or "No hay resumen disponible" }} +{% if conv.tags %}Etiquetas: {{ conv.tags }}{% endif %} +{% if conv.created_at %}Creado: {{ conv.created_at }}{% endif %} +{% endfor %} +--- + +Instrucciones: +- Analiza cuidadosamente cada resumen de conversación +- Selecciona conversaciones que sean relevantes o potencialmente relevantes para las consultas del usuario +- Incluye conversaciones con relevancia directa así como aquellas con relevancia parcial o indirecta +- Considera sinónimos, temas relacionados y conexiones contextuales +- Si una conversación podría contener información útil para responder las consultas, inclúyela +- Solo excluye conversaciones que claramente no estén relacionadas con ninguna consulta +- Si NINGUNA conversación es relevante, devuelve una lista vacía + +Responde con un objeto JSON que contenga solo la lista de IDs de conversaciones relevantes: +{ + "selected_conversation_ids": ["id1", "id2", ...] +} + +Devuelve SOLO el objeto JSON, ningún otro texto. + diff --git a/echo/server/prompt_templates/auto_select_conversations.fr.jinja b/echo/server/prompt_templates/auto_select_conversations.fr.jinja new file mode 100644 index 00000000..a76c5c83 --- /dev/null +++ b/echo/server/prompt_templates/auto_select_conversations.fr.jinja @@ -0,0 +1,34 @@ +Vous êtes un assistant de sélection de conversations. Votre tâche consiste à identifier quelles conversations sont les plus pertinentes pour les requêtes de l'utilisateur. + +Requêtes de l'utilisateur : +{% for query in user_queries %} +- {{ query }} +{% endfor %} + +Conversations disponibles : +{% for conv in conversations %} +--- +ID de conversation : {{ conv.id }} +Participant : {{ conv.participant_name }} +Résumé : {{ conv.summary or "Aucun résumé disponible" }} +{% if conv.tags %}Tags : {{ conv.tags }}{% endif %} +{% if conv.created_at %}Créé : {{ conv.created_at }}{% endif %} +{% endfor %} +--- + +Instructions : +- Analysez attentivement chaque résumé de conversation +- Sélectionnez les conversations qui sont pertinentes ou potentiellement pertinentes pour les requêtes de l'utilisateur +- Incluez les conversations avec une pertinence directe ainsi que celles avec une pertinence partielle ou indirecte +- Considérez les synonymes, les sujets connexes et les liens contextuels +- Si une conversation peut contenir des informations utiles pour répondre aux requêtes, incluez-la +- N'excluez que les conversations qui ne sont clairement pas liées à toutes les requêtes +- Si AUCUNE conversation n'est pertinente, renvoyez une liste vide + +Répondez avec un objet JSON contenant uniquement la liste des IDs de conversations pertinentes : +{ + "selected_conversation_ids": ["id1", "id2", ...] +} + +Renvoyez UNIQUEMENT l'objet JSON, aucun autre texte. + diff --git a/echo/server/prompt_templates/auto_select_conversations.nl.jinja b/echo/server/prompt_templates/auto_select_conversations.nl.jinja new file mode 100644 index 00000000..a990f9e5 --- /dev/null +++ b/echo/server/prompt_templates/auto_select_conversations.nl.jinja @@ -0,0 +1,34 @@ +U bent een assistent voor het selecteren van gesprekken. Uw taak is om te identificeren welke gesprekken het meest relevant zijn voor de vragen van de gebruiker. + +Gebruikersvragen: +{% for query in user_queries %} +- {{ query }} +{% endfor %} + +Beschikbare gesprekken: +{% for conv in conversations %} +--- +Gesprek-ID: {{ conv.id }} +Deelnemer: {{ conv.participant_name }} +Samenvatting: {{ conv.summary or "Geen samenvatting beschikbaar" }} +{% if conv.tags %}Tags: {{ conv.tags }}{% endif %} +{% if conv.created_at %}Gemaakt: {{ conv.created_at }}{% endif %} +{% endfor %} +--- + +Instructies: +- Analyseer elke gespreksamenvatting zorgvuldig +- Selecteer gesprekken die relevant of potentieel relevant zijn voor de vragen van de gebruiker +- Neem gesprekken op met directe relevantie evenals die met gedeeltelijke of indirecte relevantie +- Overweeg synoniemen, gerelateerde onderwerpen en contextuele verbindingen +- Als een gesprek informatie kan bevatten die nuttig is voor het beantwoorden van de vragen, neem het dan op +- Sluit alleen gesprekken uit die duidelijk niet gerelateerd zijn aan alle vragen +- Als GEEN gesprekken relevant zijn, retourneer dan een lege lijst + +Reageer met een JSON-object dat alleen de lijst met relevante gesprek-ID's bevat: +{ + "selected_conversation_ids": ["id1", "id2", ...] +} + +Retourneer ALLEEN het JSON-object, geen andere tekst. + From 585d631bd2ea4df6b87af47e263f148525d80113 Mon Sep 17 00:00:00 2001 From: Usama Date: Wed, 1 Oct 2025 14:27:14 +0000 Subject: [PATCH 05/13] - add parallel batching for processing conversations --- echo/server/dembrane/chat_utils.py | 232 ++++++++++++++++++----------- 1 file changed, 144 insertions(+), 88 deletions(-) diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index ca85f8b1..7b2f1a7e 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -1,5 +1,6 @@ import json import math +import asyncio import logging from typing import Any, Dict, List, Optional @@ -257,7 +258,8 @@ async def auto_select_conversations( This function fetches conversation summaries from the database and uses an LLM to select the most relevant conversations for the given queries. It handles - batching to stay within LLM context limits. + batching to stay within LLM context limits and processes batches in parallel + for optimal performance. Args: user_query_inputs: List of user query strings (currently up to 3) @@ -297,115 +299,66 @@ async def auto_select_conversations( results[project_id] = {"conversation_id_list": []} continue - logger.info(f"To remove this line::: {conversations} ::: for project {project_id}") logger.info(f"Found {len(conversations)} total conversations for project {project_id}") # Calculate expected number of LLM calls for observability expected_llm_calls = math.ceil(len(conversations) / BATCH_SIZE) logger.info( - f"Auto-select will make up to {expected_llm_calls} LLM call(s) " + f"Auto-select will make {expected_llm_calls} parallel LLM call(s) " f"for {len(conversations)} conversations (batch size: {BATCH_SIZE})" ) - # Batch conversations and process them - all_selected_ids = [] - llm_calls_made = 0 + # Create batches and prepare parallel tasks + tasks = [] for i in range(0, len(conversations), BATCH_SIZE): batch = conversations[i : i + BATCH_SIZE] - logger.info( - f"Processing batch {i // BATCH_SIZE + 1} " - f"({len(batch)} conversations, indices {i} to {i + len(batch) - 1})" + batch_num = i // BATCH_SIZE + 1 + tasks.append( + _process_single_batch( + batch=batch, + batch_num=batch_num, + user_query_inputs=user_query_inputs, + language=language, + ) ) - # Prepare conversation data for the prompt - conversation_data = [] - for conv in batch: - # Get summary or fallback to transcript excerpt - summary_text = None - if conv.summary and conv.summary.strip(): - summary_text = conv.summary - else: - # Use transcript as fallback - try: - transcript = get_conversation_transcript( - conv.id, - DirectusSession(user_id="none", is_admin=True), - ) - # Limit transcript to first 500 characters for context - if transcript and len(transcript) > 500: - summary_text = transcript[:500] + "..." - elif transcript: - summary_text = transcript - except Exception as e: - logger.warning(f"Could not get transcript for conversation {conv.id}: {e}") - - # Skip conversations with no content at all - if not summary_text: - logger.debug(f"Skipping conversation {conv.id} - no summary or transcript") - continue + # Execute all batches in parallel + logger.info(f"Executing {len(tasks)} batches in parallel...") + batch_results = await asyncio.gather(*tasks, return_exceptions=True) - conv_data = { - "id": conv.id, - "participant_name": conv.participant_name or "Unknown", - "summary": summary_text, - } - if conv.tags: - conv_data["tags"] = ", ".join([tag.text for tag in conv.tags]) - if conv.created_at: - conv_data["created_at"] = conv.created_at.isoformat() - conversation_data.append(conv_data) - - # Skip batch if no valid conversations - if not conversation_data: - logger.warning( - f"Batch {i // BATCH_SIZE + 1} has no valid conversations with content. Skipping." - ) + # Aggregate results from all batches + all_selected_ids = [] + successful_batches = 0 + failed_batches = 0 + + for i, batch_result in enumerate(batch_results): + # Handle exceptions from gather + if isinstance(batch_result, Exception): + logger.error(f"Batch {i + 1} failed with exception: {str(batch_result)}") + failed_batches += 1 continue - # Render the prompt - prompt = render_prompt( - "auto_select_conversations", - language, - { - "user_queries": user_query_inputs, - "conversations": conversation_data, - }, - ) + # Type check: ensure batch_result is a dict, not an exception + if not isinstance(batch_result, dict): + logger.error(f"Batch {i + 1} returned unexpected type: {type(batch_result)}") + failed_batches += 1 + continue - # Call the LLM - try: - response = await acompletion( - model=SMALL_LITELLM_MODEL, - messages=[{"role": "user", "content": prompt}], - api_base=SMALL_LITELLM_API_BASE, - api_key=SMALL_LITELLM_API_KEY, - response_format={"type": "json_object"}, - ) - llm_calls_made += 1 - - if response.choices[0].message.content: - result = json.loads(response.choices[0].message.content) - batch_selected_ids = result.get("selected_conversation_ids", []) - all_selected_ids.extend(batch_selected_ids) - logger.info( - f"Batch {i // BATCH_SIZE + 1} selected {len(batch_selected_ids)} " - f"conversations: {batch_selected_ids}" - ) - else: - logger.warning(f"No response from LLM for batch {i // BATCH_SIZE + 1}") + # Handle batch results + if "error" in batch_result: + failed_batches += 1 + else: + successful_batches += 1 - except Exception as e: - logger.error( - f"Error processing batch {i // BATCH_SIZE + 1}: {str(e)}. Skipping batch." - ) - continue + selected_ids = batch_result.get("selected_ids", []) + all_selected_ids.extend(selected_ids) # Remove duplicates while preserving order unique_selected_ids = list(dict.fromkeys(all_selected_ids)) logger.info( - f"Auto-select completed: Made {llm_calls_made} LLM call(s), " - f"selected {len(unique_selected_ids)} unique conversations " + f"Auto-select completed: {successful_batches}/{len(tasks)} batches successful " + f"({failed_batches} failed), selected {len(unique_selected_ids)} unique conversations " f"for project {project_id}: {unique_selected_ids}" ) @@ -414,6 +367,109 @@ async def auto_select_conversations( return {"results": results} +async def _process_single_batch( + batch: List[ConversationModel], + batch_num: int, + user_query_inputs: List[str], + language: str, +) -> Dict[str, Any]: + """ + Process a single batch of conversations and return selected IDs. + + Args: + batch: List of ConversationModel instances to process + batch_num: Batch number for logging + user_query_inputs: User queries to match against + language: Language code for the prompt template + + Returns: + Dictionary with: + - "selected_ids": List of selected conversation IDs + - "batch_num": The batch number + - "error": Error message if processing failed (optional) + """ + logger.info(f"Processing batch {batch_num} ({len(batch)} conversations, parallel execution)") + + # Prepare conversation data for the prompt + conversation_data = [] + for conv in batch: + # Get summary or fallback to transcript excerpt + summary_text = None + if conv.summary and conv.summary.strip(): + summary_text = conv.summary + else: + # Use transcript as fallback + try: + transcript = get_conversation_transcript( + conv.id, + DirectusSession(user_id="none", is_admin=True), + ) + # Limit transcript to first 500 characters for context + if transcript and len(transcript) > 500: + summary_text = transcript[:500] + "..." + elif transcript: + summary_text = transcript + except Exception as e: + logger.warning(f"Could not get transcript for conversation {conv.id}: {e}") + + # Skip conversations with no content at all + if not summary_text: + logger.debug(f"Skipping conversation {conv.id} - no summary or transcript") + continue + + conv_data = { + "id": conv.id, + "participant_name": conv.participant_name or "Unknown", + "summary": summary_text, + } + if conv.tags: + conv_data["tags"] = ", ".join([tag.text for tag in conv.tags]) + if conv.created_at: + conv_data["created_at"] = conv.created_at.isoformat() + conversation_data.append(conv_data) + + # Skip batch if no valid conversations + if not conversation_data: + logger.warning(f"Batch {batch_num} has no valid conversations with content. Skipping.") + return {"selected_ids": [], "batch_num": batch_num} + + # Render the prompt + prompt = render_prompt( + "auto_select_conversations", + language, + { + "user_queries": user_query_inputs, + "conversations": conversation_data, + }, + ) + + # Call the LLM + try: + response = await acompletion( + model=SMALL_LITELLM_MODEL, + messages=[{"role": "user", "content": prompt}], + api_base=SMALL_LITELLM_API_BASE, + api_key=SMALL_LITELLM_API_KEY, + response_format={"type": "json_object"}, + ) + + if response.choices[0].message.content: + result = json.loads(response.choices[0].message.content) + batch_selected_ids = result.get("selected_conversation_ids", []) + logger.info( + f"Batch {batch_num} selected {len(batch_selected_ids)} " + f"conversations: {batch_selected_ids}" + ) + return {"selected_ids": batch_selected_ids, "batch_num": batch_num} + else: + logger.warning(f"No response from LLM for batch {batch_num}") + return {"selected_ids": [], "batch_num": batch_num} + + except Exception as e: + logger.error(f"Error processing batch {batch_num}: {str(e)}. Skipping batch.") + return {"selected_ids": [], "batch_num": batch_num, "error": str(e)} + + async def get_conversation_citations( rag_prompt: str, accumulated_response: str, From 53d8e61b0ab90e2a6f883359d87843f529d3650c Mon Sep 17 00:00:00 2001 From: Usama Date: Wed, 1 Oct 2025 14:46:03 +0000 Subject: [PATCH 06/13] - add LLM validation for output conversation_ids against valid ids --- echo/server/dembrane/chat_utils.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index 7b2f1a7e..7f65b2a6 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -455,7 +455,24 @@ async def _process_single_batch( if response.choices[0].message.content: result = json.loads(response.choices[0].message.content) - batch_selected_ids = result.get("selected_conversation_ids", []) + raw_selected_ids = result.get("selected_conversation_ids", []) + + # Validate LLM response: ensure all returned IDs are from this batch + valid_ids = {conv.id for conv in batch} + batch_selected_ids = [ + id for id in raw_selected_ids if isinstance(id, (int, str)) and id in valid_ids + ] + + # Log warning if LLM returned invalid IDs + if len(batch_selected_ids) != len(raw_selected_ids): + filtered_count = len(raw_selected_ids) - len(batch_selected_ids) + invalid_ids = [id for id in raw_selected_ids if id not in valid_ids] + logger.warning( + f"Batch {batch_num}: LLM returned {filtered_count} invalid ID(s), " + f"filtered from {len(raw_selected_ids)} to {len(batch_selected_ids)}. " + f"Invalid IDs: {invalid_ids}" + ) + logger.info( f"Batch {batch_num} selected {len(batch_selected_ids)} " f"conversations: {batch_selected_ids}" From 2508d4de0896866f27082fadad1a98d3f8125734 Mon Sep 17 00:00:00 2001 From: Usama Date: Thu, 2 Oct 2025 08:44:43 +0000 Subject: [PATCH 07/13] - include manually added and auto selected conversations both --- echo/server/dembrane/api/chat.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index e0823d26..f0de6b50 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -544,9 +544,10 @@ async def post_chat( detail="Auto select is not possible with the current context length", ) - # Build references list from all conversations added during auto-select + # Build references list from ALL conversations in context (both manually selected and auto-selected) conversation_references: dict[str, list[dict[str, str]]] = {"references": []} - for conv in all_conversations_added: + # Use chat.used_conversations directly - it already has all conversation objects + for conv in chat.used_conversations: conversation_references["references"].append( { "conversation": conv.id, @@ -554,7 +555,9 @@ async def post_chat( } ) - logger.info(f"Selected conversations for frontend: {conversation_references}") + logger.info( + f"Selected conversations for frontend (manually selected + auto-selected): {conversation_references}" + ) async def stream_response_async_autoselect() -> AsyncGenerator[str, None]: # Send conversation references (selected conversations) From 98c58df08924f873cac3dabe39e8ff1d64fa7d88 Mon Sep 17 00:00:00 2001 From: Usama Date: Thu, 2 Oct 2025 11:07:03 +0000 Subject: [PATCH 08/13] - add follow up question check function and logic for auto-select to save cost --- echo/server/dembrane/api/chat.py | 212 ++++++++++++------ .../is_followup_question.de.jinja | 20 ++ .../is_followup_question.en.jinja | 19 ++ .../is_followup_question.es.jinja | 20 ++ .../is_followup_question.fr.jinja | 20 ++ .../is_followup_question.nl.jinja | 20 ++ 6 files changed, 246 insertions(+), 65 deletions(-) create mode 100644 echo/server/prompt_templates/is_followup_question.de.jinja create mode 100644 echo/server/prompt_templates/is_followup_question.en.jinja create mode 100644 echo/server/prompt_templates/is_followup_question.es.jinja create mode 100644 echo/server/prompt_templates/is_followup_question.fr.jinja create mode 100644 echo/server/prompt_templates/is_followup_question.nl.jinja diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index f0de6b50..7d2fac51 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -21,6 +21,7 @@ LIGHTRAG_LITELLM_INFERENCE_API_BASE, LIGHTRAG_LITELLM_INFERENCE_API_VERSION, ) +from dembrane.prompts import render_prompt from dembrane.database import ( DatabaseSession, ProjectChatModel, @@ -39,15 +40,66 @@ from dembrane.quote_utils import count_tokens from dembrane.api.conversation import get_conversation_token_count from dembrane.api.dependency_auth import DirectusSession, DependencyDirectusSession -from dembrane.audio_lightrag.utils.lightrag_utils import ( - get_project_id, -) +from dembrane.audio_lightrag.utils.lightrag_utils import get_project_id ChatRouter = APIRouter(tags=["chat"]) logger = logging.getLogger("dembrane.chat") +async def is_followup_question( + conversation_history: List[Dict[str, str]], language: str = "en" +) -> bool: + """ + Determine if the current question is a follow-up to previous messages. + Uses a small LLM call to check semantic relationship. + + Returns: + True if it's a follow-up question, False if it's a new independent question + """ + if len(conversation_history) < 2: + # No previous context, can't be a follow-up + return False + + # Take last 4 messages for context (2 exchanges) + recent_messages = conversation_history[-4:] + + # Format messages for the prompt + previous_messages = [ + {"role": msg["role"], "content": msg["content"]} for msg in recent_messages[:-1] + ] + current_question = recent_messages[-1]["content"] + + prompt = render_prompt( + "is_followup_question", + language, + { + "previous_messages": previous_messages, + "current_question": current_question, + }, + ) + + try: + response = await litellm.acompletion( + model=LIGHTRAG_LITELLM_INFERENCE_MODEL, + api_key=LIGHTRAG_LITELLM_INFERENCE_API_KEY, + api_version=LIGHTRAG_LITELLM_INFERENCE_API_VERSION, + api_base=LIGHTRAG_LITELLM_INFERENCE_API_BASE, + messages=[{"role": "user", "content": prompt}], + temperature=0, # Deterministic + ) + + result_text = response.choices[0].message.content.strip() + result = json.loads(result_text) + is_followup = result.get("is_followup", False) + + logger.info(f"Follow-up detection: {is_followup} for query: {current_question[:50]}...") + return is_followup + except Exception as e: + logger.warning(f"Follow-up detection failed: {e}. Defaulting to False (run auto-select)") + return False + + class ChatContextConversationSchema(BaseModel): conversation_id: str conversation_participant_name: str @@ -464,66 +516,25 @@ async def post_chat( query = filtered_messages[-1]["content"] conversation_history = filtered_messages - # Track conversations added by auto-select (defined outside loop) - all_conversations_added = [] - - while MAX_CHAT_CONTEXT_LENGTH < prompt_len: - formatted_messages = [] - top_k = max(5, top_k - 10) - - # Call dummy auto-select function (replaces get_lightrag_prompt_by_params) - user_query_inputs = [query] - - logger.info(f"Calling auto_select_conversations with query: {query}, top_k: {top_k}") - auto_select_result = await auto_select_conversations( - user_query_inputs=user_query_inputs, - project_id_list=[project_id], - db=db, - language=language, - ) - - logger.info(f"Auto-select result: {auto_select_result}") - - # Extract selected conversation IDs - selected_conversation_ids = [] - if "results" in auto_select_result: - for proj_result in auto_select_result["results"].values(): - if "conversation_id_list" in proj_result: - selected_conversation_ids.extend(proj_result["conversation_id_list"]) - - # Add selected conversations to chat context - conversations_added = [] - for conversation_id in selected_conversation_ids: - conversation = db.get(ConversationModel, conversation_id) - if conversation and conversation not in chat.used_conversations: - chat.used_conversations.append(conversation) - conversations_added.append(conversation) - all_conversations_added.append(conversation) # Track all added - - # Create a message to lock the auto-selected conversations - if conversations_added: - auto_select_message = ProjectChatMessageModel( - id=generate_uuid(), - date_created=get_utc_timestamp(), - message_from="dembrane", - text=f"Auto-selected and added {len(conversations_added)} conversations as context to the chat.", - project_chat_id=chat_id, - used_conversations=conversations_added, - ) - db.add(auto_select_message) - db.commit() - logger.info(f"Added {len(conversations_added)} conversations via auto-select") + # Check if this is a follow-up question (only if we have locked conversations) + should_reuse_locked = False + if locked_conversation_id_list: + is_followup = await is_followup_question(conversation_history, language) + if is_followup: + logger.info("Detected follow-up question - reusing locked conversations") + should_reuse_locked = True + else: + logger.info("New independent question - running auto-select") - # Get updated chat context - updated_chat_context = await get_chat_context(chat_id, db, auth) - updated_conversation_id_list = updated_chat_context.conversation_id_list + if should_reuse_locked: + # Reuse existing locked conversations for follow-up questions + updated_conversation_id_list = locked_conversation_id_list - # Build system messages from the selected conversations system_messages = await create_system_messages_for_chat( updated_conversation_id_list, db, language, project_id ) - # Build messages to send + formatted_messages = [] if isinstance(system_messages, list): for msg in system_messages: formatted_messages.append({"role": "system", "content": msg["text"]}) @@ -532,18 +543,89 @@ async def post_chat( formatted_messages = [ {"role": "system", "content": system_messages} ] + conversation_history + else: + # Run auto-select for first query or new independent questions + all_conversations_added = [] - # Check context length - prompt_len = token_counter( - model=LIGHTRAG_LITELLM_INFERENCE_MODEL, messages=formatted_messages - ) + while MAX_CHAT_CONTEXT_LENGTH < prompt_len: + formatted_messages = [] + top_k = max(5, top_k - 10) + + # Call auto-select function (replaces the get_lightrag_prompt_by_params function) + user_query_inputs = [query] + + logger.info( + f"Calling auto_select_conversations with query: {query}, top_k: {top_k}" + ) + auto_select_result = await auto_select_conversations( + user_query_inputs=user_query_inputs, + project_id_list=[project_id], + db=db, + language=language, + ) - if top_k <= 5: - raise HTTPException( - status_code=400, - detail="Auto select is not possible with the current context length", + logger.info(f"Auto-select result: {auto_select_result}") + + # Extract selected conversation IDs + selected_conversation_ids = [] + if "results" in auto_select_result: + for proj_result in auto_select_result["results"].values(): + if "conversation_id_list" in proj_result: + selected_conversation_ids.extend(proj_result["conversation_id_list"]) + + # Add selected conversations to chat context + conversations_added = [] + for conversation_id in selected_conversation_ids: + conversation = db.get(ConversationModel, conversation_id) + if conversation and conversation not in chat.used_conversations: + chat.used_conversations.append(conversation) + conversations_added.append(conversation) + all_conversations_added.append(conversation) # Track all added + + # Create a message to lock the auto-selected conversations + if conversations_added: + auto_select_message = ProjectChatMessageModel( + id=generate_uuid(), + date_created=get_utc_timestamp(), + message_from="dembrane", + text=f"Auto-selected and added {len(conversations_added)} conversations as context to the chat.", + project_chat_id=chat_id, + used_conversations=conversations_added, + ) + db.add(auto_select_message) + db.commit() + logger.info(f"Added {len(conversations_added)} conversations via auto-select") + + # Get updated chat context + updated_chat_context = await get_chat_context(chat_id, db, auth) + updated_conversation_id_list = updated_chat_context.conversation_id_list + + # Build system messages from the selected conversations + system_messages = await create_system_messages_for_chat( + updated_conversation_id_list, db, language, project_id + ) + + # Build messages to send + if isinstance(system_messages, list): + for msg in system_messages: + formatted_messages.append({"role": "system", "content": msg["text"]}) + formatted_messages.extend(conversation_history) + else: + formatted_messages = [ + {"role": "system", "content": system_messages} + ] + conversation_history + + # Check context length + prompt_len = token_counter( + model=LIGHTRAG_LITELLM_INFERENCE_MODEL, messages=formatted_messages ) + if top_k <= 5: + raise HTTPException( + status_code=400, + detail="Auto select is not possible with the current context length", + ) + # Build references list from ALL conversations in context (both manually selected and auto-selected) conversation_references: dict[str, list[dict[str, str]]] = {"references": []} # Use chat.used_conversations directly - it already has all conversation objects diff --git a/echo/server/prompt_templates/is_followup_question.de.jinja b/echo/server/prompt_templates/is_followup_question.de.jinja new file mode 100644 index 00000000..375312db --- /dev/null +++ b/echo/server/prompt_templates/is_followup_question.de.jinja @@ -0,0 +1,20 @@ +Sie analysieren, ob die Frage eines Benutzers eine Folgefrage zu vorherigen Konversationsnachrichten ist. + +Vorherige Nachrichten: +{% for msg in previous_messages %} +{{ msg.role }}: {{ msg.content }} +{% endfor %} + +Aktuelle Frage: {{ current_question }} + +Ist die aktuelle Frage eine Folgefrage oder Klärung im Zusammenhang mit den vorherigen Nachrichten? +Antworten Sie NUR mit einem JSON-Objekt: +{"is_followup": true} wenn es eine Folgefrage/Klärung ist +{"is_followup": false} wenn es eine neue unabhängige Frage ist + +Beispiele für Folgefragen: "was noch?", "können Sie das näher erläutern?", "erzählen Sie mir mehr", "und was ist mit X in diesem Kontext?" +Beispiele für neue Fragen: "wie sieht es mit der Bereitstellung aus?", "erzählen Sie mir über das Testen", "welche Konversationen besprechen Y?" + +Geben Sie NUR das JSON-Objekt zurück, keinen anderen Text. + + diff --git a/echo/server/prompt_templates/is_followup_question.en.jinja b/echo/server/prompt_templates/is_followup_question.en.jinja new file mode 100644 index 00000000..4ce9e5c7 --- /dev/null +++ b/echo/server/prompt_templates/is_followup_question.en.jinja @@ -0,0 +1,19 @@ +You are analyzing if a user's question is a follow-up to previous conversation messages. + +Previous messages: +{% for msg in previous_messages %} +{{ msg.role }}: {{ msg.content }} +{% endfor %} + +Current question: {{ current_question }} + +Is the current question a follow-up or clarification related to the previous messages? +Answer ONLY with a JSON object: +{"is_followup": true} if it's a follow-up/clarification +{"is_followup": false} if it's a new independent question + +Examples of follow-ups: "what else?", "can you elaborate?", "tell me more", "and what about X in that context?" +Examples of new questions: "what about deployment?", "tell me about testing", "which conversations discuss Y?" + +Return ONLY the JSON object, no other text. + diff --git a/echo/server/prompt_templates/is_followup_question.es.jinja b/echo/server/prompt_templates/is_followup_question.es.jinja new file mode 100644 index 00000000..6a89c018 --- /dev/null +++ b/echo/server/prompt_templates/is_followup_question.es.jinja @@ -0,0 +1,20 @@ +Estás analizando si la pregunta de un usuario es un seguimiento de mensajes de conversación anteriores. + +Mensajes anteriores: +{% for msg in previous_messages %} +{{ msg.role }}: {{ msg.content }} +{% endfor %} + +Pregunta actual: {{ current_question }} + +¿Es la pregunta actual un seguimiento o aclaración relacionada con los mensajes anteriores? +Responde SOLO con un objeto JSON: +{"is_followup": true} si es un seguimiento/aclaración +{"is_followup": false} si es una pregunta nueva e independiente + +Ejemplos de seguimientos: "¿qué más?", "¿puedes elaborar?", "cuéntame más", "¿y qué hay de X en ese contexto?" +Ejemplos de preguntas nuevas: "¿qué hay del despliegue?", "cuéntame sobre las pruebas", "¿qué conversaciones discuten Y?" + +Devuelve SOLO el objeto JSON, ningún otro texto. + + diff --git a/echo/server/prompt_templates/is_followup_question.fr.jinja b/echo/server/prompt_templates/is_followup_question.fr.jinja new file mode 100644 index 00000000..cf96c81d --- /dev/null +++ b/echo/server/prompt_templates/is_followup_question.fr.jinja @@ -0,0 +1,20 @@ +Vous analysez si la question d'un utilisateur fait suite à des messages de conversation précédents. + +Messages précédents : +{% for msg in previous_messages %} +{{ msg.role }}: {{ msg.content }} +{% endfor %} + +Question actuelle : {{ current_question }} + +La question actuelle est-elle une suite ou une clarification liée aux messages précédents ? +Répondez UNIQUEMENT avec un objet JSON : +{"is_followup": true} si c'est une suite/clarification +{"is_followup": false} si c'est une nouvelle question indépendante + +Exemples de suites : "quoi d'autre ?", "pouvez-vous développer ?", "dites-m'en plus", "et qu'en est-il de X dans ce contexte ?" +Exemples de nouvelles questions : "qu'en est-il du déploiement ?", "parlez-moi des tests", "quelles conversations discutent de Y ?" + +Retournez UNIQUEMENT l'objet JSON, aucun autre texte. + + diff --git a/echo/server/prompt_templates/is_followup_question.nl.jinja b/echo/server/prompt_templates/is_followup_question.nl.jinja new file mode 100644 index 00000000..4b8e6dc0 --- /dev/null +++ b/echo/server/prompt_templates/is_followup_question.nl.jinja @@ -0,0 +1,20 @@ +U analyseert of de vraag van een gebruiker een vervolgvraag is op eerdere gespreksberichten. + +Eerdere berichten: +{% for msg in previous_messages %} +{{ msg.role }}: {{ msg.content }} +{% endfor %} + +Huidige vraag: {{ current_question }} + +Is de huidige vraag een vervolgvraag of verduidelijking gerelateerd aan de eerdere berichten? +Antwoord ALLEEN met een JSON-object: +{"is_followup": true} als het een vervolgvraag/verduidelijking is +{"is_followup": false} als het een nieuwe onafhankelijke vraag is + +Voorbeelden van vervolgvragen: "wat nog meer?", "kun je dat uitwerken?", "vertel me meer", "en hoe zit het met X in die context?" +Voorbeelden van nieuwe vragen: "hoe zit het met deployment?", "vertel me over testen", "welke gesprekken bespreken Y?" + +Geef ALLEEN het JSON-object terug, geen andere tekst. + + From 53267c4f3340bf847fd886a1035ff1451f2066ef Mon Sep 17 00:00:00 2001 From: Usama Date: Thu, 2 Oct 2025 11:17:33 +0000 Subject: [PATCH 09/13] - remove redundant code and add checks for context length --- echo/server/dembrane/api/chat.py | 138 ++++++++++++++----------------- 1 file changed, 63 insertions(+), 75 deletions(-) diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index 7d2fac51..daa8dda5 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -15,7 +15,6 @@ from dembrane.utils import generate_uuid, get_utc_timestamp from dembrane.config import ( ENABLE_CHAT_AUTO_SELECT, - AUDIO_LIGHTRAG_TOP_K_PROMPT, LIGHTRAG_LITELLM_INFERENCE_MODEL, LIGHTRAG_LITELLM_INFERENCE_API_KEY, LIGHTRAG_LITELLM_INFERENCE_API_BASE, @@ -511,8 +510,6 @@ async def post_chat( ): filtered_messages = filtered_messages[:-1] - top_k = AUDIO_LIGHTRAG_TOP_K_PROMPT - prompt_len = float("inf") query = filtered_messages[-1]["content"] conversation_history = filtered_messages @@ -545,87 +542,78 @@ async def post_chat( ] + conversation_history else: # Run auto-select for first query or new independent questions - all_conversations_added = [] - - while MAX_CHAT_CONTEXT_LENGTH < prompt_len: - formatted_messages = [] - top_k = max(5, top_k - 10) - - # Call auto-select function (replaces the get_lightrag_prompt_by_params function) - user_query_inputs = [query] + user_query_inputs = [query] + + logger.info(f"Calling auto_select_conversations with query: {query}") + auto_select_result = await auto_select_conversations( + user_query_inputs=user_query_inputs, + project_id_list=[project_id], + db=db, + language=language, + ) - logger.info( - f"Calling auto_select_conversations with query: {query}, top_k: {top_k}" - ) - auto_select_result = await auto_select_conversations( - user_query_inputs=user_query_inputs, - project_id_list=[project_id], - db=db, - language=language, + logger.info(f"Auto-select result: {auto_select_result}") + + # Extract selected conversation IDs + selected_conversation_ids = [] + if "results" in auto_select_result: + for proj_result in auto_select_result["results"].values(): + if "conversation_id_list" in proj_result: + selected_conversation_ids.extend(proj_result["conversation_id_list"]) + + # Add selected conversations to chat context + conversations_added = [] + for conversation_id in selected_conversation_ids: + conversation = db.get(ConversationModel, conversation_id) + if conversation and conversation not in chat.used_conversations: + chat.used_conversations.append(conversation) + conversations_added.append(conversation) + + # Create a message to lock the auto-selected conversations + if conversations_added: + auto_select_message = ProjectChatMessageModel( + id=generate_uuid(), + date_created=get_utc_timestamp(), + message_from="dembrane", + text=f"Auto-selected and added {len(conversations_added)} conversations as context to the chat.", + project_chat_id=chat_id, + used_conversations=conversations_added, ) + db.add(auto_select_message) + db.commit() + logger.info(f"Added {len(conversations_added)} conversations via auto-select") - logger.info(f"Auto-select result: {auto_select_result}") - - # Extract selected conversation IDs - selected_conversation_ids = [] - if "results" in auto_select_result: - for proj_result in auto_select_result["results"].values(): - if "conversation_id_list" in proj_result: - selected_conversation_ids.extend(proj_result["conversation_id_list"]) - - # Add selected conversations to chat context - conversations_added = [] - for conversation_id in selected_conversation_ids: - conversation = db.get(ConversationModel, conversation_id) - if conversation and conversation not in chat.used_conversations: - chat.used_conversations.append(conversation) - conversations_added.append(conversation) - all_conversations_added.append(conversation) # Track all added - - # Create a message to lock the auto-selected conversations - if conversations_added: - auto_select_message = ProjectChatMessageModel( - id=generate_uuid(), - date_created=get_utc_timestamp(), - message_from="dembrane", - text=f"Auto-selected and added {len(conversations_added)} conversations as context to the chat.", - project_chat_id=chat_id, - used_conversations=conversations_added, - ) - db.add(auto_select_message) - db.commit() - logger.info(f"Added {len(conversations_added)} conversations via auto-select") + # Get updated chat context + updated_chat_context = await get_chat_context(chat_id, db, auth) + updated_conversation_id_list = updated_chat_context.conversation_id_list - # Get updated chat context - updated_chat_context = await get_chat_context(chat_id, db, auth) - updated_conversation_id_list = updated_chat_context.conversation_id_list + # Build system messages from the selected conversations + system_messages = await create_system_messages_for_chat( + updated_conversation_id_list, db, language, project_id + ) - # Build system messages from the selected conversations - system_messages = await create_system_messages_for_chat( - updated_conversation_id_list, db, language, project_id - ) + # Build messages to send + formatted_messages = [] + if isinstance(system_messages, list): + for msg in system_messages: + formatted_messages.append({"role": "system", "content": msg["text"]}) + formatted_messages.extend(conversation_history) + else: + formatted_messages = [ + {"role": "system", "content": system_messages} + ] + conversation_history - # Build messages to send - if isinstance(system_messages, list): - for msg in system_messages: - formatted_messages.append({"role": "system", "content": msg["text"]}) - formatted_messages.extend(conversation_history) - else: - formatted_messages = [ - {"role": "system", "content": system_messages} - ] + conversation_history + # Check context length + prompt_len = token_counter( + model=LIGHTRAG_LITELLM_INFERENCE_MODEL, messages=formatted_messages + ) - # Check context length - prompt_len = token_counter( - model=LIGHTRAG_LITELLM_INFERENCE_MODEL, messages=formatted_messages + if prompt_len > MAX_CHAT_CONTEXT_LENGTH: + raise HTTPException( + status_code=400, + detail="Auto select returned too many conversations. The selected conversations exceed the maximum context length.", ) - if top_k <= 5: - raise HTTPException( - status_code=400, - detail="Auto select is not possible with the current context length", - ) - # Build references list from ALL conversations in context (both manually selected and auto-selected) conversation_references: dict[str, list[dict[str, str]]] = {"references": []} # Use chat.used_conversations directly - it already has all conversation objects From df058eec51e7d9560b45eaed542fa6e5260d8a0e Mon Sep 17 00:00:00 2001 From: Usama Date: Thu, 2 Oct 2025 11:28:31 +0000 Subject: [PATCH 10/13] - add optional batch size field --- echo/server/dembrane/chat_utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index 7f65b2a6..c2b531dd 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -252,6 +252,7 @@ async def auto_select_conversations( project_id_list: List[str], db: Session, language: str = "en", + batch_size: int = 20, ) -> Dict[str, Any]: """ Auto-select conversations based on user queries using LLM-based relevance assessment. @@ -266,6 +267,7 @@ async def auto_select_conversations( project_id_list: List containing a single project ID db: Database session language: Language code for the prompt template (default: "en") + batch_size: Number of conversations to process in each LLM call (default: 20) Returns: Dictionary with structure: @@ -282,8 +284,8 @@ async def auto_select_conversations( results: Dict[str, Any] = {} # Batch size: number of conversations to process in each LLM call - # Adjust based on context limits and average summary length - BATCH_SIZE = 20 + # Can be adjusted per-chat via the auto_select_batch_size field + BATCH_SIZE = batch_size for project_id in project_id_list: # Get all conversations for this project From a7d613026da8ccc1027c16b76d949734a0811153 Mon Sep 17 00:00:00 2001 From: Usama Date: Thu, 2 Oct 2025 13:46:19 +0000 Subject: [PATCH 11/13] - add context length exhaustion handling logic - add retry logic upto 3 times for LLM calls - add exception handling for LLM calls --- echo/server/dembrane/api/chat.py | 15 ++-- echo/server/dembrane/chat_utils.py | 106 +++++++++++++++++++++++++---- 2 files changed, 105 insertions(+), 16 deletions(-) diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index daa8dda5..5124a770 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -14,6 +14,9 @@ from dembrane.utils import generate_uuid, get_utc_timestamp from dembrane.config import ( + SMALL_LITELLM_MODEL, + SMALL_LITELLM_API_KEY, + SMALL_LITELLM_API_BASE, ENABLE_CHAT_AUTO_SELECT, LIGHTRAG_LITELLM_INFERENCE_MODEL, LIGHTRAG_LITELLM_INFERENCE_API_KEY, @@ -80,12 +83,12 @@ async def is_followup_question( try: response = await litellm.acompletion( - model=LIGHTRAG_LITELLM_INFERENCE_MODEL, - api_key=LIGHTRAG_LITELLM_INFERENCE_API_KEY, - api_version=LIGHTRAG_LITELLM_INFERENCE_API_VERSION, - api_base=LIGHTRAG_LITELLM_INFERENCE_API_BASE, + model=SMALL_LITELLM_MODEL, + api_key=SMALL_LITELLM_API_KEY, + api_base=SMALL_LITELLM_API_BASE, messages=[{"role": "user", "content": prompt}], temperature=0, # Deterministic + timeout=60, # 1 minute timeout for quick decision ) result_text = response.choices[0].message.content.strip() @@ -643,6 +646,8 @@ async def stream_response_async_autoselect() -> AsyncGenerator[str, None]: api_base=LIGHTRAG_LITELLM_INFERENCE_API_BASE, messages=formatted_messages, stream=True, + timeout=300, # 5 minute timeout for response + stream_timeout=180, # 3 minute timeout for streaming # mock_response="It's simple to use and easy to get started", ) async for chunk in response: @@ -733,6 +738,8 @@ async def stream_response_async_manualselect() -> AsyncGenerator[str, None]: api_base=LIGHTRAG_LITELLM_INFERENCE_API_BASE, messages=messages_to_send, stream=True, + timeout=300, # 5 minute timeout for response + stream_timeout=180, # 3 minute timeout for streaming ) async for chunk in response: if chunk.choices[0].delta.content: diff --git a/echo/server/dembrane/chat_utils.py b/echo/server/dembrane/chat_utils.py index c2b531dd..0c73bd8b 100644 --- a/echo/server/dembrane/chat_utils.py +++ b/echo/server/dembrane/chat_utils.py @@ -4,11 +4,17 @@ import logging from typing import Any, Dict, List, Optional +import backoff from litellm import completion, acompletion from pydantic import BaseModel -from sqlalchemy.orm import ( - Session, - selectinload, +from litellm.utils import token_counter +from sqlalchemy.orm import Session, selectinload +from litellm.exceptions import ( + Timeout, + APIError, + RateLimitError, + BadRequestError, + ContextWindowExceededError, ) from dembrane.config import ( @@ -369,6 +375,25 @@ async def auto_select_conversations( return {"results": results} +@backoff.on_exception( + backoff.expo, + (RateLimitError, Timeout, APIError), + max_tries=3, + max_time=5 * 60, # 5 minutes +) +async def _call_llm_with_backoff(prompt: str, batch_num: int) -> Any: + """Call LLM with automatic retry for transient errors.""" + logger.debug(f"Calling LLM for batch {batch_num}") + return await acompletion( + model=SMALL_LITELLM_MODEL, + messages=[{"role": "user", "content": prompt}], + api_base=SMALL_LITELLM_API_BASE, + api_key=SMALL_LITELLM_API_KEY, + response_format={"type": "json_object"}, + timeout=5 * 60, # 5 minutes + ) + + async def _process_single_batch( batch: List[ConversationModel], batch_num: int, @@ -445,14 +470,55 @@ async def _process_single_batch( }, ) - # Call the LLM + # Validate prompt size before sending try: - response = await acompletion( - model=SMALL_LITELLM_MODEL, - messages=[{"role": "user", "content": prompt}], - api_base=SMALL_LITELLM_API_BASE, - api_key=SMALL_LITELLM_API_KEY, - response_format={"type": "json_object"}, + prompt_tokens = token_counter(model=SMALL_LITELLM_MODEL, text=prompt) + MAX_BATCH_CONTEXT = 100000 # Leave headroom for response + + if prompt_tokens > MAX_BATCH_CONTEXT: + # If batch has only 1 conversation, we can't split further + if len(batch) == 1: + logger.error( + f"Batch {batch_num} single conversation exceeds context limit: " + f"{prompt_tokens} tokens. Skipping conversation {batch[0].id}." + ) + return { + "selected_ids": [], + "batch_num": batch_num, + "error": "single_conversation_too_large", + } + + # Split batch in half and process recursively + mid = len(batch) // 2 + batch_1 = batch[:mid] + batch_2 = batch[mid:] + + logger.warning( + f"Batch {batch_num} prompt too large ({prompt_tokens} tokens). " + f"Splitting into 2 sub-batches: {len(batch_1)} and {len(batch_2)} conversations." + ) + + # Process both halves recursively + result_1 = await _process_single_batch(batch_1, batch_num, user_query_inputs, language) + result_2 = await _process_single_batch(batch_2, batch_num, user_query_inputs, language) + + # Combine results from both sub-batches + combined_ids = result_1.get("selected_ids", []) + result_2.get("selected_ids", []) + + logger.info( + f"Batch {batch_num} split processing complete: " + f"{len(combined_ids)} conversations selected from sub-batches." + ) + + return {"selected_ids": combined_ids, "batch_num": batch_num} + except Exception as e: + logger.warning(f"Could not count tokens for batch {batch_num}: {e}") + + # Call the LLM with retry logic for transient errors + try: + response = await _call_llm_with_backoff( + prompt=prompt, + batch_num=batch_num, ) if response.choices[0].message.content: @@ -484,10 +550,26 @@ async def _process_single_batch( logger.warning(f"No response from LLM for batch {batch_num}") return {"selected_ids": [], "batch_num": batch_num} - except Exception as e: - logger.error(f"Error processing batch {batch_num}: {str(e)}. Skipping batch.") + except ContextWindowExceededError as e: + logger.error( + f"Batch {batch_num} exceeded context window ({len(batch)} conversations). " + f"Error: {str(e)}" + ) + return {"selected_ids": [], "batch_num": batch_num, "error": "context_exceeded"} + + except (RateLimitError, Timeout) as e: + # These are already retried by backoff, so if we get here, all retries failed + logger.error(f"Batch {batch_num} failed after retries: {type(e).__name__}") return {"selected_ids": [], "batch_num": batch_num, "error": str(e)} + except (APIError, BadRequestError) as e: + logger.error(f"Batch {batch_num} API error: {str(e)}") + return {"selected_ids": [], "batch_num": batch_num, "error": "api_error"} + + except Exception as e: + logger.error(f"Batch {batch_num} unexpected error: {str(e)}") + return {"selected_ids": [], "batch_num": batch_num, "error": "unknown"} + async def get_conversation_citations( rag_prompt: str, From 75002020cb722e12d394efd8e2238018d6b5f98f Mon Sep 17 00:00:00 2001 From: Usama Date: Thu, 2 Oct 2025 14:20:17 +0000 Subject: [PATCH 12/13] - donot show references if no new conversations were added --- echo/server/dembrane/api/chat.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index 5124a770..6c3c400a 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -516,6 +516,9 @@ async def post_chat( query = filtered_messages[-1]["content"] conversation_history = filtered_messages + # Track newly added conversations for displaying in the frontend + conversations_added: list[ConversationModel] = [] + # Check if this is a follow-up question (only if we have locked conversations) should_reuse_locked = False if locked_conversation_id_list: @@ -617,10 +620,10 @@ async def post_chat( detail="Auto select returned too many conversations. The selected conversations exceed the maximum context length.", ) - # Build references list from ALL conversations in context (both manually selected and auto-selected) + # Build references list from ONLY newly added conversations (not all conversations) conversation_references: dict[str, list[dict[str, str]]] = {"references": []} - # Use chat.used_conversations directly - it already has all conversation objects - for conv in chat.used_conversations: + # Only include conversations that were just added via auto-select + for conv in conversations_added: conversation_references["references"].append( { "conversation": conv.id, @@ -628,9 +631,7 @@ async def post_chat( } ) - logger.info( - f"Selected conversations for frontend (manually selected + auto-selected): {conversation_references}" - ) + logger.info(f"Newly added conversations for frontend: {conversation_references}") async def stream_response_async_autoselect() -> AsyncGenerator[str, None]: # Send conversation references (selected conversations) From cd98d279a0ca9690732711b4408eb28ec5d936cc Mon Sep 17 00:00:00 2001 From: Usama Date: Thu, 2 Oct 2025 14:31:06 +0000 Subject: [PATCH 13/13] - add context length check in reuse locked conversations --- echo/server/dembrane/api/chat.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/echo/server/dembrane/api/chat.py b/echo/server/dembrane/api/chat.py index 6c3c400a..86cd1737 100644 --- a/echo/server/dembrane/api/chat.py +++ b/echo/server/dembrane/api/chat.py @@ -546,6 +546,17 @@ async def post_chat( formatted_messages = [ {"role": "system", "content": system_messages} ] + conversation_history + + # Check context length + prompt_len = token_counter( + model=LIGHTRAG_LITELLM_INFERENCE_MODEL, messages=formatted_messages + ) + + if prompt_len > MAX_CHAT_CONTEXT_LENGTH: + raise HTTPException( + status_code=400, + detail="The conversation context with the new message exceeds the maximum context length.", + ) else: # Run auto-select for first query or new independent questions user_query_inputs = [query]