Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
7 changes: 4 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ repos:
- --use-current-year

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.11
rev: v0.14.1
hooks:
#- id: ruff-check
# args: [--fix]
- id: ruff-format
stages: [pre-commit]
- id: ruff-check
args: [--fix, --unsafe-fixes]

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
Expand Down
5 changes: 2 additions & 3 deletions bin/filter-errors-after-date
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
# Used to filter errors to only show lines committed on or after a specific date
# Can be chained with filter-errors-for-user

import sys
from datetime import datetime
import re
import subprocess
from datetime import datetime

import sys

_blame = {}

Expand Down
3 changes: 1 addition & 2 deletions bin/filter-errors-for-user
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

# Used when running `./bin/mypy-strict --for-me`

import sys
import re
import subprocess

import sys

_blame = {}

Expand Down
98 changes: 52 additions & 46 deletions dimos/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,32 @@
import json
import os
import threading
from typing import Any, Tuple, Optional, Union
from typing import TYPE_CHECKING, Any

# Third-party imports
from dotenv import load_dotenv
from openai import NOT_GIVEN, OpenAI
from pydantic import BaseModel
from reactivex import Observer, create, Observable, empty, operators as RxOps, just
from reactivex import Observable, Observer, create, empty, just, operators as RxOps
from reactivex.disposable import CompositeDisposable, Disposable
from reactivex.scheduler import ThreadPoolScheduler
from reactivex.subject import Subject

# Local imports
from dimos.agents.memory.base import AbstractAgentSemanticMemory
from dimos.agents.memory.chroma_impl import OpenAISemanticMemory
from dimos.agents.prompt_builder.impl import PromptBuilder
from dimos.agents.tokenizer.base import AbstractTokenizer
from dimos.agents.tokenizer.openai_tokenizer import OpenAITokenizer
from dimos.skills.skills import AbstractSkill, SkillLibrary
from dimos.stream.frame_processor import FrameProcessor
from dimos.stream.stream_merger import create_stream_merger
from dimos.stream.video_operators import Operators as MyOps, VideoOperators as MyVidOps
from dimos.utils.threadpool import get_scheduler
from dimos.utils.logging_config import setup_logger
from dimos.utils.threadpool import get_scheduler

if TYPE_CHECKING:
from reactivex.scheduler import ThreadPoolScheduler

from dimos.agents.memory.base import AbstractAgentSemanticMemory
from dimos.agents.tokenizer.base import AbstractTokenizer

# Initialize environment variables
load_dotenv()
Expand All @@ -75,9 +78,9 @@ def __init__(
self,
dev_name: str = "NA",
agent_type: str = "Base",
agent_memory: Optional[AbstractAgentSemanticMemory] = None,
pool_scheduler: Optional[ThreadPoolScheduler] = None,
):
agent_memory: AbstractAgentSemanticMemory | None = None,
pool_scheduler: ThreadPoolScheduler | None = None,
) -> None:
"""
Initializes a new instance of the Agent.

Expand All @@ -94,7 +97,7 @@ def __init__(
self.disposables = CompositeDisposable()
self.pool_scheduler = pool_scheduler if pool_scheduler else get_scheduler()

def dispose_all(self):
def dispose_all(self) -> None:
"""Disposes of all active subscriptions managed by this agent."""
if self.disposables:
self.disposables.dispose()
Expand Down Expand Up @@ -145,16 +148,16 @@ def __init__(
self,
dev_name: str = "NA",
agent_type: str = "LLM",
agent_memory: Optional[AbstractAgentSemanticMemory] = None,
pool_scheduler: Optional[ThreadPoolScheduler] = None,
agent_memory: AbstractAgentSemanticMemory | None = None,
pool_scheduler: ThreadPoolScheduler | None = None,
process_all_inputs: bool = False,
system_query: Optional[str] = None,
system_query: str | None = None,
max_output_tokens_per_request: int = 16384,
max_input_tokens_per_request: int = 128000,
input_query_stream: Optional[Observable] = None,
input_data_stream: Optional[Observable] = None,
input_video_stream: Optional[Observable] = None,
):
input_query_stream: Observable | None = None,
input_data_stream: Observable | None = None,
input_video_stream: Observable | None = None,
) -> None:
"""
Initializes a new instance of the LLMAgent.

Expand All @@ -169,9 +172,9 @@ def __init__(
"""
super().__init__(dev_name, agent_type, agent_memory, pool_scheduler)
# These attributes can be configured by a subclass if needed.
self.query: Optional[str] = None
self.prompt_builder: Optional[PromptBuilder] = None
self.system_query: Optional[str] = system_query
self.query: str | None = None
self.prompt_builder: PromptBuilder | None = None
self.system_query: str | None = system_query
self.image_detail: str = "low"
self.max_input_tokens_per_request: int = max_input_tokens_per_request
self.max_output_tokens_per_request: int = max_output_tokens_per_request
Expand All @@ -180,7 +183,7 @@ def __init__(
)
self.rag_query_n: int = 4
self.rag_similarity_threshold: float = 0.45
self.frame_processor: Optional[FrameProcessor] = None
self.frame_processor: FrameProcessor | None = None
self.output_dir: str = os.path.join(os.getcwd(), "assets", "agent")
self.process_all_inputs: bool = process_all_inputs
os.makedirs(self.output_dir, exist_ok=True)
Expand Down Expand Up @@ -225,8 +228,11 @@ def __init__(
)

logger.info("Subscribing to merged input stream...")

# Define a query extractor for the merged stream
query_extractor = lambda emission: (emission[0], emission[1][0])
def query_extractor(emission):
return (emission[0], emission[1][0])

self.disposables.add(
self.subscribe_to_image_processing(
self.merged_stream, query_extractor=query_extractor
Expand All @@ -241,7 +247,7 @@ def __init__(
logger.info("Subscribing to input query stream...")
self.disposables.add(self.subscribe_to_query_processing(self.input_query_stream))

def _update_query(self, incoming_query: Optional[str]) -> None:
def _update_query(self, incoming_query: str | None) -> None:
"""Updates the query if an incoming query is provided.

Args:
Expand All @@ -250,7 +256,7 @@ def _update_query(self, incoming_query: Optional[str]) -> None:
if incoming_query is not None:
self.query = incoming_query

def _get_rag_context(self) -> Tuple[str, str]:
def _get_rag_context(self) -> tuple[str, str]:
"""Queries the agent memory to retrieve RAG context.

Returns:
Expand All @@ -273,8 +279,8 @@ def _get_rag_context(self) -> Tuple[str, str]:

def _build_prompt(
self,
base64_image: Optional[str],
dimensions: Optional[Tuple[int, int]],
base64_image: str | None,
dimensions: tuple[int, int] | None,
override_token_limit: bool,
condensed_results: str,
) -> list:
Expand Down Expand Up @@ -370,10 +376,10 @@ def _tooling_callback(message, messages, response_message, skill_library: SkillL
def _observable_query(
self,
observer: Observer,
base64_image: Optional[str] = None,
dimensions: Optional[Tuple[int, int]] = None,
base64_image: str | None = None,
dimensions: tuple[int, int] | None = None,
override_token_limit: bool = False,
incoming_query: Optional[str] = None,
incoming_query: str | None = None,
):
"""Prepares and sends a query to the LLM, emitting the response to the observer.

Expand Down Expand Up @@ -449,7 +455,7 @@ def _send_query(self, messages: list) -> Any:
"""
raise NotImplementedError("Subclasses must implement _send_query method.")

def _log_response_to_file(self, response, output_dir: str = None):
def _log_response_to_file(self, response, output_dir: str | None = None) -> None:
"""Logs the LLM response to a file.

Args:
Expand Down Expand Up @@ -670,7 +676,7 @@ def run_observable_query(self, query_text: str, **kwargs) -> Observable:
)
)

def dispose_all(self):
def dispose_all(self) -> None:
"""Disposes of all active subscriptions managed by this agent."""
super().dispose_all()
self.response_subject.on_completed()
Expand All @@ -695,27 +701,27 @@ def __init__(
dev_name: str,
agent_type: str = "Vision",
query: str = "What do you see?",
input_query_stream: Optional[Observable] = None,
input_data_stream: Optional[Observable] = None,
input_video_stream: Optional[Observable] = None,
input_query_stream: Observable | None = None,
input_data_stream: Observable | None = None,
input_video_stream: Observable | None = None,
output_dir: str = os.path.join(os.getcwd(), "assets", "agent"),
agent_memory: Optional[AbstractAgentSemanticMemory] = None,
system_query: Optional[str] = None,
agent_memory: AbstractAgentSemanticMemory | None = None,
system_query: str | None = None,
max_input_tokens_per_request: int = 128000,
max_output_tokens_per_request: int = 16384,
model_name: str = "gpt-4o",
prompt_builder: Optional[PromptBuilder] = None,
tokenizer: Optional[AbstractTokenizer] = None,
prompt_builder: PromptBuilder | None = None,
tokenizer: AbstractTokenizer | None = None,
rag_query_n: int = 4,
rag_similarity_threshold: float = 0.45,
skills: Optional[Union[AbstractSkill, list[AbstractSkill], SkillLibrary]] = None,
response_model: Optional[BaseModel] = None,
frame_processor: Optional[FrameProcessor] = None,
skills: AbstractSkill | list[AbstractSkill] | SkillLibrary | None = None,
response_model: BaseModel | None = None,
frame_processor: FrameProcessor | None = None,
image_detail: str = "low",
pool_scheduler: Optional[ThreadPoolScheduler] = None,
process_all_inputs: Optional[bool] = None,
openai_client: Optional[OpenAI] = None,
):
pool_scheduler: ThreadPoolScheduler | None = None,
process_all_inputs: bool | None = None,
openai_client: OpenAI | None = None,
) -> None:
"""
Initializes a new instance of the OpenAIAgent.

Expand Down Expand Up @@ -803,7 +809,7 @@ def __init__(

logger.info("OpenAI Agent Initialized.")

def _add_context_to_memory(self):
def _add_context_to_memory(self) -> None:
"""Adds initial context to the agent's memory."""
context_data = [
(
Expand Down
10 changes: 5 additions & 5 deletions dimos/agents/agent_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List

from dimos.agents.agent import Agent


class AgentConfig:
def __init__(self, agents: List[Agent] = None):
def __init__(self, agents: list[Agent] | None = None) -> None:
"""
Initialize an AgentConfig with a list of agents.

Expand All @@ -26,7 +26,7 @@ def __init__(self, agents: List[Agent] = None):
"""
self.agents = agents if agents is not None else []

def add_agent(self, agent: Agent):
def add_agent(self, agent: Agent) -> None:
"""
Add an agent to the configuration.

Expand All @@ -35,7 +35,7 @@ def add_agent(self, agent: Agent):
"""
self.agents.append(agent)

def remove_agent(self, agent: Agent):
def remove_agent(self, agent: Agent) -> None:
"""
Remove an agent from the configuration.

Expand All @@ -45,7 +45,7 @@ def remove_agent(self, agent: Agent):
if agent in self.agents:
self.agents.remove(agent)

def get_agents(self) -> List[Agent]:
def get_agents(self) -> list[Agent]:
"""
Get the list of configured agents.

Expand Down
Loading
Loading