Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 177 additions & 1 deletion dimos/agents/agent_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
"""Agent-specific types for message passing."""

from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from typing import List, Optional, Dict, Any, Union
import threading
import time
import json


@dataclass
Expand Down Expand Up @@ -67,3 +69,177 @@ def __repr__(self) -> str:
content_preview = self.content[:50] + "..." if len(self.content) > 50 else self.content
tool_info = f", tools={len(self.tool_calls)}" if self.tool_calls else ""
return f"AgentResponse(role='{self.role}', content='{content_preview}'{tool_info})"


@dataclass
class ConversationMessage:
"""Single message in conversation history.

Represents a message in the conversation that can be converted to
different formats (OpenAI, TensorZero, etc).
"""

role: str # "system", "user", "assistant", "tool"
content: Union[str, List[Dict[str, Any]]] # Text or content blocks
tool_calls: Optional[List[ToolCall]] = None
tool_call_id: Optional[str] = None # For tool responses
timestamp: float = field(default_factory=time.time)

def to_openai_format(self) -> Dict[str, Any]:
"""Convert to OpenAI API format."""
msg = {"role": self.role}

# Handle content
if isinstance(self.content, str):
msg["content"] = self.content
else:
# Content is already a list of content blocks
msg["content"] = self.content

# Add tool calls if present
if self.tool_calls:
msg["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {"name": tc.name, "arguments": json.dumps(tc.arguments)},
}
for tc in self.tool_calls
]

# Add tool_call_id for tool responses
if self.tool_call_id:
msg["tool_call_id"] = self.tool_call_id

return msg

def __repr__(self) -> str:
content_preview = (
str(self.content)[:50] + "..." if len(str(self.content)) > 50 else str(self.content)
)
return f"ConversationMessage(role='{self.role}', content='{content_preview}')"


class ConversationHistory:
"""Thread-safe conversation history manager.

Manages conversation history with proper formatting for different
LLM providers and automatic trimming.
"""

def __init__(self, max_size: int = 20):
"""Initialize conversation history.

Args:
max_size: Maximum number of messages to keep
"""
self._messages: List[ConversationMessage] = []
self._lock = threading.Lock()
self.max_size = max_size

def add_user_message(self, content: Union[str, List[Dict[str, Any]]]) -> None:
"""Add user message to history.

Args:
content: Text string or list of content blocks (for multimodal)
"""
with self._lock:
self._messages.append(ConversationMessage(role="user", content=content))
self._trim()

def add_assistant_message(
self, content: str, tool_calls: Optional[List[ToolCall]] = None
) -> None:
"""Add assistant response to history.

Args:
content: Response text
tool_calls: Optional list of tool calls made
"""
with self._lock:
self._messages.append(
ConversationMessage(role="assistant", content=content, tool_calls=tool_calls)
)
self._trim()

def add_tool_result(self, tool_call_id: str, content: str) -> None:
"""Add tool execution result to history.

Args:
tool_call_id: ID of the tool call this is responding to
content: Result of the tool execution
"""
with self._lock:
self._messages.append(
ConversationMessage(role="tool", content=content, tool_call_id=tool_call_id)
)
self._trim()

def add_raw_message(self, message: Dict[str, Any]) -> None:
"""Add a raw message dict to history.

Args:
message: Message dict with role and content
"""
with self._lock:
# Extract fields from raw message
role = message.get("role", "user")
content = message.get("content", "")

# Handle tool calls if present
tool_calls = None
if "tool_calls" in message:
tool_calls = [
ToolCall(
id=tc["id"],
name=tc["function"]["name"],
arguments=json.loads(tc["function"]["arguments"])
if isinstance(tc["function"]["arguments"], str)
else tc["function"]["arguments"],
status="completed",
)
for tc in message["tool_calls"]
]

# Handle tool_call_id for tool responses
tool_call_id = message.get("tool_call_id")

self._messages.append(
ConversationMessage(
role=role, content=content, tool_calls=tool_calls, tool_call_id=tool_call_id
)
)
self._trim()

def to_openai_format(self) -> List[Dict[str, Any]]:
"""Export history in OpenAI format.

Returns:
List of message dicts in OpenAI format
"""
with self._lock:
return [msg.to_openai_format() for msg in self._messages]

def clear(self) -> None:
"""Clear all conversation history."""
with self._lock:
self._messages.clear()

def size(self) -> int:
"""Get number of messages in history.

Returns:
Number of messages
"""
with self._lock:
return len(self._messages)

def _trim(self) -> None:
"""Trim history to max_size (must be called within lock)."""
if len(self._messages) > self.max_size:
# Keep the most recent messages
self._messages = self._messages[-self.max_size :]

def __repr__(self) -> str:
with self._lock:
return f"ConversationHistory(messages={len(self._messages)}, max_size={self.max_size})"
109 changes: 76 additions & 33 deletions dimos/agents/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from dimos.skills.skills import AbstractSkill, SkillLibrary
from dimos.utils.logging_config import setup_logger
from dimos.agents.agent_message import AgentMessage
from dimos.agents.agent_types import AgentResponse, ToolCall
from dimos.agents.agent_types import AgentResponse, ToolCall, ConversationHistory

try:
from .gateway import UnifiedGatewayClient
Expand Down Expand Up @@ -102,7 +102,7 @@ def __init__(
self.temperature = temperature
self.max_tokens = max_tokens
self.max_input_tokens = max_input_tokens
self.max_history = max_history
self._max_history = max_history
self.rag_n = rag_n
self.rag_threshold = rag_threshold
self.dev_name = dev_name
Expand Down Expand Up @@ -132,9 +132,8 @@ def __init__(
# Initialize gateway
self.gateway = UnifiedGatewayClient()

# Conversation history
self.history = []
self._history_lock = threading.Lock()
# Conversation history with proper format management
self.conversation = ConversationHistory(max_size=self._max_history)

# Thread pool for async operations
self._executor = ThreadPoolExecutor(max_workers=2)
Expand All @@ -148,6 +147,17 @@ def __init__(
# Initialize memory with default context
self._initialize_memory()

@property
def max_history(self) -> int:
"""Get max history size."""
return self._max_history

@max_history.setter
def max_history(self, value: int):
"""Set max history size and update conversation."""
self._max_history = value
self.conversation.max_size = value

def _check_vision_support(self) -> bool:
"""Check if the model supports vision."""
return self.model in VISION_MODELS
Expand Down Expand Up @@ -197,6 +207,23 @@ async def _process_query_async(self, agent_msg: AgentMessage) -> AgentResponse:
# Get tools if available
tools = self.skills.get_tools() if len(self.skills) > 0 else None

# Debug logging before gateway call
logger.debug("=== Gateway Request ===")
logger.debug(f"Model: {self.model}")
logger.debug(f"Number of messages: {len(messages)}")
for i, msg in enumerate(messages):
role = msg.get("role", "unknown")
content = msg.get("content", "")
if isinstance(content, str):
content_preview = content[:100]
elif isinstance(content, list):
content_preview = f"[{len(content)} content blocks]"
else:
content_preview = str(content)[:100]
logger.debug(f" Message {i}: role={role}, content={content_preview}...")
logger.debug(f"Tools available: {len(tools) if tools else 0}")
logger.debug("======================")

# Make inference call
response = await self.gateway.ainference(
model=self.model,
Expand Down Expand Up @@ -243,17 +270,17 @@ async def _process_query_async(self, agent_msg: AgentMessage) -> AgentResponse:
)
else:
# No tools, add both user and assistant messages to history
with self._history_lock:
# Add user message
user_msg = messages[-1] # Last message in messages is the user message
self.history.append(user_msg)

# Add assistant response
self.history.append(message)

# Trim history if needed
if len(self.history) > self.max_history:
self.history = self.history[-self.max_history :]
# Get the user message content from the built message
user_msg = messages[-1] # Last message in messages is the user message
user_content = user_msg["content"]

# Add to conversation history
logger.info(f"=== Adding to history (no tools) ===")
logger.info(f" Adding user message: {str(user_content)[:100]}...")
self.conversation.add_user_message(user_content)
logger.info(f" Adding assistant response: {content[:100]}...")
self.conversation.add_assistant_message(content)
logger.info(f" History size now: {self.conversation.size()}")

return AgentResponse(
content=content,
Expand Down Expand Up @@ -293,10 +320,23 @@ def _build_messages(
system_content += f"\n\nRelevant context: {rag_context}"
messages.append({"role": "system", "content": system_content})

# Add conversation history
with self._history_lock:
# History items should already be Message objects or dicts
messages.extend(self.history)
# Add conversation history in OpenAI format
history_messages = self.conversation.to_openai_format()
messages.extend(history_messages)

# Debug history state
logger.info(f"=== Building messages with {len(history_messages)} history messages ===")
if history_messages:
for i, msg in enumerate(history_messages):
role = msg.get("role", "unknown")
content = msg.get("content", "")
if isinstance(content, str):
preview = content[:100]
elif isinstance(content, list):
preview = f"[{len(content)} content blocks]"
else:
preview = str(content)[:100]
logger.info(f" History[{i}]: role={role}, content={preview}")

# Build user message content from AgentMessage
user_content = agent_msg.get_combined_text() if agent_msg.has_text() else ""
Expand Down Expand Up @@ -395,19 +435,22 @@ async def _handle_tool_calls(
final_message = response["choices"][0]["message"]

# Now add all messages to history in order (like Claude does)
with self._history_lock:
# Add user message
self.history.append(user_message)
# Add assistant message with tool calls
self.history.append(assistant_msg)
# Add all tool results
self.history.extend(tool_results)
# Add final assistant response
self.history.append(final_message)

# Trim history if needed
if len(self.history) > self.max_history:
self.history = self.history[-self.max_history :]
# Add user message
user_content = user_message["content"]
self.conversation.add_user_message(user_content)

# Add assistant message with tool calls
self.conversation.add_assistant_message("", tool_calls)

# Add tool results
for result in tool_results:
self.conversation.add_tool_result(
tool_call_id=result["tool_call_id"], content=result["content"]
)

# Add final assistant response
final_content = final_message.get("content", "")
self.conversation.add_assistant_message(final_content)

return final_message.get("content", "")

Expand Down
Loading