Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 72 additions & 2 deletions backend/app/agents/devrel/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from functools import partial
from langgraph.graph import StateGraph, END
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.checkpoint.memory import InMemorySaver
from ..shared.base_agent import BaseAgent, AgentState
from ..shared.classification_router import MessageCategory
from .tools.search_tool import TavilySearchTool
Expand All @@ -14,6 +15,7 @@
from .nodes.handle_technical_support_node import handle_technical_support_node
from .nodes.handle_onboarding_node import handle_onboarding_node
from .nodes.generate_response_node import generate_response_node
from .nodes.summarization_node import check_summarization_needed, summarize_conversation_node, store_summary_to_database

logger = logging.getLogger(__name__)

Expand All @@ -29,6 +31,7 @@ def __init__(self, config: Dict[str, Any] = None):
)
self.search_tool = TavilySearchTool()
self.faq_tool = FAQTool()
self.checkpointer = InMemorySaver()
super().__init__("DevRelAgent", self.config)

def _build_graph(self):
Expand All @@ -43,6 +46,8 @@ def _build_graph(self):
workflow.add_node("handle_technical_support", handle_technical_support_node)
workflow.add_node("handle_onboarding", handle_onboarding_node)
workflow.add_node("generate_response", partial(generate_response_node, llm=self.llm))
workflow.add_node("check_summarization", check_summarization_needed)
workflow.add_node("summarize_conversation", partial(summarize_conversation_node, llm=self.llm))

# Add edges
workflow.add_conditional_edges(
Expand All @@ -65,12 +70,26 @@ def _build_graph(self):
for node in ["handle_faq", "handle_web_search", "handle_technical_support", "handle_onboarding"]:
workflow.add_edge(node, "generate_response")

workflow.add_edge("generate_response", END)
workflow.add_edge("generate_response", "check_summarization")

# Conditional edge for summarization
workflow.add_conditional_edges(
"check_summarization",
self._should_summarize,
{
"summarize": "summarize_conversation",
"end": END
}
)

# End after summarization
workflow.add_edge("summarize_conversation", END)

# Set entry point
workflow.set_entry_point("gather_context")

self.graph = workflow.compile()
# Compile with InMemorySaver checkpointer
self.graph = workflow.compile(checkpointer=self.checkpointer)

def _route_to_handler(self, state: AgentState) -> str:
"""Route to the appropriate handler based on intent"""
Expand Down Expand Up @@ -98,3 +117,54 @@ def _route_to_handler(self, state: AgentState) -> str:
# Later to be changed to handle anomalies
logger.info(f"Unknown intent '{intent}', routing to technical support")
return MessageCategory.TECHNICAL_SUPPORT

def _should_summarize(self, state: AgentState) -> str:
"""Determine if conversation should be summarized"""
if state.summarization_needed:
logger.info(f"Summarization needed for session {state.session_id}")
return "summarize"
return "end"

async def get_thread_state(self, thread_id: str) -> Dict[str, Any]:
"""Get the current state of a thread"""
try:
config = {"configurable": {"thread_id": thread_id}}
state = self.graph.get_state(config)
return state.values if state else {}
except Exception as e:
logger.error(f"Error getting thread state: {str(e)}")
return {}

async def clear_thread_memory(self, thread_id: str, force_clear: bool = False) -> bool:
"""Clear memory for a specific thread using memory_timeout_reached flag"""
try:
config = {"configurable": {"thread_id": thread_id}}
state = self.graph.get_state(config)

if state and state.values:
agent_state = AgentState(**state.values)

# Check the memory_timeout_reached flag
if agent_state.memory_timeout_reached or force_clear:
if agent_state.memory_timeout_reached:
logger.info(f"Thread {thread_id} timeout flag set, storing final summary and clearing memory")
else:
logger.info(f"Force clearing memory for thread {thread_id}")

# Store final summary to database before clearing
await store_summary_to_database(agent_state)

# Delete the thread from InMemorySaver
self.checkpointer.delete_thread(thread_id)
logger.info(f"Successfully cleared memory for thread {thread_id}")
return True
else:
logger.info(f"Thread {thread_id} has not timed out, memory preserved")
return False
else:
logger.info(f"No state found for thread {thread_id}, nothing to clear")
return True

except Exception as e:
logger.error(f"Error clearing thread memory: {str(e)}")
return False
Comment thread
smokeyScraper marked this conversation as resolved.
Empty file.
22 changes: 18 additions & 4 deletions backend/app/agents/devrel/nodes/gather_context_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime
from app.agents.shared.state import AgentState
from app.agents.shared.classification_router import MessageCategory

Expand All @@ -11,12 +12,25 @@ async def gather_context_node(state: AgentState) -> AgentState:
# TODO: Add context gathering from databases
# Currently, context is simple
# In production, query databases for user history, etc.

original_message = state.context.get("original_message", "")

new_message = {
"role": "user",
"content": original_message,
"timestamp": datetime.now().isoformat()
}

context_data = {
"user_profile": {"user_id": state.user_id, "platform": state.platform},
"conversation_context": len(state.messages),
"conversation_context": len(state.messages) + 1, # +1 for the new message
"session_info": {"session_id": state.session_id}
}

state.context.update(context_data)
state.current_task = "context_gathered"
return state
updated_context = {**state.context, **context_data}

return {
"messages": [new_message],
"context": updated_context,
"current_task": "context_gathered"
}
55 changes: 39 additions & 16 deletions backend/app/agents/devrel/nodes/generate_response_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,43 +34,66 @@ async def _create_llm_response(state: AgentState, task_result: Dict[str, Any], l
elif state.context.get("original_message"):
latest_message = state.context["original_message"]

conversation_summary = state.conversation_summary or "This is the beginning of our conversation."

recent_messages_count = min(10, len(state.messages))
conversation_history_str = "\n".join([
f"{msg.get('type', 'unknown')}: {msg.get('content', '')}"
for msg in state.conversation_history[-5:]
f"{msg.get('role', 'user')}: {msg.get('content', '')}"
for msg in state.messages[-recent_messages_count:]
])
Comment on lines +39 to 43
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fallback to legacy "type" field to avoid empty roles after migration

Older AgentState.messages objects still use the "type" key. Restricting to "role" risks emitting "user" for every historic message, degrading context.

-        f"{msg.get('role', 'user')}: {msg.get('content', '')}"
+        f"{msg.get('role', msg.get('type', 'user'))}: {msg.get('content', '')}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
recent_messages_count = min(10, len(state.messages))
conversation_history_str = "\n".join([
f"{msg.get('type', 'unknown')}: {msg.get('content', '')}"
for msg in state.conversation_history[-5:]
f"{msg.get('role', 'user')}: {msg.get('content', '')}"
for msg in state.messages[-recent_messages_count:]
])
recent_messages_count = min(10, len(state.messages))
conversation_history_str = "\n".join([
f"{msg.get('role', msg.get('type', 'user'))}: {msg.get('content', '')}"
for msg in state.messages[-recent_messages_count:]
])
🤖 Prompt for AI Agents
In backend/app/agents/devrel/nodes/generate_response_node.py around lines 39 to
43, the code currently uses only the "role" key to determine message roles,
which causes fallback to "user" for older messages that use the "type" key.
Update the code to first check for the "role" key and if missing, fallback to
the "type" key to correctly preserve roles from legacy messages and avoid
degrading context.

current_context_str = str(state.context)
task_type_str = str(task_result.get("type", "N/A"))
task_details_str = str(task_result)

total_messages = len(state.messages)
if total_messages > recent_messages_count:
conversation_history_str = f"[Showing last {recent_messages_count} of {total_messages} messages]\n" + \
conversation_history_str

context_parts = [
f"Platform: {state.platform}",
f"Total interactions: {state.interaction_count}",
f"Session duration: {(state.last_interaction_time - state.session_start_time).total_seconds() / 60:.1f} minutes"
]

if state.key_topics:
context_parts.append(f"Key topics discussed: {', '.join(state.key_topics)}")

if state.user_profile:
context_parts.append(f"User profile: {state.user_profile}")

current_context_str = "\n".join(context_parts)

try:
prompt = GENERAL_LLM_RESPONSE_PROMPT.format(
conversation_summary=conversation_summary,
latest_message=latest_message,
conversation_history=conversation_history_str,
current_context=current_context_str,
task_type=task_type_str,
task_details=task_details_str
task_type=task_result.get("type", "general"),
task_details=str(task_result)
)

logger.info(f"Prompt includes summary: {len(conversation_summary)} chars, "
f"recent history: {recent_messages_count} messages, "
f"total history: {total_messages} messages")
except KeyError as e:
logger.error(f"Missing key in GENERAL_LLM_RESPONSE_PROMPT: {e}")
return "Error: Response template formatting error."

response = await llm.ainvoke([HumanMessage(content=prompt)])
return response.content.strip()

async def generate_response_node(state: AgentState, llm) -> AgentState:
async def generate_response_node(state: AgentState, llm) -> dict:
"""Generate final response to user"""
logger.info(f"Generating response for session {state.session_id}")
task_result = state.task_result or {}

if task_result.get("type") == "faq":
state.final_response = task_result.get("response", "I don't have a specific answer for that question.")
final_response = task_result.get("response", "I don't have a specific answer for that question.")
elif task_result.get("type") == "web_search":
response = await _create_search_response(task_result)
state.final_response = response
final_response = await _create_search_response(task_result)
else:
# Pass the llm instance to _create_llm_response
response = await _create_llm_response(state, task_result, llm)
state.final_response = response
final_response = await _create_llm_response(state, task_result, llm)

state.current_task = "response_generated"
return state
return {
"final_response": final_response,
"current_task": "response_generated"
}
16 changes: 8 additions & 8 deletions backend/app/agents/devrel/nodes/handle_faq_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

logger = logging.getLogger(__name__)

async def handle_faq_node(state: AgentState, faq_tool) -> AgentState:
async def handle_faq_node(state: AgentState, faq_tool) -> dict:
"""Handle FAQ requests"""
logger.info(f"Handling FAQ for session {state.session_id}")

Expand All @@ -16,11 +16,11 @@ async def handle_faq_node(state: AgentState, faq_tool) -> AgentState:
# faq_tool will be passed from the agent, similar to llm for classify_intent
faq_response = await faq_tool.get_response(latest_message)

state.task_result = {
"type": "faq",
"response": faq_response,
"source": "faq_database"
return {
"task_result": {
"type": "faq",
"response": faq_response,
"source": "faq_database"
},
"current_task": "faq_handled"
}
Comment on lines +19 to 26
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling around external FAQ call

An exception inside faq_tool.get_response will bubble up and break the graph. Wrap it to surface a controlled failure:

-    faq_response = await faq_tool.get_response(latest_message)
+    try:
+        faq_response = await faq_tool.get_response(latest_message)
+    except Exception as e:
+        logger.exception("FAQ tool failed")
+        return {
+            "errors": [str(e)],
+            "current_task": "faq_failed"
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return {
"task_result": {
"type": "faq",
"response": faq_response,
"source": "faq_database"
},
"current_task": "faq_handled"
}
try:
faq_response = await faq_tool.get_response(latest_message)
except Exception as e:
logger.exception("FAQ tool failed")
return {
"errors": [str(e)],
"current_task": "faq_failed"
}
return {
"task_result": {
"type": "faq",
"response": faq_response,
"source": "faq_database"
},
"current_task": "faq_handled"
}
🤖 Prompt for AI Agents
In backend/app/agents/devrel/nodes/handle_faq_node.py around lines 19 to 26, the
call to faq_tool.get_response lacks error handling, so any exception will
propagate and break the graph. Wrap the call to faq_tool.get_response in a
try-except block, catch exceptions, and return a controlled failure response
indicating the error instead of letting the exception bubble up.


state.current_task = "faq_handled"
return state
14 changes: 7 additions & 7 deletions backend/app/agents/devrel/nodes/handle_onboarding_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ async def handle_onboarding_node(state: AgentState) -> AgentState:
"""Handle onboarding requests"""
logger.info(f"Handling onboarding for session {state.session_id}")

state.task_result = {
"type": "onboarding",
"action": "welcome_and_guide",
"next_steps": ["setup_environment", "first_contribution", "join_community"]
return {
"task_result": {
"type": "onboarding",
"action": "welcome_and_guide",
"next_steps": ["setup_environment", "first_contribution", "join_community"]
},
"current_task": "onboarding_handled"
}

state.current_task = "onboarding_handled"
return state
14 changes: 7 additions & 7 deletions backend/app/agents/devrel/nodes/handle_technical_support_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ async def handle_technical_support_node(state: AgentState) -> AgentState:
"""Handle technical support requests"""
logger.info(f"Handling technical support for session {state.session_id}")

state.task_result = {
"type": "technical_support",
"action": "provide_guidance",
"requires_human_review": False
return {
"task_result": {
"type": "technical_support",
"action": "provide_guidance",
"requires_human_review": False
},
"current_task": "technical_support_handled"
}

state.current_task = "technical_support_handled"
return state
19 changes: 10 additions & 9 deletions backend/app/agents/devrel/nodes/handle_web_search_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def _extract_search_query(message: str, llm) -> str:
logger.info(f"Extracted search query: {search_query}")
return search_query

async def handle_web_search_node(state: AgentState, search_tool, llm) -> AgentState:
async def handle_web_search_node(state: AgentState, search_tool, llm) -> dict:
"""Handle web search requests"""
logger.info(f"Handling web search for session {state.session_id}")

Expand All @@ -31,12 +31,13 @@ async def handle_web_search_node(state: AgentState, search_tool, llm) -> AgentSt
search_query = await _extract_search_query(latest_message, llm)
search_results = await search_tool.search(search_query)

state.task_result = {
"type": "web_search",
"query": search_query,
"results": search_results,
"source": "tavily_search"
return {
"task_result": {
"type": "web_search",
"query": search_query,
"results": search_results,
"source": "tavily_search"
},
"tools_used": ["tavily_search"],
"current_task": "web_search_handled"
}
state.tools_used.append("tavily_search")
state.current_task = "web_search_handled"
return state
Empty file.
Loading