Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
37bc2e8
agents2
leshy Aug 9, 2025
29d2538
fix skill tests
leshy Aug 9, 2025
9b7a2be
cleanup
leshy Aug 9, 2025
8ea2455
small fixes, restructure of configurable
leshy Aug 9, 2025
4796a5d
bugfixes
leshy Aug 10, 2025
21f82fe
get_loop functionality
leshy Aug 10, 2025
0d455ae
langchain dep
leshy Aug 10, 2025
b92446f
plucked ci changes from agent-refactor
leshy Aug 10, 2025
b6bf28c
skillcontainer hosts skill execution
leshy Aug 14, 2025
c37f611
better documentation
leshy Aug 14, 2025
34da551
skillcoordinator handles threading
leshy Aug 14, 2025
c7a7446
streaming skill sketch, async skill sketch
leshy Aug 14, 2025
c29e887
initial streaming implemented
leshy Aug 14, 2025
b45b83c
type fixes, work on reducers
leshy Aug 14, 2025
5c1cbfb
skill state bugfix
leshy Aug 14, 2025
07b153a
test bugfix
leshy Aug 14, 2025
b1f5d9d
reducer rewrite starting
leshy Aug 17, 2025
5cc4ebf
new reducer structure implemented
leshy Aug 17, 2025
6359e5c
reducer restructure checkpoint, tests passing
leshy Aug 17, 2025
c665a44
finished reducing
leshy Aug 17, 2025
c647960
__str__ for coordinator and skill state
leshy Aug 17, 2025
547e7d3
cleanup
leshy Aug 17, 2025
a05d1c0
passive skills tests
leshy Aug 17, 2025
95b250e
ToolMessage/situational awareness msg ordering
leshy Aug 17, 2025
237c4c6
agent initial working version
leshy Aug 17, 2025
5561c0f
coordinator -> agent interface still needs work
leshy Aug 17, 2025
789d8f6
major agent cleanup
leshy Aug 18, 2025
fa270e6
agent publishes messages exchanged, for observability
leshy Aug 18, 2025
f970c95
agentspy renamed to skillspy
leshy Aug 18, 2025
2d8ecc4
agentspy
leshy Aug 18, 2025
6c42069
implicit skills
leshy Aug 18, 2025
89be5d8
tests fix
leshy Aug 18, 2025
c2b4c7f
small comments cleanup
leshy Aug 19, 2025
7c84d5a
ci tests fix
leshy Aug 20, 2025
6bb2384
Merge branch 'dev' into agent2
leshy Aug 25, 2025
e98bf09
initial image implementation
leshy Aug 26, 2025
7090564
Remove type alias not supported in python 3.10
spomichter Aug 27, 2025
5094483
mock agent implementation
leshy Aug 27, 2025
5aea816
mock agent testing, image calls
leshy Aug 27, 2025
63572e5
reducers are pickleable
leshy Aug 27, 2025
3400b24
mock agent, rpc client inherits docstrings, all modules are skillcont…
leshy Aug 27, 2025
1af4bf4
enabled single-process mock agent test
leshy Aug 27, 2025
60b492c
agent encoding happens before message is sent from a skillcontainer
leshy Aug 27, 2025
7c32d2b
Merge branch 'dev' into agent2
leshy Aug 28, 2025
00d21be
test fix
leshy Aug 30, 2025
2488df6
Merge branch 'dev' into agent2
leshy Aug 30, 2025
4016e64
CI code cleanup
leshy Aug 30, 2025
86d05dc
skill threading bugfix
leshy Aug 30, 2025
b46fc3c
Merge branch 'agent2' of github.com:dimensionalOS/dimos into agent2
leshy Aug 30, 2025
a84b89c
core test fix
leshy Aug 31, 2025
812949c
Added temp development tests and files
spomichter Sep 2, 2025
f2c82a8
Rewrote LM generated tests, deleted reduntant tests
spomichter Sep 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions dimos/agents2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from langchain_core.messages import (
AIMessage,
HumanMessage,
MessageLikeRepresentation,
SystemMessage,
ToolCall,
ToolMessage,
)

from dimos.agents2.agent import Agent
from dimos.agents2.spec import AgentSpec
257 changes: 257 additions & 0 deletions dimos/agents2/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
from operator import itemgetter
from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union

from langchain.chat_models import init_chat_model
from langchain_core.messages import (
AIMessage,
HumanMessage,
SystemMessage,
ToolCall,
ToolMessage,
)

from dimos.agents2.spec import AgentSpec
from dimos.core import rpc
from dimos.msgs.sensor_msgs import Image
from dimos.protocol.skill.coordinator import SkillCoordinator, SkillState, SkillStateDict
from dimos.protocol.skill.type import Output
from dimos.utils.logging_config import setup_logger

logger = setup_logger("dimos.protocol.agents2")


SYSTEM_MSG_APPEND = "\nYour message history will always be appended with a System Overview message that provides situational awareness."


def toolmsg_from_state(state: SkillState) -> ToolMessage:
return ToolMessage(
# if agent call has been triggered by another skill,
# and this specific skill didn't finish yet but we need a tool call response
# we return a message explaining that execution is still ongoing
content=state.content()
or "Running, you will be called with an update, no need for subsequent tool calls",
name=state.name,
tool_call_id=state.call_id,
)


class SkillStateSummary(TypedDict):
name: str
call_id: str
state: str
data: Any


def summary_from_state(state: SkillState, special_data: bool = False) -> SkillStateSummary:
content = state.content()
if isinstance(content, dict):
content = json.dumps(content)

if not isinstance(content, str):
content = str(content)

return {
"name": state.name,
"call_id": state.call_id,
"state": state.state.name,
"data": state.content() if not special_data else "data will be in a separate message",
}


# takes an overview of running skills from the coorindator
# and builds messages to be sent to an agent
def snapshot_to_messages(
state: SkillStateDict,
tool_calls: List[ToolCall],
) -> Tuple[List[ToolMessage], Optional[AIMessage]]:
# builds a set of tool call ids from a previous agent request
tool_call_ids = set(
map(itemgetter("id"), tool_calls),
)

# build a tool msg responses
tool_msgs: list[ToolMessage] = []

# build a general skill state overview (for longer running skills)
state_overview: list[Dict[str, SkillStateSummary]] = []

# for special skills that want to return a separate message
# (images for example, requires to be a HumanMessage)
special_msgs: List[HumanMessage] = []

# Initialize state_msg
state_msg = None

for skill_state in sorted(
state.values(),
key=lambda skill_state: skill_state.duration(),
):
if skill_state.call_id in tool_call_ids:
tool_msgs.append(toolmsg_from_state(skill_state))
continue

special_data = skill_state.skill_config.output != Output.standard
if special_data:
print("special data from skill", skill_state.name, skill_state.content())
special_msgs.append(HumanMessage(content=[skill_state.content()]))

state_overview.append(summary_from_state(skill_state, special_data))

if state_overview:
state_msg = AIMessage(
"State Overview:\n" + "\n".join(map(json.dumps, state_overview)),
)

return {
"tool_msgs": tool_msgs if tool_msgs else [],
"state_msgs": ([state_msg] if state_msg else []) + special_msgs,
}


# Agent class job is to glue skill coordinator state to an agent, builds langchain messages
class Agent(AgentSpec):
system_message: SystemMessage
state_messages: List[Union[AIMessage, HumanMessage]]

def __init__(
self,
*args,
**kwargs,
):
AgentSpec.__init__(self, *args, **kwargs)

self.state_messages = []
self.coordinator = SkillCoordinator()
self._history = []

if self.config.system_prompt:
if isinstance(self.config.system_prompt, str):
self.system_message = SystemMessage(self.config.system_prompt + SYSTEM_MSG_APPEND)
else:
self.config.system_prompt.content += SYSTEM_MSG_APPEND
self.system_message = self.config.system_prompt

self.publish(self.system_message)

# Use provided model instance if available, otherwise initialize from config
if self.config.model_instance:
self._llm = self.config.model_instance
else:
self._llm = init_chat_model(
model_provider=self.config.provider, model=self.config.model
)

@rpc
def start(self):
self.coordinator.start()

@rpc
def stop(self):
self.coordinator.stop()

def clear_history(self):
self._history.clear()

def append_history(self, *msgs: List[Union[AIMessage, HumanMessage]]):
for msg in msgs:
self.publish(msg)

self._history.extend(msgs)

def history(self):
return [self.system_message] + self._history + self.state_messages

# Used by agent to execute tool calls
def execute_tool_calls(self, tool_calls: List[ToolCall]) -> None:
"""Execute a list of tool calls from the agent."""
for tool_call in tool_calls:
logger.info(f"executing skill call {tool_call}")
self.coordinator.call_skill(
tool_call.get("id"),
tool_call.get("name"),
tool_call.get("args"),
)

# used to inject skill calls into the agent loop without agent asking for it
def run_implicit_skill(self, skill_name: str, *args, **kwargs) -> None:
self.coordinator.call_skill(False, skill_name, {"args": args, "kwargs": kwargs})

async def agent_loop(self, seed_query: str = ""):
self.append_history(HumanMessage(seed_query))

try:
while True:
# we are getting tools from the coordinator on each turn
# since this allows for skillcontainers to dynamically provide new skills
tools = self.get_tools()
self._llm = self._llm.bind_tools(tools)

# publish to /agent topic for observability
for state_msg in self.state_messages:
self.publish(state_msg)

# history() builds our message history dynamically
# ensures we include latest system state, but not old ones.
msg = self._llm.invoke(self.history())
self.append_history(msg)

logger.info(f"Agent response: {msg.content}")

if msg.tool_calls:
self.execute_tool_calls(msg.tool_calls)

print(self)
print(self.coordinator)

if not self.coordinator.has_active_skills():
logger.info("No active tasks, exiting agent loop.")
return msg.content

# coordinator will continue once a skill state has changed in
# such a way that agent call needs to be executed
await self.coordinator.wait_for_updates()

# we request a full snapshot of currently running, finished or errored out skills
# we ask for removal of finished skills from subsequent snapshots (clear=True)
update = self.coordinator.generate_snapshot(clear=True)

# generate tool_msgs and general state update message,
# depending on a skill having associated tool call from previous interaction
# we will return a tool message, and not a general state message
snapshot_msgs = snapshot_to_messages(update, msg.tool_calls)

self.state_messages = snapshot_msgs.get("state_msgs", [])
self.append_history(*snapshot_msgs.get("tool_msgs", []))

except Exception as e:
logger.error(f"Error in agent loop: {e}")
import traceback

traceback.print_exc()

def query_async(self, query: str):
return asyncio.ensure_future(self.agent_loop(query), loop=self._loop)

def query(self, query: str):
return asyncio.run_coroutine_threadsafe(self.agent_loop(query), self._loop).result()

def register_skills(self, container):
return self.coordinator.register_skills(container)

def get_tools(self):
return self.coordinator.get_tools()
Loading
Loading