From 8de71e2df259b83341be4124320c49218bcb6a1f Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 10:11:44 -0700 Subject: [PATCH 01/19] initial sketch of module-agent interface --- dimos/core/module.py | 41 ++++++++++++++- dimos/protocol/rpc/__init.py | 2 +- dimos/protocol/rpc/pubsubrpc.py | 4 +- dimos/protocol/rpc/spec.py | 2 +- dimos/protocol/tool/comms.py | 88 +++++++++++++++++++++++++++++++++ dimos/protocol/tool/tool.py | 48 ++++++++++++++++++ dimos/types/timestamped.py | 5 +- 7 files changed, 182 insertions(+), 8 deletions(-) create mode 100644 dimos/protocol/tool/comms.py create mode 100644 dimos/protocol/tool/tool.py diff --git a/dimos/core/module.py b/dimos/core/module.py index c2a33869ce..05c7955aae 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import inspect +from enum import Enum from typing import ( Any, Callable, + Optional, + TypeVar, get_args, get_origin, get_type_hints, @@ -25,19 +28,53 @@ from dimos.core import colors from dimos.core.core import T, rpc from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport -from dimos.protocol.rpc.lcmrpc import LCMRPC +from dimos.protocol.rpc import LCMRPC, RPCSpec +from dimos.protocol.tf import LCMTF, TFSpec +from dimos.protocol.tool.comms import LCMToolComms, ToolCommsSpec + + +class CommsSpec(Enum): + rpc: RPCSpec + agent: ToolCommsSpec + tf: TFSpec + + +class LCMComms(CommsSpec): + rpc: LCMRPC + agent: LCMToolComms + tf: LCMTF class ModuleBase: + comms: CommsSpec = LCMComms + _rpc: Optional[RPCSpec] = None + _agent: Optional[ToolCommsSpec] = None + _tf: Optional[TFSpec] = None + def __init__(self, *args, **kwargs): + # we can completely override comms protocols if we want + if kwargs.get("comms", None) is not None: + self.comms = kwargs["comms"] try: get_worker() - self.rpc = LCMRPC() + self.rpc = self.comms.rpc() self.rpc.serve_module_rpc(self) self.rpc.start() except ValueError: return + @property + def agent(self): + if self._agent is None: + self._agent = self.comms.agent() + return self._agent + + @property + def tf(self): + if self._tf is None: + self._tf = self.comms.tf() + return self._tf + @property def outputs(self) -> dict[str, Out]: return { diff --git a/dimos/protocol/rpc/__init.py b/dimos/protocol/rpc/__init.py index b38609e9fd..4061c9e9cf 100644 --- a/dimos/protocol/rpc/__init.py +++ b/dimos/protocol/rpc/__init.py @@ -13,4 +13,4 @@ # limitations under the License. from dimos.protocol.rpc.lcmrpc import LCMRPC -from dimos.protocol.rpc.spec import RPC, RPCClient, RPCServer +from dimos.protocol.rpc.spec import RPCClient, RPCServer, RPCSpec diff --git a/dimos/protocol/rpc/pubsubrpc.py b/dimos/protocol/rpc/pubsubrpc.py index 67f0c245e1..c1dcbe7c61 100644 --- a/dimos/protocol/rpc/pubsubrpc.py +++ b/dimos/protocol/rpc/pubsubrpc.py @@ -35,7 +35,7 @@ ) from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub -from dimos.protocol.rpc.spec import RPC, Args, RPCClient, RPCInspectable, RPCServer +from dimos.protocol.rpc.spec import Args, RPCClient, RPCInspectable, RPCServer, RPCSpec from dimos.protocol.service.spec import Service MsgT = TypeVar("MsgT") @@ -57,7 +57,7 @@ class RPCRes(TypedDict): res: Any -class PubSubRPCMixin(RPC, PubSub[TopicT, MsgT], Generic[TopicT, MsgT]): +class PubSubRPCMixin(RPCSpec, PubSub[TopicT, MsgT], Generic[TopicT, MsgT]): @abstractmethod def topicgen(self, name: str, req_or_res: bool) -> TopicT: ... diff --git a/dimos/protocol/rpc/spec.py b/dimos/protocol/rpc/spec.py index 113b5a8531..fbb99d661d 100644 --- a/dimos/protocol/rpc/spec.py +++ b/dimos/protocol/rpc/spec.py @@ -86,4 +86,4 @@ def override_f(*args, fname=fname, **kwargs): self.serve_rpc(override_f, topic) -class RPC(RPCServer, RPCClient): ... +class RPCSpec(RPCServer, RPCClient): ... diff --git a/dimos/protocol/tool/comms.py b/dimos/protocol/tool/comms.py new file mode 100644 index 0000000000..71e83a0186 --- /dev/null +++ b/dimos/protocol/tool/comms.py @@ -0,0 +1,88 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from abc import abstractmethod +from dataclasses import dataclass +from typing import Generic, Optional, TypeVar, Union + +from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic +from dimos.protocol.pubsub.spec import PubSub +from dimos.protocol.service import Service +from dimos.types.timestamped import Timestamped + + +class AgentMessage(Timestamped): + ts: float + + def __init__(self, content: str): + self.ts = time.time() + self.content = content + + def __repr__(self): + return f"AgentMessage(content={self.content})" + + +class ToolCommsSpec: + @abstractmethod + def publish(self, msg: AgentMessage) -> None: ... + + +MsgT = TypeVar("MsgT") +TopicT = TypeVar("TopicT") + + +@dataclass +class PubSubCommsConfig(Generic[TopicT, MsgT]): + topic: Optional[TopicT] = None # Required field but needs default for dataclass inheritance + pubsub: Union[type[PubSub[TopicT, MsgT]], PubSub[TopicT, MsgT], None] = None + autostart: bool = True + + +class PubSubComms(Service, ToolCommsSpec): + default_config: type[PubSubCommsConfig] = PubSubCommsConfig + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + pubsub_config = getattr(self.config, "pubsub", None) + if pubsub_config is not None: + if callable(pubsub_config): + self.pubsub = pubsub_config() + else: + self.pubsub = pubsub_config + else: + raise ValueError("PubSub configuration is missing") + + if getattr(self.config, "autostart", True): + self.start() + + def start(self) -> None: + self.pubsub.start() + + def stop(self): + self.pubsub.stop() + + def publish(self, msg: AgentMessage) -> None: + self.pubsub.publish(self.config.topic, msg) + + +@dataclass +class LCMCommsConfig(PubSubCommsConfig[str, AgentMessage]): + topic: str = "/agent" + pubsub: Union[type[PubSub], PubSub, None] = PickleLCM + autostart: bool = True + + +class LCMToolComms(LCMCommsConfig): + default_config: type[LCMCommsConfig] = LCMCommsConfig diff --git a/dimos/protocol/tool/tool.py b/dimos/protocol/tool/tool.py new file mode 100644 index 0000000000..ed80c5f42a --- /dev/null +++ b/dimos/protocol/tool/tool.py @@ -0,0 +1,48 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import Any, Callable, Generic, Optional, TypeVar + + +class Call(Enum): + Implicit = "implicit" + Explicit = "explicit" + + +class Reducer(Enum): + latest = lambda data: data[-1] if data else None + all = lambda data: data + average = lambda data: sum(data) / len(data) if data else None + + +class Stream(Enum): + none = "none" + passive = "passive" + call_agent = "call_agent" + + +class Return(Enum): + none = "none" + passive = "passive" + call_agent = "call_agent" + + +def tool(reducer=Reducer.latest, stream=Stream.none, ret=Return.call_agent): + def decorator(f: Callable[..., Any]) -> Any: + def wrapper(self, *args, **kwargs): + return f(self, *args, **kwargs) + + wrapper._tool = {reducer: reducer, stream: stream, ret: ret} + return wrapper diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index f948c63751..27d755ac61 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import bisect from datetime import datetime, timezone -from typing import Generic, Iterable, List, Optional, Tuple, TypedDict, TypeVar, Union +from typing import Generic, Iterable, Optional, Tuple, TypedDict, TypeVar, Union + from sortedcontainers import SortedList -import bisect # any class that carries a timestamp should inherit from this # this allows us to work with timeseries in consistent way, allign messages, replay etc From 95e618520893e2925c04f0cc9f4fc7cf95c142a9 Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 10:48:48 -0700 Subject: [PATCH 02/19] message passing established --- dimos/core/__init__.py | 2 +- dimos/core/module.py | 6 --- dimos/protocol/tool/agent_listener.py | 41 +++++++++++++++++++ dimos/protocol/tool/comms.py | 29 ++++++++----- .../{rpc/__init.py => tool/test_tool.py} | 24 ++++++++++- dimos/protocol/tool/tool.py | 40 +++++++++++++++++- 6 files changed, 122 insertions(+), 20 deletions(-) create mode 100644 dimos/protocol/tool/agent_listener.py rename dimos/protocol/{rpc/__init.py => tool/test_tool.py} (53%) diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 9bb1a3dc68..179056d7af 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -12,7 +12,7 @@ from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport from dimos.core.transport import LCMTransport, ZenohTransport, pLCMTransport from dimos.protocol.rpc.lcmrpc import LCMRPC -from dimos.protocol.rpc.spec import RPC +from dimos.protocol.rpc.spec import RPCSpec from dimos.protocol.tf import LCMTF, TF, PubSubTF, TFConfig, TFSpec __all__ = ["TF", "LCMTF", "PubSubTF", "TFSpec", "TFConfig"] diff --git a/dimos/core/module.py b/dimos/core/module.py index 05c7955aae..794cc664a6 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -63,12 +63,6 @@ def __init__(self, *args, **kwargs): except ValueError: return - @property - def agent(self): - if self._agent is None: - self._agent = self.comms.agent() - return self._agent - @property def tf(self): if self._tf is None: diff --git a/dimos/protocol/tool/agent_listener.py b/dimos/protocol/tool/agent_listener.py new file mode 100644 index 0000000000..533a524c6c --- /dev/null +++ b/dimos/protocol/tool/agent_listener.py @@ -0,0 +1,41 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from dimos.protocol.service import Service +from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, ToolCommsSpec + + +@dataclass +class AgentInputConfig: + comms: ToolCommsSpec = LCMToolComms + + +class AgentInput(Service[AgentInputConfig]): + default_config: type[AgentInputConfig] = AgentInputConfig + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self.comms = self.config.comms() + + def start(self) -> None: + self.comms.start() + self.comms.subscribe(self.handle_message) + + def stop(self) -> None: + self.comms.stop() + + def handle_message(self, msg: AgentMsg) -> None: + print(f"Received message: {msg}") diff --git a/dimos/protocol/tool/comms.py b/dimos/protocol/tool/comms.py index 71e83a0186..5607c91a56 100644 --- a/dimos/protocol/tool/comms.py +++ b/dimos/protocol/tool/comms.py @@ -15,7 +15,7 @@ import time from abc import abstractmethod from dataclasses import dataclass -from typing import Generic, Optional, TypeVar, Union +from typing import Callable, Generic, Optional, TypeVar, Union from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic from dimos.protocol.pubsub.spec import PubSub @@ -23,20 +23,24 @@ from dimos.types.timestamped import Timestamped -class AgentMessage(Timestamped): +class AgentMsg(Timestamped): ts: float - def __init__(self, content: str): + def __init__(self, tool: str, content: str | int | float | dict | list) -> None: self.ts = time.time() + self.tool = tool self.content = content def __repr__(self): - return f"AgentMessage(content={self.content})" + return f"AgentMsg(tool={self.tool}, content={self.content})" class ToolCommsSpec: @abstractmethod - def publish(self, msg: AgentMessage) -> None: ... + def publish(self, msg: AgentMsg) -> None: ... + + @abstractmethod + def subscribe(self, cb: Callable[[AgentMsg], None]) -> None: ... MsgT = TypeVar("MsgT") @@ -50,7 +54,7 @@ class PubSubCommsConfig(Generic[TopicT, MsgT]): autostart: bool = True -class PubSubComms(Service, ToolCommsSpec): +class PubSubComms(Service[PubSubCommsConfig], ToolCommsSpec): default_config: type[PubSubCommsConfig] = PubSubCommsConfig def __init__(self, **kwargs) -> None: @@ -73,16 +77,21 @@ def start(self) -> None: def stop(self): self.pubsub.stop() - def publish(self, msg: AgentMessage) -> None: + def publish(self, msg: AgentMsg) -> None: self.pubsub.publish(self.config.topic, msg) + def subscribe(self, cb: Callable[[AgentMsg], None]) -> None: + self.pubsub.subscribe(self.config.topic, lambda msg, topic: cb(msg)) + @dataclass -class LCMCommsConfig(PubSubCommsConfig[str, AgentMessage]): +class LCMCommsConfig(PubSubCommsConfig[str, AgentMsg]): topic: str = "/agent" pubsub: Union[type[PubSub], PubSub, None] = PickleLCM - autostart: bool = True + # lcm needs to be started only if receiving + # tool comms are broadcast only in modules so we don't autostart + autostart: bool = False -class LCMToolComms(LCMCommsConfig): +class LCMToolComms(PubSubComms): default_config: type[LCMCommsConfig] = LCMCommsConfig diff --git a/dimos/protocol/rpc/__init.py b/dimos/protocol/tool/test_tool.py similarity index 53% rename from dimos/protocol/rpc/__init.py rename to dimos/protocol/tool/test_tool.py index 4061c9e9cf..4e7ebd10b9 100644 --- a/dimos/protocol/rpc/__init.py +++ b/dimos/protocol/tool/test_tool.py @@ -12,5 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dimos.protocol.rpc.lcmrpc import LCMRPC -from dimos.protocol.rpc.spec import RPCClient, RPCServer, RPCSpec +from dimos.core import rpc, start +from dimos.protocol.tool.agent_listener import AgentInput +from dimos.protocol.tool.tool import ToolContainer, tool + + +class TestContainer(ToolContainer): + @rpc + @tool() + def add(self, x: int, y: int) -> int: + return x + y + + +def test_introspect_tool(): + testContainer = TestContainer() + print(testContainer.tools) + + +def test_deploy(): + agentInput = AgentInput() + agentInput.start() + testContainer = TestContainer() + print(testContainer.add(1, 2)) diff --git a/dimos/protocol/tool/tool.py b/dimos/protocol/tool/tool.py index ed80c5f42a..6149936cd8 100644 --- a/dimos/protocol/tool/tool.py +++ b/dimos/protocol/tool/tool.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect from enum import Enum from typing import Any, Callable, Generic, Optional, TypeVar +from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, ToolCommsSpec + class Call(Enum): Implicit = "implicit" @@ -42,7 +45,42 @@ class Return(Enum): def tool(reducer=Reducer.latest, stream=Stream.none, ret=Return.call_agent): def decorator(f: Callable[..., Any]) -> Any: def wrapper(self, *args, **kwargs): - return f(self, *args, **kwargs) + val = f(self, *args, **kwargs) + tool = f"{self.__class__.__name__}.{f.__name__}" + self.agent.publish(AgentMsg(tool, val)) + return val wrapper._tool = {reducer: reducer, stream: stream, ret: ret} return wrapper + + return decorator + + +class CommsSpec: + agent: ToolCommsSpec + + +class LCMComms(CommsSpec): + agent: ToolCommsSpec = LCMToolComms + + +class ToolContainer: + comms: CommsSpec = LCMComms() + _agent: ToolCommsSpec = None + + @property + def tools(self): + # Avoid recursion by excluding this property itself + return { + name: getattr(self, name) + for name in dir(self) + if not name.startswith("_") + and name != "tools" + and hasattr(getattr(self, name), "_tool") + } + + @property + def agent(self): + if self._agent is None: + self._agent = self.comms.agent() + return self._agent From 3cd52a5d21e07d8a212313f2e4e18ae70cc03a83 Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 12:06:55 -0700 Subject: [PATCH 03/19] tool config propagation --- dimos/protocol/tool/agent_listener.py | 44 +++++++++--- dimos/protocol/tool/comms.py | 26 +++++++- dimos/protocol/tool/test_tool.py | 18 ++++- dimos/protocol/tool/tool.py | 96 ++++++++++++++++++++------- 4 files changed, 145 insertions(+), 39 deletions(-) diff --git a/dimos/protocol/tool/agent_listener.py b/dimos/protocol/tool/agent_listener.py index 533a524c6c..f8cca58080 100644 --- a/dimos/protocol/tool/agent_listener.py +++ b/dimos/protocol/tool/agent_listener.py @@ -16,26 +16,52 @@ from dimos.protocol.service import Service from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, ToolCommsSpec +from dimos.protocol.tool.tool import ToolContainer, ToolConfig @dataclass class AgentInputConfig: - comms: ToolCommsSpec = LCMToolComms + agent_comms: type[ToolCommsSpec] = LCMToolComms -class AgentInput(Service[AgentInputConfig]): - default_config: type[AgentInputConfig] = AgentInputConfig +class AgentInput(ToolContainer): + running_tools: dict[str, ToolContainer] = {} - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self.comms = self.config.comms() + def __init__(self) -> None: + super().__init__() def start(self) -> None: - self.comms.start() - self.comms.subscribe(self.handle_message) + self.agent_comms.start() + self.agent_comms.subscribe(self.handle_message) def stop(self) -> None: - self.comms.stop() + self.agent_comms.stop() + # updates local tool state (appends to streamed data if needed etc) + # checks if agent needs to be called if AgentMsg has Return call_agent or Stream call_agent def handle_message(self, msg: AgentMsg) -> None: print(f"Received message: {msg}") + + def get_state(self): ... + + # outputs data for the agent call + # clears the local state (finished tool calls) + def agent_call(self): ... + + # outputs a list of tools that are registered + # for the agent to introspect + def get_tools(self): ... + + def register_tools(self, container: ToolContainer): + for tool_name, tool in container.tools.items(): + print(f"Registering tool: {tool_name}, {tool}") + + @property + def tools(self) -> dict[str, ToolConfig]: + """Returns a dictionary of tools registered in this container.""" + # Aggregate all tools from registered containers + all_tools: dict[str, ToolConfig] = {} + for container_name, container in self.running_tools.items(): + for tool_name, tool_config in container.tools.items(): + all_tools[f"{container_name}.{tool_name}"] = tool_config + return all_tools diff --git a/dimos/protocol/tool/comms.py b/dimos/protocol/tool/comms.py index 5607c91a56..d371b72db6 100644 --- a/dimos/protocol/tool/comms.py +++ b/dimos/protocol/tool/comms.py @@ -15,6 +15,7 @@ import time from abc import abstractmethod from dataclasses import dataclass +from enum import Enum from typing import Callable, Generic, Optional, TypeVar, Union from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic @@ -23,16 +24,29 @@ from dimos.types.timestamped import Timestamped +class MsgType(Enum): + start = 0 + stream = 1 + ret = 2 + + class AgentMsg(Timestamped): ts: float - - def __init__(self, tool: str, content: str | int | float | dict | list) -> None: + type: MsgType + + def __init__( + self, + tool: str, + content: str | int | float | dict | list, + type: Optional[MsgType] = MsgType.ret, + ) -> None: self.ts = time.time() self.tool = tool self.content = content + self.type = type def __repr__(self): - return f"AgentMsg(tool={self.tool}, content={self.content})" + return f"AgentMsg(tool={self.tool}, content={self.content}, type={self.type})" class ToolCommsSpec: @@ -42,6 +56,12 @@ def publish(self, msg: AgentMsg) -> None: ... @abstractmethod def subscribe(self, cb: Callable[[AgentMsg], None]) -> None: ... + @abstractmethod + def start(self) -> None: ... + + @abstractmethod + def stop(self) -> None: ... + MsgT = TypeVar("MsgT") TopicT = TypeVar("TopicT") diff --git a/dimos/protocol/tool/test_tool.py b/dimos/protocol/tool/test_tool.py index 4e7ebd10b9..1884b35cb7 100644 --- a/dimos/protocol/tool/test_tool.py +++ b/dimos/protocol/tool/test_tool.py @@ -12,25 +12,37 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dimos.core import rpc, start +import time + from dimos.protocol.tool.agent_listener import AgentInput from dimos.protocol.tool.tool import ToolContainer, tool class TestContainer(ToolContainer): - @rpc @tool() def add(self, x: int, y: int) -> int: return x + y + @tool() + def delayadd(self, x: int, y: int) -> int: + time.sleep(1) + return x + y + def test_introspect_tool(): testContainer = TestContainer() print(testContainer.tools) -def test_deploy(): +def test_comms(): agentInput = AgentInput() agentInput.start() + testContainer = TestContainer() + + agentInput.register_tools(testContainer) + + print(testContainer.delayadd(2, 4, toolcall=True)) print(testContainer.add(1, 2)) + + time.sleep(1.3) diff --git a/dimos/protocol/tool/tool.py b/dimos/protocol/tool/tool.py index 6149936cd8..53fe26a7de 100644 --- a/dimos/protocol/tool/tool.py +++ b/dimos/protocol/tool/tool.py @@ -13,15 +13,17 @@ # limitations under the License. import inspect +import threading from enum import Enum -from typing import Any, Callable, Generic, Optional, TypeVar +from typing import Any, Callable, Generic, Optional, TypedDict, TypeVar, cast -from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, ToolCommsSpec +from dimos.core import colors +from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, MsgType, ToolCommsSpec class Call(Enum): - Implicit = "implicit" - Explicit = "explicit" + Implicit = 0 + Explicit = 1 class Reducer(Enum): @@ -31,48 +33,94 @@ class Reducer(Enum): class Stream(Enum): - none = "none" - passive = "passive" - call_agent = "call_agent" + # no streaming + none = 0 + # passive stream, doesn't schedule an agent call, but returns the value to the agent + passive = 1 + # calls the agent with every value emitted, schedules an agent call + call_agent = 2 class Return(Enum): - none = "none" - passive = "passive" - call_agent = "call_agent" + # doesn't return anything to an agent + none = 0 + # returns the value to the agent, but doesn't schedule an agent call + passive = 1 + # calls the agent with the value, scheduling an agent call + call_agent = 2 + + +class ToolConfig: + def __init__(self, name: str, reducer: Reducer, stream: Stream, ret: Return): + self.name = name + self.reducer = reducer + self.stream = stream + self.ret = ret + + def __str__(self): + parts = [f"name={colors.yellow(self.name)}"] + + # Only show reducer if stream is not none (streaming is happening) + if self.stream != Stream.none: + reducer_name = "unknown" + if self.reducer == Reducer.latest: + reducer_name = "latest" + elif self.reducer == Reducer.all: + reducer_name = "all" + elif self.reducer == Reducer.average: + reducer_name = "average" + parts.append(f"reducer={colors.green(reducer_name)}") + parts.append(f"stream={colors.red(self.stream.name)}") + + # Always show return mode + parts.append(f"ret={colors.blue(self.ret.name)}") + + return f"Tool({', '.join(parts)})" def tool(reducer=Reducer.latest, stream=Stream.none, ret=Return.call_agent): def decorator(f: Callable[..., Any]) -> Any: def wrapper(self, *args, **kwargs): - val = f(self, *args, **kwargs) - tool = f"{self.__class__.__name__}.{f.__name__}" - self.agent.publish(AgentMsg(tool, val)) - return val + tool = f"{f.__name__}" - wrapper._tool = {reducer: reducer, stream: stream, ret: ret} + def run_function(): + self.agent_comms.publish(AgentMsg(tool, None, type=MsgType.start)) + val = f(self, *args, **kwargs) + self.agent_comms.publish(AgentMsg(tool, val, type=MsgType.ret)) + + if kwargs.get("toolcall"): + del kwargs["toolcall"] + thread = threading.Thread(target=run_function) + thread.start() + return None + + return run_function() + + wrapper._tool = ToolConfig(name=f.__name__, reducer=reducer, stream=stream, ret=ret) # type: ignore[attr-defined] + wrapper.__name__ = f.__name__ # Preserve original function name + wrapper.__doc__ = f.__doc__ # Preserve original docstring return wrapper return decorator class CommsSpec: - agent: ToolCommsSpec + agent_comms_class: type[ToolCommsSpec] class LCMComms(CommsSpec): - agent: ToolCommsSpec = LCMToolComms + agent_comms_class: type[ToolCommsSpec] = LCMToolComms class ToolContainer: comms: CommsSpec = LCMComms() - _agent: ToolCommsSpec = None + _agent_comms: Optional[ToolCommsSpec] = None @property - def tools(self): + def tools(self) -> dict[str, ToolConfig]: # Avoid recursion by excluding this property itself return { - name: getattr(self, name) + name: getattr(self, name)._tool for name in dir(self) if not name.startswith("_") and name != "tools" @@ -80,7 +128,7 @@ def tools(self): } @property - def agent(self): - if self._agent is None: - self._agent = self.comms.agent() - return self._agent + def agent_comms(self) -> ToolCommsSpec: + if self._agent_comms is None: + self._agent_comms = self.comms.agent_comms_class() + return self._agent_comms From ac21dca07c86fecc951dc7ef65d88669c1f567f9 Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 13:55:43 -0700 Subject: [PATCH 04/19] types extracted, tool config --- dimos/protocol/tool/agent_listener.py | 87 +++++++++++++++++----- dimos/protocol/tool/comms.py | 26 +------ dimos/protocol/tool/test_tool.py | 10 +-- dimos/protocol/tool/tool.py | 75 ++++--------------- dimos/protocol/tool/types.py | 101 ++++++++++++++++++++++++++ 5 files changed, 190 insertions(+), 109 deletions(-) create mode 100644 dimos/protocol/tool/types.py diff --git a/dimos/protocol/tool/agent_listener.py b/dimos/protocol/tool/agent_listener.py index f8cca58080..56e1ad179b 100644 --- a/dimos/protocol/tool/agent_listener.py +++ b/dimos/protocol/tool/agent_listener.py @@ -15,8 +15,12 @@ from dataclasses import dataclass from dimos.protocol.service import Service -from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, ToolCommsSpec -from dimos.protocol.tool.tool import ToolContainer, ToolConfig +from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, MsgType, ToolCommsSpec +from dimos.protocol.tool.tool import ToolConfig, ToolContainer +from dimos.protocol.tool.types import Stream +from dimos.utils.logging_config import setup_logger + +logger = setup_logger("dimos.protocol.tool.agent_input") @dataclass @@ -25,10 +29,17 @@ class AgentInputConfig: class AgentInput(ToolContainer): - running_tools: dict[str, ToolContainer] = {} + _static_containers: list[ToolContainer] + _dynamic_containers: list[ToolContainer] + _tool_state: dict[str, list[AgentMsg]] + _tools: dict[str, ToolConfig] def __init__(self) -> None: super().__init__() + self._static_containers = [] + self._dynamic_containers = [] + self._tools = {} + self._tool_state = {} def start(self) -> None: self.agent_comms.start() @@ -40,28 +51,66 @@ def stop(self) -> None: # updates local tool state (appends to streamed data if needed etc) # checks if agent needs to be called if AgentMsg has Return call_agent or Stream call_agent def handle_message(self, msg: AgentMsg) -> None: - print(f"Received message: {msg}") + print("AgentInput received message", msg) + self.update_state(msg.tool_name, msg) + + def update_state(self, tool_name: str, msg: AgentMsg) -> None: + if tool_name not in self._tool_state: + self._tool_state[tool_name] = [] + self._tool_state[tool_name].append(msg) + + # we check if message should trigger an agent call + if self.should_call_agent(msg): + self.call_agent() + + def should_call_agent(self, msg) -> bool: + tool_config = self._tools.get(msg.tool_name) + if not tool_config: + logger.warning( + f"Tool {msg.tool_name} not found in registered tools but tool message received {msg}" + ) + return False + + if msg.type == MsgType.start: + return False - def get_state(self): ... + if msg.type == MsgType.stream: + if tool_config.stream == Stream.none or tool_config.stream == Stream.passive: + return False + if tool_config.stream == Stream.call_agent: + return True + + def collect_state(self): + ... + # return {"tool_name": {"state": tool_state, messages: list[AgentMsg]}} # outputs data for the agent call # clears the local state (finished tool calls) - def agent_call(self): ... - - # outputs a list of tools that are registered - # for the agent to introspect - def get_tools(self): ... + def get_agent_query(self): + state = self.collect_state() + ... + # given toolcontainers can run remotely, we are + # caching available tools from static containers + # + # dynamic containers will be queried at runtime via + # .tools() method def register_tools(self, container: ToolContainer): - for tool_name, tool in container.tools.items(): - print(f"Registering tool: {tool_name}, {tool}") + print("registering tool container", container) + if not container.dynamic_tools: + self._static_containers.append(container) + for name, tool_config in container.tools().items(): + self._tools[name] = tool_config + else: + self._dynamic_containers.append(container) - @property def tools(self) -> dict[str, ToolConfig]: - """Returns a dictionary of tools registered in this container.""" - # Aggregate all tools from registered containers - all_tools: dict[str, ToolConfig] = {} - for container_name, container in self.running_tools.items(): - for tool_name, tool_config in container.tools.items(): - all_tools[f"{container_name}.{tool_name}"] = tool_config + # static container tooling is already cached + all_tools: dict[str, ToolConfig] = {**self._tools} + + # Then aggregate tools from dynamic containers + for container in self._dynamic_containers: + for tool_name, tool_config in container.tools().items(): + all_tools[tool_name] = tool_config + return all_tools diff --git a/dimos/protocol/tool/comms.py b/dimos/protocol/tool/comms.py index d371b72db6..c68a6ed188 100644 --- a/dimos/protocol/tool/comms.py +++ b/dimos/protocol/tool/comms.py @@ -21,34 +21,10 @@ from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic from dimos.protocol.pubsub.spec import PubSub from dimos.protocol.service import Service +from dimos.protocol.tool.types import AgentMsg, Call, MsgType, Reducer, Stream, ToolConfig from dimos.types.timestamped import Timestamped -class MsgType(Enum): - start = 0 - stream = 1 - ret = 2 - - -class AgentMsg(Timestamped): - ts: float - type: MsgType - - def __init__( - self, - tool: str, - content: str | int | float | dict | list, - type: Optional[MsgType] = MsgType.ret, - ) -> None: - self.ts = time.time() - self.tool = tool - self.content = content - self.type = type - - def __repr__(self): - return f"AgentMsg(tool={self.tool}, content={self.content}, type={self.type})" - - class ToolCommsSpec: @abstractmethod def publish(self, msg: AgentMsg) -> None: ... diff --git a/dimos/protocol/tool/test_tool.py b/dimos/protocol/tool/test_tool.py index 1884b35cb7..8565b759a9 100644 --- a/dimos/protocol/tool/test_tool.py +++ b/dimos/protocol/tool/test_tool.py @@ -16,6 +16,7 @@ from dimos.protocol.tool.agent_listener import AgentInput from dimos.protocol.tool.tool import ToolContainer, tool +from dimos.protocol.tool.types import Return, Stream class TestContainer(ToolContainer): @@ -25,7 +26,6 @@ def add(self, x: int, y: int) -> int: @tool() def delayadd(self, x: int, y: int) -> int: - time.sleep(1) return x + y @@ -41,8 +41,8 @@ def test_comms(): testContainer = TestContainer() agentInput.register_tools(testContainer) + print("AGENT TOOLS", agentInput.tools()) + testContainer.delayadd(2, 4, toolcall=True) + testContainer.add(1, 2) - print(testContainer.delayadd(2, 4, toolcall=True)) - print(testContainer.add(1, 2)) - - time.sleep(1.3) + time.sleep(2) diff --git a/dimos/protocol/tool/tool.py b/dimos/protocol/tool/tool.py index 53fe26a7de..00126c4a90 100644 --- a/dimos/protocol/tool/tool.py +++ b/dimos/protocol/tool/tool.py @@ -17,65 +17,16 @@ from enum import Enum from typing import Any, Callable, Generic, Optional, TypedDict, TypeVar, cast -from dimos.core import colors -from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, MsgType, ToolCommsSpec - - -class Call(Enum): - Implicit = 0 - Explicit = 1 - - -class Reducer(Enum): - latest = lambda data: data[-1] if data else None - all = lambda data: data - average = lambda data: sum(data) / len(data) if data else None - - -class Stream(Enum): - # no streaming - none = 0 - # passive stream, doesn't schedule an agent call, but returns the value to the agent - passive = 1 - # calls the agent with every value emitted, schedules an agent call - call_agent = 2 - - -class Return(Enum): - # doesn't return anything to an agent - none = 0 - # returns the value to the agent, but doesn't schedule an agent call - passive = 1 - # calls the agent with the value, scheduling an agent call - call_agent = 2 - - -class ToolConfig: - def __init__(self, name: str, reducer: Reducer, stream: Stream, ret: Return): - self.name = name - self.reducer = reducer - self.stream = stream - self.ret = ret - - def __str__(self): - parts = [f"name={colors.yellow(self.name)}"] - - # Only show reducer if stream is not none (streaming is happening) - if self.stream != Stream.none: - reducer_name = "unknown" - if self.reducer == Reducer.latest: - reducer_name = "latest" - elif self.reducer == Reducer.all: - reducer_name = "all" - elif self.reducer == Reducer.average: - reducer_name = "average" - parts.append(f"reducer={colors.green(reducer_name)}") - parts.append(f"stream={colors.red(self.stream.name)}") - - # Always show return mode - parts.append(f"ret={colors.blue(self.ret.name)}") - - return f"Tool({', '.join(parts)})" +from dimos.core import colors, rpc +from dimos.protocol.tool.comms import LCMToolComms, ToolCommsSpec +from dimos.protocol.tool.types import ( + AgentMsg, + MsgType, + Reducer, + Return, + Stream, + ToolConfig, +) def tool(reducer=Reducer.latest, stream=Stream.none, ret=Return.call_agent): @@ -112,11 +63,15 @@ class LCMComms(CommsSpec): agent_comms_class: type[ToolCommsSpec] = LCMToolComms +# here we can have also dynamic tools potentially +# agent can check .tools each time when introspecting class ToolContainer: comms: CommsSpec = LCMComms() _agent_comms: Optional[ToolCommsSpec] = None - @property + dynamic_tools = False + + @rpc def tools(self) -> dict[str, ToolConfig]: # Avoid recursion by excluding this property itself return { diff --git a/dimos/protocol/tool/types.py b/dimos/protocol/tool/types.py new file mode 100644 index 0000000000..64fea48316 --- /dev/null +++ b/dimos/protocol/tool/types.py @@ -0,0 +1,101 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from enum import Enum +from typing import Optional + +from dimos.types.timestamped import Timestamped + + +class Call(Enum): + Implicit = 0 + Explicit = 1 + + +class Reducer(Enum): + all = lambda data: data + latest = lambda data: data[-1] if data else None + average = lambda data: sum(data) / len(data) if data else None + + +class Stream(Enum): + # no streaming + none = 0 + # passive stream, doesn't schedule an agent call, but returns the value to the agent + passive = 1 + # calls the agent with every value emitted, schedules an agent call + call_agent = 2 + + +class Return(Enum): + # doesn't return anything to an agent + none = 0 + # returns the value to the agent, but doesn't schedule an agent call + passive = 1 + # calls the agent with the value, scheduling an agent call + call_agent = 2 + + +class ToolConfig: + def __init__(self, name: str, reducer: Reducer, stream: Stream, ret: Return): + self.name = name + self.reducer = reducer + self.stream = stream + self.ret = ret + + def __str__(self): + parts = [f"name={self.name}"] + + # Only show reducer if stream is not none (streaming is happening) + if self.stream != Stream.none: + reducer_name = "unknown" + if self.reducer == Reducer.latest: + reducer_name = "latest" + elif self.reducer == Reducer.all: + reducer_name = "all" + elif self.reducer == Reducer.average: + reducer_name = "average" + parts.append(f"reducer={reducer_name}") + parts.append(f"stream={self.stream.name}") + + # Always show return mode + parts.append(f"ret={self.ret.name}") + return f"Tool({', '.join(parts)})" + + +class MsgType(Enum): + start = 0 + stream = 1 + ret = 2 + error = 3 + + +class AgentMsg(Timestamped): + ts: float + type: MsgType + + def __init__( + self, + tool_name: str, + content: str | int | float | dict | list, + type: Optional[MsgType] = MsgType.ret, + ) -> None: + self.ts = time.time() + self.tool_name = tool_name + self.content = content + self.type = type + + def __repr__(self): + return f"AgentMsg(tool={self.tool_name}, content={self.content}, type={self.type})" From d80b1ce3e8feae6b830adf16fed4c51f38510b2b Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 14:09:48 -0700 Subject: [PATCH 05/19] __init__ files --- dimos/protocol/rpc/__init__.py | 16 ++++++++++++++++ dimos/protocol/service/__init__.py | 2 ++ 2 files changed, 18 insertions(+) create mode 100644 dimos/protocol/rpc/__init__.py create mode 100644 dimos/protocol/service/__init__.py diff --git a/dimos/protocol/rpc/__init__.py b/dimos/protocol/rpc/__init__.py new file mode 100644 index 0000000000..4061c9e9cf --- /dev/null +++ b/dimos/protocol/rpc/__init__.py @@ -0,0 +1,16 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dimos.protocol.rpc.lcmrpc import LCMRPC +from dimos.protocol.rpc.spec import RPCClient, RPCServer, RPCSpec diff --git a/dimos/protocol/service/__init__.py b/dimos/protocol/service/__init__.py new file mode 100644 index 0000000000..ce8a823f86 --- /dev/null +++ b/dimos/protocol/service/__init__.py @@ -0,0 +1,2 @@ +from dimos.protocol.service.lcmservice import LCMService +from dimos.protocol.service.spec import Service From 6468d964ada590bac0c5b3a230e6d6c396075ec4 Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 15:54:54 -0700 Subject: [PATCH 06/19] agent interface work --- dimos/protocol/rpc/pubsubrpc.py | 1 - dimos/protocol/tool/agent_listener.py | 143 ++++++++++++++++++++------ dimos/protocol/tool/test_tool.py | 10 +- dimos/protocol/tool/tool.py | 8 +- dimos/protocol/tool/types.py | 58 +++++++++-- dimos/types/timestamped.py | 10 ++ 6 files changed, 184 insertions(+), 46 deletions(-) diff --git a/dimos/protocol/rpc/pubsubrpc.py b/dimos/protocol/rpc/pubsubrpc.py index c1dcbe7c61..138607b1ac 100644 --- a/dimos/protocol/rpc/pubsubrpc.py +++ b/dimos/protocol/rpc/pubsubrpc.py @@ -82,7 +82,6 @@ def call(self, name: str, arguments: Args, cb: Optional[Callable]): def call_cb(self, name: str, arguments: Args, cb: Callable) -> Any: topic_req = self.topicgen(name, False) topic_res = self.topicgen(name, True) - msg_id = float(time.time()) req: RPCReq = {"name": name, "args": arguments, "id": msg_id} diff --git a/dimos/protocol/tool/agent_listener.py b/dimos/protocol/tool/agent_listener.py index 56e1ad179b..c0b5bdc356 100644 --- a/dimos/protocol/tool/agent_listener.py +++ b/dimos/protocol/tool/agent_listener.py @@ -12,12 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy from dataclasses import dataclass +from enum import Enum +from pprint import pformat +from typing import Callable, Optional -from dimos.protocol.service import Service from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, MsgType, ToolCommsSpec from dimos.protocol.tool.tool import ToolConfig, ToolContainer from dimos.protocol.tool.types import Stream +from dimos.types.timestamped import TimestampedCollection from dimos.utils.logging_config import setup_logger logger = setup_logger("dimos.protocol.tool.agent_input") @@ -28,10 +32,62 @@ class AgentInputConfig: agent_comms: type[ToolCommsSpec] = LCMToolComms +class ToolStateEnum(Enum): + pending = 0 + running = 1 + finished = 2 + error = 3 + + +class ToolState(TimestampedCollection): + name: str + state: ToolStateEnum + + def __init__(self, name: str) -> None: + super().__init__() + self.state = ToolStateEnum.pending + self.name = name + + def handle_msg(self, msg: AgentMsg) -> None: + self.add(msg) + + if msg.type == MsgType.stream: + if self.tool_config.stream == Stream.none or self.tool_config.stream == Stream.passive: + return False + if self.tool_config.stream == Stream.call_agent: + return True + + if msg.type == MsgType.ret: + self.state = ToolStateEnum.finished + return False + + if msg.type == MsgType.error: + self.state = ToolStateEnum.error + return False + + if msg.type == MsgType.start: + self.state = ToolStateEnum.running + return False + + def __str__(self) -> str: + head = f"ToolState(state={self.state}" + + if self.state == ToolStateEnum.finished or self.state == ToolStateEnum.error: + head += ", ran for=" + else: + head += ", running for=" + + head += f"{self.duration():.2f}s" + + if len(self): + return head + f", messages={list(self._items)})" + return head + ", No Messages)" + + class AgentInput(ToolContainer): _static_containers: list[ToolContainer] _dynamic_containers: list[ToolContainer] - _tool_state: dict[str, list[AgentMsg]] + _tool_state: dict[str, ToolState] _tools: dict[str, ToolConfig] def __init__(self) -> None: @@ -51,59 +107,84 @@ def stop(self) -> None: # updates local tool state (appends to streamed data if needed etc) # checks if agent needs to be called if AgentMsg has Return call_agent or Stream call_agent def handle_message(self, msg: AgentMsg) -> None: - print("AgentInput received message", msg) - self.update_state(msg.tool_name, msg) + logger.debug("tool message", msg) - def update_state(self, tool_name: str, msg: AgentMsg) -> None: - if tool_name not in self._tool_state: - self._tool_state[tool_name] = [] - self._tool_state[tool_name].append(msg) + if self._tool_state.get(msg.tool_name) is None: + logger.warn( + f"Tool state for {msg.tool_name} not found, (tool not called by our agent?) initializing..." + ) + self._tool_state[msg.tool_name] = ToolState(name=msg.tool_name) - # we check if message should trigger an agent call - if self.should_call_agent(msg): + should_call_agent = self._tool_state[msg.tool_name].handle_msg(msg) + if should_call_agent: self.call_agent() - def should_call_agent(self, msg) -> bool: - tool_config = self._tools.get(msg.tool_name) + def execute_tool(self, tool_name: str, *args, **kwargs) -> None: + tool_config = self.get_tool_config(tool_name) if not tool_config: - logger.warning( - f"Tool {msg.tool_name} not found in registered tools but tool message received {msg}" + logger.error( + f"Tool {tool_name} not found in registered tools, but agent tried to call it (did a dynamic tool expire?)" ) - return False + return - if msg.type == MsgType.start: - return False + # This initializes the tool state if it doesn't exist + self._tool_state[tool_name] = ToolState(name=tool_name) + return tool_config.call(*args, **kwargs) - if msg.type == MsgType.stream: - if tool_config.stream == Stream.none or tool_config.stream == Stream.passive: - return False - if tool_config.stream == Stream.call_agent: - return True + def state_snapshot(self) -> dict[str, list[AgentMsg]]: + ret = copy(self._tool_state) + + # Since state is exported, we can clear the finished tool runs + for tool_name, tool_run in self._tool_state.items(): + if tool_run.state == ToolState.finished: + logger.log("Tool run finished", tool_name) + del self._tool_state[tool_name] + if tool_run.state == ToolState.error: + logger.error(f"Tool run error for {tool_name}") + del self._tool_state[tool_name] + + return ret - def collect_state(self): - ... - # return {"tool_name": {"state": tool_state, messages: list[AgentMsg]}} + def __str__(self): + # Convert objects to their string representations + def stringify_value(obj): + if isinstance(obj, dict): + return {k: stringify_value(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [stringify_value(item) for item in obj] + else: + return str(obj) - # outputs data for the agent call + ret = stringify_value(self._tool_state) + + return f"AgentInput(\n{pformat(ret, indent=2, depth=3, width=120, compact=True)}\n)" + + # Outputs data for the agent call # clears the local state (finished tool calls) def get_agent_query(self): - state = self.collect_state() - ... + return self.state_snapshot() - # given toolcontainers can run remotely, we are + # Given toolcontainers can run remotely, we are # caching available tools from static containers # # dynamic containers will be queried at runtime via # .tools() method def register_tools(self, container: ToolContainer): - print("registering tool container", container) if not container.dynamic_tools: + logger.info(f"Registering static tool container, {container}") self._static_containers.append(container) for name, tool_config in container.tools().items(): self._tools[name] = tool_config else: + logger.info(f"Registering dynamic tool container, {container}") self._dynamic_containers.append(container) + def get_tool_config(self, tool_name: str) -> Optional[ToolConfig]: + tool_config = self._tools.get(tool_name) + if not tool_config: + tool_config = self.tools().get(tool_name) + return tool_config + def tools(self) -> dict[str, ToolConfig]: # static container tooling is already cached all_tools: dict[str, ToolConfig] = {**self._tools} @@ -111,6 +192,6 @@ def tools(self) -> dict[str, ToolConfig]: # Then aggregate tools from dynamic containers for container in self._dynamic_containers: for tool_name, tool_config in container.tools().items(): - all_tools[tool_name] = tool_config + all_tools[tool_name] = tool_config.bind(getattr(container, tool_name)) return all_tools diff --git a/dimos/protocol/tool/test_tool.py b/dimos/protocol/tool/test_tool.py index 8565b759a9..6cf06f75d9 100644 --- a/dimos/protocol/tool/test_tool.py +++ b/dimos/protocol/tool/test_tool.py @@ -41,8 +41,16 @@ def test_comms(): testContainer = TestContainer() agentInput.register_tools(testContainer) - print("AGENT TOOLS", agentInput.tools()) + + # toolcall=True makes the tool function exit early, + # it doesn't behave like a blocking function, + # + # return is passed as AgentMsg to the agent topic testContainer.delayadd(2, 4, toolcall=True) testContainer.add(1, 2) + time.sleep(0.5) + print(agentInput) + time.sleep(2) + print(agentInput) diff --git a/dimos/protocol/tool/tool.py b/dimos/protocol/tool/tool.py index 00126c4a90..a9a4b607dd 100644 --- a/dimos/protocol/tool/tool.py +++ b/dimos/protocol/tool/tool.py @@ -47,7 +47,9 @@ def run_function(): return run_function() - wrapper._tool = ToolConfig(name=f.__name__, reducer=reducer, stream=stream, ret=ret) # type: ignore[attr-defined] + tool_config = ToolConfig(name=f.__name__, reducer=reducer, stream=stream, ret=ret) + + wrapper._tool = tool_config # type: ignore[attr-defined] wrapper.__name__ = f.__name__ # Preserve original function name wrapper.__doc__ = f.__doc__ # Preserve original docstring return wrapper @@ -68,9 +70,11 @@ class LCMComms(CommsSpec): class ToolContainer: comms: CommsSpec = LCMComms() _agent_comms: Optional[ToolCommsSpec] = None - dynamic_tools = False + def __str__(self) -> str: + return f"ToolContainer({self.__class__.__name__})" + @rpc def tools(self) -> dict[str, ToolConfig]: # Avoid recursion by excluding this property itself diff --git a/dimos/protocol/tool/types.py b/dimos/protocol/tool/types.py index 64fea48316..b3f940fef0 100644 --- a/dimos/protocol/tool/types.py +++ b/dimos/protocol/tool/types.py @@ -13,8 +13,9 @@ # limitations under the License. import time +from dataclasses import dataclass from enum import Enum -from typing import Optional +from typing import Any, Callable, Generic, Optional, TypeVar from dimos.types.timestamped import Timestamped @@ -48,12 +49,24 @@ class Return(Enum): call_agent = 2 +@dataclass class ToolConfig: - def __init__(self, name: str, reducer: Reducer, stream: Stream, ret: Return): - self.name = name - self.reducer = reducer - self.stream = stream - self.ret = ret + name: str + reducer: Reducer + stream: Stream + ret: Return + f: Callable | None = None + + def bind(self, f: Callable) -> "ToolConfig": + self.f = f + return self + + def call(self, *args, **kwargs) -> Any: + if self.f is None: + raise ValueError( + "Function is not bound to the ToolConfig. This shiould be called only within AgentListener." + ) + return self.f(*args, **kwargs) def __str__(self): parts = [f"name={self.name}"] @@ -76,10 +89,11 @@ def __str__(self): class MsgType(Enum): - start = 0 - stream = 1 - ret = 2 - error = 3 + pending = 0 + start = 1 + stream = 2 + ret = 3 + error = 4 class AgentMsg(Timestamped): @@ -98,4 +112,26 @@ def __init__( self.type = type def __repr__(self): - return f"AgentMsg(tool={self.tool_name}, content={self.content}, type={self.type})" + return self.__str__() + + @property + def end(self) -> bool: + return self.type == MsgType.ret or self.type == MsgType.error + + @property + def start(self) -> bool: + return self.type == MsgType.start + + def __str__(self): + time_ago = time.time() - self.ts + + if self.type == MsgType.start: + return f"Start({time_ago:.1f}s ago)" + if self.type == MsgType.ret: + return f"Ret({time_ago:.1f}s ago, val={self.content})" + if self.type == MsgType.error: + return f"Error({time_ago:.1f}s ago, val={self.content})" + if self.type == MsgType.pending: + return f"Pending({time_ago:.1f}s ago)" + if self.type == MsgType.stream: + return f"Stream({time_ago:.1f}s ago, val={self.content})" diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index 27d755ac61..858a2bdaad 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -160,6 +160,16 @@ def slice_by_time(self, start: float, end: float) -> "TimestampedCollection[T]": end_idx = bisect.bisect_right(timestamps, end) return TimestampedCollection(self._items[start_idx:end_idx]) + @property + def start_ts(self) -> Optional[float]: + """Get the start timestamp of the collection.""" + return self._items[0].ts if self._items else None + + @property + def end_ts(self) -> Optional[float]: + """Get the end timestamp of the collection.""" + return self._items[-1].ts if self._items else None + def __len__(self) -> int: return len(self._items) From cceafb41bca9a0834f1aa693847a252aa934c44e Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 15:59:32 -0700 Subject: [PATCH 07/19] test fix --- dimos/protocol/tool/test_tool.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dimos/protocol/tool/test_tool.py b/dimos/protocol/tool/test_tool.py index 6cf06f75d9..d7312a17cc 100644 --- a/dimos/protocol/tool/test_tool.py +++ b/dimos/protocol/tool/test_tool.py @@ -26,6 +26,7 @@ def add(self, x: int, y: int) -> int: @tool() def delayadd(self, x: int, y: int) -> int: + time.sleep(0.5) return x + y @@ -49,8 +50,8 @@ def test_comms(): testContainer.delayadd(2, 4, toolcall=True) testContainer.add(1, 2) - time.sleep(0.5) + time.sleep(0.25) print(agentInput) - time.sleep(2) + time.sleep(0.75) print(agentInput) From debc902a06a0b3863ede11c5146ed4db0720ad07 Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 16:27:22 -0700 Subject: [PATCH 08/19] working system --- dimos/protocol/tool/agent_listener.py | 58 ++++++++++++++++++--------- dimos/protocol/tool/test_tool.py | 16 +++++++- dimos/protocol/tool/tool.py | 13 +++--- dimos/protocol/tool/types.py | 4 +- 4 files changed, 64 insertions(+), 27 deletions(-) diff --git a/dimos/protocol/tool/agent_listener.py b/dimos/protocol/tool/agent_listener.py index c0b5bdc356..5b5a90db6d 100644 --- a/dimos/protocol/tool/agent_listener.py +++ b/dimos/protocol/tool/agent_listener.py @@ -12,15 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy +from copy import copy from dataclasses import dataclass from enum import Enum from pprint import pformat -from typing import Callable, Optional +from typing import Optional from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, MsgType, ToolCommsSpec from dimos.protocol.tool.tool import ToolConfig, ToolContainer -from dimos.protocol.tool.types import Stream +from dimos.protocol.tool.types import Reducer, Return, Stream from dimos.types.timestamped import TimestampedCollection from dimos.utils.logging_config import setup_logger @@ -35,20 +35,29 @@ class AgentInputConfig: class ToolStateEnum(Enum): pending = 0 running = 1 - finished = 2 + ret = 2 error = 3 class ToolState(TimestampedCollection): name: str state: ToolStateEnum + tool_config: ToolConfig - def __init__(self, name: str) -> None: + def __init__(self, name: str, tool_config: Optional[ToolConfig] = None) -> None: super().__init__() + if tool_config is None: + self.tool_config = ToolConfig( + name=name, stream=Stream.none, ret=Return.none, reducer=Reducer.none + ) + else: + self.tool_config = tool_config + self.state = ToolStateEnum.pending self.name = name - def handle_msg(self, msg: AgentMsg) -> None: + # returns True if the agent should be called for this message + def handle_msg(self, msg: AgentMsg) -> bool: self.add(msg) if msg.type == MsgType.stream: @@ -58,12 +67,14 @@ def handle_msg(self, msg: AgentMsg) -> None: return True if msg.type == MsgType.ret: - self.state = ToolStateEnum.finished + self.state = ToolStateEnum.ret + if self.tool_config.ret == Return.call_agent: + return True return False if msg.type == MsgType.error: self.state = ToolStateEnum.error - return False + return True if msg.type == MsgType.start: self.state = ToolStateEnum.running @@ -72,7 +83,7 @@ def handle_msg(self, msg: AgentMsg) -> None: def __str__(self) -> str: head = f"ToolState(state={self.state}" - if self.state == ToolStateEnum.finished or self.state == ToolStateEnum.error: + if self.state == ToolStateEnum.ret or self.state == ToolStateEnum.error: head += ", ran for=" else: head += ", running for=" @@ -107,11 +118,11 @@ def stop(self) -> None: # updates local tool state (appends to streamed data if needed etc) # checks if agent needs to be called if AgentMsg has Return call_agent or Stream call_agent def handle_message(self, msg: AgentMsg) -> None: - logger.debug("tool message", msg) + logger.info(f"Tool msg {msg}") if self._tool_state.get(msg.tool_name) is None: logger.warn( - f"Tool state for {msg.tool_name} not found, (tool not called by our agent?) initializing..." + f"Tool state for {msg.tool_name} not found, (tool not called by our agent?) initializing. (message received: {msg})" ) self._tool_state[msg.tool_name] = ToolState(name=msg.tool_name) @@ -128,23 +139,32 @@ def execute_tool(self, tool_name: str, *args, **kwargs) -> None: return # This initializes the tool state if it doesn't exist - self._tool_state[tool_name] = ToolState(name=tool_name) + self._tool_state[tool_name] = ToolState(name=tool_name, tool_config=tool_config) return tool_config.call(*args, **kwargs) def state_snapshot(self) -> dict[str, list[AgentMsg]]: ret = copy(self._tool_state) + to_delete = [] # Since state is exported, we can clear the finished tool runs for tool_name, tool_run in self._tool_state.items(): - if tool_run.state == ToolState.finished: - logger.log("Tool run finished", tool_name) - del self._tool_state[tool_name] - if tool_run.state == ToolState.error: + if tool_run.state == ToolStateEnum.ret: + logger.info(f"Tool {tool_name} finished") + to_delete.append(tool_name) + if tool_run.state == ToolStateEnum.error: logger.error(f"Tool run error for {tool_name}") - del self._tool_state[tool_name] + to_delete.append(tool_name) + + for tool_name in to_delete: + logger.debug(f"Tool {tool_name} finished, removing from state") + del self._tool_state[tool_name] return ret + def call_agent(self) -> None: + """Call the agent with the current state of tool runs.""" + logger.info(f"Calling agent with current tool state: {self.state_snapshot()}") + def __str__(self): # Convert objects to their string representations def stringify_value(obj): @@ -157,7 +177,7 @@ def stringify_value(obj): ret = stringify_value(self._tool_state) - return f"AgentInput(\n{pformat(ret, indent=2, depth=3, width=120, compact=True)}\n)" + return f"AgentInput({pformat(ret, indent=2, depth=3, width=120, compact=True)})" # Outputs data for the agent call # clears the local state (finished tool calls) @@ -174,7 +194,7 @@ def register_tools(self, container: ToolContainer): logger.info(f"Registering static tool container, {container}") self._static_containers.append(container) for name, tool_config in container.tools().items(): - self._tools[name] = tool_config + self._tools[name] = tool_config.bind(getattr(container, name)) else: logger.info(f"Registering dynamic tool container, {container}") self._dynamic_containers.append(container) diff --git a/dimos/protocol/tool/test_tool.py b/dimos/protocol/tool/test_tool.py index d7312a17cc..0cdf665139 100644 --- a/dimos/protocol/tool/test_tool.py +++ b/dimos/protocol/tool/test_tool.py @@ -48,10 +48,24 @@ def test_comms(): # # return is passed as AgentMsg to the agent topic testContainer.delayadd(2, 4, toolcall=True) - testContainer.add(1, 2) + testContainer.add(1, 2, toolcall=True) time.sleep(0.25) print(agentInput) time.sleep(0.75) print(agentInput) + + print(agentInput.state_snapshot()) + + print(agentInput.tools()) + + print(agentInput) + + agentInput.execute_tool("delayadd", 1, 2) + + time.sleep(0.25) + print(agentInput) + time.sleep(0.75) + + print(agentInput) diff --git a/dimos/protocol/tool/tool.py b/dimos/protocol/tool/tool.py index a9a4b607dd..c3631f55c8 100644 --- a/dimos/protocol/tool/tool.py +++ b/dimos/protocol/tool/tool.py @@ -34,18 +34,19 @@ def decorator(f: Callable[..., Any]) -> Any: def wrapper(self, *args, **kwargs): tool = f"{f.__name__}" - def run_function(): - self.agent_comms.publish(AgentMsg(tool, None, type=MsgType.start)) - val = f(self, *args, **kwargs) - self.agent_comms.publish(AgentMsg(tool, val, type=MsgType.ret)) - if kwargs.get("toolcall"): del kwargs["toolcall"] + + def run_function(): + self.agent_comms.publish(AgentMsg(tool, None, type=MsgType.start)) + val = f(self, *args, **kwargs) + self.agent_comms.publish(AgentMsg(tool, val, type=MsgType.ret)) + thread = threading.Thread(target=run_function) thread.start() return None - return run_function() + return f(self, *args, **kwargs) tool_config = ToolConfig(name=f.__name__, reducer=reducer, stream=stream, ret=ret) diff --git a/dimos/protocol/tool/types.py b/dimos/protocol/tool/types.py index b3f940fef0..f791b56fe9 100644 --- a/dimos/protocol/tool/types.py +++ b/dimos/protocol/tool/types.py @@ -26,6 +26,7 @@ class Call(Enum): class Reducer(Enum): + none = 0 all = lambda data: data latest = lambda data: data[-1] if data else None average = lambda data: sum(data) / len(data) if data else None @@ -66,7 +67,8 @@ def call(self, *args, **kwargs) -> Any: raise ValueError( "Function is not bound to the ToolConfig. This shiould be called only within AgentListener." ) - return self.f(*args, **kwargs) + + return self.f(*args, **kwargs, toolcall=True) def __str__(self): parts = [f"name={self.name}"] From a6e9443d7e106eb73378d896578566077a623ac0 Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 16:34:48 -0700 Subject: [PATCH 09/19] agent callback, tool test --- dimos/protocol/tool/agent_listener.py | 46 ++++++++++++++++++--------- dimos/protocol/tool/test_tool.py | 27 +++++++++++++++- 2 files changed, 57 insertions(+), 16 deletions(-) diff --git a/dimos/protocol/tool/agent_listener.py b/dimos/protocol/tool/agent_listener.py index 5b5a90db6d..e42835bdd2 100644 --- a/dimos/protocol/tool/agent_listener.py +++ b/dimos/protocol/tool/agent_listener.py @@ -16,7 +16,7 @@ from dataclasses import dataclass from enum import Enum from pprint import pformat -from typing import Optional +from typing import Callable, Optional from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, MsgType, ToolCommsSpec from dimos.protocol.tool.tool import ToolConfig, ToolContainer @@ -100,9 +100,13 @@ class AgentInput(ToolContainer): _dynamic_containers: list[ToolContainer] _tool_state: dict[str, ToolState] _tools: dict[str, ToolConfig] + _agent_callback: Optional[Callable[[dict[str, ToolState]], any]] = None - def __init__(self) -> None: + # agent callback is called with a state snapshot once system decides that agents needs + # to be woken up + def __init__(self, agent_callback: Callable[[dict[str, ToolState]], any] = None) -> None: super().__init__() + self._agent_callback = agent_callback self._static_containers = [] self._dynamic_containers = [] self._tools = {} @@ -115,6 +119,19 @@ def start(self) -> None: def stop(self) -> None: self.agent_comms.stop() + # this is used by agent to call tools + def execute_tool(self, tool_name: str, *args, **kwargs) -> None: + tool_config = self.get_tool_config(tool_name) + if not tool_config: + logger.error( + f"Tool {tool_name} not found in registered tools, but agent tried to call it (did a dynamic tool expire?)" + ) + return + + # This initializes the tool state if it doesn't exist + self._tool_state[tool_name] = ToolState(name=tool_name, tool_config=tool_config) + return tool_config.call(*args, **kwargs) + # updates local tool state (appends to streamed data if needed etc) # checks if agent needs to be called if AgentMsg has Return call_agent or Stream call_agent def handle_message(self, msg: AgentMsg) -> None: @@ -130,19 +147,13 @@ def handle_message(self, msg: AgentMsg) -> None: if should_call_agent: self.call_agent() - def execute_tool(self, tool_name: str, *args, **kwargs) -> None: - tool_config = self.get_tool_config(tool_name) - if not tool_config: - logger.error( - f"Tool {tool_name} not found in registered tools, but agent tried to call it (did a dynamic tool expire?)" - ) - return + # Returns a snapshot of the current state of tool runs. + # If clear is True, it will assume the snapshot is being sent to an agent + # and will clear the finished tool runs. + def state_snapshot(self, clear: bool = True) -> dict[str, ToolState]: + if not clear: + return self._tool_state - # This initializes the tool state if it doesn't exist - self._tool_state[tool_name] = ToolState(name=tool_name, tool_config=tool_config) - return tool_config.call(*args, **kwargs) - - def state_snapshot(self) -> dict[str, list[AgentMsg]]: ret = copy(self._tool_state) to_delete = [] @@ -163,7 +174,12 @@ def state_snapshot(self) -> dict[str, list[AgentMsg]]: def call_agent(self) -> None: """Call the agent with the current state of tool runs.""" - logger.info(f"Calling agent with current tool state: {self.state_snapshot()}") + logger.info(f"Calling agent with current tool state: {self.state_snapshot(clear=False)}") + + state = self.state_snapshot(clear=True) + + if self._agent_callback: + self._agent_callback(state) def __str__(self): # Convert objects to their string representations diff --git a/dimos/protocol/tool/test_tool.py b/dimos/protocol/tool/test_tool.py index 0cdf665139..5b5ee22862 100644 --- a/dimos/protocol/tool/test_tool.py +++ b/dimos/protocol/tool/test_tool.py @@ -35,7 +35,7 @@ def test_introspect_tool(): print(testContainer.tools) -def test_comms(): +def test_internals(): agentInput = AgentInput() agentInput.start() @@ -69,3 +69,28 @@ def test_comms(): time.sleep(0.75) print(agentInput) + + +def test_standard_usage(): + agentInput = AgentInput(agent_callback=print) + agentInput.start() + + testContainer = TestContainer() + + agentInput.register_tools(testContainer) + + # we can investigate tools + print(agentInput.tools()) + + # we can execute a tool + agentInput.execute_tool("delayadd", 1, 2) + + # while tool is executing, we can introspect the state + # (we see that the tool is running) + time.sleep(0.25) + print(agentInput) + time.sleep(0.75) + + # after the tool has finished, we can see the result + # and the tool state + print(agentInput) From 33f86cc42ec0365fc9345420ab968e33af2cf644 Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 16:39:49 -0700 Subject: [PATCH 10/19] tool decorator implies RPC decorator --- dimos/protocol/tool/tool.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dimos/protocol/tool/tool.py b/dimos/protocol/tool/tool.py index c3631f55c8..1cdcacb832 100644 --- a/dimos/protocol/tool/tool.py +++ b/dimos/protocol/tool/tool.py @@ -50,6 +50,8 @@ def run_function(): tool_config = ToolConfig(name=f.__name__, reducer=reducer, stream=stream, ret=ret) + # implicit RPC call as well + wrapper.__rpc__ = True wrapper._tool = tool_config # type: ignore[attr-defined] wrapper.__name__ = f.__name__ # Preserve original function name wrapper.__doc__ = f.__doc__ # Preserve original docstring From 7ab913ccdd44954db9667b08d08de51051c7c7b9 Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 18:11:40 -0700 Subject: [PATCH 11/19] small cleanup --- dimos/protocol/tool/__init__.py | 2 ++ .../protocol/tool/{agent_listener.py => agent_interface.py} | 2 +- dimos/protocol/tool/tool.py | 6 ++---- 3 files changed, 5 insertions(+), 5 deletions(-) create mode 100644 dimos/protocol/tool/__init__.py rename dimos/protocol/tool/{agent_listener.py => agent_interface.py} (99%) diff --git a/dimos/protocol/tool/__init__.py b/dimos/protocol/tool/__init__.py new file mode 100644 index 0000000000..1e819a9061 --- /dev/null +++ b/dimos/protocol/tool/__init__.py @@ -0,0 +1,2 @@ +from dimos.protcol.tool.agent_interface import AgentInterface, ToolState +from dimos.protocol.tool.tool import ToolContainer, tool diff --git a/dimos/protocol/tool/agent_listener.py b/dimos/protocol/tool/agent_interface.py similarity index 99% rename from dimos/protocol/tool/agent_listener.py rename to dimos/protocol/tool/agent_interface.py index e42835bdd2..d144f19695 100644 --- a/dimos/protocol/tool/agent_listener.py +++ b/dimos/protocol/tool/agent_interface.py @@ -95,7 +95,7 @@ def __str__(self) -> str: return head + ", No Messages)" -class AgentInput(ToolContainer): +class AgentInterface(ToolContainer): _static_containers: list[ToolContainer] _dynamic_containers: list[ToolContainer] _tool_state: dict[str, ToolState] diff --git a/dimos/protocol/tool/tool.py b/dimos/protocol/tool/tool.py index 1cdcacb832..0f1fdebd19 100644 --- a/dimos/protocol/tool/tool.py +++ b/dimos/protocol/tool/tool.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import inspect import threading -from enum import Enum -from typing import Any, Callable, Generic, Optional, TypedDict, TypeVar, cast +from typing import Any, Callable, Optional -from dimos.core import colors, rpc +from dimos.core import rpc from dimos.protocol.tool.comms import LCMToolComms, ToolCommsSpec from dimos.protocol.tool.types import ( AgentMsg, From 6a47966be4c2703649e7e25bd45eb90abdc2905c Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 18:21:00 -0700 Subject: [PATCH 12/19] tool -> skill rename --- dimos/core/module.py | 8 +- dimos/protocol/skill/__init__.py | 2 + dimos/protocol/skill/agent_interface.py | 236 ++++++++++++++++++ dimos/protocol/{tool => skill}/comms.py | 11 +- .../protocol/{tool/tool.py => skill/skill.py} | 48 ++-- dimos/protocol/skill/test_skill.py | 96 +++++++ dimos/protocol/{tool => skill}/types.py | 14 +- dimos/protocol/tool/__init__.py | 2 - dimos/protocol/tool/agent_interface.py | 233 ----------------- dimos/protocol/tool/test_tool.py | 96 ------- 10 files changed, 375 insertions(+), 371 deletions(-) create mode 100644 dimos/protocol/skill/__init__.py create mode 100644 dimos/protocol/skill/agent_interface.py rename dimos/protocol/{tool => skill}/comms.py (88%) rename dimos/protocol/{tool/tool.py => skill/skill.py} (58%) create mode 100644 dimos/protocol/skill/test_skill.py rename dimos/protocol/{tool => skill}/types.py (91%) delete mode 100644 dimos/protocol/tool/__init__.py delete mode 100644 dimos/protocol/tool/agent_interface.py delete mode 100644 dimos/protocol/tool/test_tool.py diff --git a/dimos/core/module.py b/dimos/core/module.py index 794cc664a6..943eb9b523 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -30,25 +30,25 @@ from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport from dimos.protocol.rpc import LCMRPC, RPCSpec from dimos.protocol.tf import LCMTF, TFSpec -from dimos.protocol.tool.comms import LCMToolComms, ToolCommsSpec +from dimos.protocol.skill.comms import LCMSkillComms, SkillCommsSpec class CommsSpec(Enum): rpc: RPCSpec - agent: ToolCommsSpec + agent: SkillCommsSpec tf: TFSpec class LCMComms(CommsSpec): rpc: LCMRPC - agent: LCMToolComms + agent: LCMSkillComms tf: LCMTF class ModuleBase: comms: CommsSpec = LCMComms _rpc: Optional[RPCSpec] = None - _agent: Optional[ToolCommsSpec] = None + _agent: Optional[SkillCommsSpec] = None _tf: Optional[TFSpec] = None def __init__(self, *args, **kwargs): diff --git a/dimos/protocol/skill/__init__.py b/dimos/protocol/skill/__init__.py new file mode 100644 index 0000000000..85b6146f56 --- /dev/null +++ b/dimos/protocol/skill/__init__.py @@ -0,0 +1,2 @@ +from dimos.protocol.skill.agent_interface import AgentInterface, SkillState +from dimos.protocol.skill.skill import SkillContainer, skill diff --git a/dimos/protocol/skill/agent_interface.py b/dimos/protocol/skill/agent_interface.py new file mode 100644 index 0000000000..da821d8f4e --- /dev/null +++ b/dimos/protocol/skill/agent_interface.py @@ -0,0 +1,236 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from copy import copy +from dataclasses import dataclass +from enum import Enum +from pprint import pformat +from typing import Callable, Optional + +from dimos.protocol.skill.comms import AgentMsg, LCMSkillComms, MsgType, SkillCommsSpec +from dimos.protocol.skill.skill import SkillConfig, SkillContainer +from dimos.protocol.skill.types import Reducer, Return, Stream +from dimos.types.timestamped import TimestampedCollection +from dimos.utils.logging_config import setup_logger + +logger = setup_logger("dimos.protocol.skill.agent_input") + + +@dataclass +class AgentInputConfig: + agent_comms: type[SkillCommsSpec] = LCMSkillComms + + +class SkillStateEnum(Enum): + pending = 0 + running = 1 + ret = 2 + error = 3 + + +class SkillState(TimestampedCollection): + name: str + state: SkillStateEnum + skill_config: SkillConfig + + def __init__(self, name: str, skill_config: Optional[SkillConfig] = None) -> None: + super().__init__() + if skill_config is None: + self.skill_config = SkillConfig( + name=name, stream=Stream.none, ret=Return.none, reducer=Reducer.none + ) + else: + self.skill_config = skill_config + + self.state = SkillStateEnum.pending + self.name = name + + # returns True if the agent should be called for this message + def handle_msg(self, msg: AgentMsg) -> bool: + self.add(msg) + + if msg.type == MsgType.stream: + if ( + self.skill_config.stream == Stream.none + or self.skill_config.stream == Stream.passive + ): + return False + if self.skill_config.stream == Stream.call_agent: + return True + + if msg.type == MsgType.ret: + self.state = SkillStateEnum.ret + if self.skill_config.ret == Return.call_agent: + return True + return False + + if msg.type == MsgType.error: + self.state = SkillStateEnum.error + return True + + if msg.type == MsgType.start: + self.state = SkillStateEnum.running + return False + + def __str__(self) -> str: + head = f"SkillState(state={self.state}" + + if self.state == SkillStateEnum.ret or self.state == SkillStateEnum.error: + head += ", ran for=" + else: + head += ", running for=" + + head += f"{self.duration():.2f}s" + + if len(self): + return head + f", messages={list(self._items)})" + return head + ", No Messages)" + + +class AgentInterface(SkillContainer): + _static_containers: list[SkillContainer] + _dynamic_containers: list[SkillContainer] + _skill_state: dict[str, SkillState] + _skills: dict[str, SkillConfig] + _agent_callback: Optional[Callable[[dict[str, SkillState]], any]] = None + + # agent callback is called with a state snapshot once system decides that agents needs + # to be woken up + def __init__(self, agent_callback: Callable[[dict[str, SkillState]], any] = None) -> None: + super().__init__() + self._agent_callback = agent_callback + self._static_containers = [] + self._dynamic_containers = [] + self._skills = {} + self._skill_state = {} + + def start(self) -> None: + self.agent_comms.start() + self.agent_comms.subscribe(self.handle_message) + + def stop(self) -> None: + self.agent_comms.stop() + + # this is used by agent to call skills + def execute_skill(self, skill_name: str, *args, **kwargs) -> None: + skill_config = self.get_skill_config(skill_name) + if not skill_config: + logger.error( + f"Skill {skill_name} not found in registered skills, but agent tried to call it (did a dynamic skill expire?)" + ) + return + + # This initializes the skill state if it doesn't exist + self._skill_state[skill_name] = SkillState(name=skill_name, skill_config=skill_config) + return skill_config.call(*args, **kwargs) + + # updates local skill state (appends to streamed data if needed etc) + # checks if agent needs to be called if AgentMsg has Return call_agent or Stream call_agent + def handle_message(self, msg: AgentMsg) -> None: + logger.info(f"Skill msg {msg}") + + if self._skill_state.get(msg.skill_name) is None: + logger.warn( + f"Skill state for {msg.skill_name} not found, (skill not called by our agent?) initializing. (message received: {msg})" + ) + self._skill_state[msg.skill_name] = SkillState(name=msg.skill_name) + + should_call_agent = self._skill_state[msg.skill_name].handle_msg(msg) + if should_call_agent: + self.call_agent() + + # Returns a snapshot of the current state of skill runs. + # If clear is True, it will assume the snapshot is being sent to an agent + # and will clear the finished skill runs. + def state_snapshot(self, clear: bool = True) -> dict[str, SkillState]: + if not clear: + return self._skill_state + + ret = copy(self._skill_state) + + to_delete = [] + # Since state is exported, we can clear the finished skill runs + for skill_name, skill_run in self._skill_state.items(): + if skill_run.state == SkillStateEnum.ret: + logger.info(f"Skill {skill_name} finished") + to_delete.append(skill_name) + if skill_run.state == SkillStateEnum.error: + logger.error(f"Skill run error for {skill_name}") + to_delete.append(skill_name) + + for skill_name in to_delete: + logger.debug(f"Skill {skill_name} finished, removing from state") + del self._skill_state[skill_name] + + return ret + + def call_agent(self) -> None: + """Call the agent with the current state of skill runs.""" + logger.info(f"Calling agent with current skill state: {self.state_snapshot(clear=False)}") + + state = self.state_snapshot(clear=True) + + if self._agent_callback: + self._agent_callback(state) + + def __str__(self): + # Convert objects to their string representations + def stringify_value(obj): + if isinstance(obj, dict): + return {k: stringify_value(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [stringify_value(item) for item in obj] + else: + return str(obj) + + ret = stringify_value(self._skill_state) + + return f"AgentInput({pformat(ret, indent=2, depth=3, width=120, compact=True)})" + + # Outputs data for the agent call + # clears the local state (finished skill calls) + def get_agent_query(self): + return self.state_snapshot() + + # Given skillcontainers can run remotely, we are + # caching available skills from static containers + # + # dynamic containers will be queried at runtime via + # .skills() method + def register_skills(self, container: SkillContainer): + if not container.dynamic_skills: + logger.info(f"Registering static skill container, {container}") + self._static_containers.append(container) + for name, skill_config in container.skills().items(): + self._skills[name] = skill_config.bind(getattr(container, name)) + else: + logger.info(f"Registering dynamic skill container, {container}") + self._dynamic_containers.append(container) + + def get_skill_config(self, skill_name: str) -> Optional[SkillConfig]: + skill_config = self._skills.get(skill_name) + if not skill_config: + skill_config = self.skills().get(skill_name) + return skill_config + + def skills(self) -> dict[str, SkillConfig]: + # static container skilling is already cached + all_skills: dict[str, SkillConfig] = {**self._skills} + + # Then aggregate skills from dynamic containers + for container in self._dynamic_containers: + for skill_name, skill_config in container.skills().items(): + all_skills[skill_name] = skill_config.bind(getattr(container, skill_name)) + + return all_skills diff --git a/dimos/protocol/tool/comms.py b/dimos/protocol/skill/comms.py similarity index 88% rename from dimos/protocol/tool/comms.py rename to dimos/protocol/skill/comms.py index c68a6ed188..d6e9e73bf0 100644 --- a/dimos/protocol/tool/comms.py +++ b/dimos/protocol/skill/comms.py @@ -21,11 +21,12 @@ from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic from dimos.protocol.pubsub.spec import PubSub from dimos.protocol.service import Service -from dimos.protocol.tool.types import AgentMsg, Call, MsgType, Reducer, Stream, ToolConfig +from dimos.protocol.skill.types import AgentMsg, Call, MsgType, Reducer, SkillConfig, Stream from dimos.types.timestamped import Timestamped -class ToolCommsSpec: +# defines a protocol for communication between skills and agents +class SkillCommsSpec: @abstractmethod def publish(self, msg: AgentMsg) -> None: ... @@ -50,7 +51,7 @@ class PubSubCommsConfig(Generic[TopicT, MsgT]): autostart: bool = True -class PubSubComms(Service[PubSubCommsConfig], ToolCommsSpec): +class PubSubComms(Service[PubSubCommsConfig], SkillCommsSpec): default_config: type[PubSubCommsConfig] = PubSubCommsConfig def __init__(self, **kwargs) -> None: @@ -85,9 +86,9 @@ class LCMCommsConfig(PubSubCommsConfig[str, AgentMsg]): topic: str = "/agent" pubsub: Union[type[PubSub], PubSub, None] = PickleLCM # lcm needs to be started only if receiving - # tool comms are broadcast only in modules so we don't autostart + # skill comms are broadcast only in modules so we don't autostart autostart: bool = False -class LCMToolComms(PubSubComms): +class LCMSkillComms(PubSubComms): default_config: type[LCMCommsConfig] = LCMCommsConfig diff --git a/dimos/protocol/tool/tool.py b/dimos/protocol/skill/skill.py similarity index 58% rename from dimos/protocol/tool/tool.py rename to dimos/protocol/skill/skill.py index 0f1fdebd19..ac9fc6bc47 100644 --- a/dimos/protocol/tool/tool.py +++ b/dimos/protocol/skill/skill.py @@ -16,29 +16,29 @@ from typing import Any, Callable, Optional from dimos.core import rpc -from dimos.protocol.tool.comms import LCMToolComms, ToolCommsSpec -from dimos.protocol.tool.types import ( +from dimos.protocol.skill.comms import LCMSkillComms, SkillCommsSpec +from dimos.protocol.skill.types import ( AgentMsg, MsgType, Reducer, Return, Stream, - ToolConfig, + SkillConfig, ) -def tool(reducer=Reducer.latest, stream=Stream.none, ret=Return.call_agent): +def skill(reducer=Reducer.latest, stream=Stream.none, ret=Return.call_agent): def decorator(f: Callable[..., Any]) -> Any: def wrapper(self, *args, **kwargs): - tool = f"{f.__name__}" + skill = f"{f.__name__}" - if kwargs.get("toolcall"): - del kwargs["toolcall"] + if kwargs.get("skillcall"): + del kwargs["skillcall"] def run_function(): - self.agent_comms.publish(AgentMsg(tool, None, type=MsgType.start)) + self.agent_comms.publish(AgentMsg(skill, None, type=MsgType.start)) val = f(self, *args, **kwargs) - self.agent_comms.publish(AgentMsg(tool, val, type=MsgType.ret)) + self.agent_comms.publish(AgentMsg(skill, val, type=MsgType.ret)) thread = threading.Thread(target=run_function) thread.start() @@ -46,11 +46,11 @@ def run_function(): return f(self, *args, **kwargs) - tool_config = ToolConfig(name=f.__name__, reducer=reducer, stream=stream, ret=ret) + skill_config = SkillConfig(name=f.__name__, reducer=reducer, stream=stream, ret=ret) # implicit RPC call as well wrapper.__rpc__ = True - wrapper._tool = tool_config # type: ignore[attr-defined] + wrapper._skill = skill_config # type: ignore[attr-defined] wrapper.__name__ = f.__name__ # Preserve original function name wrapper.__doc__ = f.__doc__ # Preserve original docstring return wrapper @@ -59,36 +59,36 @@ def run_function(): class CommsSpec: - agent_comms_class: type[ToolCommsSpec] + agent_comms_class: type[SkillCommsSpec] class LCMComms(CommsSpec): - agent_comms_class: type[ToolCommsSpec] = LCMToolComms + agent_comms_class: type[SkillCommsSpec] = LCMSkillComms -# here we can have also dynamic tools potentially -# agent can check .tools each time when introspecting -class ToolContainer: +# here we can have also dynamic skills potentially +# agent can check .skills each time when introspecting +class SkillContainer: comms: CommsSpec = LCMComms() - _agent_comms: Optional[ToolCommsSpec] = None - dynamic_tools = False + _agent_comms: Optional[SkillCommsSpec] = None + dynamic_skills = False def __str__(self) -> str: - return f"ToolContainer({self.__class__.__name__})" + return f"SkillContainer({self.__class__.__name__})" @rpc - def tools(self) -> dict[str, ToolConfig]: + def skills(self) -> dict[str, SkillConfig]: # Avoid recursion by excluding this property itself return { - name: getattr(self, name)._tool + name: getattr(self, name)._skill for name in dir(self) if not name.startswith("_") - and name != "tools" - and hasattr(getattr(self, name), "_tool") + and name != "skills" + and hasattr(getattr(self, name), "_skill") } @property - def agent_comms(self) -> ToolCommsSpec: + def agent_comms(self) -> SkillCommsSpec: if self._agent_comms is None: self._agent_comms = self.comms.agent_comms_class() return self._agent_comms diff --git a/dimos/protocol/skill/test_skill.py b/dimos/protocol/skill/test_skill.py new file mode 100644 index 0000000000..d8f854bb6e --- /dev/null +++ b/dimos/protocol/skill/test_skill.py @@ -0,0 +1,96 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +from dimos.protocol.skill.agent_interface import AgentInterface +from dimos.protocol.skill.skill import SkillContainer, skill +from dimos.protocol.skill.types import Return, Stream + + +class TestContainer(SkillContainer): + @skill() + def add(self, x: int, y: int) -> int: + return x + y + + @skill() + def delayadd(self, x: int, y: int) -> int: + time.sleep(0.5) + return x + y + + +def test_introspect_skill(): + testContainer = TestContainer() + print(testContainer.skills()) + + +def test_internals(): + agentInterface = AgentInterface() + agentInterface.start() + + testContainer = TestContainer() + + agentInterface.register_skills(testContainer) + + # skillcall=True makes the skill function exit early, + # it doesn't behave like a blocking function, + # + # return is passed as AgentMsg to the agent topic + testContainer.delayadd(2, 4, skillcall=True) + testContainer.add(1, 2, skillcall=True) + + time.sleep(0.25) + print(agentInterface) + + time.sleep(0.75) + print(agentInterface) + + print(agentInterface.state_snapshot()) + + print(agentInterface.skills()) + + print(agentInterface) + + agentInterface.execute_skill("delayadd", 1, 2) + + time.sleep(0.25) + print(agentInterface) + time.sleep(0.75) + + print(agentInterface) + + +def test_standard_usage(): + agentInterface = AgentInterface(agent_callback=print) + agentInterface.start() + + testContainer = TestContainer() + + agentInterface.register_skills(testContainer) + + # we can investigate skills + print(agentInterface.skills()) + + # we can execute a skill + agentInterface.execute_skill("delayadd", 1, 2) + + # while skill is executing, we can introspect the state + # (we see that the skill is running) + time.sleep(0.25) + print(agentInterface) + time.sleep(0.75) + + # after the skill has finished, we can see the result + # and the skill state + print(agentInterface) diff --git a/dimos/protocol/tool/types.py b/dimos/protocol/skill/types.py similarity index 91% rename from dimos/protocol/tool/types.py rename to dimos/protocol/skill/types.py index f791b56fe9..c6ee7ee7c2 100644 --- a/dimos/protocol/tool/types.py +++ b/dimos/protocol/skill/types.py @@ -51,24 +51,24 @@ class Return(Enum): @dataclass -class ToolConfig: +class SkillConfig: name: str reducer: Reducer stream: Stream ret: Return f: Callable | None = None - def bind(self, f: Callable) -> "ToolConfig": + def bind(self, f: Callable) -> "SkillConfig": self.f = f return self def call(self, *args, **kwargs) -> Any: if self.f is None: raise ValueError( - "Function is not bound to the ToolConfig. This shiould be called only within AgentListener." + "Function is not bound to the SkillConfig. This should be called only within AgentListener." ) - return self.f(*args, **kwargs, toolcall=True) + return self.f(*args, **kwargs, skillcall=True) def __str__(self): parts = [f"name={self.name}"] @@ -87,7 +87,7 @@ def __str__(self): # Always show return mode parts.append(f"ret={self.ret.name}") - return f"Tool({', '.join(parts)})" + return f"Skill({', '.join(parts)})" class MsgType(Enum): @@ -104,12 +104,12 @@ class AgentMsg(Timestamped): def __init__( self, - tool_name: str, + skill_name: str, content: str | int | float | dict | list, type: Optional[MsgType] = MsgType.ret, ) -> None: self.ts = time.time() - self.tool_name = tool_name + self.skill_name = skill_name self.content = content self.type = type diff --git a/dimos/protocol/tool/__init__.py b/dimos/protocol/tool/__init__.py deleted file mode 100644 index 1e819a9061..0000000000 --- a/dimos/protocol/tool/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from dimos.protcol.tool.agent_interface import AgentInterface, ToolState -from dimos.protocol.tool.tool import ToolContainer, tool diff --git a/dimos/protocol/tool/agent_interface.py b/dimos/protocol/tool/agent_interface.py deleted file mode 100644 index d144f19695..0000000000 --- a/dimos/protocol/tool/agent_interface.py +++ /dev/null @@ -1,233 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from copy import copy -from dataclasses import dataclass -from enum import Enum -from pprint import pformat -from typing import Callable, Optional - -from dimos.protocol.tool.comms import AgentMsg, LCMToolComms, MsgType, ToolCommsSpec -from dimos.protocol.tool.tool import ToolConfig, ToolContainer -from dimos.protocol.tool.types import Reducer, Return, Stream -from dimos.types.timestamped import TimestampedCollection -from dimos.utils.logging_config import setup_logger - -logger = setup_logger("dimos.protocol.tool.agent_input") - - -@dataclass -class AgentInputConfig: - agent_comms: type[ToolCommsSpec] = LCMToolComms - - -class ToolStateEnum(Enum): - pending = 0 - running = 1 - ret = 2 - error = 3 - - -class ToolState(TimestampedCollection): - name: str - state: ToolStateEnum - tool_config: ToolConfig - - def __init__(self, name: str, tool_config: Optional[ToolConfig] = None) -> None: - super().__init__() - if tool_config is None: - self.tool_config = ToolConfig( - name=name, stream=Stream.none, ret=Return.none, reducer=Reducer.none - ) - else: - self.tool_config = tool_config - - self.state = ToolStateEnum.pending - self.name = name - - # returns True if the agent should be called for this message - def handle_msg(self, msg: AgentMsg) -> bool: - self.add(msg) - - if msg.type == MsgType.stream: - if self.tool_config.stream == Stream.none or self.tool_config.stream == Stream.passive: - return False - if self.tool_config.stream == Stream.call_agent: - return True - - if msg.type == MsgType.ret: - self.state = ToolStateEnum.ret - if self.tool_config.ret == Return.call_agent: - return True - return False - - if msg.type == MsgType.error: - self.state = ToolStateEnum.error - return True - - if msg.type == MsgType.start: - self.state = ToolStateEnum.running - return False - - def __str__(self) -> str: - head = f"ToolState(state={self.state}" - - if self.state == ToolStateEnum.ret or self.state == ToolStateEnum.error: - head += ", ran for=" - else: - head += ", running for=" - - head += f"{self.duration():.2f}s" - - if len(self): - return head + f", messages={list(self._items)})" - return head + ", No Messages)" - - -class AgentInterface(ToolContainer): - _static_containers: list[ToolContainer] - _dynamic_containers: list[ToolContainer] - _tool_state: dict[str, ToolState] - _tools: dict[str, ToolConfig] - _agent_callback: Optional[Callable[[dict[str, ToolState]], any]] = None - - # agent callback is called with a state snapshot once system decides that agents needs - # to be woken up - def __init__(self, agent_callback: Callable[[dict[str, ToolState]], any] = None) -> None: - super().__init__() - self._agent_callback = agent_callback - self._static_containers = [] - self._dynamic_containers = [] - self._tools = {} - self._tool_state = {} - - def start(self) -> None: - self.agent_comms.start() - self.agent_comms.subscribe(self.handle_message) - - def stop(self) -> None: - self.agent_comms.stop() - - # this is used by agent to call tools - def execute_tool(self, tool_name: str, *args, **kwargs) -> None: - tool_config = self.get_tool_config(tool_name) - if not tool_config: - logger.error( - f"Tool {tool_name} not found in registered tools, but agent tried to call it (did a dynamic tool expire?)" - ) - return - - # This initializes the tool state if it doesn't exist - self._tool_state[tool_name] = ToolState(name=tool_name, tool_config=tool_config) - return tool_config.call(*args, **kwargs) - - # updates local tool state (appends to streamed data if needed etc) - # checks if agent needs to be called if AgentMsg has Return call_agent or Stream call_agent - def handle_message(self, msg: AgentMsg) -> None: - logger.info(f"Tool msg {msg}") - - if self._tool_state.get(msg.tool_name) is None: - logger.warn( - f"Tool state for {msg.tool_name} not found, (tool not called by our agent?) initializing. (message received: {msg})" - ) - self._tool_state[msg.tool_name] = ToolState(name=msg.tool_name) - - should_call_agent = self._tool_state[msg.tool_name].handle_msg(msg) - if should_call_agent: - self.call_agent() - - # Returns a snapshot of the current state of tool runs. - # If clear is True, it will assume the snapshot is being sent to an agent - # and will clear the finished tool runs. - def state_snapshot(self, clear: bool = True) -> dict[str, ToolState]: - if not clear: - return self._tool_state - - ret = copy(self._tool_state) - - to_delete = [] - # Since state is exported, we can clear the finished tool runs - for tool_name, tool_run in self._tool_state.items(): - if tool_run.state == ToolStateEnum.ret: - logger.info(f"Tool {tool_name} finished") - to_delete.append(tool_name) - if tool_run.state == ToolStateEnum.error: - logger.error(f"Tool run error for {tool_name}") - to_delete.append(tool_name) - - for tool_name in to_delete: - logger.debug(f"Tool {tool_name} finished, removing from state") - del self._tool_state[tool_name] - - return ret - - def call_agent(self) -> None: - """Call the agent with the current state of tool runs.""" - logger.info(f"Calling agent with current tool state: {self.state_snapshot(clear=False)}") - - state = self.state_snapshot(clear=True) - - if self._agent_callback: - self._agent_callback(state) - - def __str__(self): - # Convert objects to their string representations - def stringify_value(obj): - if isinstance(obj, dict): - return {k: stringify_value(v) for k, v in obj.items()} - elif isinstance(obj, list): - return [stringify_value(item) for item in obj] - else: - return str(obj) - - ret = stringify_value(self._tool_state) - - return f"AgentInput({pformat(ret, indent=2, depth=3, width=120, compact=True)})" - - # Outputs data for the agent call - # clears the local state (finished tool calls) - def get_agent_query(self): - return self.state_snapshot() - - # Given toolcontainers can run remotely, we are - # caching available tools from static containers - # - # dynamic containers will be queried at runtime via - # .tools() method - def register_tools(self, container: ToolContainer): - if not container.dynamic_tools: - logger.info(f"Registering static tool container, {container}") - self._static_containers.append(container) - for name, tool_config in container.tools().items(): - self._tools[name] = tool_config.bind(getattr(container, name)) - else: - logger.info(f"Registering dynamic tool container, {container}") - self._dynamic_containers.append(container) - - def get_tool_config(self, tool_name: str) -> Optional[ToolConfig]: - tool_config = self._tools.get(tool_name) - if not tool_config: - tool_config = self.tools().get(tool_name) - return tool_config - - def tools(self) -> dict[str, ToolConfig]: - # static container tooling is already cached - all_tools: dict[str, ToolConfig] = {**self._tools} - - # Then aggregate tools from dynamic containers - for container in self._dynamic_containers: - for tool_name, tool_config in container.tools().items(): - all_tools[tool_name] = tool_config.bind(getattr(container, tool_name)) - - return all_tools diff --git a/dimos/protocol/tool/test_tool.py b/dimos/protocol/tool/test_tool.py deleted file mode 100644 index 5b5ee22862..0000000000 --- a/dimos/protocol/tool/test_tool.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time - -from dimos.protocol.tool.agent_listener import AgentInput -from dimos.protocol.tool.tool import ToolContainer, tool -from dimos.protocol.tool.types import Return, Stream - - -class TestContainer(ToolContainer): - @tool() - def add(self, x: int, y: int) -> int: - return x + y - - @tool() - def delayadd(self, x: int, y: int) -> int: - time.sleep(0.5) - return x + y - - -def test_introspect_tool(): - testContainer = TestContainer() - print(testContainer.tools) - - -def test_internals(): - agentInput = AgentInput() - agentInput.start() - - testContainer = TestContainer() - - agentInput.register_tools(testContainer) - - # toolcall=True makes the tool function exit early, - # it doesn't behave like a blocking function, - # - # return is passed as AgentMsg to the agent topic - testContainer.delayadd(2, 4, toolcall=True) - testContainer.add(1, 2, toolcall=True) - - time.sleep(0.25) - print(agentInput) - - time.sleep(0.75) - print(agentInput) - - print(agentInput.state_snapshot()) - - print(agentInput.tools()) - - print(agentInput) - - agentInput.execute_tool("delayadd", 1, 2) - - time.sleep(0.25) - print(agentInput) - time.sleep(0.75) - - print(agentInput) - - -def test_standard_usage(): - agentInput = AgentInput(agent_callback=print) - agentInput.start() - - testContainer = TestContainer() - - agentInput.register_tools(testContainer) - - # we can investigate tools - print(agentInput.tools()) - - # we can execute a tool - agentInput.execute_tool("delayadd", 1, 2) - - # while tool is executing, we can introspect the state - # (we see that the tool is running) - time.sleep(0.25) - print(agentInput) - time.sleep(0.75) - - # after the tool has finished, we can see the result - # and the tool state - print(agentInput) From 9b7909661fc2e70aadde40e403e84ce0de985fa4 Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 18:32:32 -0700 Subject: [PATCH 13/19] type fixes --- dimos/protocol/skill/agent_interface.py | 10 +++++++--- dimos/protocol/skill/skill.py | 2 +- dimos/protocol/skill/test_skill.py | 1 - dimos/protocol/skill/types.py | 3 ++- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dimos/protocol/skill/agent_interface.py b/dimos/protocol/skill/agent_interface.py index da821d8f4e..bf878a979f 100644 --- a/dimos/protocol/skill/agent_interface.py +++ b/dimos/protocol/skill/agent_interface.py @@ -16,7 +16,7 @@ from dataclasses import dataclass from enum import Enum from pprint import pformat -from typing import Callable, Optional +from typing import Any, Callable, Optional from dimos.protocol.skill.comms import AgentMsg, LCMSkillComms, MsgType, SkillCommsSpec from dimos.protocol.skill.skill import SkillConfig, SkillContainer @@ -83,6 +83,8 @@ def handle_msg(self, msg: AgentMsg) -> bool: self.state = SkillStateEnum.running return False + return False + def __str__(self) -> str: head = f"SkillState(state={self.state}" @@ -103,11 +105,13 @@ class AgentInterface(SkillContainer): _dynamic_containers: list[SkillContainer] _skill_state: dict[str, SkillState] _skills: dict[str, SkillConfig] - _agent_callback: Optional[Callable[[dict[str, SkillState]], any]] = None + _agent_callback: Optional[Callable[[dict[str, SkillState]], Any]] = None # agent callback is called with a state snapshot once system decides that agents needs # to be woken up - def __init__(self, agent_callback: Callable[[dict[str, SkillState]], any] = None) -> None: + def __init__( + self, agent_callback: Optional[Callable[[dict[str, SkillState]], Any]] = None + ) -> None: super().__init__() self._agent_callback = agent_callback self._static_containers = [] diff --git a/dimos/protocol/skill/skill.py b/dimos/protocol/skill/skill.py index ac9fc6bc47..d30da330dc 100644 --- a/dimos/protocol/skill/skill.py +++ b/dimos/protocol/skill/skill.py @@ -49,7 +49,7 @@ def run_function(): skill_config = SkillConfig(name=f.__name__, reducer=reducer, stream=stream, ret=ret) # implicit RPC call as well - wrapper.__rpc__ = True + wrapper.__rpc__ = True # type: ignore[attr-defined] wrapper._skill = skill_config # type: ignore[attr-defined] wrapper.__name__ = f.__name__ # Preserve original function name wrapper.__doc__ = f.__doc__ # Preserve original docstring diff --git a/dimos/protocol/skill/test_skill.py b/dimos/protocol/skill/test_skill.py index d8f854bb6e..46e9a8ad47 100644 --- a/dimos/protocol/skill/test_skill.py +++ b/dimos/protocol/skill/test_skill.py @@ -16,7 +16,6 @@ from dimos.protocol.skill.agent_interface import AgentInterface from dimos.protocol.skill.skill import SkillContainer, skill -from dimos.protocol.skill.types import Return, Stream class TestContainer(SkillContainer): diff --git a/dimos/protocol/skill/types.py b/dimos/protocol/skill/types.py index c6ee7ee7c2..a2c8bde9a2 100644 --- a/dimos/protocol/skill/types.py +++ b/dimos/protocol/skill/types.py @@ -57,6 +57,7 @@ class SkillConfig: stream: Stream ret: Return f: Callable | None = None + autostart: bool = False def bind(self, f: Callable) -> "SkillConfig": self.f = f @@ -106,7 +107,7 @@ def __init__( self, skill_name: str, content: str | int | float | dict | list, - type: Optional[MsgType] = MsgType.ret, + type: MsgType = MsgType.ret, ) -> None: self.ts = time.time() self.skill_name = skill_name From 1e33b96c024ead2299fa959b9b36a12a6a7e8a94 Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 19:09:15 -0700 Subject: [PATCH 14/19] module test --- dimos/core/module.py | 16 +++++------ dimos/protocol/skill/agent_interface.py | 26 ++++++++++-------- dimos/protocol/skill/skill.py | 10 +++---- dimos/protocol/skill/test_skill.py | 35 +++++++++++++++++++++++++ dimos/protocol/skill/types.py | 6 ++--- 5 files changed, 66 insertions(+), 27 deletions(-) diff --git a/dimos/core/module.py b/dimos/core/module.py index 943eb9b523..81db39e4ab 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -29,20 +29,20 @@ from dimos.core.core import T, rpc from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport from dimos.protocol.rpc import LCMRPC, RPCSpec -from dimos.protocol.tf import LCMTF, TFSpec from dimos.protocol.skill.comms import LCMSkillComms, SkillCommsSpec +from dimos.protocol.tf import LCMTF, TFSpec -class CommsSpec(Enum): - rpc: RPCSpec - agent: SkillCommsSpec - tf: TFSpec +class CommsSpec: + rpc: type[RPCSpec] + agent: type[SkillCommsSpec] + tf: type[TFSpec] class LCMComms(CommsSpec): - rpc: LCMRPC - agent: LCMSkillComms - tf: LCMTF + rpc = LCMRPC + agent = LCMSkillComms + tf = LCMTF class ModuleBase: diff --git a/dimos/protocol/skill/agent_interface.py b/dimos/protocol/skill/agent_interface.py index bf878a979f..13c78a6960 100644 --- a/dimos/protocol/skill/agent_interface.py +++ b/dimos/protocol/skill/agent_interface.py @@ -39,6 +39,7 @@ class SkillStateEnum(Enum): error = 3 +# TODO pending timeout, running timeout, etc. class SkillState(TimestampedCollection): name: str state: SkillStateEnum @@ -107,8 +108,8 @@ class AgentInterface(SkillContainer): _skills: dict[str, SkillConfig] _agent_callback: Optional[Callable[[dict[str, SkillState]], Any]] = None - # agent callback is called with a state snapshot once system decides that agents needs - # to be woken up + # Agent callback is called with a state snapshot once system decides + # that agents needs to be woken up, according to inputs from active skills def __init__( self, agent_callback: Optional[Callable[[dict[str, SkillState]], Any]] = None ) -> None: @@ -126,7 +127,7 @@ def start(self) -> None: def stop(self) -> None: self.agent_comms.stop() - # this is used by agent to call skills + # This is used by agent to call skills def execute_skill(self, skill_name: str, *args, **kwargs) -> None: skill_config = self.get_skill_config(skill_name) if not skill_config: @@ -139,8 +140,10 @@ def execute_skill(self, skill_name: str, *args, **kwargs) -> None: self._skill_state[skill_name] = SkillState(name=skill_name, skill_config=skill_config) return skill_config.call(*args, **kwargs) - # updates local skill state (appends to streamed data if needed etc) - # checks if agent needs to be called if AgentMsg has Return call_agent or Stream call_agent + # Receives a message from active skill + # Updates local skill state (appends to streamed data if needed etc) + # + # Checks if agent needs to be called (if ToolConfig has Return=call_agent or Stream=call_agent) def handle_message(self, msg: AgentMsg) -> None: logger.info(f"Skill msg {msg}") @@ -155,8 +158,9 @@ def handle_message(self, msg: AgentMsg) -> None: self.call_agent() # Returns a snapshot of the current state of skill runs. - # If clear is True, it will assume the snapshot is being sent to an agent - # and will clear the finished skill runs. + # + # If clear=True, it will assume the snapshot is being sent to an agent + # and will clear the finished skill runs from the state def state_snapshot(self, clear: bool = True) -> dict[str, SkillState]: if not clear: return self._skill_state @@ -203,14 +207,14 @@ def stringify_value(obj): return f"AgentInput({pformat(ret, indent=2, depth=3, width=120, compact=True)})" # Outputs data for the agent call - # clears the local state (finished skill calls) + # Clears the local state (finished skill calls) def get_agent_query(self): return self.state_snapshot() # Given skillcontainers can run remotely, we are - # caching available skills from static containers + # Caching available skills from static containers # - # dynamic containers will be queried at runtime via + # Dynamic containers will be queried at runtime via # .skills() method def register_skills(self, container: SkillContainer): if not container.dynamic_skills: @@ -229,7 +233,7 @@ def get_skill_config(self, skill_name: str) -> Optional[SkillConfig]: return skill_config def skills(self) -> dict[str, SkillConfig]: - # static container skilling is already cached + # Static container skilling is already cached all_skills: dict[str, SkillConfig] = {**self._skills} # Then aggregate skills from dynamic containers diff --git a/dimos/protocol/skill/skill.py b/dimos/protocol/skill/skill.py index d30da330dc..46f3e769f2 100644 --- a/dimos/protocol/skill/skill.py +++ b/dimos/protocol/skill/skill.py @@ -22,8 +22,8 @@ MsgType, Reducer, Return, - Stream, SkillConfig, + Stream, ) @@ -59,17 +59,17 @@ def run_function(): class CommsSpec: - agent_comms_class: type[SkillCommsSpec] + agent: type[SkillCommsSpec] class LCMComms(CommsSpec): - agent_comms_class: type[SkillCommsSpec] = LCMSkillComms + agent: type[SkillCommsSpec] = LCMSkillComms # here we can have also dynamic skills potentially # agent can check .skills each time when introspecting class SkillContainer: - comms: CommsSpec = LCMComms() + comms: CommsSpec = LCMComms _agent_comms: Optional[SkillCommsSpec] = None dynamic_skills = False @@ -90,5 +90,5 @@ def skills(self) -> dict[str, SkillConfig]: @property def agent_comms(self) -> SkillCommsSpec: if self._agent_comms is None: - self._agent_comms = self.comms.agent_comms_class() + self._agent_comms = self.comms.agent() return self._agent_comms diff --git a/dimos/protocol/skill/test_skill.py b/dimos/protocol/skill/test_skill.py index 46e9a8ad47..9bf7e85a35 100644 --- a/dimos/protocol/skill/test_skill.py +++ b/dimos/protocol/skill/test_skill.py @@ -93,3 +93,38 @@ def test_standard_usage(): # after the skill has finished, we can see the result # and the skill state print(agentInterface) + + +def test_module(): + from dimos.core import Module, start + + class MockModule(Module, SkillContainer): + def __init__(self): + super().__init__() + SkillContainer.__init__(self) + + @skill() + def add(self, x: int, y: int) -> int: + time.sleep(0.5) + return x * y + + agentInterface = AgentInterface(agent_callback=print) + agentInterface.start() + + dimos = start(1) + mock_module = dimos.deploy(MockModule) + + agentInterface.register_skills(mock_module) + + # we can execute a skill + agentInterface.execute_skill("add", 1, 2) + + # while skill is executing, we can introspect the state + # (we see that the skill is running) + time.sleep(0.25) + print(agentInterface) + time.sleep(0.75) + + # after the skill has finished, we can see the result + # and the skill state + print(agentInterface) diff --git a/dimos/protocol/skill/types.py b/dimos/protocol/skill/types.py index a2c8bde9a2..e4b09a7ef9 100644 --- a/dimos/protocol/skill/types.py +++ b/dimos/protocol/skill/types.py @@ -27,9 +27,9 @@ class Call(Enum): class Reducer(Enum): none = 0 - all = lambda data: data - latest = lambda data: data[-1] if data else None - average = lambda data: sum(data) / len(data) if data else None + all = 1 + latest = 2 + average = 3 class Stream(Enum): From afa7bcf3bf44a221ae571b34ef49f5590b3ad0af Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 19:19:01 -0700 Subject: [PATCH 15/19] modules provide tf by default --- dimos/core/module.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/dimos/core/module.py b/dimos/core/module.py index 81db39e4ab..e30df27a68 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -69,6 +69,16 @@ def tf(self): self._tf = self.comms.tf() return self._tf + @tf.setter + def tf(self, value): + import warnings + + warnings.warn( + "tf is available on all modules. Call self.tf.start() to activate tf functionality. No need to assign it", + UserWarning, + stacklevel=2, + ) + @property def outputs(self) -> dict[str, Out]: return { From 76eedee5f627b4c5e6df976e5cb683408847244a Mon Sep 17 00:00:00 2001 From: lesh Date: Tue, 5 Aug 2025 22:58:59 -0700 Subject: [PATCH 16/19] agentspy cli, other cli tools installing corectly via pyproject --- bin/foxglove-bridge | 7 - bin/lcmspy | 7 - dimos/protocol/skill/agent_interface.py | 5 - dimos/utils/cli/agentspy/agentspy.py | 366 ++++++++++++++++++ dimos/utils/cli/agentspy/demo_agentspy.py | 103 +++++ .../foxglove_bridge/run_foxglove_bridge.py | 6 +- dimos/utils/cli/lcmspy/run_lcmspy.py | 6 +- pyproject.toml | 4 + 8 files changed, 483 insertions(+), 21 deletions(-) delete mode 100755 bin/foxglove-bridge delete mode 100755 bin/lcmspy create mode 100644 dimos/utils/cli/agentspy/agentspy.py create mode 100644 dimos/utils/cli/agentspy/demo_agentspy.py diff --git a/bin/foxglove-bridge b/bin/foxglove-bridge deleted file mode 100755 index 8d80ac52cd..0000000000 --- a/bin/foxglove-bridge +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash -# current script dir + ..dimos - - -script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" - -python $script_dir/../dimos/utils/cli/foxglove_bridge/run_foxglove_bridge.py "$@" diff --git a/bin/lcmspy b/bin/lcmspy deleted file mode 100755 index 64387aad98..0000000000 --- a/bin/lcmspy +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash -# current script dir + ..dimos - - -script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" - -python $script_dir/../dimos/utils/cli/lcmspy/run_lcmspy.py "$@" diff --git a/dimos/protocol/skill/agent_interface.py b/dimos/protocol/skill/agent_interface.py index 13c78a6960..9f186f99ec 100644 --- a/dimos/protocol/skill/agent_interface.py +++ b/dimos/protocol/skill/agent_interface.py @@ -206,11 +206,6 @@ def stringify_value(obj): return f"AgentInput({pformat(ret, indent=2, depth=3, width=120, compact=True)})" - # Outputs data for the agent call - # Clears the local state (finished skill calls) - def get_agent_query(self): - return self.state_snapshot() - # Given skillcontainers can run remotely, we are # Caching available skills from static containers # diff --git a/dimos/utils/cli/agentspy/agentspy.py b/dimos/utils/cli/agentspy/agentspy.py new file mode 100644 index 0000000000..fa02540aa2 --- /dev/null +++ b/dimos/utils/cli/agentspy/agentspy.py @@ -0,0 +1,366 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +import logging +import threading +import time +from typing import Callable, Dict, Optional + +from rich.text import Text +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Container, Horizontal, Vertical +from textual.reactive import reactive +from textual.widgets import DataTable, Footer, Header, RichLog + +from dimos.protocol.skill.agent_interface import AgentInterface, SkillState, SkillStateEnum +from dimos.protocol.skill.comms import AgentMsg, LCMSkillComms +from dimos.protocol.skill.types import MsgType + +logger = logging.getLogger(__name__) + + +class AgentSpy: + """Spy on agent skill executions via LCM messages.""" + + def __init__(self): + self.agent_interface = AgentInterface() + self.message_callbacks: list[Callable[[Dict[str, SkillState]], None]] = [] + self._lock = threading.Lock() + self._latest_state: Dict[str, SkillState] = {} + + def start(self): + """Start spying on agent messages.""" + # Start the agent interface + self.agent_interface.start() + + # Subscribe to the agent interface's comms + self.agent_interface.agent_comms.subscribe(self._handle_message) + logger.info("AgentSpy started, subscribed to agent messages") + + def stop(self): + """Stop spying.""" + # Nothing to stop since we're using agent_interface's comms + pass + + def _handle_message(self, msg: AgentMsg): + """Handle incoming agent messages.""" + logger.debug(f"AgentSpy received message: {msg}") + + # Small delay to ensure agent_interface has processed the message + def delayed_update(): + time.sleep(0.1) + with self._lock: + self._latest_state = self.agent_interface.state_snapshot(clear=False) + logger.debug(f"State snapshot has {len(self._latest_state)} skills") + for callback in self.message_callbacks: + logger.debug(f"Calling callback with state") + callback(self._latest_state) + + # Run in separate thread to not block LCM + threading.Thread(target=delayed_update, daemon=True).start() + + def subscribe(self, callback: Callable[[Dict[str, SkillState]], None]): + """Subscribe to state updates.""" + self.message_callbacks.append(callback) + + def get_state(self) -> Dict[str, SkillState]: + """Get current state snapshot.""" + with self._lock: + return self._latest_state.copy() + + +def state_color(state: SkillStateEnum) -> str: + """Get color for skill state.""" + if state == SkillStateEnum.pending: + return "yellow" + elif state == SkillStateEnum.running: + return "cyan" + elif state == SkillStateEnum.ret: + return "green" + elif state == SkillStateEnum.error: + return "red" + return "white" + + +def format_duration(duration: float) -> str: + """Format duration in human readable format.""" + if duration < 1: + return f"{duration * 1000:.0f}ms" + elif duration < 60: + return f"{duration:.1f}s" + elif duration < 3600: + return f"{duration / 60:.1f}m" + else: + return f"{duration / 3600:.1f}h" + + +class TextualLogHandler(logging.Handler): + """Custom log handler that sends logs to a Textual RichLog widget.""" + + def __init__(self, log_widget: RichLog): + super().__init__() + self.log_widget = log_widget + + def emit(self, record): + """Emit a log record to the RichLog widget.""" + try: + msg = self.format(record) + # Color based on level + if record.levelno >= logging.ERROR: + style = "bold red" + elif record.levelno >= logging.WARNING: + style = "yellow" + elif record.levelno >= logging.INFO: + style = "green" + else: + style = "dim" + + self.log_widget.write(Text(msg, style=style)) + except Exception: + self.handleError(record) + + +class AgentSpyApp(App): + """A real-time CLI dashboard for agent skill monitoring using Textual.""" + + CSS = """ + Screen { + layout: vertical; + } + Vertical { + height: 100%; + } + DataTable { + height: 70%; + border: none; + background: black; + } + RichLog { + height: 30%; + border: none; + background: black; + border-top: solid $primary; + } + """ + + BINDINGS = [ + Binding("q", "quit", "Quit"), + Binding("c", "clear", "Clear History"), + Binding("l", "toggle_logs", "Toggle Logs"), + ] + + show_logs = reactive(True) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.spy = AgentSpy() + self.table: Optional[DataTable] = None + self.log_view: Optional[RichLog] = None + self.skill_history: list[tuple[str, SkillState, float]] = [] # (name, state, start_time) + self.log_handler: Optional[TextualLogHandler] = None + + def compose(self) -> ComposeResult: + self.table = DataTable(zebra_stripes=False, cursor_type=None) + self.table.add_column("Skill Name", width=30) + self.table.add_column("State", width=10) + self.table.add_column("Duration", width=10) + self.table.add_column("Start Time", width=12) + self.table.add_column("Messages", width=10) + self.table.add_column("Details", width=40) + + self.log_view = RichLog(markup=True, wrap=True) + + with Vertical(): + yield self.table + yield self.log_view + + yield Footer() + + def on_mount(self): + """Start the spy when app mounts.""" + self.theme = "flexoki" + + # Set up custom log handler to show logs in the UI + if self.log_view: + self.log_handler = TextualLogHandler(self.log_view) + self.log_handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S" + ) + ) + # Add handler to root logger + root_logger = logging.getLogger() + root_logger.addHandler(self.log_handler) + root_logger.setLevel(logging.INFO) + + # Set initial visibility + if not self.show_logs: + self.log_view.visible = False + self.table.styles.height = "100%" + + self.spy.subscribe(self.update_state) + self.spy.start() + + # Also set up periodic refresh to update durations + self.set_interval(0.5, self.refresh_table) + + async def on_unmount(self): + """Stop the spy when app unmounts.""" + self.spy.stop() + + def update_state(self, state: Dict[str, SkillState]): + """Update state from spy callback.""" + logger.info(f"AgentSpyApp.update_state called with {len(state)} skills") + + # Update history with current state + current_time = time.time() + + # Add new skills or update existing ones + for skill_name, skill_state in state.items(): + logger.debug(f"Processing skill {skill_name} in state {skill_state.state}") + + # Find if skill already in history + found = False + for i, (name, old_state, start_time) in enumerate(self.skill_history): + if name == skill_name: + # Update existing entry + self.skill_history[i] = (skill_name, skill_state, start_time) + found = True + break + + if not found: + # Add new entry with current time as start + start_time = current_time + if len(skill_state) > 0: + # Use first message timestamp if available + start_time = skill_state._items[0].ts + self.skill_history.append((skill_name, skill_state, start_time)) + logger.info(f"Added new skill to history: {skill_name}") + + logger.info(f"History now has {len(self.skill_history)} skills") + + # Schedule UI update + self.call_from_thread(self.refresh_table) + + def refresh_table(self): + """Refresh the table display.""" + logger.debug(f"refresh_table called, history has {len(self.skill_history)} items") + + if not self.table: + logger.warning("Table not initialized yet") + return + + # Clear table + self.table.clear(columns=False) + + # Sort by start time (newest first) + sorted_history = sorted(self.skill_history, key=lambda x: x[2], reverse=True) + + # Get terminal height and calculate how many rows we can show + height = self.size.height - 6 # Account for header, footer, column headers + max_rows = max(1, height) + + logger.debug( + f"Showing {min(len(sorted_history), max_rows)} of {len(sorted_history)} skills" + ) + + # Show only top N entries + for skill_name, skill_state, start_time in sorted_history[:max_rows]: + # Calculate how long ago it started + time_ago = time.time() - start_time + start_str = format_duration(time_ago) + " ago" + + # Duration + duration_str = format_duration(skill_state.duration()) + + # Message count + msg_count = len(skill_state) + + # Details based on state and last message + details = "" + if skill_state.state == SkillStateEnum.error and msg_count > 0: + # Show error message + last_msg = skill_state._items[-1] + if last_msg.type == MsgType.error: + details = str(last_msg.content)[:40] + elif skill_state.state == SkillStateEnum.ret and msg_count > 0: + # Show return value + last_msg = skill_state._items[-1] + if last_msg.type == MsgType.ret: + details = f"→ {str(last_msg.content)[:37]}" + elif skill_state.state == SkillStateEnum.running: + # Show progress indicator + details = "⋯ " + "▸" * min(int(time_ago), 20) + + # Add row with colored state + self.table.add_row( + Text(skill_name, style="white"), + Text(skill_state.state.name, style=state_color(skill_state.state)), + Text(duration_str, style="dim"), + Text(start_str, style="dim"), + Text(str(msg_count), style="dim"), + Text(details, style="dim white"), + ) + + def action_clear(self): + """Clear the skill history.""" + self.skill_history.clear() + self.refresh_table() + + def action_toggle_logs(self): + """Toggle the log view visibility.""" + self.show_logs = not self.show_logs + if self.show_logs: + self.table.styles.height = "70%" + else: + self.table.styles.height = "100%" + self.log_view.visible = self.show_logs + + +def main(): + """Main entry point for agentspy CLI.""" + # Set up logging to file for debugging + import os + import sys + + log_file = os.path.join(os.path.dirname(__file__), "agentspy_debug.log") + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + filename=log_file, + filemode="w", + ) + + # Check if running in web mode + if len(sys.argv) > 1 and sys.argv[1] == "web": + import os + + from textual_serve.server import Server + + server = Server(f"python {os.path.abspath(__file__)}") + server.serve() + else: + logger.info("Starting AgentSpy app...") + + # Don't disable logging - we'll show it in the UI instead + app = AgentSpyApp() + app.run() + + +if __name__ == "__main__": + main() diff --git a/dimos/utils/cli/agentspy/demo_agentspy.py b/dimos/utils/cli/agentspy/demo_agentspy.py new file mode 100644 index 0000000000..2b39674a7b --- /dev/null +++ b/dimos/utils/cli/agentspy/demo_agentspy.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Demo script that runs skills in the background while agentspy monitors them.""" + +import time +import threading +from dimos.protocol.skill.agent_interface import AgentInterface +from dimos.protocol.skill.skill import SkillContainer, skill + + +class DemoSkills(SkillContainer): + @skill() + def count_to(self, n: int) -> str: + """Count to n with delays.""" + for i in range(n): + time.sleep(0.5) + return f"Counted to {n}" + + @skill() + def compute_fibonacci(self, n: int) -> int: + """Compute nth fibonacci number.""" + if n <= 1: + return n + a, b = 0, 1 + for _ in range(2, n + 1): + time.sleep(0.1) # Simulate computation + a, b = b, a + b + return b + + @skill() + def simulate_error(self) -> None: + """Skill that always errors.""" + time.sleep(0.3) + raise RuntimeError("Simulated error for testing") + + @skill() + def quick_task(self, name: str) -> str: + """Quick task that completes fast.""" + time.sleep(0.1) + return f"Quick task '{name}' done!" + + +def run_demo_skills(): + """Run demo skills in background.""" + # Create and start agent interface + agent_interface = AgentInterface() + agent_interface.start() + + # Register skills + demo_skills = DemoSkills() + agent_interface.register_skills(demo_skills) + + # Run various skills periodically + def skill_runner(): + counter = 0 + while True: + time.sleep(2) + + # Run different skills based on counter + if counter % 4 == 0: + demo_skills.count_to(3, skillcall=True) + elif counter % 4 == 1: + demo_skills.compute_fibonacci(10, skillcall=True) + elif counter % 4 == 2: + demo_skills.quick_task(f"task-{counter}", skillcall=True) + else: + try: + demo_skills.simulate_error(skillcall=True) + except: + pass # Expected to fail + + counter += 1 + + # Start skill runner in background + thread = threading.Thread(target=skill_runner, daemon=True) + thread.start() + + print("Demo skills running in background. Start agentspy in another terminal to monitor.") + print("Run: agentspy") + + # Keep running + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nDemo stopped.") + + +if __name__ == "__main__": + run_demo_skills() diff --git a/dimos/utils/cli/foxglove_bridge/run_foxglove_bridge.py b/dimos/utils/cli/foxglove_bridge/run_foxglove_bridge.py index bbcb70faee..a0cf07ffb6 100644 --- a/dimos/utils/cli/foxglove_bridge/run_foxglove_bridge.py +++ b/dimos/utils/cli/foxglove_bridge/run_foxglove_bridge.py @@ -58,5 +58,9 @@ def bridge_thread(): print("Shutting down...") -if __name__ == "__main__": +def main(): run_bridge_example() + + +if __name__ == "__main__": + main() diff --git a/dimos/utils/cli/lcmspy/run_lcmspy.py b/dimos/utils/cli/lcmspy/run_lcmspy.py index 17a9d0bbc6..13288cafe9 100644 --- a/dimos/utils/cli/lcmspy/run_lcmspy.py +++ b/dimos/utils/cli/lcmspy/run_lcmspy.py @@ -118,7 +118,7 @@ def refresh_table(self): ) -if __name__ == "__main__": +def main(): import sys if len(sys.argv) > 1 and sys.argv[1] == "web": @@ -130,3 +130,7 @@ def refresh_table(self): server.serve() else: LCMSpyApp().run() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index fa0c73cbce..fcc62bf476 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,10 @@ dependencies = [ "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@ba3445d16be75a7ade6fb2a516b39a3e44319d5c" ] +[project.scripts] +lcmspy = "dimos.utils.cli.lcmspy.run_lcmspy:main" +foxglove-bridge = "dimos.utils.cli.foxglove_bridge.run_foxglove_bridge:main" +agentspy = "dimos.utils.cli.agentspy.agentspy:main" [project.optional-dependencies] manipulation = [ From bf35f92aceae36263cfc75f171b84536fee83c6a Mon Sep 17 00:00:00 2001 From: lesh Date: Wed, 6 Aug 2025 00:53:47 -0700 Subject: [PATCH 17/19] small fixes --- dimos/protocol/skill/agent_interface.py | 12 ++-- dimos/protocol/skill/skill.py | 7 +- dimos/utils/cli/agentspy/agentspy.py | 86 +++++++++++++------------ 3 files changed, 55 insertions(+), 50 deletions(-) diff --git a/dimos/protocol/skill/agent_interface.py b/dimos/protocol/skill/agent_interface.py index 9f186f99ec..7694d7b807 100644 --- a/dimos/protocol/skill/agent_interface.py +++ b/dimos/protocol/skill/agent_interface.py @@ -24,7 +24,7 @@ from dimos.types.timestamped import TimestampedCollection from dimos.utils.logging_config import setup_logger -logger = setup_logger("dimos.protocol.skill.agent_input") +logger = setup_logger("dimos.protocol.skill.agent_interface") @dataclass @@ -35,7 +35,7 @@ class AgentInputConfig: class SkillStateEnum(Enum): pending = 0 running = 1 - ret = 2 + returned = 2 error = 3 @@ -71,7 +71,7 @@ def handle_msg(self, msg: AgentMsg) -> bool: return True if msg.type == MsgType.ret: - self.state = SkillStateEnum.ret + self.state = SkillStateEnum.returned if self.skill_config.ret == Return.call_agent: return True return False @@ -89,7 +89,7 @@ def handle_msg(self, msg: AgentMsg) -> bool: def __str__(self) -> str: head = f"SkillState(state={self.state}" - if self.state == SkillStateEnum.ret or self.state == SkillStateEnum.error: + if self.state == SkillStateEnum.returned or self.state == SkillStateEnum.error: head += ", ran for=" else: head += ", running for=" @@ -145,7 +145,7 @@ def execute_skill(self, skill_name: str, *args, **kwargs) -> None: # # Checks if agent needs to be called (if ToolConfig has Return=call_agent or Stream=call_agent) def handle_message(self, msg: AgentMsg) -> None: - logger.info(f"Skill msg {msg}") + logger.info(f"Skill '{msg.skill_name}' - {msg}") if self._skill_state.get(msg.skill_name) is None: logger.warn( @@ -170,7 +170,7 @@ def state_snapshot(self, clear: bool = True) -> dict[str, SkillState]: to_delete = [] # Since state is exported, we can clear the finished skill runs for skill_name, skill_run in self._skill_state.items(): - if skill_run.state == SkillStateEnum.ret: + if skill_run.state == SkillStateEnum.returned: logger.info(f"Skill {skill_name} finished") to_delete.append(skill_name) if skill_run.state == SkillStateEnum.error: diff --git a/dimos/protocol/skill/skill.py b/dimos/protocol/skill/skill.py index 46f3e769f2..e0f868b5f9 100644 --- a/dimos/protocol/skill/skill.py +++ b/dimos/protocol/skill/skill.py @@ -37,8 +37,11 @@ def wrapper(self, *args, **kwargs): def run_function(): self.agent_comms.publish(AgentMsg(skill, None, type=MsgType.start)) - val = f(self, *args, **kwargs) - self.agent_comms.publish(AgentMsg(skill, val, type=MsgType.ret)) + try: + val = f(self, *args, **kwargs) + self.agent_comms.publish(AgentMsg(skill, val, type=MsgType.ret)) + except Exception as e: + self.agent_comms.publish(AgentMsg(skill, str(e), type=MsgType.error)) thread = threading.Thread(target=run_function) thread.start() diff --git a/dimos/utils/cli/agentspy/agentspy.py b/dimos/utils/cli/agentspy/agentspy.py index fa02540aa2..fba1e6ce5f 100644 --- a/dimos/utils/cli/agentspy/agentspy.py +++ b/dimos/utils/cli/agentspy/agentspy.py @@ -31,8 +31,6 @@ from dimos.protocol.skill.comms import AgentMsg, LCMSkillComms from dimos.protocol.skill.types import MsgType -logger = logging.getLogger(__name__) - class AgentSpy: """Spy on agent skill executions via LCM messages.""" @@ -50,7 +48,6 @@ def start(self): # Subscribe to the agent interface's comms self.agent_interface.agent_comms.subscribe(self._handle_message) - logger.info("AgentSpy started, subscribed to agent messages") def stop(self): """Stop spying.""" @@ -59,16 +56,13 @@ def stop(self): def _handle_message(self, msg: AgentMsg): """Handle incoming agent messages.""" - logger.debug(f"AgentSpy received message: {msg}") # Small delay to ensure agent_interface has processed the message def delayed_update(): time.sleep(0.1) with self._lock: self._latest_state = self.agent_interface.state_snapshot(clear=False) - logger.debug(f"State snapshot has {len(self._latest_state)} skills") for callback in self.message_callbacks: - logger.debug(f"Calling callback with state") callback(self._latest_state) # Run in separate thread to not block LCM @@ -90,7 +84,7 @@ def state_color(state: SkillStateEnum) -> str: return "yellow" elif state == SkillStateEnum.running: return "cyan" - elif state == SkillStateEnum.ret: + elif state == SkillStateEnum.returned: return "green" elif state == SkillStateEnum.error: return "red" @@ -109,12 +103,28 @@ def format_duration(duration: float) -> str: return f"{duration / 3600:.1f}h" +class AgentSpyLogFilter(logging.Filter): + """Filter to suppress specific log messages in agentspy.""" + + def filter(self, record): + # Suppress the "Skill state not found" warning as it's expected in agentspy + if ( + record.levelname == "WARNING" + and "Skill state for" in record.getMessage() + and "not found" in record.getMessage() + ): + return False + return True + + class TextualLogHandler(logging.Handler): """Custom log handler that sends logs to a Textual RichLog widget.""" def __init__(self, log_widget: RichLog): super().__init__() self.log_widget = log_widget + # Add filter to suppress expected warnings + self.addFilter(AgentSpyLogFilter()) def emit(self, record): """Emit a log record to the RichLog widget.""" @@ -176,12 +186,12 @@ def __init__(self, *args, **kwargs): def compose(self) -> ComposeResult: self.table = DataTable(zebra_stripes=False, cursor_type=None) - self.table.add_column("Skill Name", width=30) - self.table.add_column("State", width=10) - self.table.add_column("Duration", width=10) - self.table.add_column("Start Time", width=12) - self.table.add_column("Messages", width=10) - self.table.add_column("Details", width=40) + self.table.add_column("Skill Name") + self.table.add_column("State") + self.table.add_column("Duration") + self.table.add_column("Start Time") + self.table.add_column("Messages") + self.table.add_column("Details") self.log_view = RichLog(markup=True, wrap=True) @@ -195,11 +205,30 @@ def on_mount(self): """Start the spy when app mounts.""" self.theme = "flexoki" + # Remove ALL existing handlers from ALL loggers to prevent console output + # This is needed because setup_logger creates loggers with propagate=False + for name in logging.root.manager.loggerDict: + logger = logging.getLogger(name) + logger.handlers.clear() + logger.propagate = True + + # Clear root logger handlers too + logging.root.handlers.clear() + # Set up custom log handler to show logs in the UI if self.log_view: self.log_handler = TextualLogHandler(self.log_view) + + # Custom formatter that shortens the logger name + class ShortNameFormatter(logging.Formatter): + def format(self, record): + # Remove the common prefix from logger names + if record.name.startswith("dimos.protocol.skill."): + record.name = record.name.replace("dimos.protocol.skill.", "") + return super().format(record) + self.log_handler.setFormatter( - logging.Formatter( + ShortNameFormatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S" ) ) @@ -225,15 +254,11 @@ async def on_unmount(self): def update_state(self, state: Dict[str, SkillState]): """Update state from spy callback.""" - logger.info(f"AgentSpyApp.update_state called with {len(state)} skills") - # Update history with current state current_time = time.time() # Add new skills or update existing ones for skill_name, skill_state in state.items(): - logger.debug(f"Processing skill {skill_name} in state {skill_state.state}") - # Find if skill already in history found = False for i, (name, old_state, start_time) in enumerate(self.skill_history): @@ -250,19 +275,13 @@ def update_state(self, state: Dict[str, SkillState]): # Use first message timestamp if available start_time = skill_state._items[0].ts self.skill_history.append((skill_name, skill_state, start_time)) - logger.info(f"Added new skill to history: {skill_name}") - - logger.info(f"History now has {len(self.skill_history)} skills") # Schedule UI update self.call_from_thread(self.refresh_table) def refresh_table(self): """Refresh the table display.""" - logger.debug(f"refresh_table called, history has {len(self.skill_history)} items") - if not self.table: - logger.warning("Table not initialized yet") return # Clear table @@ -275,10 +294,6 @@ def refresh_table(self): height = self.size.height - 6 # Account for header, footer, column headers max_rows = max(1, height) - logger.debug( - f"Showing {min(len(sorted_history), max_rows)} of {len(sorted_history)} skills" - ) - # Show only top N entries for skill_name, skill_state, start_time in sorted_history[:max_rows]: # Calculate how long ago it started @@ -298,7 +313,7 @@ def refresh_table(self): last_msg = skill_state._items[-1] if last_msg.type == MsgType.error: details = str(last_msg.content)[:40] - elif skill_state.state == SkillStateEnum.ret and msg_count > 0: + elif skill_state.state == SkillStateEnum.returned and msg_count > 0: # Show return value last_msg = skill_state._items[-1] if last_msg.type == MsgType.ret: @@ -334,18 +349,8 @@ def action_toggle_logs(self): def main(): """Main entry point for agentspy CLI.""" - # Set up logging to file for debugging - import os import sys - log_file = os.path.join(os.path.dirname(__file__), "agentspy_debug.log") - logging.basicConfig( - level=logging.DEBUG, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - filename=log_file, - filemode="w", - ) - # Check if running in web mode if len(sys.argv) > 1 and sys.argv[1] == "web": import os @@ -355,9 +360,6 @@ def main(): server = Server(f"python {os.path.abspath(__file__)}") server.serve() else: - logger.info("Starting AgentSpy app...") - - # Don't disable logging - we'll show it in the UI instead app = AgentSpyApp() app.run() From 21acf760ed6743cb0aabc90b17b09f8e03ff5b22 Mon Sep 17 00:00:00 2001 From: lesh Date: Wed, 6 Aug 2025 09:35:36 -0700 Subject: [PATCH 18/19] cleanup --- dimos/utils/cli/agentspy/agentspy.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dimos/utils/cli/agentspy/agentspy.py b/dimos/utils/cli/agentspy/agentspy.py index fba1e6ce5f..b5d49d4ea2 100644 --- a/dimos/utils/cli/agentspy/agentspy.py +++ b/dimos/utils/cli/agentspy/agentspy.py @@ -51,8 +51,7 @@ def start(self): def stop(self): """Stop spying.""" - # Nothing to stop since we're using agent_interface's comms - pass + self.agent_interface.stop() def _handle_message(self, msg: AgentMsg): """Handle incoming agent messages.""" @@ -172,6 +171,7 @@ class AgentSpyApp(App): Binding("q", "quit", "Quit"), Binding("c", "clear", "Clear History"), Binding("l", "toggle_logs", "Toggle Logs"), + Binding("ctrl+c", "quit", "Quit", show=False), ] show_logs = reactive(True) @@ -248,9 +248,13 @@ def format(self, record): # Also set up periodic refresh to update durations self.set_interval(0.5, self.refresh_table) - async def on_unmount(self): + def on_unmount(self): """Stop the spy when app unmounts.""" self.spy.stop() + # Remove log handler to prevent errors on shutdown + if self.log_handler: + root_logger = logging.getLogger() + root_logger.removeHandler(self.log_handler) def update_state(self, state: Dict[str, SkillState]): """Update state from spy callback.""" From 5f6c5db6b20b3be7e8c6408dd95b33794e47313e Mon Sep 17 00:00:00 2001 From: lesh Date: Wed, 6 Aug 2025 13:11:45 -0700 Subject: [PATCH 19/19] small changes --- dimos/protocol/skill/agent_interface.py | 13 +++++-------- dimos/utils/cli/agentspy/agentspy.py | 4 ++-- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/dimos/protocol/skill/agent_interface.py b/dimos/protocol/skill/agent_interface.py index 7694d7b807..8a9926d028 100644 --- a/dimos/protocol/skill/agent_interface.py +++ b/dimos/protocol/skill/agent_interface.py @@ -47,12 +47,9 @@ class SkillState(TimestampedCollection): def __init__(self, name: str, skill_config: Optional[SkillConfig] = None) -> None: super().__init__() - if skill_config is None: - self.skill_config = SkillConfig( - name=name, stream=Stream.none, ret=Return.none, reducer=Reducer.none - ) - else: - self.skill_config = skill_config + self.skill_config = skill_config or SkillConfig( + name=name, stream=Stream.none, ret=Return.none, reducer=Reducer.none + ) self.state = SkillStateEnum.pending self.name = name @@ -145,7 +142,7 @@ def execute_skill(self, skill_name: str, *args, **kwargs) -> None: # # Checks if agent needs to be called (if ToolConfig has Return=call_agent or Stream=call_agent) def handle_message(self, msg: AgentMsg) -> None: - logger.info(f"Skill '{msg.skill_name}' - {msg}") + logger.info(f"{msg.skill_name} - {msg}") if self._skill_state.get(msg.skill_name) is None: logger.warn( @@ -178,7 +175,7 @@ def state_snapshot(self, clear: bool = True) -> dict[str, SkillState]: to_delete.append(skill_name) for skill_name in to_delete: - logger.debug(f"Skill {skill_name} finished, removing from state") + logger.debug(f"{skill_name} finished, removing from state") del self._skill_state[skill_name] return ret diff --git a/dimos/utils/cli/agentspy/agentspy.py b/dimos/utils/cli/agentspy/agentspy.py index b5d49d4ea2..0c25a89612 100644 --- a/dimos/utils/cli/agentspy/agentspy.py +++ b/dimos/utils/cli/agentspy/agentspy.py @@ -82,9 +82,9 @@ def state_color(state: SkillStateEnum) -> str: if state == SkillStateEnum.pending: return "yellow" elif state == SkillStateEnum.running: - return "cyan" - elif state == SkillStateEnum.returned: return "green" + elif state == SkillStateEnum.returned: + return "cyan" elif state == SkillStateEnum.error: return "red" return "white"