Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9f8c0e0
Unitree go2 runfile integration tool call issues
spomichter Sep 8, 2025
3f8cd96
webcam agent
leshy Sep 11, 2025
ad82529
old date stream fix
leshy Sep 12, 2025
97ea354
no pytest agent test
leshy Sep 12, 2025
a5e7b0b
human cli, fixed module/partial/local deployment
leshy Sep 12, 2025
9ba84ae
truncate so messages are more readable
paul-nechifor Sep 12, 2025
72d6616
do not publish none message
paul-nechifor Sep 12, 2025
a56f244
temp fix for pickling skills
paul-nechifor Sep 12, 2025
590909f
fix date json encoding
paul-nechifor Sep 12, 2025
db5a0de
CI code cleanup
paul-nechifor Sep 12, 2025
82080bc
add small cafe image
paul-nechifor Sep 13, 2025
51dd78b
rewriting cli
leshy Sep 13, 2025
36bc304
better human cli
leshy Sep 13, 2025
56eca4e
nicer human cli
leshy Sep 13, 2025
e0c5125
shutdown lcm
paul-nechifor Sep 14, 2025
4332bd8
agent lock
paul-nechifor Sep 14, 2025
5833e96
remove agent lock
paul-nechifor Sep 14, 2025
6e299d6
extreme hack
paul-nechifor Sep 14, 2025
6cc083f
Merge branch 'unitree-agents2-skill-integration' into unitree-agents2…
paul-nechifor Sep 14, 2025
832ecdb
add human input through human cli
paul-nechifor Sep 15, 2025
63ee782
remove panels from mujoco window
paul-nechifor Sep 15, 2025
8d0e943
fix kwargs calls
paul-nechifor Sep 15, 2025
14520c9
recorded tests
paul-nechifor Sep 13, 2025
f536f92
CI code cleanup
paul-nechifor Sep 13, 2025
8c5ac1e
fix
paul-nechifor Sep 13, 2025
49f62fe
fix imports
paul-nechifor Sep 15, 2025
601801f
Merge branch 'dev' into unitree-agents2-skill-integration-paul
paul-nechifor Sep 15, 2025
973ff40
fix
paul-nechifor Sep 16, 2025
cf53a3a
CI code cleanup
paul-nechifor Sep 16, 2025
fceb7b2
test fix
leshy Sep 15, 2025
4bd67d5
fix getattr
paul-nechifor Sep 16, 2025
19eaa9d
use one loop per test
paul-nechifor Sep 16, 2025
141f818
revert unitree_go2
paul-nechifor Sep 16, 2025
ec182b4
remove prints
paul-nechifor Sep 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data/.lfs/cafe-smol.jpg.tar.gz
Git LFS file not shown
2 changes: 2 additions & 0 deletions dimos/agents2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@

from dimos.agents2.agent import Agent
from dimos.agents2.spec import AgentSpec
from dimos.protocol.skill.skill import skill
from dimos.protocol.skill.type import Output, Reducer, Stream
96 changes: 81 additions & 15 deletions dimos/agents2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.
import asyncio
import json
import datetime
import uuid
from operator import itemgetter
from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union

Expand Down Expand Up @@ -40,7 +42,7 @@

def toolmsg_from_state(state: SkillState) -> ToolMessage:
if state.skill_config.output != Output.standard:
content = "Special output, see separate message"
content = "output attached in separate messages"
else:
content = state.content()

Expand Down Expand Up @@ -78,6 +80,12 @@ def summary_from_state(state: SkillState, special_data: bool = False) -> SkillSt
}


def _custom_json_serializers(obj):
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")


# takes an overview of running skills from the coorindator
# and builds messages to be sent to an agent
def snapshot_to_messages(
Expand All @@ -99,6 +107,10 @@ def snapshot_to_messages(
# (images for example, requires to be a HumanMessage)
special_msgs: List[HumanMessage] = []

# for special skills that want to return a separate message that should
# stay in history, like actual human messages, critical events
history_msgs: List[HumanMessage] = []

# Initialize state_msg
state_msg = None

Expand All @@ -109,25 +121,34 @@ def snapshot_to_messages(
if skill_state.call_id in tool_call_ids:
tool_msgs.append(toolmsg_from_state(skill_state))

special_data = skill_state.skill_config.output != Output.standard
if skill_state.skill_config.output == Output.human:
content = skill_state.content()
if not content:
continue
history_msgs.append(HumanMessage(content=content))
continue

special_data = skill_state.skill_config.output == Output.image
if special_data:
content = skill_state.content()
if not content:
continue
special_msgs.append(HumanMessage(content=[content]))
special_msgs.append(HumanMessage(content=content))

if skill_state.call_id in tool_call_ids:
continue

state_overview.append(summary_from_state(skill_state, special_data))

if state_overview:
state_msg = AIMessage(
"State Overview:\n" + "\n".join(map(json.dumps, state_overview)),
state_overview_str = "\n".join(
json.dumps(s, default=_custom_json_serializers) for s in state_overview
)
state_msg = AIMessage("State Overview:\n" + state_overview_str)

return {
"tool_msgs": tool_msgs if tool_msgs else [],
"tool_msgs": tool_msgs,
"history_msgs": history_msgs,
"state_msgs": ([state_msg] if state_msg else []) + special_msgs,
}

Expand All @@ -147,6 +168,8 @@ def __init__(
self.state_messages = []
self.coordinator = SkillCoordinator()
self._history = []
self._agent_id = str(uuid.uuid4())
self._agent_stopped = False

if self.config.system_prompt:
if isinstance(self.config.system_prompt, str):
Expand All @@ -165,13 +188,18 @@ def __init__(
model_provider=self.config.provider, model=self.config.model
)

@rpc
def get_agent_id(self) -> str:
return self._agent_id

@rpc
def start(self):
self.coordinator.start()

@rpc
def stop(self):
self.coordinator.stop()
self._agent_stopped = True

def clear_history(self):
self._history.clear()
Expand All @@ -188,6 +216,9 @@ def history(self):
# Used by agent to execute tool calls
def execute_tool_calls(self, tool_calls: List[ToolCall]) -> None:
"""Execute a list of tool calls from the agent."""
if self._agent_stopped:
logger.warning("Agent is stopped, cannot execute tool calls.")
return
for tool_call in tool_calls:
logger.info(f"executing skill call {tool_call}")
self.coordinator.call_skill(
Expand All @@ -197,12 +228,32 @@ def execute_tool_calls(self, tool_calls: List[ToolCall]) -> None:
)

# used to inject skill calls into the agent loop without agent asking for it
def run_implicit_skill(self, skill_name: str, *args, **kwargs) -> None:
self.coordinator.call_skill(False, skill_name, {"args": args, "kwargs": kwargs})
def run_implicit_skill(self, skill_name: str, **kwargs) -> None:
if self._agent_stopped:
logger.warning("Agent is stopped, cannot execute implicit skill calls.")
return
self.coordinator.call_skill(False, skill_name, {"args": kwargs})

async def agent_loop(self, first_query: str = ""):
# TODO: Should I add a lock here to prevent concurrent calls to agent_loop?

if self._agent_stopped:
logger.warning("Agent is stopped, cannot run agent loop.")
# return "Agent is stopped."
import traceback

traceback.print_stack()
return "Agent is stopped."

async def agent_loop(self, seed_query: str = ""):
self.state_messages = []
self.append_history(HumanMessage(seed_query))
if first_query:
self.append_history(HumanMessage(first_query))

def _get_state() -> str:
# TODO: FIX THIS EXTREME HACK
update = self.coordinator.generate_snapshot(clear=False)
snapshot_msgs = snapshot_to_messages(update, msg.tool_calls)
return json.dumps(snapshot_msgs, sort_keys=True, default=lambda o: repr(o))

try:
while True:
Expand All @@ -222,6 +273,8 @@ async def agent_loop(self, seed_query: str = ""):

logger.info(f"Agent response: {msg.content}")

state = _get_state()

if msg.tool_calls:
self.execute_tool_calls(msg.tool_calls)

Expand All @@ -234,7 +287,9 @@ async def agent_loop(self, seed_query: str = ""):

# coordinator will continue once a skill state has changed in
# such a way that agent call needs to be executed
await self.coordinator.wait_for_updates()

if state == _get_state():
await self.coordinator.wait_for_updates()

# we request a full snapshot of currently running, finished or errored out skills
# we ask for removal of finished skills from subsequent snapshots (clear=True)
Expand All @@ -246,19 +301,30 @@ async def agent_loop(self, seed_query: str = ""):
snapshot_msgs = snapshot_to_messages(update, msg.tool_calls)

self.state_messages = snapshot_msgs.get("state_msgs", [])
self.append_history(*snapshot_msgs.get("tool_msgs", []))
self.append_history(
*snapshot_msgs.get("tool_msgs", []), *snapshot_msgs.get("history_msgs", [])
)

except Exception as e:
logger.error(f"Error in agent loop: {e}")
import traceback

traceback.print_exc()

@rpc
def loop_thread(self):
asyncio.run_coroutine_threadsafe(self.agent_loop(), self._loop)
return True

@rpc
def query(self, query: str):
return asyncio.ensure_future(self.agent_loop(query), loop=self._loop)
# TODO: could this be
# from distributed.utils import sync
# return sync(self._loop, self.agent_loop, query)
return asyncio.run_coroutine_threadsafe(self.agent_loop(query), self._loop).result()

def query_async(self, query: str):
return self.agent_loop(query)
async def query_async(self, query: str):
return await self.agent_loop(query)

def register_skills(self, container):
return self.coordinator.register_skills(container)
Expand Down
35 changes: 35 additions & 0 deletions dimos/agents2/cli/human.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import queue

from dimos.agents2 import Output, Reducer, Stream, skill
from dimos.core import Module, pLCMTransport


class HumanInput(Module):
running: bool = False

@skill(stream=Stream.call_agent, reducer=Reducer.string, output=Output.human)
def human(self):
"""receives human input, no need to run this, it's running implicitly"""
if self.running:
return "already running"
self.running = True
transport = pLCMTransport("/human_input")

msg_queue = queue.Queue()
transport.subscribe(msg_queue.put)
for message in iter(msg_queue.get, None):
yield message
Loading