diff --git a/README.md b/README.md index e859c9924c..9a74d63aa7 100644 --- a/README.md +++ b/README.md @@ -260,7 +260,7 @@ yarn dev # you may need to run sudo if previously built via Docker from dimos.robot.unitree.unitree_go2 import UnitreeGo2 from dimos.robot.unitree.unitree_skills import MyUnitreeSkills from dimos.robot.unitree.unitree_ros_control import UnitreeROSControl -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent # Initialize robot robot = UnitreeGo2(ip=robot_ip, diff --git a/assets/agent/prompt_agents2.txt b/assets/agent/prompt_agents.txt similarity index 100% rename from assets/agent/prompt_agents2.txt rename to assets/agent/prompt_agents.txt diff --git a/dimos/agents/__init__.py b/dimos/agents/__init__.py index e69de29bb2..8e099a21b4 100644 --- a/dimos/agents/__init__.py +++ b/dimos/agents/__init__.py @@ -0,0 +1,13 @@ +from langchain_core.messages import ( + AIMessage, + HumanMessage, + MessageLikeRepresentation, + SystemMessage, + ToolCall, + ToolMessage, +) + +from dimos.agents.agent import Agent, deploy +from dimos.agents.spec import AgentSpec +from dimos.protocol.skill.skill import skill +from dimos.protocol.skill.type import Output, Reducer, Stream diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index fc771ffecf..a232876fec 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -11,907 +11,433 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import asyncio +import datetime +import json +from operator import itemgetter +import os +from typing import Any, TypedDict +import uuid + +from langchain.chat_models import init_chat_model +from langchain_core.messages import ( + AIMessage, + HumanMessage, + SystemMessage, + ToolCall, + ToolMessage, +) +from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline + +from dimos.agents.ollama_agent import ensure_ollama_model +from dimos.agents.spec import AgentSpec, Model, Provider +from dimos.agents.system_prompt import get_system_prompt +from dimos.core import DimosCluster, rpc +from dimos.protocol.skill.coordinator import ( + SkillCoordinator, + SkillState, + SkillStateDict, +) +from dimos.protocol.skill.skill import SkillContainer +from dimos.protocol.skill.type import Output +from dimos.utils.logging_config import setup_logger -"""Agent framework for LLM-based autonomous systems. +logger = setup_logger() -This module provides a flexible foundation for creating agents that can: -- Process image and text inputs through LLM APIs -- Store and retrieve contextual information using semantic memory -- Handle tool/function calling -- Process streaming inputs asynchronously -The module offers base classes (Agent, LLMAgent) and concrete implementations -like OpenAIAgent that connect to specific LLM providers. -""" +SYSTEM_MSG_APPEND = "\nYour message history will always be appended with a System Overview message that provides situational awareness." -from __future__ import annotations -# Standard library imports -import json -import os -import threading -from typing import TYPE_CHECKING, Any - -# Third-party imports -from dotenv import load_dotenv -from openai import NOT_GIVEN, OpenAI -from pydantic import BaseModel -from reactivex import Observable, Observer, create, empty, just, operators as RxOps -from reactivex.disposable import CompositeDisposable, Disposable -from reactivex.subject import Subject - -# Local imports -from dimos.agents.memory.chroma_impl import OpenAISemanticMemory -from dimos.agents.prompt_builder.impl import PromptBuilder -from dimos.agents.tokenizer.openai_tokenizer import OpenAITokenizer -from dimos.skills.skills import AbstractSkill, SkillLibrary -from dimos.stream.frame_processor import FrameProcessor -from dimos.stream.stream_merger import create_stream_merger -from dimos.stream.video_operators import Operators as MyOps, VideoOperators as MyVidOps -from dimos.utils.logging_config import setup_logger -from dimos.utils.threadpool import get_scheduler +def toolmsg_from_state(state: SkillState) -> ToolMessage: + if state.skill_config.output != Output.standard: + content = "output attached in separate messages" + else: + content = state.content() # type: ignore[assignment] -if TYPE_CHECKING: - from reactivex.scheduler import ThreadPoolScheduler + return ToolMessage( + # if agent call has been triggered by another skill, + # and this specific skill didn't finish yet but we need a tool call response + # we return a message explaining that execution is still ongoing + content=content + or "Running, you will be called with an update, no need for subsequent tool calls", + name=state.name, + tool_call_id=state.call_id, + ) - from dimos.agents.memory.base import AbstractAgentSemanticMemory - from dimos.agents.tokenizer.base import AbstractTokenizer -# Initialize environment variables -load_dotenv() +class SkillStateSummary(TypedDict): + name: str + call_id: str + state: str + data: Any -# Initialize logger for the agent module -logger = setup_logger() -# Constants -_TOKEN_BUDGET_PARTS = 4 # Number of parts to divide token budget -_MAX_SAVED_FRAMES = 100 # Maximum number of frames to save +def summary_from_state(state: SkillState, special_data: bool = False) -> SkillStateSummary: + content = state.content() + if isinstance(content, dict): + content = json.dumps(content) + if not isinstance(content, str): + content = str(content) -# ----------------------------------------------------------------------------- -# region Agent Base Class -# ----------------------------------------------------------------------------- -class Agent: - """Base agent that manages memory and subscriptions.""" + return { + "name": state.name, + "call_id": state.call_id, + "state": state.state.name, + "data": state.content() if not special_data else "data will be in a separate message", + } - def __init__( - self, - dev_name: str = "NA", - agent_type: str = "Base", - agent_memory: AbstractAgentSemanticMemory | None = None, - pool_scheduler: ThreadPoolScheduler | None = None, - ) -> None: - """ - Initializes a new instance of the Agent. - - Args: - dev_name (str): The device name of the agent. - agent_type (str): The type of the agent (e.g., 'Base', 'Vision'). - agent_memory (AbstractAgentSemanticMemory): The memory system for the agent. - pool_scheduler (ThreadPoolScheduler): The scheduler to use for thread pool operations. - If None, the global scheduler from get_scheduler() will be used. - """ - self.dev_name = dev_name - self.agent_type = agent_type - self.agent_memory = agent_memory or OpenAISemanticMemory() - self.disposables = CompositeDisposable() - self.pool_scheduler = pool_scheduler if pool_scheduler else get_scheduler() - - def dispose_all(self) -> None: - """Disposes of all active subscriptions managed by this agent.""" - if self.disposables: - self.disposables.dispose() - else: - logger.info("No disposables to dispose.") - - -# endregion Agent Base Class - - -# ----------------------------------------------------------------------------- -# region LLMAgent Base Class (Generic LLM Agent) -# ----------------------------------------------------------------------------- -class LLMAgent(Agent): - """Generic LLM agent containing common logic for LLM-based agents. - - This class implements functionality for: - - Updating the query - - Querying the agent's memory (for RAG) - - Building prompts via a prompt builder - - Handling tooling callbacks in responses - - Subscribing to image and query streams - - Emitting responses as an observable stream - - Subclasses must implement the `_send_query` method, which is responsible - for sending the prompt to a specific LLM API. - - Attributes: - query (str): The current query text to process. - prompt_builder (PromptBuilder): Handles construction of prompts. - system_query (str): System prompt for RAG context situations. - image_detail (str): Detail level for image processing ('low','high','auto'). - max_input_tokens_per_request (int): Maximum input token count. - max_output_tokens_per_request (int): Maximum output token count. - max_tokens_per_request (int): Total maximum token count. - rag_query_n (int): Number of results to fetch from memory. - rag_similarity_threshold (float): Minimum similarity for RAG results. - frame_processor (FrameProcessor): Processes video frames. - output_dir (str): Directory for output files. - response_subject (Subject): Subject that emits agent responses. - process_all_inputs (bool): Whether to process every input emission (True) or - skip emissions when the agent is busy processing a previous input (False). - """ - - logging_file_memory_lock = threading.Lock() - - def __init__( - self, - dev_name: str = "NA", - agent_type: str = "LLM", - agent_memory: AbstractAgentSemanticMemory | None = None, - pool_scheduler: ThreadPoolScheduler | None = None, - process_all_inputs: bool = False, - system_query: str | None = None, - max_output_tokens_per_request: int = 16384, - max_input_tokens_per_request: int = 128000, - input_query_stream: Observable | None = None, # type: ignore[type-arg] - input_data_stream: Observable | None = None, # type: ignore[type-arg] - input_video_stream: Observable | None = None, # type: ignore[type-arg] - ) -> None: - """ - Initializes a new instance of the LLMAgent. - - Args: - dev_name (str): The device name of the agent. - agent_type (str): The type of the agent. - agent_memory (AbstractAgentSemanticMemory): The memory system for the agent. - pool_scheduler (ThreadPoolScheduler): The scheduler to use for thread pool operations. - If None, the global scheduler from get_scheduler() will be used. - process_all_inputs (bool): Whether to process every input emission (True) or - skip emissions when the agent is busy processing a previous input (False). - """ - super().__init__(dev_name, agent_type, agent_memory, pool_scheduler) - # These attributes can be configured by a subclass if needed. - self.query: str | None = None - self.prompt_builder: PromptBuilder | None = None - self.system_query: str | None = system_query - self.image_detail: str = "low" - self.max_input_tokens_per_request: int = max_input_tokens_per_request - self.max_output_tokens_per_request: int = max_output_tokens_per_request - self.max_tokens_per_request: int = ( - self.max_input_tokens_per_request + self.max_output_tokens_per_request - ) - self.rag_query_n: int = 4 - self.rag_similarity_threshold: float = 0.45 - self.frame_processor: FrameProcessor | None = None - self.output_dir: str = os.path.join(os.getcwd(), "assets", "agent") - self.process_all_inputs: bool = process_all_inputs - os.makedirs(self.output_dir, exist_ok=True) - - # Subject for emitting responses - self.response_subject = Subject() # type: ignore[var-annotated] - - # Conversation history for maintaining context between calls - self.conversation_history = [] # type: ignore[var-annotated] - - # Initialize input streams - self.input_video_stream = input_video_stream - self.input_query_stream = ( - input_query_stream - if (input_data_stream is None) - else ( - input_query_stream.pipe( # type: ignore[misc, union-attr] - RxOps.with_latest_from(input_data_stream), - RxOps.map( - lambda combined: { - "query": combined[0], # type: ignore[index] - "objects": combined[1] # type: ignore[index] - if len(combined) > 1 # type: ignore[arg-type] - else "No object data available", - } - ), - RxOps.map( - lambda data: f"{data['query']}\n\nCurrent objects detected:\n{data['objects']}" # type: ignore[index] - ), - RxOps.do_action( - lambda x: print(f"\033[34mEnriched query: {x.split(chr(10))[0]}\033[0m") # type: ignore[arg-type] - or [print(f"\033[34m{line}\033[0m") for line in x.split(chr(10))[1:]] # type: ignore[var-annotated] - ), - ) - ) - ) - # Setup stream subscriptions based on inputs provided - if (self.input_video_stream is not None) and (self.input_query_stream is not None): - self.merged_stream = create_stream_merger( - data_input_stream=self.input_video_stream, text_query_stream=self.input_query_stream - ) +def _custom_json_serializers(obj): # type: ignore[no-untyped-def] + if isinstance(obj, datetime.date | datetime.datetime): + return obj.isoformat() + raise TypeError(f"Type {type(obj)} not serializable") - logger.info("Subscribing to merged input stream...") - # Define a query extractor for the merged stream - def query_extractor(emission): # type: ignore[no-untyped-def] - return (emission[0], emission[1][0]) +# takes an overview of running skills from the coorindator +# and builds messages to be sent to an agent +def snapshot_to_messages( + state: SkillStateDict, + tool_calls: list[ToolCall], +) -> tuple[list[ToolMessage], AIMessage | None]: + # builds a set of tool call ids from a previous agent request + tool_call_ids = set( + map(itemgetter("id"), tool_calls), + ) - self.disposables.add( - self.subscribe_to_image_processing( - self.merged_stream, query_extractor=query_extractor - ) - ) - else: - # If no merged stream, fall back to individual streams - if self.input_video_stream is not None: - logger.info("Subscribing to input video stream...") - self.disposables.add(self.subscribe_to_image_processing(self.input_video_stream)) - if self.input_query_stream is not None: - logger.info("Subscribing to input query stream...") - self.disposables.add(self.subscribe_to_query_processing(self.input_query_stream)) - - def _update_query(self, incoming_query: str | None) -> None: - """Updates the query if an incoming query is provided. - - Args: - incoming_query (str): The new query text. - """ - if incoming_query is not None: - self.query = incoming_query - - def _get_rag_context(self) -> tuple[str, str]: - """Queries the agent memory to retrieve RAG context. - - Returns: - Tuple[str, str]: A tuple containing the formatted results (for logging) - and condensed results (for use in the prompt). - """ - results = self.agent_memory.query( - query_texts=self.query, - n_results=self.rag_query_n, - similarity_threshold=self.rag_similarity_threshold, - ) - formatted_results = "\n".join( - f"Document ID: {doc.id}\nMetadata: {doc.metadata}\nContent: {doc.page_content}\nScore: {score}\n" - for (doc, score) in results + # build a tool msg responses + tool_msgs: list[ToolMessage] = [] + + # build a general skill state overview (for longer running skills) + state_overview: list[dict[str, SkillStateSummary]] = [] + + # for special skills that want to return a separate message + # (images for example, requires to be a HumanMessage) + special_msgs: list[HumanMessage] = [] + + # for special skills that want to return a separate message that should + # stay in history, like actual human messages, critical events + history_msgs: list[HumanMessage] = [] + + # Initialize state_msg + state_msg = None + + for skill_state in sorted( + state.values(), + key=lambda skill_state: skill_state.duration(), + ): + if skill_state.call_id in tool_call_ids: + tool_msgs.append(toolmsg_from_state(skill_state)) + + if skill_state.skill_config.output == Output.human: + content = skill_state.content() + if not content: + continue + history_msgs.append(HumanMessage(content=content)) # type: ignore[arg-type] + continue + + special_data = skill_state.skill_config.output == Output.image + if special_data: + content = skill_state.content() + if not content: + continue + special_msgs.append(HumanMessage(content=content)) # type: ignore[arg-type] + + if skill_state.call_id in tool_call_ids: + continue + + state_overview.append(summary_from_state(skill_state, special_data)) # type: ignore[arg-type] + + if state_overview: + state_overview_str = "\n".join( + json.dumps(s, default=_custom_json_serializers) for s in state_overview ) - condensed_results = " | ".join(f"{doc.page_content}" for (doc, _) in results) - logger.info(f"Agent Memory Query Results:\n{formatted_results}") - logger.info("=== Results End ===") - return formatted_results, condensed_results + state_msg = AIMessage("State Overview:\n" + state_overview_str) + + return { # type: ignore[return-value] + "tool_msgs": tool_msgs, + "history_msgs": history_msgs, + "state_msgs": ([state_msg] if state_msg else []) + special_msgs, + } + + +# Agent class job is to glue skill coordinator state to an agent, builds langchain messages +class Agent(AgentSpec): + system_message: SystemMessage + state_messages: list[AIMessage | HumanMessage] - def _build_prompt( + def __init__( # type: ignore[no-untyped-def] self, - base64_image: str | None, - dimensions: tuple[int, int] | None, - override_token_limit: bool, - condensed_results: str, - ) -> list: # type: ignore[type-arg] - """Builds a prompt message using the prompt builder. - - Args: - base64_image (str): Optional Base64-encoded image. - dimensions (Tuple[int, int]): Optional image dimensions. - override_token_limit (bool): Whether to override token limits. - condensed_results (str): The condensed RAG context. - - Returns: - list: A list of message dictionaries to be sent to the LLM. - """ - # Budget for each component of the prompt - budgets = { - "system_prompt": self.max_input_tokens_per_request // _TOKEN_BUDGET_PARTS, - "user_query": self.max_input_tokens_per_request // _TOKEN_BUDGET_PARTS, - "image": self.max_input_tokens_per_request // _TOKEN_BUDGET_PARTS, - "rag": self.max_input_tokens_per_request // _TOKEN_BUDGET_PARTS, - } - - # Define truncation policies for each component - policies = { - "system_prompt": "truncate_end", - "user_query": "truncate_middle", - "image": "do_not_truncate", - "rag": "truncate_end", - } - - return self.prompt_builder.build( # type: ignore[no-any-return, union-attr] - user_query=self.query, - override_token_limit=override_token_limit, - base64_image=base64_image, - image_width=dimensions[0] if dimensions is not None else None, - image_height=dimensions[1] if dimensions is not None else None, - image_detail=self.image_detail, - rag_context=condensed_results, - system_prompt=self.system_query, - budgets=budgets, - policies=policies, - ) + *args, + **kwargs, + ) -> None: + AgentSpec.__init__(self, *args, **kwargs) - def _handle_tooling(self, response_message, messages): # type: ignore[no-untyped-def] - """Handles tooling callbacks in the response message. - - If tool calls are present, the corresponding functions are executed and - a follow-up query is sent. - - Args: - response_message: The response message containing tool calls. - messages (list): The original list of messages sent. - - Returns: - The final response message after processing tool calls, if any. - """ - - # TODO: Make this more generic or move implementation to OpenAIAgent. - # This is presently OpenAI-specific. - def _tooling_callback(message, messages, response_message, skill_library: SkillLibrary): # type: ignore[no-untyped-def] - has_called_tools = False - new_messages = [] - for tool_call in message.tool_calls: - has_called_tools = True - name = tool_call.function.name - args = json.loads(tool_call.function.arguments) - result = skill_library.call(name, **args) - logger.info(f"Function Call Results: {result}") - new_messages.append( - { - "role": "tool", - "tool_call_id": tool_call.id, - "content": str(result), - "name": name, - } - ) - if has_called_tools: - logger.info("Sending Another Query.") - messages.append(response_message) - messages.extend(new_messages) - # Delegate to sending the query again. - return self._send_query(messages) + self.state_messages = [] + self.coordinator = SkillCoordinator() + self._history = [] # type: ignore[var-annotated] + self._agent_id = str(uuid.uuid4()) + self._agent_stopped = False + + if self.config.system_prompt: + if isinstance(self.config.system_prompt, str): + self.system_message = SystemMessage(self.config.system_prompt + SYSTEM_MSG_APPEND) else: - logger.info("No Need for Another Query.") - return None - - if response_message.tool_calls is not None: - return _tooling_callback( - response_message, - messages, - response_message, - self.skill_library, # type: ignore[attr-defined] - ) - return None + self.config.system_prompt.content += SYSTEM_MSG_APPEND # type: ignore[operator] + self.system_message = self.config.system_prompt + else: + self.system_message = SystemMessage(get_system_prompt() + SYSTEM_MSG_APPEND) - def _observable_query( # type: ignore[no-untyped-def] - self, - observer: Observer, # type: ignore[type-arg] - base64_image: str | None = None, - dimensions: tuple[int, int] | None = None, - override_token_limit: bool = False, - incoming_query: str | None = None, - ): - """Prepares and sends a query to the LLM, emitting the response to the observer. - - Args: - observer (Observer): The observer to emit responses to. - base64_image (str): Optional Base64-encoded image. - dimensions (Tuple[int, int]): Optional image dimensions. - override_token_limit (bool): Whether to override token limits. - incoming_query (str): Optional query to update the agent's query. - - Raises: - Exception: Propagates any exceptions encountered during processing. - """ - try: - self._update_query(incoming_query) - _, condensed_results = self._get_rag_context() - messages = self._build_prompt( - base64_image, dimensions, override_token_limit, condensed_results - ) - # logger.debug(f"Sending Query: {messages}") - logger.info("Sending Query.") - response_message = self._send_query(messages) - logger.info(f"Received Response: {response_message}") - if response_message is None: - raise Exception("Response message does not exist.") - - # TODO: Make this more generic. The parsed tag and tooling handling may be OpenAI-specific. - # If no skill library is provided or there are no tool calls, emit the response directly. - if ( - self.skill_library is None # type: ignore[attr-defined] - or self.skill_library.get_tools() in (None, NOT_GIVEN) # type: ignore[attr-defined] - or response_message.tool_calls is None - ): - final_msg = ( - response_message.parsed - if hasattr(response_message, "parsed") and response_message.parsed - else ( - response_message.content - if hasattr(response_message, "content") - else response_message - ) + self.publish(self.system_message) + + # Use provided model instance if available, otherwise initialize from config + if self.config.model_instance: + self._llm = self.config.model_instance + else: + # For Ollama provider, ensure the model is available before initializing + if self.config.provider.value.lower() == "ollama": + ensure_ollama_model(self.config.model) + + # For HuggingFace, we need to create a pipeline and wrap it in ChatHuggingFace + if self.config.provider.value.lower() == "huggingface": + llm = HuggingFacePipeline.from_model_id( + model_id=self.config.model, + task="text-generation", + pipeline_kwargs={ + "max_new_tokens": 512, + "temperature": 0.7, + }, ) - observer.on_next(final_msg) - self.response_subject.on_next(final_msg) + self._llm = ChatHuggingFace(llm=llm, model_id=self.config.model) else: - response_message_2 = self._handle_tooling(response_message, messages) # type: ignore[no-untyped-call] - final_msg = ( - response_message_2 if response_message_2 is not None else response_message + self._llm = init_chat_model( # type: ignore[call-overload] + model_provider=self.config.provider, model=self.config.model ) - if isinstance(final_msg, BaseModel): # TODO: Test - final_msg = str(final_msg.content) # type: ignore[attr-defined] - observer.on_next(final_msg) - self.response_subject.on_next(final_msg) - observer.on_completed() - except Exception as e: - logger.error(f"Query failed in {self.dev_name}: {e}") - observer.on_error(e) - self.response_subject.on_error(e) - - def _send_query(self, messages: list) -> Any: # type: ignore[type-arg] - """Sends the query to the LLM API. - - This method must be implemented by subclasses with specifics of the LLM API. - - Args: - messages (list): The prompt messages to be sent. - - Returns: - Any: The response message from the LLM. - - Raises: - NotImplementedError: Always, unless overridden. - """ - raise NotImplementedError("Subclasses must implement _send_query method.") - - def _log_response_to_file(self, response, output_dir: str | None = None) -> None: # type: ignore[no-untyped-def] - """Logs the LLM response to a file. - - Args: - response: The response message to log. - output_dir (str): The directory where the log file is stored. - """ - if output_dir is None: - output_dir = self.output_dir - if response is not None: - with self.logging_file_memory_lock: - log_path = os.path.join(output_dir, "memory.txt") - with open(log_path, "a") as file: - file.write(f"{self.dev_name}: {response}\n") - logger.info(f"LLM Response [{self.dev_name}]: {response}") - - def subscribe_to_image_processing( # type: ignore[no-untyped-def] - self, - frame_observable: Observable, # type: ignore[type-arg] - query_extractor=None, - ) -> Disposable: - """Subscribes to a stream of video frames for processing. - - This method sets up a subscription to process incoming video frames. - Each frame is encoded and then sent to the LLM by directly calling the - _observable_query method. The response is then logged to a file. - - Args: - frame_observable (Observable): An observable emitting video frames or - (query, frame) tuples if query_extractor is provided. - query_extractor (callable, optional): Function to extract query and frame from - each emission. If None, assumes emissions are - raw frames and uses self.system_query. - - Returns: - Disposable: A disposable representing the subscription. - """ - # Initialize frame processor if not already set - if self.frame_processor is None: - self.frame_processor = FrameProcessor(delete_on_init=True) - - print_emission_args = {"enabled": True, "dev_name": self.dev_name, "counts": {}} - - def _process_frame(emission) -> Observable: # type: ignore[no-untyped-def, type-arg] - """ - Processes a frame or (query, frame) tuple. - """ - # Extract query and frame - if query_extractor: - query, frame = query_extractor(emission) - else: - query = self.system_query - frame = emission - return just(frame).pipe( # type: ignore[call-overload, no-any-return] - MyOps.print_emission(id="B", **print_emission_args), # type: ignore[arg-type] - RxOps.observe_on(self.pool_scheduler), - MyOps.print_emission(id="C", **print_emission_args), # type: ignore[arg-type] - RxOps.subscribe_on(self.pool_scheduler), - MyOps.print_emission(id="D", **print_emission_args), # type: ignore[arg-type] - MyVidOps.with_jpeg_export( - self.frame_processor, # type: ignore[arg-type] - suffix=f"{self.dev_name}_frame_", - save_limit=_MAX_SAVED_FRAMES, - ), - MyOps.print_emission(id="E", **print_emission_args), # type: ignore[arg-type] - MyVidOps.encode_image(), - MyOps.print_emission(id="F", **print_emission_args), # type: ignore[arg-type] - RxOps.filter( - lambda base64_and_dims: base64_and_dims is not None - and base64_and_dims[0] is not None # type: ignore[index] - and base64_and_dims[1] is not None # type: ignore[index] - ), - MyOps.print_emission(id="G", **print_emission_args), # type: ignore[arg-type] - RxOps.flat_map( - lambda base64_and_dims: create( # type: ignore[arg-type, return-value] - lambda observer, _: self._observable_query( - observer, # type: ignore[arg-type] - base64_image=base64_and_dims[0], - dimensions=base64_and_dims[1], - incoming_query=query, - ) - ) - ), # Use the extracted query - MyOps.print_emission(id="H", **print_emission_args), # type: ignore[arg-type] + + @rpc + def get_agent_id(self) -> str: + return self._agent_id + + @rpc + def start(self) -> None: + super().start() + self.coordinator.start() + + @rpc + def stop(self) -> None: + self.coordinator.stop() + self._agent_stopped = True + super().stop() + + def clear_history(self) -> None: + self._history.clear() + + def append_history(self, *msgs: list[AIMessage | HumanMessage]) -> None: + for msg in msgs: + self.publish(msg) # type: ignore[arg-type] + + self._history.extend(msgs) + + def history(self): # type: ignore[no-untyped-def] + return [self.system_message, *self._history, *self.state_messages] + + # Used by agent to execute tool calls + def execute_tool_calls(self, tool_calls: list[ToolCall]) -> None: + """Execute a list of tool calls from the agent.""" + if self._agent_stopped: + logger.warning("Agent is stopped, cannot execute tool calls.") + return + for tool_call in tool_calls: + logger.info(f"executing skill call {tool_call}") + self.coordinator.call_skill( + tool_call.get("id"), # type: ignore[arg-type] + tool_call.get("name"), # type: ignore[arg-type] + tool_call.get("args"), # type: ignore[arg-type] ) - # Use a mutable flag to ensure only one frame is processed at a time. - is_processing = [False] + # used to inject skill calls into the agent loop without agent asking for it + def run_implicit_skill(self, skill_name: str, **kwargs) -> None: # type: ignore[no-untyped-def] + if self._agent_stopped: + logger.warning("Agent is stopped, cannot execute implicit skill calls.") + return + self.coordinator.call_skill(False, skill_name, {"args": kwargs}) - def process_if_free(emission): # type: ignore[no-untyped-def] - if not self.process_all_inputs and is_processing[0]: - # Drop frame if a request is in progress and process_all_inputs is False - return empty() - else: - is_processing[0] = True - return _process_frame(emission).pipe( - MyOps.print_emission(id="I", **print_emission_args), # type: ignore[arg-type] - RxOps.observe_on(self.pool_scheduler), - MyOps.print_emission(id="J", **print_emission_args), # type: ignore[arg-type] - RxOps.subscribe_on(self.pool_scheduler), - MyOps.print_emission(id="K", **print_emission_args), # type: ignore[arg-type] - RxOps.do_action( - on_completed=lambda: is_processing.__setitem__(0, False), - on_error=lambda e: is_processing.__setitem__(0, False), - ), - MyOps.print_emission(id="L", **print_emission_args), # type: ignore[arg-type] - ) + async def agent_loop(self, first_query: str = ""): # type: ignore[no-untyped-def] + # TODO: Should I add a lock here to prevent concurrent calls to agent_loop? - observable = frame_observable.pipe( - MyOps.print_emission(id="A", **print_emission_args), # type: ignore[arg-type] - RxOps.flat_map(process_if_free), - MyOps.print_emission(id="M", **print_emission_args), # type: ignore[arg-type] - ) + if self._agent_stopped: + logger.warning("Agent is stopped, cannot run agent loop.") + # return "Agent is stopped." + import traceback - disposable = observable.subscribe( - on_next=lambda response: self._log_response_to_file(response, self.output_dir), - on_error=lambda e: logger.error(f"Error encountered: {e}"), - on_completed=lambda: logger.info(f"Stream processing completed for {self.dev_name}"), - ) - self.disposables.add(disposable) - return disposable # type: ignore[no-any-return] - - def subscribe_to_query_processing(self, query_observable: Observable) -> Disposable: # type: ignore[type-arg] - """Subscribes to a stream of queries for processing. - - This method sets up a subscription to process incoming queries by directly - calling the _observable_query method. The responses are logged to a file. - - Args: - query_observable (Observable): An observable emitting queries. - - Returns: - Disposable: A disposable representing the subscription. - """ - print_emission_args = {"enabled": False, "dev_name": self.dev_name, "counts": {}} - - def _process_query(query) -> Observable: # type: ignore[no-untyped-def, type-arg] - """ - Processes a single query by logging it and passing it to _observable_query. - Returns an observable that emits the LLM response. - """ - return just(query).pipe( - MyOps.print_emission(id="Pr A", **print_emission_args), # type: ignore[arg-type] - RxOps.flat_map( - lambda query: create( # type: ignore[arg-type, return-value] - lambda observer, _: self._observable_query(observer, incoming_query=query) # type: ignore[arg-type] + traceback.print_stack() + return "Agent is stopped." + + self.state_messages = [] + if first_query: + self.append_history(HumanMessage(first_query)) # type: ignore[arg-type] + + def _get_state() -> str: + # TODO: FIX THIS EXTREME HACK + update = self.coordinator.generate_snapshot(clear=False) + snapshot_msgs = snapshot_to_messages(update, msg.tool_calls) # type: ignore[attr-defined] + return json.dumps(snapshot_msgs, sort_keys=True, default=lambda o: repr(o)) + + try: + while True: + # we are getting tools from the coordinator on each turn + # since this allows for skillcontainers to dynamically provide new skills + tools = self.get_tools() # type: ignore[no-untyped-call] + self._llm = self._llm.bind_tools(tools) # type: ignore[assignment] + + # publish to /agent topic for observability + for state_msg in self.state_messages: + self.publish(state_msg) + + # history() builds our message history dynamically + # ensures we include latest system state, but not old ones. + messages = self.history() # type: ignore[no-untyped-call] + + # Some LLMs don't work without any human messages. Add an initial one. + if len(messages) == 1 and isinstance(messages[0], SystemMessage): + messages.append( + HumanMessage( + "Everything is initialized. I'll let you know when you should act." + ) ) - ), - MyOps.print_emission(id="Pr B", **print_emission_args), # type: ignore[arg-type] - ) + self.append_history(messages[-1]) - # A mutable flag indicating whether a query is currently being processed. - is_processing = [False] + msg = self._llm.invoke(messages) - def process_if_free(query): # type: ignore[no-untyped-def] - logger.info(f"Processing Query: {query}") - if not self.process_all_inputs and is_processing[0]: - # Drop query if a request is already in progress and process_all_inputs is False - return empty() - else: - is_processing[0] = True - logger.info("Processing Query.") - return _process_query(query).pipe( - MyOps.print_emission(id="B", **print_emission_args), # type: ignore[arg-type] - RxOps.observe_on(self.pool_scheduler), - MyOps.print_emission(id="C", **print_emission_args), # type: ignore[arg-type] - RxOps.subscribe_on(self.pool_scheduler), - MyOps.print_emission(id="D", **print_emission_args), # type: ignore[arg-type] - RxOps.do_action( - on_completed=lambda: is_processing.__setitem__(0, False), - on_error=lambda e: is_processing.__setitem__(0, False), - ), - MyOps.print_emission(id="E", **print_emission_args), # type: ignore[arg-type] - ) + self.append_history(msg) # type: ignore[arg-type] - observable = query_observable.pipe( - MyOps.print_emission(id="A", **print_emission_args), # type: ignore[arg-type] - RxOps.flat_map(lambda query: process_if_free(query)), # type: ignore[no-untyped-call] - MyOps.print_emission(id="F", **print_emission_args), # type: ignore[arg-type] - ) + logger.info(f"Agent response: {msg.content}") - disposable = observable.subscribe( - on_next=lambda response: self._log_response_to_file(response, self.output_dir), - on_error=lambda e: logger.error(f"Error processing query for {self.dev_name}: {e}"), - on_completed=lambda: logger.info(f"Stream processing completed for {self.dev_name}"), - ) - self.disposables.add(disposable) - return disposable # type: ignore[no-any-return] - - def get_response_observable(self) -> Observable: # type: ignore[type-arg] - """Gets an observable that emits responses from this agent. - - Returns: - Observable: An observable that emits string responses from the agent. - """ - return self.response_subject.pipe( - RxOps.observe_on(self.pool_scheduler), - RxOps.subscribe_on(self.pool_scheduler), - RxOps.share(), - ) + state = _get_state() - def run_observable_query(self, query_text: str, **kwargs) -> Observable: # type: ignore[no-untyped-def, type-arg] - """Creates an observable that processes a one-off text query to Agent and emits the response. - - This method provides a simple way to send a text query and get an observable - stream of the response. It's designed for one-off queries rather than - continuous processing of input streams. Useful for testing and development. - - Args: - query_text (str): The query text to process. - **kwargs: Additional arguments to pass to _observable_query. Supported args vary by agent type. - For example, ClaudeAgent supports: base64_image, dimensions, override_token_limit, - reset_conversation, thinking_budget_tokens - - Returns: - Observable: An observable that emits the response as a string. - """ - return create( - lambda observer, _: self._observable_query( - observer, # type: ignore[arg-type] - incoming_query=query_text, - **kwargs, - ) - ) + if msg.tool_calls: # type: ignore[attr-defined] + self.execute_tool_calls(msg.tool_calls) # type: ignore[attr-defined] - def dispose_all(self) -> None: - """Disposes of all active subscriptions managed by this agent.""" - super().dispose_all() - self.response_subject.on_completed() + # print(self) + # print(self.coordinator) + self._write_debug_history_file() -# endregion LLMAgent Base Class (Generic LLM Agent) + if not self.coordinator.has_active_skills(): + logger.info("No active tasks, exiting agent loop.") + return msg.content + # coordinator will continue once a skill state has changed in + # such a way that agent call needs to be executed -# ----------------------------------------------------------------------------- -# region OpenAIAgent Subclass (OpenAI-Specific Implementation) -# ----------------------------------------------------------------------------- -class OpenAIAgent(LLMAgent): - """OpenAI agent implementation that uses OpenAI's API for processing. + if state == _get_state(): + await self.coordinator.wait_for_updates() - This class implements the _send_query method to interact with OpenAI's API. - It also sets up OpenAI-specific parameters, such as the client, model name, - tokenizer, and response model. - """ + # we request a full snapshot of currently running, finished or errored out skills + # we ask for removal of finished skills from subsequent snapshots (clear=True) + update = self.coordinator.generate_snapshot(clear=True) - def __init__( - self, - dev_name: str, - agent_type: str = "Vision", - query: str = "What do you see?", - input_query_stream: Observable | None = None, # type: ignore[type-arg] - input_data_stream: Observable | None = None, # type: ignore[type-arg] - input_video_stream: Observable | None = None, # type: ignore[type-arg] - output_dir: str = os.path.join(os.getcwd(), "assets", "agent"), - agent_memory: AbstractAgentSemanticMemory | None = None, - system_query: str | None = None, - max_input_tokens_per_request: int = 128000, - max_output_tokens_per_request: int = 16384, - model_name: str = "gpt-4o", - prompt_builder: PromptBuilder | None = None, - tokenizer: AbstractTokenizer | None = None, - rag_query_n: int = 4, - rag_similarity_threshold: float = 0.45, - skills: AbstractSkill | list[AbstractSkill] | SkillLibrary | None = None, - response_model: BaseModel | None = None, - frame_processor: FrameProcessor | None = None, - image_detail: str = "low", - pool_scheduler: ThreadPoolScheduler | None = None, - process_all_inputs: bool | None = None, - openai_client: OpenAI | None = None, - ) -> None: - """ - Initializes a new instance of the OpenAIAgent. - - Args: - dev_name (str): The device name of the agent. - agent_type (str): The type of the agent. - query (str): The default query text. - input_query_stream (Observable): An observable for query input. - input_data_stream (Observable): An observable for data input. - input_video_stream (Observable): An observable for video frames. - output_dir (str): Directory for output files. - agent_memory (AbstractAgentSemanticMemory): The memory system. - system_query (str): The system prompt to use with RAG context. - max_input_tokens_per_request (int): Maximum tokens for input. - max_output_tokens_per_request (int): Maximum tokens for output. - model_name (str): The OpenAI model name to use. - prompt_builder (PromptBuilder): Custom prompt builder. - tokenizer (AbstractTokenizer): Custom tokenizer for token counting. - rag_query_n (int): Number of results to fetch in RAG queries. - rag_similarity_threshold (float): Minimum similarity for RAG results. - skills (Union[AbstractSkill, List[AbstractSkill], SkillLibrary]): Skills available to the agent. - response_model (BaseModel): Optional Pydantic model for responses. - frame_processor (FrameProcessor): Custom frame processor. - image_detail (str): Detail level for images ("low", "high", "auto"). - pool_scheduler (ThreadPoolScheduler): The scheduler to use for thread pool operations. - If None, the global scheduler from get_scheduler() will be used. - process_all_inputs (bool): Whether to process all inputs or skip when busy. - If None, defaults to True for text queries and merged streams, False for video streams. - openai_client (OpenAI): The OpenAI client to use. This can be used to specify - a custom OpenAI client if targetting another provider. - """ - # Determine appropriate default for process_all_inputs if not provided - if process_all_inputs is None: - if input_query_stream is not None: - process_all_inputs = True - else: - process_all_inputs = False - - super().__init__( - dev_name=dev_name, - agent_type=agent_type, - agent_memory=agent_memory, - pool_scheduler=pool_scheduler, - process_all_inputs=process_all_inputs, - system_query=system_query, - input_query_stream=input_query_stream, - input_data_stream=input_data_stream, - input_video_stream=input_video_stream, - ) - self.client = openai_client or OpenAI() - self.query = query - self.output_dir = output_dir - os.makedirs(self.output_dir, exist_ok=True) - - # Configure skill library. - self.skills = skills - self.skill_library = None - if isinstance(self.skills, SkillLibrary): - self.skill_library = self.skills - elif isinstance(self.skills, list): - self.skill_library = SkillLibrary() - for skill in self.skills: - self.skill_library.add(skill) - elif isinstance(self.skills, AbstractSkill): - self.skill_library = SkillLibrary() - self.skill_library.add(self.skills) - - self.response_model = response_model if response_model is not None else NOT_GIVEN - self.model_name = model_name - self.tokenizer = tokenizer or OpenAITokenizer(model_name=self.model_name) - self.prompt_builder = prompt_builder or PromptBuilder( - self.model_name, tokenizer=self.tokenizer - ) - self.rag_query_n = rag_query_n - self.rag_similarity_threshold = rag_similarity_threshold - self.image_detail = image_detail - self.max_output_tokens_per_request = max_output_tokens_per_request - self.max_input_tokens_per_request = max_input_tokens_per_request - self.max_tokens_per_request = max_input_tokens_per_request + max_output_tokens_per_request - - # Add static context to memory. - self._add_context_to_memory() - - self.frame_processor = frame_processor or FrameProcessor(delete_on_init=True) - - logger.info("OpenAI Agent Initialized.") - - def _add_context_to_memory(self) -> None: - """Adds initial context to the agent's memory.""" - context_data = [ - ( - "id0", - "Optical Flow is a technique used to track the movement of objects in a video sequence.", - ), - ( - "id1", - "Edge Detection is a technique used to identify the boundaries of objects in an image.", - ), - ("id2", "Video is a sequence of frames captured at regular intervals."), - ( - "id3", - "Colors in Optical Flow are determined by the movement of light, and can be used to track the movement of objects.", - ), - ( - "id4", - "Json is a data interchange format that is easy for humans to read and write, and easy for machines to parse and generate.", - ), - ] - for doc_id, text in context_data: - self.agent_memory.add_vector(doc_id, text) # type: ignore[no-untyped-call] - - def _send_query(self, messages: list) -> Any: # type: ignore[type-arg] - """Sends the query to OpenAI's API. - - Depending on whether a response model is provided, the appropriate API - call is made. - - Args: - messages (list): The prompt messages to send. - - Returns: - The response message from OpenAI. - - Raises: - Exception: If no response message is returned. - ConnectionError: If there's an issue connecting to the API. - ValueError: If the messages or other parameters are invalid. - """ - try: - if self.response_model is not NOT_GIVEN: - response = self.client.beta.chat.completions.parse( - model=self.model_name, - messages=messages, - response_format=self.response_model, # type: ignore[arg-type] - tools=( - self.skill_library.get_tools() # type: ignore[arg-type] - if self.skill_library is not None - else NOT_GIVEN - ), - max_tokens=self.max_output_tokens_per_request, - ) - else: - response = self.client.chat.completions.create( # type: ignore[assignment] - model=self.model_name, - messages=messages, - max_tokens=self.max_output_tokens_per_request, - tools=( - self.skill_library.get_tools() # type: ignore[arg-type] - if self.skill_library is not None - else NOT_GIVEN - ), + # generate tool_msgs and general state update message, + # depending on a skill having associated tool call from previous interaction + # we will return a tool message, and not a general state message + snapshot_msgs = snapshot_to_messages(update, msg.tool_calls) # type: ignore[attr-defined] + + self.state_messages = snapshot_msgs.get("state_msgs", []) # type: ignore[attr-defined] + self.append_history( + *snapshot_msgs.get("tool_msgs", []), # type: ignore[attr-defined] + *snapshot_msgs.get("history_msgs", []), # type: ignore[attr-defined] ) - response_message = response.choices[0].message - if response_message is None: - logger.error("Response message does not exist.") - raise Exception("Response message does not exist.") - return response_message - except ConnectionError as ce: - logger.error(f"Connection error with API: {ce}") - raise - except ValueError as ve: - logger.error(f"Invalid parameters: {ve}") - raise + except Exception as e: - logger.error(f"Unexpected error in API call: {e}") - raise + logger.error(f"Error in agent loop: {e}") + import traceback - def stream_query(self, query_text: str) -> Observable: # type: ignore[type-arg] - """Creates an observable that processes a text query and emits the response. + traceback.print_exc() - This method provides a simple way to send a text query and get an observable - stream of the response. It's designed for one-off queries rather than - continuous processing of input streams. + @rpc + def loop_thread(self) -> bool: + asyncio.run_coroutine_threadsafe(self.agent_loop(), self._loop) # type: ignore[arg-type] + return True - Args: - query_text (str): The query text to process. + @rpc + def query(self, query: str): # type: ignore[no-untyped-def] + # TODO: could this be + # from distributed.utils import sync + # return sync(self._loop, self.agent_loop, query) + return asyncio.run_coroutine_threadsafe(self.agent_loop(query), self._loop).result() # type: ignore[arg-type] - Returns: - Observable: An observable that emits the response as a string. - """ - return create( - lambda observer, _: self._observable_query(observer, incoming_query=query_text) # type: ignore[arg-type] - ) + async def query_async(self, query: str): # type: ignore[no-untyped-def] + return await self.agent_loop(query) + + @rpc + def register_skills(self, container, run_implicit_name: str | None = None): # type: ignore[no-untyped-def] + ret = self.coordinator.register_skills(container) # type: ignore[func-returns-value] + + if run_implicit_name: + self.run_implicit_skill(run_implicit_name) + + return ret + + def get_tools(self): # type: ignore[no-untyped-def] + return self.coordinator.get_tools() + + def _write_debug_history_file(self) -> None: + file_path = os.getenv("DEBUG_AGENT_HISTORY_FILE") + if not file_path: + return + + history = [x.__dict__ for x in self.history()] # type: ignore[no-untyped-call] + + with open(file_path, "w") as f: + json.dump(history, f, default=lambda x: repr(x), indent=2) + + +class LlmAgent(Agent): + @rpc + def start(self) -> None: + super().start() + self.loop_thread() + + @rpc + def stop(self) -> None: + super().stop() + + +llm_agent = LlmAgent.blueprint + + +def deploy( + dimos: DimosCluster, + system_prompt: str = "You are a helpful assistant for controlling a Unitree Go2 robot.", + model: Model = Model.GPT_4O, + provider: Provider = Provider.OPENAI, # type: ignore[attr-defined] + skill_containers: list[SkillContainer] | None = None, +) -> Agent: + from dimos.agents.cli.human import HumanInput + + if skill_containers is None: + skill_containers = [] + agent = dimos.deploy( # type: ignore[attr-defined] + Agent, + system_prompt=system_prompt, + model=model, + provider=provider, + ) + + human_input = dimos.deploy(HumanInput) # type: ignore[attr-defined] + human_input.start() + + agent.register_skills(human_input) + + for skill_container in skill_containers: + print("Registering skill container:", skill_container) + agent.register_skills(skill_container) + + agent.run_implicit_skill("human") + agent.start() + agent.loop_thread() + + return agent # type: ignore[no-any-return] -# endregion OpenAIAgent Subclass (OpenAI-Specific Implementation) +__all__ = ["Agent", "deploy", "llm_agent"] diff --git a/dimos/agents2/cli/human.py b/dimos/agents/cli/human.py similarity index 95% rename from dimos/agents2/cli/human.py rename to dimos/agents/cli/human.py index bbeee4961f..09e2ed24d4 100644 --- a/dimos/agents2/cli/human.py +++ b/dimos/agents/cli/human.py @@ -16,7 +16,7 @@ from reactivex.disposable import Disposable -from dimos.agents2 import Output, Reducer, Stream, skill # type: ignore[attr-defined] +from dimos.agents import Output, Reducer, Stream, skill # type: ignore[attr-defined] from dimos.core import pLCMTransport, rpc from dimos.core.module import Module from dimos.core.rpc_client import RpcCall diff --git a/dimos/agents2/cli/web.py b/dimos/agents/cli/web.py similarity index 100% rename from dimos/agents2/cli/web.py rename to dimos/agents/cli/web.py diff --git a/dimos/agents2/conftest.py b/dimos/agents/conftest.py similarity index 96% rename from dimos/agents2/conftest.py rename to dimos/agents/conftest.py index 769523f8c5..757fc839e9 100644 --- a/dimos/agents2/conftest.py +++ b/dimos/agents/conftest.py @@ -16,8 +16,8 @@ import pytest -from dimos.agents2.agent import Agent -from dimos.agents2.testing import MockModel +from dimos.agents.agent import Agent +from dimos.agents.testing import MockModel from dimos.protocol.skill.test_coordinator import SkillContainerTest diff --git a/dimos/agents2/constants.py b/dimos/agents/constants.py similarity index 97% rename from dimos/agents2/constants.py rename to dimos/agents/constants.py index 0d7d4832a0..2f78e92f96 100644 --- a/dimos/agents2/constants.py +++ b/dimos/agents/constants.py @@ -14,4 +14,4 @@ from dimos.constants import DIMOS_PROJECT_ROOT -AGENT_SYSTEM_PROMPT_PATH = DIMOS_PROJECT_ROOT / "assets/agent/prompt_agents2.txt" +AGENT_SYSTEM_PROMPT_PATH = DIMOS_PROJECT_ROOT / "assets/agent/prompt_agents.txt" diff --git a/dimos/agents2/fixtures/test_get_gps_position_for_queries.json b/dimos/agents/fixtures/test_get_gps_position_for_queries.json similarity index 100% rename from dimos/agents2/fixtures/test_get_gps_position_for_queries.json rename to dimos/agents/fixtures/test_get_gps_position_for_queries.json diff --git a/dimos/agents2/fixtures/test_go_to_object.json b/dimos/agents/fixtures/test_go_to_object.json similarity index 100% rename from dimos/agents2/fixtures/test_go_to_object.json rename to dimos/agents/fixtures/test_go_to_object.json diff --git a/dimos/agents2/fixtures/test_go_to_semantic_location.json b/dimos/agents/fixtures/test_go_to_semantic_location.json similarity index 100% rename from dimos/agents2/fixtures/test_go_to_semantic_location.json rename to dimos/agents/fixtures/test_go_to_semantic_location.json diff --git a/dimos/agents2/fixtures/test_how_much_is_124181112_plus_124124.json b/dimos/agents/fixtures/test_how_much_is_124181112_plus_124124.json similarity index 100% rename from dimos/agents2/fixtures/test_how_much_is_124181112_plus_124124.json rename to dimos/agents/fixtures/test_how_much_is_124181112_plus_124124.json diff --git a/dimos/agents2/fixtures/test_pounce.json b/dimos/agents/fixtures/test_pounce.json similarity index 100% rename from dimos/agents2/fixtures/test_pounce.json rename to dimos/agents/fixtures/test_pounce.json diff --git a/dimos/agents2/fixtures/test_set_gps_travel_points.json b/dimos/agents/fixtures/test_set_gps_travel_points.json similarity index 100% rename from dimos/agents2/fixtures/test_set_gps_travel_points.json rename to dimos/agents/fixtures/test_set_gps_travel_points.json diff --git a/dimos/agents2/fixtures/test_set_gps_travel_points_multiple.json b/dimos/agents/fixtures/test_set_gps_travel_points_multiple.json similarity index 100% rename from dimos/agents2/fixtures/test_set_gps_travel_points_multiple.json rename to dimos/agents/fixtures/test_set_gps_travel_points_multiple.json diff --git a/dimos/agents2/fixtures/test_show_your_love.json b/dimos/agents/fixtures/test_show_your_love.json similarity index 100% rename from dimos/agents2/fixtures/test_show_your_love.json rename to dimos/agents/fixtures/test_show_your_love.json diff --git a/dimos/agents2/fixtures/test_stop_movement.json b/dimos/agents/fixtures/test_stop_movement.json similarity index 100% rename from dimos/agents2/fixtures/test_stop_movement.json rename to dimos/agents/fixtures/test_stop_movement.json diff --git a/dimos/agents2/fixtures/test_take_a_look_around.json b/dimos/agents/fixtures/test_take_a_look_around.json similarity index 100% rename from dimos/agents2/fixtures/test_take_a_look_around.json rename to dimos/agents/fixtures/test_take_a_look_around.json diff --git a/dimos/agents2/fixtures/test_what_do_you_see_in_this_picture.json b/dimos/agents/fixtures/test_what_do_you_see_in_this_picture.json similarity index 100% rename from dimos/agents2/fixtures/test_what_do_you_see_in_this_picture.json rename to dimos/agents/fixtures/test_what_do_you_see_in_this_picture.json diff --git a/dimos/agents2/fixtures/test_what_is_your_name.json b/dimos/agents/fixtures/test_what_is_your_name.json similarity index 100% rename from dimos/agents2/fixtures/test_what_is_your_name.json rename to dimos/agents/fixtures/test_what_is_your_name.json diff --git a/dimos/agents2/fixtures/test_where_am_i.json b/dimos/agents/fixtures/test_where_am_i.json similarity index 100% rename from dimos/agents2/fixtures/test_where_am_i.json rename to dimos/agents/fixtures/test_where_am_i.json diff --git a/dimos/agents2/ollama_agent.py b/dimos/agents/ollama_agent.py similarity index 100% rename from dimos/agents2/ollama_agent.py rename to dimos/agents/ollama_agent.py diff --git a/dimos/agents2/skills/conftest.py b/dimos/agents/skills/conftest.py similarity index 92% rename from dimos/agents2/skills/conftest.py rename to dimos/agents/skills/conftest.py index f7d1500847..9b0b7b125a 100644 --- a/dimos/agents2/skills/conftest.py +++ b/dimos/agents/skills/conftest.py @@ -17,10 +17,10 @@ import pytest from reactivex.scheduler import ThreadPoolScheduler -from dimos.agents2.skills.google_maps_skill_container import GoogleMapsSkillContainer -from dimos.agents2.skills.gps_nav_skill import GpsNavSkillContainer -from dimos.agents2.skills.navigation import NavigationSkillContainer -from dimos.agents2.system_prompt import get_system_prompt +from dimos.agents.skills.google_maps_skill_container import GoogleMapsSkillContainer +from dimos.agents.skills.gps_nav_skill import GpsNavSkillContainer +from dimos.agents.skills.navigation import NavigationSkillContainer +from dimos.agents.system_prompt import get_system_prompt from dimos.robot.unitree_webrtc.unitree_skill_container import UnitreeSkillContainer system_prompt = get_system_prompt() diff --git a/dimos/agents2/skills/demo_calculator_skill.py b/dimos/agents/skills/demo_calculator_skill.py similarity index 100% rename from dimos/agents2/skills/demo_calculator_skill.py rename to dimos/agents/skills/demo_calculator_skill.py diff --git a/dimos/agents2/skills/demo_google_maps_skill.py b/dimos/agents/skills/demo_google_maps_skill.py similarity index 75% rename from dimos/agents2/skills/demo_google_maps_skill.py rename to dimos/agents/skills/demo_google_maps_skill.py index 4bee8691a3..52b5917f58 100644 --- a/dimos/agents2/skills/demo_google_maps_skill.py +++ b/dimos/agents/skills/demo_google_maps_skill.py @@ -15,11 +15,11 @@ from dotenv import load_dotenv -from dimos.agents2.agent import llm_agent -from dimos.agents2.cli.human import human_input -from dimos.agents2.skills.demo_robot import demo_robot -from dimos.agents2.skills.google_maps_skill_container import google_maps_skill -from dimos.agents2.system_prompt import get_system_prompt +from dimos.agents.agent import llm_agent +from dimos.agents.cli.human import human_input +from dimos.agents.skills.demo_robot import demo_robot +from dimos.agents.skills.google_maps_skill_container import google_maps_skill +from dimos.agents.system_prompt import get_system_prompt from dimos.core.blueprints import autoconnect load_dotenv() diff --git a/dimos/agents2/skills/demo_gps_nav.py b/dimos/agents/skills/demo_gps_nav.py similarity index 76% rename from dimos/agents2/skills/demo_gps_nav.py rename to dimos/agents/skills/demo_gps_nav.py index 55ffd052ff..f0eebd7ee9 100644 --- a/dimos/agents2/skills/demo_gps_nav.py +++ b/dimos/agents/skills/demo_gps_nav.py @@ -15,11 +15,11 @@ from dotenv import load_dotenv -from dimos.agents2.agent import llm_agent -from dimos.agents2.cli.human import human_input -from dimos.agents2.skills.demo_robot import demo_robot -from dimos.agents2.skills.gps_nav_skill import gps_nav_skill -from dimos.agents2.system_prompt import get_system_prompt +from dimos.agents.agent import llm_agent +from dimos.agents.cli.human import human_input +from dimos.agents.skills.demo_robot import demo_robot +from dimos.agents.skills.gps_nav_skill import gps_nav_skill +from dimos.agents.system_prompt import get_system_prompt from dimos.core.blueprints import autoconnect load_dotenv() diff --git a/dimos/agents2/skills/demo_robot.py b/dimos/agents/skills/demo_robot.py similarity index 100% rename from dimos/agents2/skills/demo_robot.py rename to dimos/agents/skills/demo_robot.py diff --git a/dimos/agents2/skills/demo_skill.py b/dimos/agents/skills/demo_skill.py similarity index 78% rename from dimos/agents2/skills/demo_skill.py rename to dimos/agents/skills/demo_skill.py index f549e6115c..835bd6b24f 100644 --- a/dimos/agents2/skills/demo_skill.py +++ b/dimos/agents/skills/demo_skill.py @@ -15,10 +15,10 @@ from dotenv import load_dotenv -from dimos.agents2.agent import llm_agent -from dimos.agents2.cli.human import human_input -from dimos.agents2.skills.demo_calculator_skill import demo_calculator_skill -from dimos.agents2.system_prompt import get_system_prompt +from dimos.agents.agent import llm_agent +from dimos.agents.cli.human import human_input +from dimos.agents.skills.demo_calculator_skill import demo_calculator_skill +from dimos.agents.system_prompt import get_system_prompt from dimos.core.blueprints import autoconnect load_dotenv() diff --git a/dimos/agents2/skills/google_maps_skill_container.py b/dimos/agents/skills/google_maps_skill_container.py similarity index 100% rename from dimos/agents2/skills/google_maps_skill_container.py rename to dimos/agents/skills/google_maps_skill_container.py diff --git a/dimos/agents2/skills/gps_nav_skill.py b/dimos/agents/skills/gps_nav_skill.py similarity index 100% rename from dimos/agents2/skills/gps_nav_skill.py rename to dimos/agents/skills/gps_nav_skill.py diff --git a/dimos/agents2/skills/navigation.py b/dimos/agents/skills/navigation.py similarity index 100% rename from dimos/agents2/skills/navigation.py rename to dimos/agents/skills/navigation.py diff --git a/dimos/agents2/skills/osm.py b/dimos/agents/skills/osm.py similarity index 100% rename from dimos/agents2/skills/osm.py rename to dimos/agents/skills/osm.py diff --git a/dimos/agents2/skills/speak_skill.py b/dimos/agents/skills/speak_skill.py similarity index 100% rename from dimos/agents2/skills/speak_skill.py rename to dimos/agents/skills/speak_skill.py diff --git a/dimos/agents2/skills/test_google_maps_skill_container.py b/dimos/agents/skills/test_google_maps_skill_container.py similarity index 100% rename from dimos/agents2/skills/test_google_maps_skill_container.py rename to dimos/agents/skills/test_google_maps_skill_container.py diff --git a/dimos/agents2/skills/test_gps_nav_skills.py b/dimos/agents/skills/test_gps_nav_skills.py similarity index 100% rename from dimos/agents2/skills/test_gps_nav_skills.py rename to dimos/agents/skills/test_gps_nav_skills.py diff --git a/dimos/agents2/skills/test_navigation.py b/dimos/agents/skills/test_navigation.py similarity index 90% rename from dimos/agents2/skills/test_navigation.py rename to dimos/agents/skills/test_navigation.py index 93c0a4f5be..4a4388e661 100644 --- a/dimos/agents2/skills/test_navigation.py +++ b/dimos/agents/skills/test_navigation.py @@ -42,7 +42,7 @@ def test_take_a_look_around(create_navigation_agent, navigation_skill_container, navigation_skill_container._bound_rpc_calls[ "WavefrontFrontierExplorer.is_exploration_active" ] = is_exploration_active_mock - mocker.patch("dimos.agents2.skills.navigation.time.sleep") + mocker.patch("dimos.agents.skills.navigation.time.sleep") agent = create_navigation_agent(fixture="test_take_a_look_around.json") agent.query("take a look around for 10 seconds") @@ -54,15 +54,15 @@ def test_go_to_semantic_location( create_navigation_agent, navigation_skill_container, mocker ) -> None: mocker.patch( - "dimos.agents2.skills.navigation.NavigationSkillContainer._navigate_by_tagged_location", + "dimos.agents.skills.navigation.NavigationSkillContainer._navigate_by_tagged_location", return_value=None, ) mocker.patch( - "dimos.agents2.skills.navigation.NavigationSkillContainer._navigate_to_object", + "dimos.agents.skills.navigation.NavigationSkillContainer._navigate_to_object", return_value=None, ) navigate_to_mock = mocker.patch( - "dimos.agents2.skills.navigation.NavigationSkillContainer._navigate_to", + "dimos.agents.skills.navigation.NavigationSkillContainer._navigate_to", return_value=True, ) query_by_text_mock = mocker.Mock( diff --git a/dimos/agents2/skills/test_unitree_skill_container.py b/dimos/agents/skills/test_unitree_skill_container.py similarity index 100% rename from dimos/agents2/skills/test_unitree_skill_container.py rename to dimos/agents/skills/test_unitree_skill_container.py diff --git a/dimos/agents2/spec.py b/dimos/agents/spec.py similarity index 100% rename from dimos/agents2/spec.py rename to dimos/agents/spec.py diff --git a/dimos/agents2/system_prompt.py b/dimos/agents/system_prompt.py similarity index 92% rename from dimos/agents2/system_prompt.py rename to dimos/agents/system_prompt.py index 6b14f3e193..33bb887d63 100644 --- a/dimos/agents2/system_prompt.py +++ b/dimos/agents/system_prompt.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dimos.agents2.constants import AGENT_SYSTEM_PROMPT_PATH +from dimos.agents.constants import AGENT_SYSTEM_PROMPT_PATH _SYSTEM_PROMPT = None diff --git a/dimos/agents2/temp/webcam_agent.py b/dimos/agents/temp/webcam_agent.py similarity index 95% rename from dimos/agents2/temp/webcam_agent.py rename to dimos/agents/temp/webcam_agent.py index 2a70770e14..f3eae0ab0f 100644 --- a/dimos/agents2/temp/webcam_agent.py +++ b/dimos/agents/temp/webcam_agent.py @@ -14,7 +14,7 @@ # limitations under the License. """ -Run script for Unitree Go2 robot with agents2 framework. +Run script for Unitree Go2 robot with agents framework. This is the migrated version using the new LangChain-based agent system. """ @@ -24,9 +24,9 @@ import reactivex as rx import reactivex.operators as ops -from dimos.agents2 import Agent, Output, Reducer, Stream, skill # type: ignore[attr-defined] -from dimos.agents2.cli.human import HumanInput -from dimos.agents2.spec import Model, Provider +from dimos.agents import Agent, Output, Reducer, Stream, skill # type: ignore[attr-defined] +from dimos.agents.cli.human import HumanInput +from dimos.agents.spec import Model, Provider from dimos.core import LCMTransport, Module, rpc, start from dimos.hardware.camera import zed from dimos.hardware.camera.module import CameraModule diff --git a/dimos/agents2/test_agent.py b/dimos/agents/test_agent.py similarity index 99% rename from dimos/agents2/test_agent.py rename to dimos/agents/test_agent.py index 447d02e6e3..ed8ca74926 100644 --- a/dimos/agents2/test_agent.py +++ b/dimos/agents/test_agent.py @@ -15,7 +15,7 @@ import pytest import pytest_asyncio -from dimos.agents2.agent import Agent +from dimos.agents.agent import Agent from dimos.core import start from dimos.protocol.skill.test_coordinator import SkillContainerTest diff --git a/dimos/agents2/test_agent_direct.py b/dimos/agents/test_agent_direct.py similarity index 98% rename from dimos/agents2/test_agent_direct.py rename to dimos/agents/test_agent_direct.py index ee3f9aa091..ae9307457f 100644 --- a/dimos/agents2/test_agent_direct.py +++ b/dimos/agents/test_agent_direct.py @@ -16,7 +16,7 @@ from contextlib import contextmanager -from dimos.agents2.agent import Agent +from dimos.agents.agent import Agent from dimos.core import start from dimos.protocol.skill.test_coordinator import SkillContainerTest diff --git a/dimos/agents2/test_agent_fake.py b/dimos/agents/test_agent_fake.py similarity index 100% rename from dimos/agents2/test_agent_fake.py rename to dimos/agents/test_agent_fake.py diff --git a/dimos/agents2/test_mock_agent.py b/dimos/agents/test_mock_agent.py similarity index 98% rename from dimos/agents2/test_mock_agent.py rename to dimos/agents/test_mock_agent.py index 4b113b45a0..fdda8f52e6 100644 --- a/dimos/agents2/test_mock_agent.py +++ b/dimos/agents/test_mock_agent.py @@ -20,8 +20,8 @@ from langchain_core.messages import AIMessage, HumanMessage import pytest -from dimos.agents2.agent import Agent -from dimos.agents2.testing import MockModel +from dimos.agents.agent import Agent +from dimos.agents.testing import MockModel from dimos.core import LCMTransport, start from dimos.msgs.geometry_msgs import PoseStamped, Vector3 from dimos.msgs.sensor_msgs import Image diff --git a/dimos/agents2/test_stash_agent.py b/dimos/agents/test_stash_agent.py similarity index 98% rename from dimos/agents2/test_stash_agent.py rename to dimos/agents/test_stash_agent.py index 8e2972568a..4cd8ccf714 100644 --- a/dimos/agents2/test_stash_agent.py +++ b/dimos/agents/test_stash_agent.py @@ -14,7 +14,7 @@ import pytest -from dimos.agents2.agent import Agent +from dimos.agents.agent import Agent from dimos.protocol.skill.test_coordinator import SkillContainerTest diff --git a/dimos/agents2/testing.py b/dimos/agents/testing.py similarity index 100% rename from dimos/agents2/testing.py rename to dimos/agents/testing.py diff --git a/dimos/agents2/__init__.py b/dimos/agents2/__init__.py deleted file mode 100644 index c817bb3aee..0000000000 --- a/dimos/agents2/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from langchain_core.messages import ( - AIMessage, - HumanMessage, - MessageLikeRepresentation, - SystemMessage, - ToolCall, - ToolMessage, -) - -from dimos.agents2.agent import Agent, deploy -from dimos.agents2.spec import AgentSpec -from dimos.protocol.skill.skill import skill -from dimos.protocol.skill.type import Output, Reducer, Stream diff --git a/dimos/agents2/agent.py b/dimos/agents2/agent.py deleted file mode 100644 index 45852767d3..0000000000 --- a/dimos/agents2/agent.py +++ /dev/null @@ -1,443 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import asyncio -import datetime -import json -from operator import itemgetter -import os -from typing import Any, TypedDict -import uuid - -from langchain.chat_models import init_chat_model -from langchain_core.messages import ( - AIMessage, - HumanMessage, - SystemMessage, - ToolCall, - ToolMessage, -) -from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline - -from dimos.agents2.ollama_agent import ensure_ollama_model -from dimos.agents2.spec import AgentSpec, Model, Provider -from dimos.agents2.system_prompt import get_system_prompt -from dimos.core import DimosCluster, rpc -from dimos.protocol.skill.coordinator import ( - SkillCoordinator, - SkillState, - SkillStateDict, -) -from dimos.protocol.skill.skill import SkillContainer -from dimos.protocol.skill.type import Output -from dimos.utils.logging_config import setup_logger - -logger = setup_logger() - - -SYSTEM_MSG_APPEND = "\nYour message history will always be appended with a System Overview message that provides situational awareness." - - -def toolmsg_from_state(state: SkillState) -> ToolMessage: - if state.skill_config.output != Output.standard: - content = "output attached in separate messages" - else: - content = state.content() # type: ignore[assignment] - - return ToolMessage( - # if agent call has been triggered by another skill, - # and this specific skill didn't finish yet but we need a tool call response - # we return a message explaining that execution is still ongoing - content=content - or "Running, you will be called with an update, no need for subsequent tool calls", - name=state.name, - tool_call_id=state.call_id, - ) - - -class SkillStateSummary(TypedDict): - name: str - call_id: str - state: str - data: Any - - -def summary_from_state(state: SkillState, special_data: bool = False) -> SkillStateSummary: - content = state.content() - if isinstance(content, dict): - content = json.dumps(content) - - if not isinstance(content, str): - content = str(content) - - return { - "name": state.name, - "call_id": state.call_id, - "state": state.state.name, - "data": state.content() if not special_data else "data will be in a separate message", - } - - -def _custom_json_serializers(obj): # type: ignore[no-untyped-def] - if isinstance(obj, datetime.date | datetime.datetime): - return obj.isoformat() - raise TypeError(f"Type {type(obj)} not serializable") - - -# takes an overview of running skills from the coorindator -# and builds messages to be sent to an agent -def snapshot_to_messages( - state: SkillStateDict, - tool_calls: list[ToolCall], -) -> tuple[list[ToolMessage], AIMessage | None]: - # builds a set of tool call ids from a previous agent request - tool_call_ids = set( - map(itemgetter("id"), tool_calls), - ) - - # build a tool msg responses - tool_msgs: list[ToolMessage] = [] - - # build a general skill state overview (for longer running skills) - state_overview: list[dict[str, SkillStateSummary]] = [] - - # for special skills that want to return a separate message - # (images for example, requires to be a HumanMessage) - special_msgs: list[HumanMessage] = [] - - # for special skills that want to return a separate message that should - # stay in history, like actual human messages, critical events - history_msgs: list[HumanMessage] = [] - - # Initialize state_msg - state_msg = None - - for skill_state in sorted( - state.values(), - key=lambda skill_state: skill_state.duration(), - ): - if skill_state.call_id in tool_call_ids: - tool_msgs.append(toolmsg_from_state(skill_state)) - - if skill_state.skill_config.output == Output.human: - content = skill_state.content() - if not content: - continue - history_msgs.append(HumanMessage(content=content)) # type: ignore[arg-type] - continue - - special_data = skill_state.skill_config.output == Output.image - if special_data: - content = skill_state.content() - if not content: - continue - special_msgs.append(HumanMessage(content=content)) # type: ignore[arg-type] - - if skill_state.call_id in tool_call_ids: - continue - - state_overview.append(summary_from_state(skill_state, special_data)) # type: ignore[arg-type] - - if state_overview: - state_overview_str = "\n".join( - json.dumps(s, default=_custom_json_serializers) for s in state_overview - ) - state_msg = AIMessage("State Overview:\n" + state_overview_str) - - return { # type: ignore[return-value] - "tool_msgs": tool_msgs, - "history_msgs": history_msgs, - "state_msgs": ([state_msg] if state_msg else []) + special_msgs, - } - - -# Agent class job is to glue skill coordinator state to an agent, builds langchain messages -class Agent(AgentSpec): - system_message: SystemMessage - state_messages: list[AIMessage | HumanMessage] - - def __init__( # type: ignore[no-untyped-def] - self, - *args, - **kwargs, - ) -> None: - AgentSpec.__init__(self, *args, **kwargs) - - self.state_messages = [] - self.coordinator = SkillCoordinator() - self._history = [] # type: ignore[var-annotated] - self._agent_id = str(uuid.uuid4()) - self._agent_stopped = False - - if self.config.system_prompt: - if isinstance(self.config.system_prompt, str): - self.system_message = SystemMessage(self.config.system_prompt + SYSTEM_MSG_APPEND) - else: - self.config.system_prompt.content += SYSTEM_MSG_APPEND # type: ignore[operator] - self.system_message = self.config.system_prompt - else: - self.system_message = SystemMessage(get_system_prompt() + SYSTEM_MSG_APPEND) - - self.publish(self.system_message) - - # Use provided model instance if available, otherwise initialize from config - if self.config.model_instance: - self._llm = self.config.model_instance - else: - # For Ollama provider, ensure the model is available before initializing - if self.config.provider.value.lower() == "ollama": - ensure_ollama_model(self.config.model) - - # For HuggingFace, we need to create a pipeline and wrap it in ChatHuggingFace - if self.config.provider.value.lower() == "huggingface": - llm = HuggingFacePipeline.from_model_id( - model_id=self.config.model, - task="text-generation", - pipeline_kwargs={ - "max_new_tokens": 512, - "temperature": 0.7, - }, - ) - self._llm = ChatHuggingFace(llm=llm, model_id=self.config.model) - else: - self._llm = init_chat_model( # type: ignore[call-overload] - model_provider=self.config.provider, model=self.config.model - ) - - @rpc - def get_agent_id(self) -> str: - return self._agent_id - - @rpc - def start(self) -> None: - super().start() - self.coordinator.start() - - @rpc - def stop(self) -> None: - self.coordinator.stop() - self._agent_stopped = True - super().stop() - - def clear_history(self) -> None: - self._history.clear() - - def append_history(self, *msgs: list[AIMessage | HumanMessage]) -> None: - for msg in msgs: - self.publish(msg) # type: ignore[arg-type] - - self._history.extend(msgs) - - def history(self): # type: ignore[no-untyped-def] - return [self.system_message, *self._history, *self.state_messages] - - # Used by agent to execute tool calls - def execute_tool_calls(self, tool_calls: list[ToolCall]) -> None: - """Execute a list of tool calls from the agent.""" - if self._agent_stopped: - logger.warning("Agent is stopped, cannot execute tool calls.") - return - for tool_call in tool_calls: - logger.info(f"executing skill call {tool_call}") - self.coordinator.call_skill( - tool_call.get("id"), # type: ignore[arg-type] - tool_call.get("name"), # type: ignore[arg-type] - tool_call.get("args"), # type: ignore[arg-type] - ) - - # used to inject skill calls into the agent loop without agent asking for it - def run_implicit_skill(self, skill_name: str, **kwargs) -> None: # type: ignore[no-untyped-def] - if self._agent_stopped: - logger.warning("Agent is stopped, cannot execute implicit skill calls.") - return - self.coordinator.call_skill(False, skill_name, {"args": kwargs}) - - async def agent_loop(self, first_query: str = ""): # type: ignore[no-untyped-def] - # TODO: Should I add a lock here to prevent concurrent calls to agent_loop? - - if self._agent_stopped: - logger.warning("Agent is stopped, cannot run agent loop.") - # return "Agent is stopped." - import traceback - - traceback.print_stack() - return "Agent is stopped." - - self.state_messages = [] - if first_query: - self.append_history(HumanMessage(first_query)) # type: ignore[arg-type] - - def _get_state() -> str: - # TODO: FIX THIS EXTREME HACK - update = self.coordinator.generate_snapshot(clear=False) - snapshot_msgs = snapshot_to_messages(update, msg.tool_calls) # type: ignore[attr-defined] - return json.dumps(snapshot_msgs, sort_keys=True, default=lambda o: repr(o)) - - try: - while True: - # we are getting tools from the coordinator on each turn - # since this allows for skillcontainers to dynamically provide new skills - tools = self.get_tools() # type: ignore[no-untyped-call] - self._llm = self._llm.bind_tools(tools) # type: ignore[assignment] - - # publish to /agent topic for observability - for state_msg in self.state_messages: - self.publish(state_msg) - - # history() builds our message history dynamically - # ensures we include latest system state, but not old ones. - messages = self.history() # type: ignore[no-untyped-call] - - # Some LLMs don't work without any human messages. Add an initial one. - if len(messages) == 1 and isinstance(messages[0], SystemMessage): - messages.append( - HumanMessage( - "Everything is initialized. I'll let you know when you should act." - ) - ) - self.append_history(messages[-1]) - - msg = self._llm.invoke(messages) - - self.append_history(msg) # type: ignore[arg-type] - - logger.info(f"Agent response: {msg.content}") - - state = _get_state() - - if msg.tool_calls: # type: ignore[attr-defined] - self.execute_tool_calls(msg.tool_calls) # type: ignore[attr-defined] - - # print(self) - # print(self.coordinator) - - self._write_debug_history_file() - - if not self.coordinator.has_active_skills(): - logger.info("No active tasks, exiting agent loop.") - return msg.content - - # coordinator will continue once a skill state has changed in - # such a way that agent call needs to be executed - - if state == _get_state(): - await self.coordinator.wait_for_updates() - - # we request a full snapshot of currently running, finished or errored out skills - # we ask for removal of finished skills from subsequent snapshots (clear=True) - update = self.coordinator.generate_snapshot(clear=True) - - # generate tool_msgs and general state update message, - # depending on a skill having associated tool call from previous interaction - # we will return a tool message, and not a general state message - snapshot_msgs = snapshot_to_messages(update, msg.tool_calls) # type: ignore[attr-defined] - - self.state_messages = snapshot_msgs.get("state_msgs", []) # type: ignore[attr-defined] - self.append_history( - *snapshot_msgs.get("tool_msgs", []), # type: ignore[attr-defined] - *snapshot_msgs.get("history_msgs", []), # type: ignore[attr-defined] - ) - - except Exception as e: - logger.error(f"Error in agent loop: {e}") - import traceback - - traceback.print_exc() - - @rpc - def loop_thread(self) -> bool: - asyncio.run_coroutine_threadsafe(self.agent_loop(), self._loop) # type: ignore[arg-type] - return True - - @rpc - def query(self, query: str): # type: ignore[no-untyped-def] - # TODO: could this be - # from distributed.utils import sync - # return sync(self._loop, self.agent_loop, query) - return asyncio.run_coroutine_threadsafe(self.agent_loop(query), self._loop).result() # type: ignore[arg-type] - - async def query_async(self, query: str): # type: ignore[no-untyped-def] - return await self.agent_loop(query) - - @rpc - def register_skills(self, container, run_implicit_name: str | None = None): # type: ignore[no-untyped-def] - ret = self.coordinator.register_skills(container) # type: ignore[func-returns-value] - - if run_implicit_name: - self.run_implicit_skill(run_implicit_name) - - return ret - - def get_tools(self): # type: ignore[no-untyped-def] - return self.coordinator.get_tools() - - def _write_debug_history_file(self) -> None: - file_path = os.getenv("DEBUG_AGENT_HISTORY_FILE") - if not file_path: - return - - history = [x.__dict__ for x in self.history()] # type: ignore[no-untyped-call] - - with open(file_path, "w") as f: - json.dump(history, f, default=lambda x: repr(x), indent=2) - - -class LlmAgent(Agent): - @rpc - def start(self) -> None: - super().start() - self.loop_thread() - - @rpc - def stop(self) -> None: - super().stop() - - -llm_agent = LlmAgent.blueprint - - -def deploy( - dimos: DimosCluster, - system_prompt: str = "You are a helpful assistant for controlling a Unitree Go2 robot.", - model: Model = Model.GPT_4O, - provider: Provider = Provider.OPENAI, # type: ignore[attr-defined] - skill_containers: list[SkillContainer] | None = None, -) -> Agent: - from dimos.agents2.cli.human import HumanInput - - if skill_containers is None: - skill_containers = [] - agent = dimos.deploy( # type: ignore[attr-defined] - Agent, - system_prompt=system_prompt, - model=model, - provider=provider, - ) - - human_input = dimos.deploy(HumanInput) # type: ignore[attr-defined] - human_input.start() - - agent.register_skills(human_input) - - for skill_container in skill_containers: - print("Registering skill container:", skill_container) - agent.register_skills(skill_container) - - agent.run_implicit_skill("human") - agent.start() - agent.loop_thread() - - return agent # type: ignore[no-any-return] - - -__all__ = ["Agent", "deploy", "llm_agent"] diff --git a/dimos/agents/memory/__init__.py b/dimos/agents_deprecated/__init__.py similarity index 100% rename from dimos/agents/memory/__init__.py rename to dimos/agents_deprecated/__init__.py diff --git a/dimos/agents_deprecated/agent.py b/dimos/agents_deprecated/agent.py new file mode 100644 index 0000000000..01ef36ad24 --- /dev/null +++ b/dimos/agents_deprecated/agent.py @@ -0,0 +1,917 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Agent framework for LLM-based autonomous systems. + +This module provides a flexible foundation for creating agents that can: +- Process image and text inputs through LLM APIs +- Store and retrieve contextual information using semantic memory +- Handle tool/function calling +- Process streaming inputs asynchronously + +The module offers base classes (Agent, LLMAgent) and concrete implementations +like OpenAIAgent that connect to specific LLM providers. +""" + +from __future__ import annotations + +# Standard library imports +import json +import os +import threading +from typing import TYPE_CHECKING, Any + +# Third-party imports +from dotenv import load_dotenv +from openai import NOT_GIVEN, OpenAI +from pydantic import BaseModel +from reactivex import Observable, Observer, create, empty, just, operators as RxOps +from reactivex.disposable import CompositeDisposable, Disposable +from reactivex.subject import Subject + +# Local imports +from dimos.agents_deprecated.memory.chroma_impl import OpenAISemanticMemory +from dimos.agents_deprecated.prompt_builder.impl import PromptBuilder +from dimos.agents_deprecated.tokenizer.openai_tokenizer import OpenAITokenizer +from dimos.skills.skills import AbstractSkill, SkillLibrary +from dimos.stream.frame_processor import FrameProcessor +from dimos.stream.stream_merger import create_stream_merger +from dimos.stream.video_operators import Operators as MyOps, VideoOperators as MyVidOps +from dimos.utils.logging_config import setup_logger +from dimos.utils.threadpool import get_scheduler + +if TYPE_CHECKING: + from reactivex.scheduler import ThreadPoolScheduler + + from dimos.agents_deprecated.memory.base import AbstractAgentSemanticMemory + from dimos.agents_deprecated.tokenizer.base import AbstractTokenizer + +# Initialize environment variables +load_dotenv() + +# Initialize logger for the agent module +logger = setup_logger() + +# Constants +_TOKEN_BUDGET_PARTS = 4 # Number of parts to divide token budget +_MAX_SAVED_FRAMES = 100 # Maximum number of frames to save + + +# ----------------------------------------------------------------------------- +# region Agent Base Class +# ----------------------------------------------------------------------------- +class Agent: + """Base agent that manages memory and subscriptions.""" + + def __init__( + self, + dev_name: str = "NA", + agent_type: str = "Base", + agent_memory: AbstractAgentSemanticMemory | None = None, + pool_scheduler: ThreadPoolScheduler | None = None, + ) -> None: + """ + Initializes a new instance of the Agent. + + Args: + dev_name (str): The device name of the agent. + agent_type (str): The type of the agent (e.g., 'Base', 'Vision'). + agent_memory (AbstractAgentSemanticMemory): The memory system for the agent. + pool_scheduler (ThreadPoolScheduler): The scheduler to use for thread pool operations. + If None, the global scheduler from get_scheduler() will be used. + """ + self.dev_name = dev_name + self.agent_type = agent_type + self.agent_memory = agent_memory or OpenAISemanticMemory() + self.disposables = CompositeDisposable() + self.pool_scheduler = pool_scheduler if pool_scheduler else get_scheduler() + + def dispose_all(self) -> None: + """Disposes of all active subscriptions managed by this agent.""" + if self.disposables: + self.disposables.dispose() + else: + logger.info("No disposables to dispose.") + + +# endregion Agent Base Class + + +# ----------------------------------------------------------------------------- +# region LLMAgent Base Class (Generic LLM Agent) +# ----------------------------------------------------------------------------- +class LLMAgent(Agent): + """Generic LLM agent containing common logic for LLM-based agents. + + This class implements functionality for: + - Updating the query + - Querying the agent's memory (for RAG) + - Building prompts via a prompt builder + - Handling tooling callbacks in responses + - Subscribing to image and query streams + - Emitting responses as an observable stream + + Subclasses must implement the `_send_query` method, which is responsible + for sending the prompt to a specific LLM API. + + Attributes: + query (str): The current query text to process. + prompt_builder (PromptBuilder): Handles construction of prompts. + system_query (str): System prompt for RAG context situations. + image_detail (str): Detail level for image processing ('low','high','auto'). + max_input_tokens_per_request (int): Maximum input token count. + max_output_tokens_per_request (int): Maximum output token count. + max_tokens_per_request (int): Total maximum token count. + rag_query_n (int): Number of results to fetch from memory. + rag_similarity_threshold (float): Minimum similarity for RAG results. + frame_processor (FrameProcessor): Processes video frames. + output_dir (str): Directory for output files. + response_subject (Subject): Subject that emits agent responses. + process_all_inputs (bool): Whether to process every input emission (True) or + skip emissions when the agent is busy processing a previous input (False). + """ + + logging_file_memory_lock = threading.Lock() + + def __init__( + self, + dev_name: str = "NA", + agent_type: str = "LLM", + agent_memory: AbstractAgentSemanticMemory | None = None, + pool_scheduler: ThreadPoolScheduler | None = None, + process_all_inputs: bool = False, + system_query: str | None = None, + max_output_tokens_per_request: int = 16384, + max_input_tokens_per_request: int = 128000, + input_query_stream: Observable | None = None, # type: ignore[type-arg] + input_data_stream: Observable | None = None, # type: ignore[type-arg] + input_video_stream: Observable | None = None, # type: ignore[type-arg] + ) -> None: + """ + Initializes a new instance of the LLMAgent. + + Args: + dev_name (str): The device name of the agent. + agent_type (str): The type of the agent. + agent_memory (AbstractAgentSemanticMemory): The memory system for the agent. + pool_scheduler (ThreadPoolScheduler): The scheduler to use for thread pool operations. + If None, the global scheduler from get_scheduler() will be used. + process_all_inputs (bool): Whether to process every input emission (True) or + skip emissions when the agent is busy processing a previous input (False). + """ + super().__init__(dev_name, agent_type, agent_memory, pool_scheduler) + # These attributes can be configured by a subclass if needed. + self.query: str | None = None + self.prompt_builder: PromptBuilder | None = None + self.system_query: str | None = system_query + self.image_detail: str = "low" + self.max_input_tokens_per_request: int = max_input_tokens_per_request + self.max_output_tokens_per_request: int = max_output_tokens_per_request + self.max_tokens_per_request: int = ( + self.max_input_tokens_per_request + self.max_output_tokens_per_request + ) + self.rag_query_n: int = 4 + self.rag_similarity_threshold: float = 0.45 + self.frame_processor: FrameProcessor | None = None + self.output_dir: str = os.path.join(os.getcwd(), "assets", "agent") + self.process_all_inputs: bool = process_all_inputs + os.makedirs(self.output_dir, exist_ok=True) + + # Subject for emitting responses + self.response_subject = Subject() # type: ignore[var-annotated] + + # Conversation history for maintaining context between calls + self.conversation_history = [] # type: ignore[var-annotated] + + # Initialize input streams + self.input_video_stream = input_video_stream + self.input_query_stream = ( + input_query_stream + if (input_data_stream is None) + else ( + input_query_stream.pipe( # type: ignore[misc, union-attr] + RxOps.with_latest_from(input_data_stream), + RxOps.map( + lambda combined: { + "query": combined[0], # type: ignore[index] + "objects": combined[1] # type: ignore[index] + if len(combined) > 1 # type: ignore[arg-type] + else "No object data available", + } + ), + RxOps.map( + lambda data: f"{data['query']}\n\nCurrent objects detected:\n{data['objects']}" # type: ignore[index] + ), + RxOps.do_action( + lambda x: print(f"\033[34mEnriched query: {x.split(chr(10))[0]}\033[0m") # type: ignore[arg-type] + or [print(f"\033[34m{line}\033[0m") for line in x.split(chr(10))[1:]] # type: ignore[var-annotated] + ), + ) + ) + ) + + # Setup stream subscriptions based on inputs provided + if (self.input_video_stream is not None) and (self.input_query_stream is not None): + self.merged_stream = create_stream_merger( + data_input_stream=self.input_video_stream, text_query_stream=self.input_query_stream + ) + + logger.info("Subscribing to merged input stream...") + + # Define a query extractor for the merged stream + def query_extractor(emission): # type: ignore[no-untyped-def] + return (emission[0], emission[1][0]) + + self.disposables.add( + self.subscribe_to_image_processing( + self.merged_stream, query_extractor=query_extractor + ) + ) + else: + # If no merged stream, fall back to individual streams + if self.input_video_stream is not None: + logger.info("Subscribing to input video stream...") + self.disposables.add(self.subscribe_to_image_processing(self.input_video_stream)) + if self.input_query_stream is not None: + logger.info("Subscribing to input query stream...") + self.disposables.add(self.subscribe_to_query_processing(self.input_query_stream)) + + def _update_query(self, incoming_query: str | None) -> None: + """Updates the query if an incoming query is provided. + + Args: + incoming_query (str): The new query text. + """ + if incoming_query is not None: + self.query = incoming_query + + def _get_rag_context(self) -> tuple[str, str]: + """Queries the agent memory to retrieve RAG context. + + Returns: + Tuple[str, str]: A tuple containing the formatted results (for logging) + and condensed results (for use in the prompt). + """ + results = self.agent_memory.query( + query_texts=self.query, + n_results=self.rag_query_n, + similarity_threshold=self.rag_similarity_threshold, + ) + formatted_results = "\n".join( + f"Document ID: {doc.id}\nMetadata: {doc.metadata}\nContent: {doc.page_content}\nScore: {score}\n" + for (doc, score) in results + ) + condensed_results = " | ".join(f"{doc.page_content}" for (doc, _) in results) + logger.info(f"Agent Memory Query Results:\n{formatted_results}") + logger.info("=== Results End ===") + return formatted_results, condensed_results + + def _build_prompt( + self, + base64_image: str | None, + dimensions: tuple[int, int] | None, + override_token_limit: bool, + condensed_results: str, + ) -> list: # type: ignore[type-arg] + """Builds a prompt message using the prompt builder. + + Args: + base64_image (str): Optional Base64-encoded image. + dimensions (Tuple[int, int]): Optional image dimensions. + override_token_limit (bool): Whether to override token limits. + condensed_results (str): The condensed RAG context. + + Returns: + list: A list of message dictionaries to be sent to the LLM. + """ + # Budget for each component of the prompt + budgets = { + "system_prompt": self.max_input_tokens_per_request // _TOKEN_BUDGET_PARTS, + "user_query": self.max_input_tokens_per_request // _TOKEN_BUDGET_PARTS, + "image": self.max_input_tokens_per_request // _TOKEN_BUDGET_PARTS, + "rag": self.max_input_tokens_per_request // _TOKEN_BUDGET_PARTS, + } + + # Define truncation policies for each component + policies = { + "system_prompt": "truncate_end", + "user_query": "truncate_middle", + "image": "do_not_truncate", + "rag": "truncate_end", + } + + return self.prompt_builder.build( # type: ignore[no-any-return, union-attr] + user_query=self.query, + override_token_limit=override_token_limit, + base64_image=base64_image, + image_width=dimensions[0] if dimensions is not None else None, + image_height=dimensions[1] if dimensions is not None else None, + image_detail=self.image_detail, + rag_context=condensed_results, + system_prompt=self.system_query, + budgets=budgets, + policies=policies, + ) + + def _handle_tooling(self, response_message, messages): # type: ignore[no-untyped-def] + """Handles tooling callbacks in the response message. + + If tool calls are present, the corresponding functions are executed and + a follow-up query is sent. + + Args: + response_message: The response message containing tool calls. + messages (list): The original list of messages sent. + + Returns: + The final response message after processing tool calls, if any. + """ + + # TODO: Make this more generic or move implementation to OpenAIAgent. + # This is presently OpenAI-specific. + def _tooling_callback(message, messages, response_message, skill_library: SkillLibrary): # type: ignore[no-untyped-def] + has_called_tools = False + new_messages = [] + for tool_call in message.tool_calls: + has_called_tools = True + name = tool_call.function.name + args = json.loads(tool_call.function.arguments) + result = skill_library.call(name, **args) + logger.info(f"Function Call Results: {result}") + new_messages.append( + { + "role": "tool", + "tool_call_id": tool_call.id, + "content": str(result), + "name": name, + } + ) + if has_called_tools: + logger.info("Sending Another Query.") + messages.append(response_message) + messages.extend(new_messages) + # Delegate to sending the query again. + return self._send_query(messages) + else: + logger.info("No Need for Another Query.") + return None + + if response_message.tool_calls is not None: + return _tooling_callback( + response_message, + messages, + response_message, + self.skill_library, # type: ignore[attr-defined] + ) + return None + + def _observable_query( # type: ignore[no-untyped-def] + self, + observer: Observer, # type: ignore[type-arg] + base64_image: str | None = None, + dimensions: tuple[int, int] | None = None, + override_token_limit: bool = False, + incoming_query: str | None = None, + ): + """Prepares and sends a query to the LLM, emitting the response to the observer. + + Args: + observer (Observer): The observer to emit responses to. + base64_image (str): Optional Base64-encoded image. + dimensions (Tuple[int, int]): Optional image dimensions. + override_token_limit (bool): Whether to override token limits. + incoming_query (str): Optional query to update the agent's query. + + Raises: + Exception: Propagates any exceptions encountered during processing. + """ + try: + self._update_query(incoming_query) + _, condensed_results = self._get_rag_context() + messages = self._build_prompt( + base64_image, dimensions, override_token_limit, condensed_results + ) + # logger.debug(f"Sending Query: {messages}") + logger.info("Sending Query.") + response_message = self._send_query(messages) + logger.info(f"Received Response: {response_message}") + if response_message is None: + raise Exception("Response message does not exist.") + + # TODO: Make this more generic. The parsed tag and tooling handling may be OpenAI-specific. + # If no skill library is provided or there are no tool calls, emit the response directly. + if ( + self.skill_library is None # type: ignore[attr-defined] + or self.skill_library.get_tools() in (None, NOT_GIVEN) # type: ignore[attr-defined] + or response_message.tool_calls is None + ): + final_msg = ( + response_message.parsed + if hasattr(response_message, "parsed") and response_message.parsed + else ( + response_message.content + if hasattr(response_message, "content") + else response_message + ) + ) + observer.on_next(final_msg) + self.response_subject.on_next(final_msg) + else: + response_message_2 = self._handle_tooling(response_message, messages) # type: ignore[no-untyped-call] + final_msg = ( + response_message_2 if response_message_2 is not None else response_message + ) + if isinstance(final_msg, BaseModel): # TODO: Test + final_msg = str(final_msg.content) # type: ignore[attr-defined] + observer.on_next(final_msg) + self.response_subject.on_next(final_msg) + observer.on_completed() + except Exception as e: + logger.error(f"Query failed in {self.dev_name}: {e}") + observer.on_error(e) + self.response_subject.on_error(e) + + def _send_query(self, messages: list) -> Any: # type: ignore[type-arg] + """Sends the query to the LLM API. + + This method must be implemented by subclasses with specifics of the LLM API. + + Args: + messages (list): The prompt messages to be sent. + + Returns: + Any: The response message from the LLM. + + Raises: + NotImplementedError: Always, unless overridden. + """ + raise NotImplementedError("Subclasses must implement _send_query method.") + + def _log_response_to_file(self, response, output_dir: str | None = None) -> None: # type: ignore[no-untyped-def] + """Logs the LLM response to a file. + + Args: + response: The response message to log. + output_dir (str): The directory where the log file is stored. + """ + if output_dir is None: + output_dir = self.output_dir + if response is not None: + with self.logging_file_memory_lock: + log_path = os.path.join(output_dir, "memory.txt") + with open(log_path, "a") as file: + file.write(f"{self.dev_name}: {response}\n") + logger.info(f"LLM Response [{self.dev_name}]: {response}") + + def subscribe_to_image_processing( # type: ignore[no-untyped-def] + self, + frame_observable: Observable, # type: ignore[type-arg] + query_extractor=None, + ) -> Disposable: + """Subscribes to a stream of video frames for processing. + + This method sets up a subscription to process incoming video frames. + Each frame is encoded and then sent to the LLM by directly calling the + _observable_query method. The response is then logged to a file. + + Args: + frame_observable (Observable): An observable emitting video frames or + (query, frame) tuples if query_extractor is provided. + query_extractor (callable, optional): Function to extract query and frame from + each emission. If None, assumes emissions are + raw frames and uses self.system_query. + + Returns: + Disposable: A disposable representing the subscription. + """ + # Initialize frame processor if not already set + if self.frame_processor is None: + self.frame_processor = FrameProcessor(delete_on_init=True) + + print_emission_args = {"enabled": True, "dev_name": self.dev_name, "counts": {}} + + def _process_frame(emission) -> Observable: # type: ignore[no-untyped-def, type-arg] + """ + Processes a frame or (query, frame) tuple. + """ + # Extract query and frame + if query_extractor: + query, frame = query_extractor(emission) + else: + query = self.system_query + frame = emission + return just(frame).pipe( # type: ignore[call-overload, no-any-return] + MyOps.print_emission(id="B", **print_emission_args), # type: ignore[arg-type] + RxOps.observe_on(self.pool_scheduler), + MyOps.print_emission(id="C", **print_emission_args), # type: ignore[arg-type] + RxOps.subscribe_on(self.pool_scheduler), + MyOps.print_emission(id="D", **print_emission_args), # type: ignore[arg-type] + MyVidOps.with_jpeg_export( + self.frame_processor, # type: ignore[arg-type] + suffix=f"{self.dev_name}_frame_", + save_limit=_MAX_SAVED_FRAMES, + ), + MyOps.print_emission(id="E", **print_emission_args), # type: ignore[arg-type] + MyVidOps.encode_image(), + MyOps.print_emission(id="F", **print_emission_args), # type: ignore[arg-type] + RxOps.filter( + lambda base64_and_dims: base64_and_dims is not None + and base64_and_dims[0] is not None # type: ignore[index] + and base64_and_dims[1] is not None # type: ignore[index] + ), + MyOps.print_emission(id="G", **print_emission_args), # type: ignore[arg-type] + RxOps.flat_map( + lambda base64_and_dims: create( # type: ignore[arg-type, return-value] + lambda observer, _: self._observable_query( + observer, # type: ignore[arg-type] + base64_image=base64_and_dims[0], + dimensions=base64_and_dims[1], + incoming_query=query, + ) + ) + ), # Use the extracted query + MyOps.print_emission(id="H", **print_emission_args), # type: ignore[arg-type] + ) + + # Use a mutable flag to ensure only one frame is processed at a time. + is_processing = [False] + + def process_if_free(emission): # type: ignore[no-untyped-def] + if not self.process_all_inputs and is_processing[0]: + # Drop frame if a request is in progress and process_all_inputs is False + return empty() + else: + is_processing[0] = True + return _process_frame(emission).pipe( + MyOps.print_emission(id="I", **print_emission_args), # type: ignore[arg-type] + RxOps.observe_on(self.pool_scheduler), + MyOps.print_emission(id="J", **print_emission_args), # type: ignore[arg-type] + RxOps.subscribe_on(self.pool_scheduler), + MyOps.print_emission(id="K", **print_emission_args), # type: ignore[arg-type] + RxOps.do_action( + on_completed=lambda: is_processing.__setitem__(0, False), + on_error=lambda e: is_processing.__setitem__(0, False), + ), + MyOps.print_emission(id="L", **print_emission_args), # type: ignore[arg-type] + ) + + observable = frame_observable.pipe( + MyOps.print_emission(id="A", **print_emission_args), # type: ignore[arg-type] + RxOps.flat_map(process_if_free), + MyOps.print_emission(id="M", **print_emission_args), # type: ignore[arg-type] + ) + + disposable = observable.subscribe( + on_next=lambda response: self._log_response_to_file(response, self.output_dir), + on_error=lambda e: logger.error(f"Error encountered: {e}"), + on_completed=lambda: logger.info(f"Stream processing completed for {self.dev_name}"), + ) + self.disposables.add(disposable) + return disposable # type: ignore[no-any-return] + + def subscribe_to_query_processing(self, query_observable: Observable) -> Disposable: # type: ignore[type-arg] + """Subscribes to a stream of queries for processing. + + This method sets up a subscription to process incoming queries by directly + calling the _observable_query method. The responses are logged to a file. + + Args: + query_observable (Observable): An observable emitting queries. + + Returns: + Disposable: A disposable representing the subscription. + """ + print_emission_args = {"enabled": False, "dev_name": self.dev_name, "counts": {}} + + def _process_query(query) -> Observable: # type: ignore[no-untyped-def, type-arg] + """ + Processes a single query by logging it and passing it to _observable_query. + Returns an observable that emits the LLM response. + """ + return just(query).pipe( + MyOps.print_emission(id="Pr A", **print_emission_args), # type: ignore[arg-type] + RxOps.flat_map( + lambda query: create( # type: ignore[arg-type, return-value] + lambda observer, _: self._observable_query(observer, incoming_query=query) # type: ignore[arg-type] + ) + ), + MyOps.print_emission(id="Pr B", **print_emission_args), # type: ignore[arg-type] + ) + + # A mutable flag indicating whether a query is currently being processed. + is_processing = [False] + + def process_if_free(query): # type: ignore[no-untyped-def] + logger.info(f"Processing Query: {query}") + if not self.process_all_inputs and is_processing[0]: + # Drop query if a request is already in progress and process_all_inputs is False + return empty() + else: + is_processing[0] = True + logger.info("Processing Query.") + return _process_query(query).pipe( + MyOps.print_emission(id="B", **print_emission_args), # type: ignore[arg-type] + RxOps.observe_on(self.pool_scheduler), + MyOps.print_emission(id="C", **print_emission_args), # type: ignore[arg-type] + RxOps.subscribe_on(self.pool_scheduler), + MyOps.print_emission(id="D", **print_emission_args), # type: ignore[arg-type] + RxOps.do_action( + on_completed=lambda: is_processing.__setitem__(0, False), + on_error=lambda e: is_processing.__setitem__(0, False), + ), + MyOps.print_emission(id="E", **print_emission_args), # type: ignore[arg-type] + ) + + observable = query_observable.pipe( + MyOps.print_emission(id="A", **print_emission_args), # type: ignore[arg-type] + RxOps.flat_map(lambda query: process_if_free(query)), # type: ignore[no-untyped-call] + MyOps.print_emission(id="F", **print_emission_args), # type: ignore[arg-type] + ) + + disposable = observable.subscribe( + on_next=lambda response: self._log_response_to_file(response, self.output_dir), + on_error=lambda e: logger.error(f"Error processing query for {self.dev_name}: {e}"), + on_completed=lambda: logger.info(f"Stream processing completed for {self.dev_name}"), + ) + self.disposables.add(disposable) + return disposable # type: ignore[no-any-return] + + def get_response_observable(self) -> Observable: # type: ignore[type-arg] + """Gets an observable that emits responses from this agent. + + Returns: + Observable: An observable that emits string responses from the agent. + """ + return self.response_subject.pipe( + RxOps.observe_on(self.pool_scheduler), + RxOps.subscribe_on(self.pool_scheduler), + RxOps.share(), + ) + + def run_observable_query(self, query_text: str, **kwargs) -> Observable: # type: ignore[no-untyped-def, type-arg] + """Creates an observable that processes a one-off text query to Agent and emits the response. + + This method provides a simple way to send a text query and get an observable + stream of the response. It's designed for one-off queries rather than + continuous processing of input streams. Useful for testing and development. + + Args: + query_text (str): The query text to process. + **kwargs: Additional arguments to pass to _observable_query. Supported args vary by agent type. + For example, ClaudeAgent supports: base64_image, dimensions, override_token_limit, + reset_conversation, thinking_budget_tokens + + Returns: + Observable: An observable that emits the response as a string. + """ + return create( + lambda observer, _: self._observable_query( + observer, # type: ignore[arg-type] + incoming_query=query_text, + **kwargs, + ) + ) + + def dispose_all(self) -> None: + """Disposes of all active subscriptions managed by this agent.""" + super().dispose_all() + self.response_subject.on_completed() + + +# endregion LLMAgent Base Class (Generic LLM Agent) + + +# ----------------------------------------------------------------------------- +# region OpenAIAgent Subclass (OpenAI-Specific Implementation) +# ----------------------------------------------------------------------------- +class OpenAIAgent(LLMAgent): + """OpenAI agent implementation that uses OpenAI's API for processing. + + This class implements the _send_query method to interact with OpenAI's API. + It also sets up OpenAI-specific parameters, such as the client, model name, + tokenizer, and response model. + """ + + def __init__( + self, + dev_name: str, + agent_type: str = "Vision", + query: str = "What do you see?", + input_query_stream: Observable | None = None, # type: ignore[type-arg] + input_data_stream: Observable | None = None, # type: ignore[type-arg] + input_video_stream: Observable | None = None, # type: ignore[type-arg] + output_dir: str = os.path.join(os.getcwd(), "assets", "agent"), + agent_memory: AbstractAgentSemanticMemory | None = None, + system_query: str | None = None, + max_input_tokens_per_request: int = 128000, + max_output_tokens_per_request: int = 16384, + model_name: str = "gpt-4o", + prompt_builder: PromptBuilder | None = None, + tokenizer: AbstractTokenizer | None = None, + rag_query_n: int = 4, + rag_similarity_threshold: float = 0.45, + skills: AbstractSkill | list[AbstractSkill] | SkillLibrary | None = None, + response_model: BaseModel | None = None, + frame_processor: FrameProcessor | None = None, + image_detail: str = "low", + pool_scheduler: ThreadPoolScheduler | None = None, + process_all_inputs: bool | None = None, + openai_client: OpenAI | None = None, + ) -> None: + """ + Initializes a new instance of the OpenAIAgent. + + Args: + dev_name (str): The device name of the agent. + agent_type (str): The type of the agent. + query (str): The default query text. + input_query_stream (Observable): An observable for query input. + input_data_stream (Observable): An observable for data input. + input_video_stream (Observable): An observable for video frames. + output_dir (str): Directory for output files. + agent_memory (AbstractAgentSemanticMemory): The memory system. + system_query (str): The system prompt to use with RAG context. + max_input_tokens_per_request (int): Maximum tokens for input. + max_output_tokens_per_request (int): Maximum tokens for output. + model_name (str): The OpenAI model name to use. + prompt_builder (PromptBuilder): Custom prompt builder. + tokenizer (AbstractTokenizer): Custom tokenizer for token counting. + rag_query_n (int): Number of results to fetch in RAG queries. + rag_similarity_threshold (float): Minimum similarity for RAG results. + skills (Union[AbstractSkill, List[AbstractSkill], SkillLibrary]): Skills available to the agent. + response_model (BaseModel): Optional Pydantic model for responses. + frame_processor (FrameProcessor): Custom frame processor. + image_detail (str): Detail level for images ("low", "high", "auto"). + pool_scheduler (ThreadPoolScheduler): The scheduler to use for thread pool operations. + If None, the global scheduler from get_scheduler() will be used. + process_all_inputs (bool): Whether to process all inputs or skip when busy. + If None, defaults to True for text queries and merged streams, False for video streams. + openai_client (OpenAI): The OpenAI client to use. This can be used to specify + a custom OpenAI client if targetting another provider. + """ + # Determine appropriate default for process_all_inputs if not provided + if process_all_inputs is None: + if input_query_stream is not None: + process_all_inputs = True + else: + process_all_inputs = False + + super().__init__( + dev_name=dev_name, + agent_type=agent_type, + agent_memory=agent_memory, + pool_scheduler=pool_scheduler, + process_all_inputs=process_all_inputs, + system_query=system_query, + input_query_stream=input_query_stream, + input_data_stream=input_data_stream, + input_video_stream=input_video_stream, + ) + self.client = openai_client or OpenAI() + self.query = query + self.output_dir = output_dir + os.makedirs(self.output_dir, exist_ok=True) + + # Configure skill library. + self.skills = skills + self.skill_library = None + if isinstance(self.skills, SkillLibrary): + self.skill_library = self.skills + elif isinstance(self.skills, list): + self.skill_library = SkillLibrary() + for skill in self.skills: + self.skill_library.add(skill) + elif isinstance(self.skills, AbstractSkill): + self.skill_library = SkillLibrary() + self.skill_library.add(self.skills) + + self.response_model = response_model if response_model is not None else NOT_GIVEN + self.model_name = model_name + self.tokenizer = tokenizer or OpenAITokenizer(model_name=self.model_name) + self.prompt_builder = prompt_builder or PromptBuilder( + self.model_name, tokenizer=self.tokenizer + ) + self.rag_query_n = rag_query_n + self.rag_similarity_threshold = rag_similarity_threshold + self.image_detail = image_detail + self.max_output_tokens_per_request = max_output_tokens_per_request + self.max_input_tokens_per_request = max_input_tokens_per_request + self.max_tokens_per_request = max_input_tokens_per_request + max_output_tokens_per_request + + # Add static context to memory. + self._add_context_to_memory() + + self.frame_processor = frame_processor or FrameProcessor(delete_on_init=True) + + logger.info("OpenAI Agent Initialized.") + + def _add_context_to_memory(self) -> None: + """Adds initial context to the agent's memory.""" + context_data = [ + ( + "id0", + "Optical Flow is a technique used to track the movement of objects in a video sequence.", + ), + ( + "id1", + "Edge Detection is a technique used to identify the boundaries of objects in an image.", + ), + ("id2", "Video is a sequence of frames captured at regular intervals."), + ( + "id3", + "Colors in Optical Flow are determined by the movement of light, and can be used to track the movement of objects.", + ), + ( + "id4", + "Json is a data interchange format that is easy for humans to read and write, and easy for machines to parse and generate.", + ), + ] + for doc_id, text in context_data: + self.agent_memory.add_vector(doc_id, text) # type: ignore[no-untyped-call] + + def _send_query(self, messages: list) -> Any: # type: ignore[type-arg] + """Sends the query to OpenAI's API. + + Depending on whether a response model is provided, the appropriate API + call is made. + + Args: + messages (list): The prompt messages to send. + + Returns: + The response message from OpenAI. + + Raises: + Exception: If no response message is returned. + ConnectionError: If there's an issue connecting to the API. + ValueError: If the messages or other parameters are invalid. + """ + try: + if self.response_model is not NOT_GIVEN: + response = self.client.beta.chat.completions.parse( + model=self.model_name, + messages=messages, + response_format=self.response_model, # type: ignore[arg-type] + tools=( + self.skill_library.get_tools() # type: ignore[arg-type] + if self.skill_library is not None + else NOT_GIVEN + ), + max_tokens=self.max_output_tokens_per_request, + ) + else: + response = self.client.chat.completions.create( # type: ignore[assignment] + model=self.model_name, + messages=messages, + max_tokens=self.max_output_tokens_per_request, + tools=( + self.skill_library.get_tools() # type: ignore[arg-type] + if self.skill_library is not None + else NOT_GIVEN + ), + ) + response_message = response.choices[0].message + if response_message is None: + logger.error("Response message does not exist.") + raise Exception("Response message does not exist.") + return response_message + except ConnectionError as ce: + logger.error(f"Connection error with API: {ce}") + raise + except ValueError as ve: + logger.error(f"Invalid parameters: {ve}") + raise + except Exception as e: + logger.error(f"Unexpected error in API call: {e}") + raise + + def stream_query(self, query_text: str) -> Observable: # type: ignore[type-arg] + """Creates an observable that processes a text query and emits the response. + + This method provides a simple way to send a text query and get an observable + stream of the response. It's designed for one-off queries rather than + continuous processing of input streams. + + Args: + query_text (str): The query text to process. + + Returns: + Observable: An observable that emits the response as a string. + """ + return create( + lambda observer, _: self._observable_query(observer, incoming_query=query_text) # type: ignore[arg-type] + ) + + +# endregion OpenAIAgent Subclass (OpenAI-Specific Implementation) diff --git a/dimos/agents/agent_config.py b/dimos/agents_deprecated/agent_config.py similarity index 97% rename from dimos/agents/agent_config.py rename to dimos/agents_deprecated/agent_config.py index 5b9027b072..ee6be0e3ad 100644 --- a/dimos/agents/agent_config.py +++ b/dimos/agents_deprecated/agent_config.py @@ -13,7 +13,7 @@ # limitations under the License. -from dimos.agents.agent import Agent +from dimos.agents_deprecated.agent import Agent class AgentConfig: diff --git a/dimos/agents/agent_message.py b/dimos/agents_deprecated/agent_message.py similarity index 98% rename from dimos/agents/agent_message.py rename to dimos/agents_deprecated/agent_message.py index 7be0ad84f2..e43f141bd4 100644 --- a/dimos/agents/agent_message.py +++ b/dimos/agents_deprecated/agent_message.py @@ -17,7 +17,7 @@ from dataclasses import dataclass, field import time -from dimos.agents.agent_types import AgentImage +from dimos.agents_deprecated.agent_types import AgentImage from dimos.msgs.sensor_msgs.Image import Image diff --git a/dimos/agents/agent_types.py b/dimos/agents_deprecated/agent_types.py similarity index 100% rename from dimos/agents/agent_types.py rename to dimos/agents_deprecated/agent_types.py diff --git a/dimos/agents/claude_agent.py b/dimos/agents_deprecated/claude_agent.py similarity index 99% rename from dimos/agents/claude_agent.py rename to dimos/agents_deprecated/claude_agent.py index 0e3fda3d59..cf1042909c 100644 --- a/dimos/agents/claude_agent.py +++ b/dimos/agents_deprecated/claude_agent.py @@ -29,7 +29,7 @@ from dotenv import load_dotenv # Local imports -from dimos.agents.agent import LLMAgent +from dimos.agents_deprecated.agent import LLMAgent from dimos.skills.skills import AbstractSkill, SkillLibrary from dimos.stream.frame_processor import FrameProcessor from dimos.utils.logging_config import setup_logger @@ -39,8 +39,8 @@ from reactivex import Observable from reactivex.scheduler import ThreadPoolScheduler - from dimos.agents.memory.base import AbstractAgentSemanticMemory - from dimos.agents.prompt_builder.impl import PromptBuilder + from dimos.agents_deprecated.memory.base import AbstractAgentSemanticMemory + from dimos.agents_deprecated.prompt_builder.impl import PromptBuilder # Initialize environment variables load_dotenv() diff --git a/dimos/agents/prompt_builder/__init__.py b/dimos/agents_deprecated/memory/__init__.py similarity index 100% rename from dimos/agents/prompt_builder/__init__.py rename to dimos/agents_deprecated/memory/__init__.py diff --git a/dimos/agents/memory/base.py b/dimos/agents_deprecated/memory/base.py similarity index 100% rename from dimos/agents/memory/base.py rename to dimos/agents_deprecated/memory/base.py diff --git a/dimos/agents/memory/chroma_impl.py b/dimos/agents_deprecated/memory/chroma_impl.py similarity index 98% rename from dimos/agents/memory/chroma_impl.py rename to dimos/agents_deprecated/memory/chroma_impl.py index 96cc32aa54..a594f4a682 100644 --- a/dimos/agents/memory/chroma_impl.py +++ b/dimos/agents_deprecated/memory/chroma_impl.py @@ -19,7 +19,7 @@ from langchain_openai import OpenAIEmbeddings import torch -from dimos.agents.memory.base import AbstractAgentSemanticMemory +from dimos.agents_deprecated.memory.base import AbstractAgentSemanticMemory class ChromaAgentSemanticMemory(AbstractAgentSemanticMemory): diff --git a/dimos/agents/memory/image_embedding.py b/dimos/agents_deprecated/memory/image_embedding.py similarity index 100% rename from dimos/agents/memory/image_embedding.py rename to dimos/agents_deprecated/memory/image_embedding.py diff --git a/dimos/agents/memory/spatial_vector_db.py b/dimos/agents_deprecated/memory/spatial_vector_db.py similarity index 98% rename from dimos/agents/memory/spatial_vector_db.py rename to dimos/agents_deprecated/memory/spatial_vector_db.py index 1e0611d643..69cbbbf9cd 100644 --- a/dimos/agents/memory/spatial_vector_db.py +++ b/dimos/agents_deprecated/memory/spatial_vector_db.py @@ -24,7 +24,7 @@ import chromadb import numpy as np -from dimos.agents.memory.visual_memory import VisualMemory +from dimos.agents_deprecated.memory.visual_memory import VisualMemory from dimos.types.robot_location import RobotLocation from dimos.utils.logging_config import setup_logger @@ -247,7 +247,7 @@ def query_by_text(self, text: str, limit: int = 5) -> list[dict]: # type: ignor List of results, each containing the image, its metadata, and similarity score """ if self.embedding_provider is None: - from dimos.agents.memory.image_embedding import ImageEmbeddingProvider + from dimos.agents_deprecated.memory.image_embedding import ImageEmbeddingProvider self.embedding_provider = ImageEmbeddingProvider(model_name="clip") diff --git a/dimos/agents/memory/test_image_embedding.py b/dimos/agents_deprecated/memory/test_image_embedding.py similarity index 99% rename from dimos/agents/memory/test_image_embedding.py rename to dimos/agents_deprecated/memory/test_image_embedding.py index b1e7cabf09..98574462d5 100644 --- a/dimos/agents/memory/test_image_embedding.py +++ b/dimos/agents_deprecated/memory/test_image_embedding.py @@ -23,7 +23,7 @@ import pytest from reactivex import operators as ops -from dimos.agents.memory.image_embedding import ImageEmbeddingProvider +from dimos.agents_deprecated.memory.image_embedding import ImageEmbeddingProvider from dimos.stream.video_provider import VideoProvider diff --git a/dimos/agents/memory/visual_memory.py b/dimos/agents_deprecated/memory/visual_memory.py similarity index 100% rename from dimos/agents/memory/visual_memory.py rename to dimos/agents_deprecated/memory/visual_memory.py diff --git a/dimos/agents/modules/__init__.py b/dimos/agents_deprecated/modules/__init__.py similarity index 100% rename from dimos/agents/modules/__init__.py rename to dimos/agents_deprecated/modules/__init__.py diff --git a/dimos/agents/modules/base.py b/dimos/agents_deprecated/modules/base.py similarity index 98% rename from dimos/agents/modules/base.py rename to dimos/agents_deprecated/modules/base.py index 8714378a89..898047e4d9 100644 --- a/dimos/agents/modules/base.py +++ b/dimos/agents_deprecated/modules/base.py @@ -21,17 +21,17 @@ from reactivex.subject import Subject -from dimos.agents.agent_message import AgentMessage -from dimos.agents.agent_types import AgentResponse, ConversationHistory, ToolCall -from dimos.agents.memory.base import AbstractAgentSemanticMemory -from dimos.agents.memory.chroma_impl import OpenAISemanticMemory +from dimos.agents_deprecated.agent_message import AgentMessage +from dimos.agents_deprecated.agent_types import AgentResponse, ConversationHistory, ToolCall +from dimos.agents_deprecated.memory.base import AbstractAgentSemanticMemory +from dimos.agents_deprecated.memory.chroma_impl import OpenAISemanticMemory from dimos.skills.skills import AbstractSkill, SkillLibrary from dimos.utils.logging_config import setup_logger try: from .gateway import UnifiedGatewayClient except ImportError: - from dimos.agents.modules.gateway import UnifiedGatewayClient + from dimos.agents_deprecated.modules.gateway import UnifiedGatewayClient logger = setup_logger() diff --git a/dimos/agents/modules/base_agent.py b/dimos/agents_deprecated/modules/base_agent.py similarity index 96% rename from dimos/agents/modules/base_agent.py rename to dimos/agents_deprecated/modules/base_agent.py index 2ebd253530..0516a5a0bc 100644 --- a/dimos/agents/modules/base_agent.py +++ b/dimos/agents_deprecated/modules/base_agent.py @@ -17,9 +17,9 @@ import threading from typing import Any -from dimos.agents.agent_message import AgentMessage -from dimos.agents.agent_types import AgentResponse -from dimos.agents.memory.base import AbstractAgentSemanticMemory +from dimos.agents_deprecated.agent_message import AgentMessage +from dimos.agents_deprecated.agent_types import AgentResponse +from dimos.agents_deprecated.memory.base import AbstractAgentSemanticMemory from dimos.core import In, Module, Out, rpc from dimos.skills.skills import AbstractSkill, SkillLibrary from dimos.utils.logging_config import setup_logger @@ -27,7 +27,7 @@ try: from .base import BaseAgent except ImportError: - from dimos.agents.modules.base import BaseAgent + from dimos.agents_deprecated.modules.base import BaseAgent logger = setup_logger() diff --git a/dimos/agents/modules/gateway/__init__.py b/dimos/agents_deprecated/modules/gateway/__init__.py similarity index 100% rename from dimos/agents/modules/gateway/__init__.py rename to dimos/agents_deprecated/modules/gateway/__init__.py diff --git a/dimos/agents/modules/gateway/client.py b/dimos/agents_deprecated/modules/gateway/client.py similarity index 100% rename from dimos/agents/modules/gateway/client.py rename to dimos/agents_deprecated/modules/gateway/client.py diff --git a/dimos/agents/modules/gateway/tensorzero_embedded.py b/dimos/agents_deprecated/modules/gateway/tensorzero_embedded.py similarity index 100% rename from dimos/agents/modules/gateway/tensorzero_embedded.py rename to dimos/agents_deprecated/modules/gateway/tensorzero_embedded.py diff --git a/dimos/agents/modules/gateway/tensorzero_simple.py b/dimos/agents_deprecated/modules/gateway/tensorzero_simple.py similarity index 100% rename from dimos/agents/modules/gateway/tensorzero_simple.py rename to dimos/agents_deprecated/modules/gateway/tensorzero_simple.py diff --git a/dimos/agents/modules/gateway/utils.py b/dimos/agents_deprecated/modules/gateway/utils.py similarity index 100% rename from dimos/agents/modules/gateway/utils.py rename to dimos/agents_deprecated/modules/gateway/utils.py diff --git a/dimos/agents/tokenizer/__init__.py b/dimos/agents_deprecated/prompt_builder/__init__.py similarity index 100% rename from dimos/agents/tokenizer/__init__.py rename to dimos/agents_deprecated/prompt_builder/__init__.py diff --git a/dimos/agents/prompt_builder/impl.py b/dimos/agents_deprecated/prompt_builder/impl.py similarity index 98% rename from dimos/agents/prompt_builder/impl.py rename to dimos/agents_deprecated/prompt_builder/impl.py index 42deea09dd..0714e39895 100644 --- a/dimos/agents/prompt_builder/impl.py +++ b/dimos/agents_deprecated/prompt_builder/impl.py @@ -15,8 +15,8 @@ from textwrap import dedent -from dimos.agents.tokenizer.base import AbstractTokenizer -from dimos.agents.tokenizer.openai_tokenizer import OpenAITokenizer +from dimos.agents_deprecated.tokenizer.base import AbstractTokenizer +from dimos.agents_deprecated.tokenizer.openai_tokenizer import OpenAITokenizer # TODO: Make class more generic when implementing other tokenizers. Presently its OpenAI specific. # TODO: Build out testing and logging diff --git a/dimos/agents_deprecated/tokenizer/__init__.py b/dimos/agents_deprecated/tokenizer/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dimos/agents/tokenizer/base.py b/dimos/agents_deprecated/tokenizer/base.py similarity index 100% rename from dimos/agents/tokenizer/base.py rename to dimos/agents_deprecated/tokenizer/base.py diff --git a/dimos/agents/tokenizer/huggingface_tokenizer.py b/dimos/agents_deprecated/tokenizer/huggingface_tokenizer.py similarity index 98% rename from dimos/agents/tokenizer/huggingface_tokenizer.py rename to dimos/agents_deprecated/tokenizer/huggingface_tokenizer.py index cf51347253..5b1eef7357 100644 --- a/dimos/agents/tokenizer/huggingface_tokenizer.py +++ b/dimos/agents_deprecated/tokenizer/huggingface_tokenizer.py @@ -14,7 +14,7 @@ from transformers import AutoTokenizer # type: ignore[import-untyped] -from dimos.agents.tokenizer.base import AbstractTokenizer +from dimos.agents_deprecated.tokenizer.base import AbstractTokenizer from dimos.utils.logging_config import setup_logger diff --git a/dimos/agents/tokenizer/openai_tokenizer.py b/dimos/agents_deprecated/tokenizer/openai_tokenizer.py similarity index 98% rename from dimos/agents/tokenizer/openai_tokenizer.py rename to dimos/agents_deprecated/tokenizer/openai_tokenizer.py index 15de95aafe..a8b75b8558 100644 --- a/dimos/agents/tokenizer/openai_tokenizer.py +++ b/dimos/agents_deprecated/tokenizer/openai_tokenizer.py @@ -14,7 +14,7 @@ import tiktoken -from dimos.agents.tokenizer.base import AbstractTokenizer +from dimos.agents_deprecated.tokenizer.base import AbstractTokenizer from dimos.utils.logging_config import setup_logger diff --git a/dimos/hardware/camera/module.py b/dimos/hardware/camera/module.py index 929e9548e5..1cb30b671a 100644 --- a/dimos/hardware/camera/module.py +++ b/dimos/hardware/camera/module.py @@ -23,7 +23,7 @@ from reactivex.observable import Observable from dimos import spec -from dimos.agents2 import Output, Reducer, Stream, skill # type: ignore[attr-defined] +from dimos.agents import Output, Reducer, Stream, skill # type: ignore[attr-defined] from dimos.core import Module, ModuleConfig, Out, rpc from dimos.hardware.camera.spec import CameraHardware from dimos.hardware.camera.webcam import Webcam diff --git a/dimos/mapping/osm/demo_osm.py b/dimos/mapping/osm/demo_osm.py index 20d9e40e74..d35794aa22 100644 --- a/dimos/mapping/osm/demo_osm.py +++ b/dimos/mapping/osm/demo_osm.py @@ -15,11 +15,11 @@ from dotenv import load_dotenv -from dimos.agents2.agent import llm_agent -from dimos.agents2.cli.human import human_input -from dimos.agents2.skills.demo_robot import demo_robot -from dimos.agents2.skills.osm import osm_skill -from dimos.agents2.system_prompt import get_system_prompt +from dimos.agents.agent import llm_agent +from dimos.agents.cli.human import human_input +from dimos.agents.skills.demo_robot import demo_robot +from dimos.agents.skills.osm import osm_skill +from dimos.agents.system_prompt import get_system_prompt from dimos.core.blueprints import autoconnect load_dotenv() diff --git a/dimos/models/qwen/video_query.py b/dimos/models/qwen/video_query.py index 0b14bdfbc8..7ba80ae069 100644 --- a/dimos/models/qwen/video_query.py +++ b/dimos/models/qwen/video_query.py @@ -8,8 +8,8 @@ from reactivex import Observable, operators as ops from reactivex.subject import Subject -from dimos.agents.agent import OpenAIAgent -from dimos.agents.tokenizer.huggingface_tokenizer import HuggingFaceTokenizer +from dimos.agents_deprecated.agent import OpenAIAgent +from dimos.agents_deprecated.tokenizer.huggingface_tokenizer import HuggingFaceTokenizer from dimos.utils.threadpool import get_scheduler BBox = tuple[float, float, float, float] # (x1, y1, x2, y2) diff --git a/dimos/navigation/rosnav.py b/dimos/navigation/rosnav.py index bc58e2937c..5c112e52e3 100644 --- a/dimos/navigation/rosnav.py +++ b/dimos/navigation/rosnav.py @@ -45,7 +45,7 @@ from tf2_msgs.msg import TFMessage as ROSTFMessage # type: ignore[attr-defined, import-untyped] from dimos import spec -from dimos.agents2 import Reducer, Stream, skill # type: ignore[attr-defined] +from dimos.agents import Reducer, Stream, skill # type: ignore[attr-defined] from dimos.core import DimosCluster, In, LCMTransport, Module, Out, pSHMTransport, rpc from dimos.core.module import ModuleConfig from dimos.msgs.geometry_msgs import ( diff --git a/dimos/perception/detection/module3D.py b/dimos/perception/detection/module3D.py index a7602ce3b5..cb2d1fb326 100644 --- a/dimos/perception/detection/module3D.py +++ b/dimos/perception/detection/module3D.py @@ -21,7 +21,7 @@ from reactivex.observable import Observable from dimos import spec -from dimos.agents2 import skill # type: ignore[attr-defined] +from dimos.agents import skill # type: ignore[attr-defined] from dimos.core import DimosCluster, In, Out, rpc from dimos.msgs.geometry_msgs import PoseStamped, Quaternion, Transform, Vector3 from dimos.msgs.sensor_msgs import Image, PointCloud2 diff --git a/dimos/perception/spatial_perception.py b/dimos/perception/spatial_perception.py index 58b51563c8..3c6de9a429 100644 --- a/dimos/perception/spatial_perception.py +++ b/dimos/perception/spatial_perception.py @@ -28,9 +28,9 @@ from reactivex.disposable import Disposable from dimos import spec -from dimos.agents.memory.image_embedding import ImageEmbeddingProvider -from dimos.agents.memory.spatial_vector_db import SpatialVectorDB -from dimos.agents.memory.visual_memory import VisualMemory +from dimos.agents_deprecated.memory.image_embedding import ImageEmbeddingProvider +from dimos.agents_deprecated.memory.spatial_vector_db import SpatialVectorDB +from dimos.agents_deprecated.memory.visual_memory import VisualMemory from dimos.constants import DIMOS_PROJECT_ROOT from dimos.core import DimosCluster, In, Module, rpc from dimos.msgs.sensor_msgs import Image diff --git a/dimos/robot/agilex/README.md b/dimos/robot/agilex/README.md index 5d43fa3c3f..8342a6045e 100644 --- a/dimos/robot/agilex/README.md +++ b/dimos/robot/agilex/README.md @@ -135,7 +135,7 @@ The run file pattern for agent integration: #!/usr/bin/env python3 import asyncio import reactivex as rx -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent from dimos.web.robot_web_interface import RobotWebInterface def main(): @@ -266,7 +266,7 @@ class MyRobot: import asyncio import os from my_robot import MyRobot -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent from dimos.skills.basic import BasicSkill from dimos.web.robot_web_interface import RobotWebInterface import reactivex as rx diff --git a/dimos/robot/agilex/README_CN.md b/dimos/robot/agilex/README_CN.md index bc82697776..a8d79ebec1 100644 --- a/dimos/robot/agilex/README_CN.md +++ b/dimos/robot/agilex/README_CN.md @@ -135,7 +135,7 @@ self.manipulation.camera_info.connect(self.camera.camera_info) #!/usr/bin/env python3 import asyncio import reactivex as rx -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent from dimos.web.robot_web_interface import RobotWebInterface def main(): @@ -266,7 +266,7 @@ class MyRobot: import asyncio import os from my_robot import MyRobot -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent from dimos.skills.basic import BasicSkill from dimos.web.robot_web_interface import RobotWebInterface import reactivex as rx diff --git a/dimos/robot/agilex/run.py b/dimos/robot/agilex/run.py index b810272e0a..38a231b211 100644 --- a/dimos/robot/agilex/run.py +++ b/dimos/robot/agilex/run.py @@ -26,7 +26,7 @@ import reactivex as rx import reactivex.operators as ops -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent from dimos.robot.agilex.piper_arm import PiperArmRobot from dimos.skills.kill_skill import KillSkill from dimos.skills.manipulation.pick_and_place import PickAndPlace diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 432e89e190..fd969f6d38 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -37,9 +37,9 @@ "unitree-g1-full": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:full_featured", "unitree-g1-detection": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:detection", "demo-osm": "dimos.mapping.osm.demo_osm:demo_osm", - "demo-skill": "dimos.agents2.skills.demo_skill:demo_skill", - "demo-gps-nav": "dimos.agents2.skills.demo_gps_nav:demo_gps_nav_skill", - "demo-google-maps-skill": "dimos.agents2.skills.demo_google_maps_skill:demo_google_maps_skill", + "demo-skill": "dimos.agents.skills.demo_skill:demo_skill", + "demo-gps-nav": "dimos.agents.skills.demo_gps_nav:demo_gps_nav_skill", + "demo-google-maps-skill": "dimos.agents.skills.demo_google_maps_skill:demo_google_maps_skill", "demo-remapping": "dimos.robot.unitree_webrtc.demo_remapping:remapping", "demo-remapping-transport": "dimos.robot.unitree_webrtc.demo_remapping:remapping_and_transport", "demo-error-on-name-conflicts": "dimos.robot.unitree_webrtc.demo_error_on_name_conflicts:blueprint", @@ -56,25 +56,25 @@ "g1_connection": "dimos.robot.unitree.connection.g1", "g1_joystick": "dimos.robot.unitree_webrtc.g1_joystick_module", "g1_skills": "dimos.robot.unitree_webrtc.unitree_g1_skill_container", - "google_maps_skill": "dimos.agents2.skills.google_maps_skill_container", - "gps_nav_skill": "dimos.agents2.skills.gps_nav_skill", + "google_maps_skill": "dimos.agents.skills.google_maps_skill_container", + "gps_nav_skill": "dimos.agents.skills.gps_nav_skill", "holonomic_local_planner": "dimos.navigation.local_planner.holonomic_local_planner", - "human_input": "dimos.agents2.cli.human", + "human_input": "dimos.agents.cli.human", "keyboard_teleop": "dimos.robot.unitree_webrtc.keyboard_teleop", - "llm_agent": "dimos.agents2.agent", + "llm_agent": "dimos.agents.agent", "mapper": "dimos.robot.unitree_webrtc.type.map", - "navigation_skill": "dimos.agents2.skills.navigation", + "navigation_skill": "dimos.agents.skills.navigation", "object_tracking": "dimos.perception.object_tracker", - "osm_skill": "dimos.agents2.skills.osm", + "osm_skill": "dimos.agents.skills.osm", "replanning_a_star_planner": "dimos.navigation.replanning_a_star.module", "ros_nav": "dimos.navigation.rosnav", "spatial_memory": "dimos.perception.spatial_perception", - "speak_skill": "dimos.agents2.skills.speak_skill", + "speak_skill": "dimos.agents.skills.speak_skill", "unitree_skills": "dimos.robot.unitree_webrtc.unitree_skill_container", "utilization": "dimos.utils.monitoring", "wavefront_frontier_explorer": "dimos.navigation.frontier_exploration.wavefront_frontier_goal_selector", "websocket_vis": "dimos.web.websocket_vis.websocket_vis_module", - "web_input": "dimos.agents2.cli.web", + "web_input": "dimos.agents.cli.web", } diff --git a/dimos/robot/drone/drone.py b/dimos/robot/drone/drone.py index 7816d6a9aa..40f6d1fcb9 100644 --- a/dimos/robot/drone/drone.py +++ b/dimos/robot/drone/drone.py @@ -28,8 +28,8 @@ from reactivex import Observable from dimos import core -from dimos.agents2.skills.google_maps_skill_container import GoogleMapsSkillContainer -from dimos.agents2.skills.osm import OsmSkill +from dimos.agents.skills.google_maps_skill_container import GoogleMapsSkillContainer +from dimos.agents.skills.osm import OsmSkill from dimos.mapping.types import LatLon from dimos.msgs.geometry_msgs import PoseStamped, Twist, Vector3 from dimos.msgs.sensor_msgs import Image @@ -451,9 +451,9 @@ def main() -> None: print(" • /drone/tracking_overlay - Object tracking visualization (Image)") print(" • /drone/tracking_status - Tracking status (String/JSON)") - from dimos.agents2 import Agent # type: ignore[attr-defined] - from dimos.agents2.cli.human import HumanInput - from dimos.agents2.spec import Model, Provider # type: ignore[attr-defined] + from dimos.agents import Agent # type: ignore[attr-defined] + from dimos.agents.cli.human import HumanInput + from dimos.agents.spec import Model, Provider # type: ignore[attr-defined] assert drone.dimos is not None human_input = drone.dimos.deploy(HumanInput) # type: ignore[attr-defined] diff --git a/dimos/robot/unitree/g1/g1agent.py b/dimos/robot/unitree/g1/g1agent.py index 9c94445a1d..ca89319f8e 100644 --- a/dimos/robot/unitree/g1/g1agent.py +++ b/dimos/robot/unitree/g1/g1agent.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dimos import agents2 -from dimos.agents2.skills.navigation import NavigationSkillContainer +from dimos import agents +from dimos.agents.skills.navigation import NavigationSkillContainer from dimos.core import DimosCluster from dimos.perception import spatial_perception from dimos.robot.unitree.g1 import g1detector @@ -37,7 +37,7 @@ def deploy(dimos: DimosCluster, ip: str): # type: ignore[no-untyped-def] ) navskills.start() - agent = agents2.deploy( # type: ignore[attr-defined] + agent = agents.deploy( # type: ignore[attr-defined] dimos, "You are controling a humanoid robot", skill_containers=[connection, nav, camera, spatialmem, navskills], diff --git a/dimos/robot/unitree/go2/go2.py b/dimos/robot/unitree/go2/go2.py index eee38a3cad..11b369f12e 100644 --- a/dimos/robot/unitree/go2/go2.py +++ b/dimos/robot/unitree/go2/go2.py @@ -32,6 +32,6 @@ def deploy(dimos: DimosCluster, ip: str): # type: ignore[no-untyped-def] # lidar=connection, # ) - # agent = agents2.deploy(dimos) + # agent = agents.deploy(dimos) # agent.register_skills(detector) return connection diff --git a/dimos/robot/unitree_webrtc/modular/connection_module.py b/dimos/robot/unitree_webrtc/modular/connection_module.py index fb4e6ac570..0ff7528d41 100644 --- a/dimos/robot/unitree_webrtc/modular/connection_module.py +++ b/dimos/robot/unitree_webrtc/modular/connection_module.py @@ -27,7 +27,7 @@ from reactivex import operators as ops from reactivex.observable import Observable -from dimos.agents2 import Output, Reducer, Stream, skill # type: ignore[attr-defined] +from dimos.agents import Output, Reducer, Stream, skill # type: ignore[attr-defined] from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core import DimosCluster, In, LCMTransport, Module, ModuleConfig, Out, pSHMTransport, rpc from dimos.core.global_config import GlobalConfig diff --git a/dimos/robot/unitree_webrtc/modular/ivan_unitree.py b/dimos/robot/unitree_webrtc/modular/ivan_unitree.py index 9c79274eb2..18f676a8fe 100644 --- a/dimos/robot/unitree_webrtc/modular/ivan_unitree.py +++ b/dimos/robot/unitree_webrtc/modular/ivan_unitree.py @@ -15,7 +15,7 @@ import logging import time -from dimos.agents2.spec import Model, Provider +from dimos.agents.spec import Model, Provider from dimos.core import LCMTransport, start from dimos.msgs.foxglove_msgs import ImageAnnotations from dimos.msgs.sensor_msgs import Image @@ -63,8 +63,8 @@ def goto(pose) -> bool: # type: ignore[no-untyped-def] connection.start() reid.start() - from dimos.agents2 import Agent # type: ignore[attr-defined] - from dimos.agents2.cli.human import HumanInput + from dimos.agents import Agent # type: ignore[attr-defined] + from dimos.agents.cli.human import HumanInput agent = Agent( system_prompt="You are a helpful assistant for controlling a Unitree Go2 robot.", diff --git a/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py b/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py index 0cc76ae51b..c4fd583aef 100644 --- a/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py @@ -25,9 +25,9 @@ ) from dimos_lcm.sensor_msgs import CameraInfo # type: ignore[import-untyped] -from dimos.agents2.agent import llm_agent -from dimos.agents2.cli.human import human_input -from dimos.agents2.skills.navigation import navigation_skill +from dimos.agents.agent import llm_agent +from dimos.agents.cli.human import human_input +from dimos.agents.skills.navigation import navigation_skill from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core.blueprints import autoconnect from dimos.core.transport import LCMTransport, pSHMTransport diff --git a/dimos/robot/unitree_webrtc/unitree_g1_skill_container.py b/dimos/robot/unitree_webrtc/unitree_g1_skill_container.py index 7b1b0757e0..7e027afc0d 100644 --- a/dimos/robot/unitree_webrtc/unitree_g1_skill_container.py +++ b/dimos/robot/unitree_webrtc/unitree_g1_skill_container.py @@ -13,7 +13,7 @@ # limitations under the License. """ -Unitree G1 skill container for the new agents2 framework. +Unitree G1 skill container for the new agents framework. Dynamically generates skills for G1 humanoid robot including arm controls and movement modes. """ diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index 6f27909de3..d65d9b7b5c 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -18,13 +18,13 @@ from dimos_lcm.sensor_msgs import CameraInfo # type: ignore[import-untyped] -from dimos.agents2.agent import llm_agent -from dimos.agents2.cli.human import human_input -from dimos.agents2.cli.web import web_input -from dimos.agents2.ollama_agent import ollama_installed -from dimos.agents2.skills.navigation import navigation_skill -from dimos.agents2.skills.speak_skill import speak_skill -from dimos.agents2.spec import Provider +from dimos.agents.agent import llm_agent +from dimos.agents.cli.human import human_input +from dimos.agents.cli.web import web_input +from dimos.agents.ollama_agent import ollama_installed +from dimos.agents.skills.navigation import navigation_skill +from dimos.agents.skills.speak_skill import speak_skill +from dimos.agents.spec import Provider from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core.blueprints import autoconnect from dimos.core.transport import JpegLcmTransport, JpegShmTransport, LCMTransport, pSHMTransport diff --git a/dimos/web/README.md b/dimos/web/README.md index c7bcd5df20..28f418bb55 100644 --- a/dimos/web/README.md +++ b/dimos/web/README.md @@ -85,7 +85,7 @@ The frontend will be available at http://localhost:3000 ### Unitree Go2 Example ```python -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent from dimos.robot.unitree.unitree_go2 import UnitreeGo2 from dimos.robot.unitree.unitree_skills import MyUnitreeSkills from dimos.web.robot_web_interface import RobotWebInterface diff --git a/dimos/web/dimos_interface/api/README.md b/dimos/web/dimos_interface/api/README.md index 37cafd6e52..a2c15015e8 100644 --- a/dimos/web/dimos_interface/api/README.md +++ b/dimos/web/dimos_interface/api/README.md @@ -34,7 +34,7 @@ The server will start on `http://0.0.0.0:5555`. See DimOS Documentation for more info. ```python -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent from dimos.robot.unitree.unitree_go2 import UnitreeGo2 from dimos.robot.unitree.unitree_skills import MyUnitreeSkills from dimos.web.robot_web_interface import RobotWebInterface diff --git a/tests/agent_manip_flow_flask_test.py b/tests/agent_manip_flow_flask_test.py index 7f7887004b..7a7504e628 100644 --- a/tests/agent_manip_flow_flask_test.py +++ b/tests/agent_manip_flow_flask_test.py @@ -31,7 +31,7 @@ from reactivex.scheduler import ThreadPoolScheduler # Local application imports -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent from dimos.stream.frame_processor import FrameProcessor from dimos.stream.video_operators import VideoOperators as vops from dimos.stream.video_provider import VideoProvider diff --git a/tests/agent_memory_test.py b/tests/agent_memory_test.py index c2c41ad502..44f1a38397 100644 --- a/tests/agent_memory_test.py +++ b/tests/agent_memory_test.py @@ -18,7 +18,7 @@ load_dotenv() -from dimos.agents.memory.chroma_impl import OpenAISemanticMemory +from dimos.agents_deprecated.memory.chroma_impl import OpenAISemanticMemory agent_memory = OpenAISemanticMemory() print("Initialization done.") diff --git a/tests/simple_agent_test.py b/tests/simple_agent_test.py index f2cf8493d4..482f131f4e 100644 --- a/tests/simple_agent_test.py +++ b/tests/simple_agent_test.py @@ -14,7 +14,7 @@ import os -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent from dimos.robot.unitree.unitree_go2 import UnitreeGo2 from dimos.robot.unitree.unitree_ros_control import UnitreeROSControl from dimos.robot.unitree.unitree_skills import MyUnitreeSkills diff --git a/tests/test_agent_alibaba.py b/tests/test_agent_alibaba.py index fa4dfe80bf..1b26a7f009 100644 --- a/tests/test_agent_alibaba.py +++ b/tests/test_agent_alibaba.py @@ -16,8 +16,8 @@ from openai import OpenAI -from dimos.agents.agent import OpenAIAgent -from dimos.agents.tokenizer.huggingface_tokenizer import HuggingFaceTokenizer +from dimos.agents_deprecated.agent import OpenAIAgent +from dimos.agents_deprecated.tokenizer.huggingface_tokenizer import HuggingFaceTokenizer from dimos.robot.unitree.unitree_skills import MyUnitreeSkills from dimos.stream.video_provider import VideoProvider from dimos.utils.threadpool import get_scheduler diff --git a/tests/test_audio_agent.py b/tests/test_audio_agent.py index d79d2040c2..be1322be10 100644 --- a/tests/test_audio_agent.py +++ b/tests/test_audio_agent.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent from dimos.stream.audio.pipelines import stt, tts from dimos.stream.audio.utils import keepalive from dimos.utils.threadpool import get_scheduler diff --git a/tests/test_audio_robot_agent.py b/tests/test_audio_robot_agent.py index 27340fcd80..68c523d0f9 100644 --- a/tests/test_audio_robot_agent.py +++ b/tests/test_audio_robot_agent.py @@ -14,7 +14,7 @@ import os -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent from dimos.robot.unitree.unitree_go2 import UnitreeGo2 from dimos.robot.unitree.unitree_ros_control import UnitreeROSControl from dimos.robot.unitree.unitree_skills import MyUnitreeSkills diff --git a/tests/test_claude_agent_query.py b/tests/test_claude_agent_query.py index 05893a6b9d..548ed4351d 100644 --- a/tests/test_claude_agent_query.py +++ b/tests/test_claude_agent_query.py @@ -14,7 +14,7 @@ from dotenv import load_dotenv -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent # Load API key from environment load_dotenv() diff --git a/tests/test_claude_agent_skills_query.py b/tests/test_claude_agent_skills_query.py index bb5753d2db..c02e895bd7 100644 --- a/tests/test_claude_agent_skills_query.py +++ b/tests/test_claude_agent_skills_query.py @@ -19,7 +19,7 @@ import reactivex as rx import reactivex.operators as ops -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent from dimos.robot.unitree.unitree_go2 import UnitreeGo2 from dimos.robot.unitree.unitree_ros_control import UnitreeROSControl from dimos.robot.unitree.unitree_skills import MyUnitreeSkills diff --git a/tests/test_manipulation_agent.py b/tests/test_manipulation_agent.py index 7651d38fef..312b530744 100644 --- a/tests/test_manipulation_agent.py +++ b/tests/test_manipulation_agent.py @@ -22,7 +22,7 @@ import reactivex.operators as ops from reactivex.subject import BehaviorSubject -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent from dimos.perception.detection2d.detic_2d_det import Detic2DDetector from dimos.perception.object_detection_stream import ObjectDetectionStream from dimos.robot.robot import MockManipulationRobot diff --git a/tests/test_object_detection_agent_data_query_stream.py b/tests/test_object_detection_agent_data_query_stream.py index ca5671f78e..e788e92c13 100644 --- a/tests/test_object_detection_agent_data_query_stream.py +++ b/tests/test_object_detection_agent_data_query_stream.py @@ -20,7 +20,7 @@ from dotenv import load_dotenv from reactivex import operators as ops -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent from dimos.perception.detection2d.detic_2d_det import Detic2DDetector from dimos.perception.object_detection_stream import ObjectDetectionStream from dimos.robot.unitree.unitree_go2 import UnitreeGo2 diff --git a/tests/test_semantic_seg_robot_agent.py b/tests/test_semantic_seg_robot_agent.py index f35fdb53d4..2791f23211 100644 --- a/tests/test_semantic_seg_robot_agent.py +++ b/tests/test_semantic_seg_robot_agent.py @@ -17,7 +17,7 @@ import cv2 from reactivex import Subject, operators as RxOps -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent from dimos.perception.semantic_seg import SemanticSegmentationStream from dimos.robot.unitree.unitree_go2 import UnitreeGo2 from dimos.robot.unitree.unitree_ros_control import UnitreeROSControl diff --git a/tests/test_skills.py b/tests/test_skills.py index 139a4efe59..a4bdd5942e 100644 --- a/tests/test_skills.py +++ b/tests/test_skills.py @@ -17,7 +17,7 @@ import unittest from unittest import mock -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent from dimos.robot.robot import MockRobot from dimos.robot.unitree.unitree_skills import MyUnitreeSkills from dimos.skills.skills import AbstractSkill @@ -123,7 +123,7 @@ def setUp(self): skills=self.skill_library, ) - @mock.patch("dimos.agents.agent.OpenAIAgent.run_observable_query") + @mock.patch("dimos.agents_deprecated.agent.OpenAIAgent.run_observable_query") def test_agent_skill_identification(self, mock_query): """Test that the agent can identify skills based on natural language.""" # Mock the agent response @@ -139,7 +139,7 @@ def test_agent_skill_identification(self, mock_query): self.assertEqual(response, "I found the TestSkill and executed it.") @mock.patch.object(TestSkill, "__call__") - @mock.patch("dimos.agents.agent.OpenAIAgent.run_observable_query") + @mock.patch("dimos.agents_deprecated.agent.OpenAIAgent.run_observable_query") def test_agent_skill_execution(self, mock_query, mock_skill_call): """Test that the agent can execute skills properly.""" # Mock the agent and skill call diff --git a/tests/test_skills_rest.py b/tests/test_skills_rest.py index a9493e3c79..6d3afc522c 100644 --- a/tests/test_skills_rest.py +++ b/tests/test_skills_rest.py @@ -18,7 +18,7 @@ import reactivex as rx import reactivex.operators as ops -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent from dimos.skills.rest.rest import GenericRestSkill from dimos.skills.skills import SkillLibrary from dimos.web.robot_web_interface import RobotWebInterface diff --git a/tests/test_spatial_memory.py b/tests/test_spatial_memory.py index e9c8c623b9..5cd587f28c 100644 --- a/tests/test_spatial_memory.py +++ b/tests/test_spatial_memory.py @@ -22,7 +22,7 @@ import reactivex from reactivex import operators as ops -from dimos.agents.memory.visual_memory import VisualMemory +from dimos.agents_deprecated.memory.visual_memory import VisualMemory from dimos.msgs.geometry_msgs import Quaternion, Vector3 # from dimos.robot.unitree_webrtc.unitree_go2 import UnitreeGo2 # Uncomment when properly configured diff --git a/tests/test_spatial_memory_query.py b/tests/test_spatial_memory_query.py index 539f5f5eb0..d6935a2116 100644 --- a/tests/test_spatial_memory_query.py +++ b/tests/test_spatial_memory_query.py @@ -28,7 +28,7 @@ import cv2 import matplotlib.pyplot as plt -from dimos.agents.memory.visual_memory import VisualMemory +from dimos.agents_deprecated.memory.visual_memory import VisualMemory from dimos.perception.spatial_perception import SpatialMemory diff --git a/tests/test_standalone_project_out.py b/tests/test_standalone_project_out.py index 9a924f7f42..13211d8032 100644 --- a/tests/test_standalone_project_out.py +++ b/tests/test_standalone_project_out.py @@ -122,14 +122,14 @@ def extract_function_info(filename): # Usage: -file_path = "./dimos/agents/memory/base.py" +file_path = "./dimos/agents_deprecated/memory/base.py" extracted_info = extract_function_info(file_path) print(extracted_info) -file_path = "./dimos/agents/memory/chroma_impl.py" +file_path = "./dimos/agents_deprecated/memory/chroma_impl.py" extracted_info = extract_function_info(file_path) print(extracted_info) -file_path = "./dimos/agents/agent.py" +file_path = "./dimos/agents_deprecated/agent.py" extracted_info = extract_function_info(file_path) print(extracted_info) diff --git a/tests/test_unitree_agent.py b/tests/test_unitree_agent.py index 5c4b6acb7b..c9999b6f20 100644 --- a/tests/test_unitree_agent.py +++ b/tests/test_unitree_agent.py @@ -21,7 +21,7 @@ # ----- -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent from dimos.robot.unitree.unitree_go2 import UnitreeGo2 from dimos.robot.unitree.unitree_skills import MyUnitreeSkills from dimos.stream.data_provider import QueryDataProvider diff --git a/tests/test_unitree_agent_queries_fastapi.py b/tests/test_unitree_agent_queries_fastapi.py index e144b8856c..77c36c14ef 100644 --- a/tests/test_unitree_agent_queries_fastapi.py +++ b/tests/test_unitree_agent_queries_fastapi.py @@ -30,7 +30,7 @@ import reactivex.operators as ops # Local application imports -from dimos.agents.agent import OpenAIAgent +from dimos.agents_deprecated.agent import OpenAIAgent from dimos.robot.unitree.unitree_go2 import UnitreeGo2 from dimos.robot.unitree.unitree_skills import MyUnitreeSkills from dimos.utils.logging_config import setup_logger diff --git a/tests/test_unitree_ros_v0.0.4.py b/tests/test_unitree_ros_v0.0.4.py index efb39be2bf..bee088b85a 100644 --- a/tests/test_unitree_ros_v0.0.4.py +++ b/tests/test_unitree_ros_v0.0.4.py @@ -18,7 +18,7 @@ import reactivex as rx import reactivex.operators as ops -from dimos.agents.claude_agent import ClaudeAgent +from dimos.agents_deprecated.claude_agent import ClaudeAgent from dimos.perception.detection2d.detic_2d_det import Detic2DDetector from dimos.perception.object_detection_stream import ObjectDetectionStream from dimos.robot.unitree.unitree_go2 import UnitreeGo2