diff --git a/bin/foxglove-bridge b/bin/foxglove-bridge deleted file mode 100755 index 8d80ac52cd..0000000000 --- a/bin/foxglove-bridge +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash -# current script dir + ..dimos - - -script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" - -python $script_dir/../dimos/utils/cli/foxglove_bridge/run_foxglove_bridge.py "$@" diff --git a/bin/lcmspy b/bin/lcmspy deleted file mode 100755 index 64387aad98..0000000000 --- a/bin/lcmspy +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash -# current script dir + ..dimos - - -script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" - -python $script_dir/../dimos/utils/cli/lcmspy/run_lcmspy.py "$@" diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 9bb1a3dc68..179056d7af 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -12,7 +12,7 @@ from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport from dimos.core.transport import LCMTransport, ZenohTransport, pLCMTransport from dimos.protocol.rpc.lcmrpc import LCMRPC -from dimos.protocol.rpc.spec import RPC +from dimos.protocol.rpc.spec import RPCSpec from dimos.protocol.tf import LCMTF, TF, PubSubTF, TFConfig, TFSpec __all__ = ["TF", "LCMTF", "PubSubTF", "TFSpec", "TFConfig"] diff --git a/dimos/core/module.py b/dimos/core/module.py index c2a33869ce..e30df27a68 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import inspect +from enum import Enum from typing import ( Any, Callable, + Optional, + TypeVar, get_args, get_origin, get_type_hints, @@ -25,19 +28,57 @@ from dimos.core import colors from dimos.core.core import T, rpc from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport -from dimos.protocol.rpc.lcmrpc import LCMRPC +from dimos.protocol.rpc import LCMRPC, RPCSpec +from dimos.protocol.skill.comms import LCMSkillComms, SkillCommsSpec +from dimos.protocol.tf import LCMTF, TFSpec + + +class CommsSpec: + rpc: type[RPCSpec] + agent: type[SkillCommsSpec] + tf: type[TFSpec] + + +class LCMComms(CommsSpec): + rpc = LCMRPC + agent = LCMSkillComms + tf = LCMTF class ModuleBase: + comms: CommsSpec = LCMComms + _rpc: Optional[RPCSpec] = None + _agent: Optional[SkillCommsSpec] = None + _tf: Optional[TFSpec] = None + def __init__(self, *args, **kwargs): + # we can completely override comms protocols if we want + if kwargs.get("comms", None) is not None: + self.comms = kwargs["comms"] try: get_worker() - self.rpc = LCMRPC() + self.rpc = self.comms.rpc() self.rpc.serve_module_rpc(self) self.rpc.start() except ValueError: return + @property + def tf(self): + if self._tf is None: + self._tf = self.comms.tf() + return self._tf + + @tf.setter + def tf(self, value): + import warnings + + warnings.warn( + "tf is available on all modules. Call self.tf.start() to activate tf functionality. No need to assign it", + UserWarning, + stacklevel=2, + ) + @property def outputs(self) -> dict[str, Out]: return { diff --git a/dimos/protocol/rpc/__init.py b/dimos/protocol/rpc/__init__.py similarity index 90% rename from dimos/protocol/rpc/__init.py rename to dimos/protocol/rpc/__init__.py index b38609e9fd..4061c9e9cf 100644 --- a/dimos/protocol/rpc/__init.py +++ b/dimos/protocol/rpc/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. from dimos.protocol.rpc.lcmrpc import LCMRPC -from dimos.protocol.rpc.spec import RPC, RPCClient, RPCServer +from dimos.protocol.rpc.spec import RPCClient, RPCServer, RPCSpec diff --git a/dimos/protocol/rpc/pubsubrpc.py b/dimos/protocol/rpc/pubsubrpc.py index 67f0c245e1..138607b1ac 100644 --- a/dimos/protocol/rpc/pubsubrpc.py +++ b/dimos/protocol/rpc/pubsubrpc.py @@ -35,7 +35,7 @@ ) from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub -from dimos.protocol.rpc.spec import RPC, Args, RPCClient, RPCInspectable, RPCServer +from dimos.protocol.rpc.spec import Args, RPCClient, RPCInspectable, RPCServer, RPCSpec from dimos.protocol.service.spec import Service MsgT = TypeVar("MsgT") @@ -57,7 +57,7 @@ class RPCRes(TypedDict): res: Any -class PubSubRPCMixin(RPC, PubSub[TopicT, MsgT], Generic[TopicT, MsgT]): +class PubSubRPCMixin(RPCSpec, PubSub[TopicT, MsgT], Generic[TopicT, MsgT]): @abstractmethod def topicgen(self, name: str, req_or_res: bool) -> TopicT: ... @@ -82,7 +82,6 @@ def call(self, name: str, arguments: Args, cb: Optional[Callable]): def call_cb(self, name: str, arguments: Args, cb: Callable) -> Any: topic_req = self.topicgen(name, False) topic_res = self.topicgen(name, True) - msg_id = float(time.time()) req: RPCReq = {"name": name, "args": arguments, "id": msg_id} diff --git a/dimos/protocol/rpc/spec.py b/dimos/protocol/rpc/spec.py index 113b5a8531..fbb99d661d 100644 --- a/dimos/protocol/rpc/spec.py +++ b/dimos/protocol/rpc/spec.py @@ -86,4 +86,4 @@ def override_f(*args, fname=fname, **kwargs): self.serve_rpc(override_f, topic) -class RPC(RPCServer, RPCClient): ... +class RPCSpec(RPCServer, RPCClient): ... diff --git a/dimos/protocol/service/__init__.py b/dimos/protocol/service/__init__.py new file mode 100644 index 0000000000..ce8a823f86 --- /dev/null +++ b/dimos/protocol/service/__init__.py @@ -0,0 +1,2 @@ +from dimos.protocol.service.lcmservice import LCMService +from dimos.protocol.service.spec import Service diff --git a/dimos/protocol/skill/__init__.py b/dimos/protocol/skill/__init__.py new file mode 100644 index 0000000000..85b6146f56 --- /dev/null +++ b/dimos/protocol/skill/__init__.py @@ -0,0 +1,2 @@ +from dimos.protocol.skill.agent_interface import AgentInterface, SkillState +from dimos.protocol.skill.skill import SkillContainer, skill diff --git a/dimos/protocol/skill/agent_interface.py b/dimos/protocol/skill/agent_interface.py new file mode 100644 index 0000000000..8a9926d028 --- /dev/null +++ b/dimos/protocol/skill/agent_interface.py @@ -0,0 +1,236 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from copy import copy +from dataclasses import dataclass +from enum import Enum +from pprint import pformat +from typing import Any, Callable, Optional + +from dimos.protocol.skill.comms import AgentMsg, LCMSkillComms, MsgType, SkillCommsSpec +from dimos.protocol.skill.skill import SkillConfig, SkillContainer +from dimos.protocol.skill.types import Reducer, Return, Stream +from dimos.types.timestamped import TimestampedCollection +from dimos.utils.logging_config import setup_logger + +logger = setup_logger("dimos.protocol.skill.agent_interface") + + +@dataclass +class AgentInputConfig: + agent_comms: type[SkillCommsSpec] = LCMSkillComms + + +class SkillStateEnum(Enum): + pending = 0 + running = 1 + returned = 2 + error = 3 + + +# TODO pending timeout, running timeout, etc. +class SkillState(TimestampedCollection): + name: str + state: SkillStateEnum + skill_config: SkillConfig + + def __init__(self, name: str, skill_config: Optional[SkillConfig] = None) -> None: + super().__init__() + self.skill_config = skill_config or SkillConfig( + name=name, stream=Stream.none, ret=Return.none, reducer=Reducer.none + ) + + self.state = SkillStateEnum.pending + self.name = name + + # returns True if the agent should be called for this message + def handle_msg(self, msg: AgentMsg) -> bool: + self.add(msg) + + if msg.type == MsgType.stream: + if ( + self.skill_config.stream == Stream.none + or self.skill_config.stream == Stream.passive + ): + return False + if self.skill_config.stream == Stream.call_agent: + return True + + if msg.type == MsgType.ret: + self.state = SkillStateEnum.returned + if self.skill_config.ret == Return.call_agent: + return True + return False + + if msg.type == MsgType.error: + self.state = SkillStateEnum.error + return True + + if msg.type == MsgType.start: + self.state = SkillStateEnum.running + return False + + return False + + def __str__(self) -> str: + head = f"SkillState(state={self.state}" + + if self.state == SkillStateEnum.returned or self.state == SkillStateEnum.error: + head += ", ran for=" + else: + head += ", running for=" + + head += f"{self.duration():.2f}s" + + if len(self): + return head + f", messages={list(self._items)})" + return head + ", No Messages)" + + +class AgentInterface(SkillContainer): + _static_containers: list[SkillContainer] + _dynamic_containers: list[SkillContainer] + _skill_state: dict[str, SkillState] + _skills: dict[str, SkillConfig] + _agent_callback: Optional[Callable[[dict[str, SkillState]], Any]] = None + + # Agent callback is called with a state snapshot once system decides + # that agents needs to be woken up, according to inputs from active skills + def __init__( + self, agent_callback: Optional[Callable[[dict[str, SkillState]], Any]] = None + ) -> None: + super().__init__() + self._agent_callback = agent_callback + self._static_containers = [] + self._dynamic_containers = [] + self._skills = {} + self._skill_state = {} + + def start(self) -> None: + self.agent_comms.start() + self.agent_comms.subscribe(self.handle_message) + + def stop(self) -> None: + self.agent_comms.stop() + + # This is used by agent to call skills + def execute_skill(self, skill_name: str, *args, **kwargs) -> None: + skill_config = self.get_skill_config(skill_name) + if not skill_config: + logger.error( + f"Skill {skill_name} not found in registered skills, but agent tried to call it (did a dynamic skill expire?)" + ) + return + + # This initializes the skill state if it doesn't exist + self._skill_state[skill_name] = SkillState(name=skill_name, skill_config=skill_config) + return skill_config.call(*args, **kwargs) + + # Receives a message from active skill + # Updates local skill state (appends to streamed data if needed etc) + # + # Checks if agent needs to be called (if ToolConfig has Return=call_agent or Stream=call_agent) + def handle_message(self, msg: AgentMsg) -> None: + logger.info(f"{msg.skill_name} - {msg}") + + if self._skill_state.get(msg.skill_name) is None: + logger.warn( + f"Skill state for {msg.skill_name} not found, (skill not called by our agent?) initializing. (message received: {msg})" + ) + self._skill_state[msg.skill_name] = SkillState(name=msg.skill_name) + + should_call_agent = self._skill_state[msg.skill_name].handle_msg(msg) + if should_call_agent: + self.call_agent() + + # Returns a snapshot of the current state of skill runs. + # + # If clear=True, it will assume the snapshot is being sent to an agent + # and will clear the finished skill runs from the state + def state_snapshot(self, clear: bool = True) -> dict[str, SkillState]: + if not clear: + return self._skill_state + + ret = copy(self._skill_state) + + to_delete = [] + # Since state is exported, we can clear the finished skill runs + for skill_name, skill_run in self._skill_state.items(): + if skill_run.state == SkillStateEnum.returned: + logger.info(f"Skill {skill_name} finished") + to_delete.append(skill_name) + if skill_run.state == SkillStateEnum.error: + logger.error(f"Skill run error for {skill_name}") + to_delete.append(skill_name) + + for skill_name in to_delete: + logger.debug(f"{skill_name} finished, removing from state") + del self._skill_state[skill_name] + + return ret + + def call_agent(self) -> None: + """Call the agent with the current state of skill runs.""" + logger.info(f"Calling agent with current skill state: {self.state_snapshot(clear=False)}") + + state = self.state_snapshot(clear=True) + + if self._agent_callback: + self._agent_callback(state) + + def __str__(self): + # Convert objects to their string representations + def stringify_value(obj): + if isinstance(obj, dict): + return {k: stringify_value(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [stringify_value(item) for item in obj] + else: + return str(obj) + + ret = stringify_value(self._skill_state) + + return f"AgentInput({pformat(ret, indent=2, depth=3, width=120, compact=True)})" + + # Given skillcontainers can run remotely, we are + # Caching available skills from static containers + # + # Dynamic containers will be queried at runtime via + # .skills() method + def register_skills(self, container: SkillContainer): + if not container.dynamic_skills: + logger.info(f"Registering static skill container, {container}") + self._static_containers.append(container) + for name, skill_config in container.skills().items(): + self._skills[name] = skill_config.bind(getattr(container, name)) + else: + logger.info(f"Registering dynamic skill container, {container}") + self._dynamic_containers.append(container) + + def get_skill_config(self, skill_name: str) -> Optional[SkillConfig]: + skill_config = self._skills.get(skill_name) + if not skill_config: + skill_config = self.skills().get(skill_name) + return skill_config + + def skills(self) -> dict[str, SkillConfig]: + # Static container skilling is already cached + all_skills: dict[str, SkillConfig] = {**self._skills} + + # Then aggregate skills from dynamic containers + for container in self._dynamic_containers: + for skill_name, skill_config in container.skills().items(): + all_skills[skill_name] = skill_config.bind(getattr(container, skill_name)) + + return all_skills diff --git a/dimos/protocol/skill/comms.py b/dimos/protocol/skill/comms.py new file mode 100644 index 0000000000..d6e9e73bf0 --- /dev/null +++ b/dimos/protocol/skill/comms.py @@ -0,0 +1,94 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from abc import abstractmethod +from dataclasses import dataclass +from enum import Enum +from typing import Callable, Generic, Optional, TypeVar, Union + +from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic +from dimos.protocol.pubsub.spec import PubSub +from dimos.protocol.service import Service +from dimos.protocol.skill.types import AgentMsg, Call, MsgType, Reducer, SkillConfig, Stream +from dimos.types.timestamped import Timestamped + + +# defines a protocol for communication between skills and agents +class SkillCommsSpec: + @abstractmethod + def publish(self, msg: AgentMsg) -> None: ... + + @abstractmethod + def subscribe(self, cb: Callable[[AgentMsg], None]) -> None: ... + + @abstractmethod + def start(self) -> None: ... + + @abstractmethod + def stop(self) -> None: ... + + +MsgT = TypeVar("MsgT") +TopicT = TypeVar("TopicT") + + +@dataclass +class PubSubCommsConfig(Generic[TopicT, MsgT]): + topic: Optional[TopicT] = None # Required field but needs default for dataclass inheritance + pubsub: Union[type[PubSub[TopicT, MsgT]], PubSub[TopicT, MsgT], None] = None + autostart: bool = True + + +class PubSubComms(Service[PubSubCommsConfig], SkillCommsSpec): + default_config: type[PubSubCommsConfig] = PubSubCommsConfig + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + pubsub_config = getattr(self.config, "pubsub", None) + if pubsub_config is not None: + if callable(pubsub_config): + self.pubsub = pubsub_config() + else: + self.pubsub = pubsub_config + else: + raise ValueError("PubSub configuration is missing") + + if getattr(self.config, "autostart", True): + self.start() + + def start(self) -> None: + self.pubsub.start() + + def stop(self): + self.pubsub.stop() + + def publish(self, msg: AgentMsg) -> None: + self.pubsub.publish(self.config.topic, msg) + + def subscribe(self, cb: Callable[[AgentMsg], None]) -> None: + self.pubsub.subscribe(self.config.topic, lambda msg, topic: cb(msg)) + + +@dataclass +class LCMCommsConfig(PubSubCommsConfig[str, AgentMsg]): + topic: str = "/agent" + pubsub: Union[type[PubSub], PubSub, None] = PickleLCM + # lcm needs to be started only if receiving + # skill comms are broadcast only in modules so we don't autostart + autostart: bool = False + + +class LCMSkillComms(PubSubComms): + default_config: type[LCMCommsConfig] = LCMCommsConfig diff --git a/dimos/protocol/skill/skill.py b/dimos/protocol/skill/skill.py new file mode 100644 index 0000000000..e0f868b5f9 --- /dev/null +++ b/dimos/protocol/skill/skill.py @@ -0,0 +1,97 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +from typing import Any, Callable, Optional + +from dimos.core import rpc +from dimos.protocol.skill.comms import LCMSkillComms, SkillCommsSpec +from dimos.protocol.skill.types import ( + AgentMsg, + MsgType, + Reducer, + Return, + SkillConfig, + Stream, +) + + +def skill(reducer=Reducer.latest, stream=Stream.none, ret=Return.call_agent): + def decorator(f: Callable[..., Any]) -> Any: + def wrapper(self, *args, **kwargs): + skill = f"{f.__name__}" + + if kwargs.get("skillcall"): + del kwargs["skillcall"] + + def run_function(): + self.agent_comms.publish(AgentMsg(skill, None, type=MsgType.start)) + try: + val = f(self, *args, **kwargs) + self.agent_comms.publish(AgentMsg(skill, val, type=MsgType.ret)) + except Exception as e: + self.agent_comms.publish(AgentMsg(skill, str(e), type=MsgType.error)) + + thread = threading.Thread(target=run_function) + thread.start() + return None + + return f(self, *args, **kwargs) + + skill_config = SkillConfig(name=f.__name__, reducer=reducer, stream=stream, ret=ret) + + # implicit RPC call as well + wrapper.__rpc__ = True # type: ignore[attr-defined] + wrapper._skill = skill_config # type: ignore[attr-defined] + wrapper.__name__ = f.__name__ # Preserve original function name + wrapper.__doc__ = f.__doc__ # Preserve original docstring + return wrapper + + return decorator + + +class CommsSpec: + agent: type[SkillCommsSpec] + + +class LCMComms(CommsSpec): + agent: type[SkillCommsSpec] = LCMSkillComms + + +# here we can have also dynamic skills potentially +# agent can check .skills each time when introspecting +class SkillContainer: + comms: CommsSpec = LCMComms + _agent_comms: Optional[SkillCommsSpec] = None + dynamic_skills = False + + def __str__(self) -> str: + return f"SkillContainer({self.__class__.__name__})" + + @rpc + def skills(self) -> dict[str, SkillConfig]: + # Avoid recursion by excluding this property itself + return { + name: getattr(self, name)._skill + for name in dir(self) + if not name.startswith("_") + and name != "skills" + and hasattr(getattr(self, name), "_skill") + } + + @property + def agent_comms(self) -> SkillCommsSpec: + if self._agent_comms is None: + self._agent_comms = self.comms.agent() + return self._agent_comms diff --git a/dimos/protocol/skill/test_skill.py b/dimos/protocol/skill/test_skill.py new file mode 100644 index 0000000000..9bf7e85a35 --- /dev/null +++ b/dimos/protocol/skill/test_skill.py @@ -0,0 +1,130 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +from dimos.protocol.skill.agent_interface import AgentInterface +from dimos.protocol.skill.skill import SkillContainer, skill + + +class TestContainer(SkillContainer): + @skill() + def add(self, x: int, y: int) -> int: + return x + y + + @skill() + def delayadd(self, x: int, y: int) -> int: + time.sleep(0.5) + return x + y + + +def test_introspect_skill(): + testContainer = TestContainer() + print(testContainer.skills()) + + +def test_internals(): + agentInterface = AgentInterface() + agentInterface.start() + + testContainer = TestContainer() + + agentInterface.register_skills(testContainer) + + # skillcall=True makes the skill function exit early, + # it doesn't behave like a blocking function, + # + # return is passed as AgentMsg to the agent topic + testContainer.delayadd(2, 4, skillcall=True) + testContainer.add(1, 2, skillcall=True) + + time.sleep(0.25) + print(agentInterface) + + time.sleep(0.75) + print(agentInterface) + + print(agentInterface.state_snapshot()) + + print(agentInterface.skills()) + + print(agentInterface) + + agentInterface.execute_skill("delayadd", 1, 2) + + time.sleep(0.25) + print(agentInterface) + time.sleep(0.75) + + print(agentInterface) + + +def test_standard_usage(): + agentInterface = AgentInterface(agent_callback=print) + agentInterface.start() + + testContainer = TestContainer() + + agentInterface.register_skills(testContainer) + + # we can investigate skills + print(agentInterface.skills()) + + # we can execute a skill + agentInterface.execute_skill("delayadd", 1, 2) + + # while skill is executing, we can introspect the state + # (we see that the skill is running) + time.sleep(0.25) + print(agentInterface) + time.sleep(0.75) + + # after the skill has finished, we can see the result + # and the skill state + print(agentInterface) + + +def test_module(): + from dimos.core import Module, start + + class MockModule(Module, SkillContainer): + def __init__(self): + super().__init__() + SkillContainer.__init__(self) + + @skill() + def add(self, x: int, y: int) -> int: + time.sleep(0.5) + return x * y + + agentInterface = AgentInterface(agent_callback=print) + agentInterface.start() + + dimos = start(1) + mock_module = dimos.deploy(MockModule) + + agentInterface.register_skills(mock_module) + + # we can execute a skill + agentInterface.execute_skill("add", 1, 2) + + # while skill is executing, we can introspect the state + # (we see that the skill is running) + time.sleep(0.25) + print(agentInterface) + time.sleep(0.75) + + # after the skill has finished, we can see the result + # and the skill state + print(agentInterface) diff --git a/dimos/protocol/skill/types.py b/dimos/protocol/skill/types.py new file mode 100644 index 0000000000..e4b09a7ef9 --- /dev/null +++ b/dimos/protocol/skill/types.py @@ -0,0 +1,140 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from dataclasses import dataclass +from enum import Enum +from typing import Any, Callable, Generic, Optional, TypeVar + +from dimos.types.timestamped import Timestamped + + +class Call(Enum): + Implicit = 0 + Explicit = 1 + + +class Reducer(Enum): + none = 0 + all = 1 + latest = 2 + average = 3 + + +class Stream(Enum): + # no streaming + none = 0 + # passive stream, doesn't schedule an agent call, but returns the value to the agent + passive = 1 + # calls the agent with every value emitted, schedules an agent call + call_agent = 2 + + +class Return(Enum): + # doesn't return anything to an agent + none = 0 + # returns the value to the agent, but doesn't schedule an agent call + passive = 1 + # calls the agent with the value, scheduling an agent call + call_agent = 2 + + +@dataclass +class SkillConfig: + name: str + reducer: Reducer + stream: Stream + ret: Return + f: Callable | None = None + autostart: bool = False + + def bind(self, f: Callable) -> "SkillConfig": + self.f = f + return self + + def call(self, *args, **kwargs) -> Any: + if self.f is None: + raise ValueError( + "Function is not bound to the SkillConfig. This should be called only within AgentListener." + ) + + return self.f(*args, **kwargs, skillcall=True) + + def __str__(self): + parts = [f"name={self.name}"] + + # Only show reducer if stream is not none (streaming is happening) + if self.stream != Stream.none: + reducer_name = "unknown" + if self.reducer == Reducer.latest: + reducer_name = "latest" + elif self.reducer == Reducer.all: + reducer_name = "all" + elif self.reducer == Reducer.average: + reducer_name = "average" + parts.append(f"reducer={reducer_name}") + parts.append(f"stream={self.stream.name}") + + # Always show return mode + parts.append(f"ret={self.ret.name}") + return f"Skill({', '.join(parts)})" + + +class MsgType(Enum): + pending = 0 + start = 1 + stream = 2 + ret = 3 + error = 4 + + +class AgentMsg(Timestamped): + ts: float + type: MsgType + + def __init__( + self, + skill_name: str, + content: str | int | float | dict | list, + type: MsgType = MsgType.ret, + ) -> None: + self.ts = time.time() + self.skill_name = skill_name + self.content = content + self.type = type + + def __repr__(self): + return self.__str__() + + @property + def end(self) -> bool: + return self.type == MsgType.ret or self.type == MsgType.error + + @property + def start(self) -> bool: + return self.type == MsgType.start + + def __str__(self): + time_ago = time.time() - self.ts + + if self.type == MsgType.start: + return f"Start({time_ago:.1f}s ago)" + if self.type == MsgType.ret: + return f"Ret({time_ago:.1f}s ago, val={self.content})" + if self.type == MsgType.error: + return f"Error({time_ago:.1f}s ago, val={self.content})" + if self.type == MsgType.pending: + return f"Pending({time_ago:.1f}s ago)" + if self.type == MsgType.stream: + return f"Stream({time_ago:.1f}s ago, val={self.content})" diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index f948c63751..858a2bdaad 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import bisect from datetime import datetime, timezone -from typing import Generic, Iterable, List, Optional, Tuple, TypedDict, TypeVar, Union +from typing import Generic, Iterable, Optional, Tuple, TypedDict, TypeVar, Union + from sortedcontainers import SortedList -import bisect # any class that carries a timestamp should inherit from this # this allows us to work with timeseries in consistent way, allign messages, replay etc @@ -159,6 +160,16 @@ def slice_by_time(self, start: float, end: float) -> "TimestampedCollection[T]": end_idx = bisect.bisect_right(timestamps, end) return TimestampedCollection(self._items[start_idx:end_idx]) + @property + def start_ts(self) -> Optional[float]: + """Get the start timestamp of the collection.""" + return self._items[0].ts if self._items else None + + @property + def end_ts(self) -> Optional[float]: + """Get the end timestamp of the collection.""" + return self._items[-1].ts if self._items else None + def __len__(self) -> int: return len(self._items) diff --git a/dimos/utils/cli/agentspy/agentspy.py b/dimos/utils/cli/agentspy/agentspy.py new file mode 100644 index 0000000000..0c25a89612 --- /dev/null +++ b/dimos/utils/cli/agentspy/agentspy.py @@ -0,0 +1,372 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +import logging +import threading +import time +from typing import Callable, Dict, Optional + +from rich.text import Text +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Container, Horizontal, Vertical +from textual.reactive import reactive +from textual.widgets import DataTable, Footer, Header, RichLog + +from dimos.protocol.skill.agent_interface import AgentInterface, SkillState, SkillStateEnum +from dimos.protocol.skill.comms import AgentMsg, LCMSkillComms +from dimos.protocol.skill.types import MsgType + + +class AgentSpy: + """Spy on agent skill executions via LCM messages.""" + + def __init__(self): + self.agent_interface = AgentInterface() + self.message_callbacks: list[Callable[[Dict[str, SkillState]], None]] = [] + self._lock = threading.Lock() + self._latest_state: Dict[str, SkillState] = {} + + def start(self): + """Start spying on agent messages.""" + # Start the agent interface + self.agent_interface.start() + + # Subscribe to the agent interface's comms + self.agent_interface.agent_comms.subscribe(self._handle_message) + + def stop(self): + """Stop spying.""" + self.agent_interface.stop() + + def _handle_message(self, msg: AgentMsg): + """Handle incoming agent messages.""" + + # Small delay to ensure agent_interface has processed the message + def delayed_update(): + time.sleep(0.1) + with self._lock: + self._latest_state = self.agent_interface.state_snapshot(clear=False) + for callback in self.message_callbacks: + callback(self._latest_state) + + # Run in separate thread to not block LCM + threading.Thread(target=delayed_update, daemon=True).start() + + def subscribe(self, callback: Callable[[Dict[str, SkillState]], None]): + """Subscribe to state updates.""" + self.message_callbacks.append(callback) + + def get_state(self) -> Dict[str, SkillState]: + """Get current state snapshot.""" + with self._lock: + return self._latest_state.copy() + + +def state_color(state: SkillStateEnum) -> str: + """Get color for skill state.""" + if state == SkillStateEnum.pending: + return "yellow" + elif state == SkillStateEnum.running: + return "green" + elif state == SkillStateEnum.returned: + return "cyan" + elif state == SkillStateEnum.error: + return "red" + return "white" + + +def format_duration(duration: float) -> str: + """Format duration in human readable format.""" + if duration < 1: + return f"{duration * 1000:.0f}ms" + elif duration < 60: + return f"{duration:.1f}s" + elif duration < 3600: + return f"{duration / 60:.1f}m" + else: + return f"{duration / 3600:.1f}h" + + +class AgentSpyLogFilter(logging.Filter): + """Filter to suppress specific log messages in agentspy.""" + + def filter(self, record): + # Suppress the "Skill state not found" warning as it's expected in agentspy + if ( + record.levelname == "WARNING" + and "Skill state for" in record.getMessage() + and "not found" in record.getMessage() + ): + return False + return True + + +class TextualLogHandler(logging.Handler): + """Custom log handler that sends logs to a Textual RichLog widget.""" + + def __init__(self, log_widget: RichLog): + super().__init__() + self.log_widget = log_widget + # Add filter to suppress expected warnings + self.addFilter(AgentSpyLogFilter()) + + def emit(self, record): + """Emit a log record to the RichLog widget.""" + try: + msg = self.format(record) + # Color based on level + if record.levelno >= logging.ERROR: + style = "bold red" + elif record.levelno >= logging.WARNING: + style = "yellow" + elif record.levelno >= logging.INFO: + style = "green" + else: + style = "dim" + + self.log_widget.write(Text(msg, style=style)) + except Exception: + self.handleError(record) + + +class AgentSpyApp(App): + """A real-time CLI dashboard for agent skill monitoring using Textual.""" + + CSS = """ + Screen { + layout: vertical; + } + Vertical { + height: 100%; + } + DataTable { + height: 70%; + border: none; + background: black; + } + RichLog { + height: 30%; + border: none; + background: black; + border-top: solid $primary; + } + """ + + BINDINGS = [ + Binding("q", "quit", "Quit"), + Binding("c", "clear", "Clear History"), + Binding("l", "toggle_logs", "Toggle Logs"), + Binding("ctrl+c", "quit", "Quit", show=False), + ] + + show_logs = reactive(True) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.spy = AgentSpy() + self.table: Optional[DataTable] = None + self.log_view: Optional[RichLog] = None + self.skill_history: list[tuple[str, SkillState, float]] = [] # (name, state, start_time) + self.log_handler: Optional[TextualLogHandler] = None + + def compose(self) -> ComposeResult: + self.table = DataTable(zebra_stripes=False, cursor_type=None) + self.table.add_column("Skill Name") + self.table.add_column("State") + self.table.add_column("Duration") + self.table.add_column("Start Time") + self.table.add_column("Messages") + self.table.add_column("Details") + + self.log_view = RichLog(markup=True, wrap=True) + + with Vertical(): + yield self.table + yield self.log_view + + yield Footer() + + def on_mount(self): + """Start the spy when app mounts.""" + self.theme = "flexoki" + + # Remove ALL existing handlers from ALL loggers to prevent console output + # This is needed because setup_logger creates loggers with propagate=False + for name in logging.root.manager.loggerDict: + logger = logging.getLogger(name) + logger.handlers.clear() + logger.propagate = True + + # Clear root logger handlers too + logging.root.handlers.clear() + + # Set up custom log handler to show logs in the UI + if self.log_view: + self.log_handler = TextualLogHandler(self.log_view) + + # Custom formatter that shortens the logger name + class ShortNameFormatter(logging.Formatter): + def format(self, record): + # Remove the common prefix from logger names + if record.name.startswith("dimos.protocol.skill."): + record.name = record.name.replace("dimos.protocol.skill.", "") + return super().format(record) + + self.log_handler.setFormatter( + ShortNameFormatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S" + ) + ) + # Add handler to root logger + root_logger = logging.getLogger() + root_logger.addHandler(self.log_handler) + root_logger.setLevel(logging.INFO) + + # Set initial visibility + if not self.show_logs: + self.log_view.visible = False + self.table.styles.height = "100%" + + self.spy.subscribe(self.update_state) + self.spy.start() + + # Also set up periodic refresh to update durations + self.set_interval(0.5, self.refresh_table) + + def on_unmount(self): + """Stop the spy when app unmounts.""" + self.spy.stop() + # Remove log handler to prevent errors on shutdown + if self.log_handler: + root_logger = logging.getLogger() + root_logger.removeHandler(self.log_handler) + + def update_state(self, state: Dict[str, SkillState]): + """Update state from spy callback.""" + # Update history with current state + current_time = time.time() + + # Add new skills or update existing ones + for skill_name, skill_state in state.items(): + # Find if skill already in history + found = False + for i, (name, old_state, start_time) in enumerate(self.skill_history): + if name == skill_name: + # Update existing entry + self.skill_history[i] = (skill_name, skill_state, start_time) + found = True + break + + if not found: + # Add new entry with current time as start + start_time = current_time + if len(skill_state) > 0: + # Use first message timestamp if available + start_time = skill_state._items[0].ts + self.skill_history.append((skill_name, skill_state, start_time)) + + # Schedule UI update + self.call_from_thread(self.refresh_table) + + def refresh_table(self): + """Refresh the table display.""" + if not self.table: + return + + # Clear table + self.table.clear(columns=False) + + # Sort by start time (newest first) + sorted_history = sorted(self.skill_history, key=lambda x: x[2], reverse=True) + + # Get terminal height and calculate how many rows we can show + height = self.size.height - 6 # Account for header, footer, column headers + max_rows = max(1, height) + + # Show only top N entries + for skill_name, skill_state, start_time in sorted_history[:max_rows]: + # Calculate how long ago it started + time_ago = time.time() - start_time + start_str = format_duration(time_ago) + " ago" + + # Duration + duration_str = format_duration(skill_state.duration()) + + # Message count + msg_count = len(skill_state) + + # Details based on state and last message + details = "" + if skill_state.state == SkillStateEnum.error and msg_count > 0: + # Show error message + last_msg = skill_state._items[-1] + if last_msg.type == MsgType.error: + details = str(last_msg.content)[:40] + elif skill_state.state == SkillStateEnum.returned and msg_count > 0: + # Show return value + last_msg = skill_state._items[-1] + if last_msg.type == MsgType.ret: + details = f"→ {str(last_msg.content)[:37]}" + elif skill_state.state == SkillStateEnum.running: + # Show progress indicator + details = "⋯ " + "▸" * min(int(time_ago), 20) + + # Add row with colored state + self.table.add_row( + Text(skill_name, style="white"), + Text(skill_state.state.name, style=state_color(skill_state.state)), + Text(duration_str, style="dim"), + Text(start_str, style="dim"), + Text(str(msg_count), style="dim"), + Text(details, style="dim white"), + ) + + def action_clear(self): + """Clear the skill history.""" + self.skill_history.clear() + self.refresh_table() + + def action_toggle_logs(self): + """Toggle the log view visibility.""" + self.show_logs = not self.show_logs + if self.show_logs: + self.table.styles.height = "70%" + else: + self.table.styles.height = "100%" + self.log_view.visible = self.show_logs + + +def main(): + """Main entry point for agentspy CLI.""" + import sys + + # Check if running in web mode + if len(sys.argv) > 1 and sys.argv[1] == "web": + import os + + from textual_serve.server import Server + + server = Server(f"python {os.path.abspath(__file__)}") + server.serve() + else: + app = AgentSpyApp() + app.run() + + +if __name__ == "__main__": + main() diff --git a/dimos/utils/cli/agentspy/demo_agentspy.py b/dimos/utils/cli/agentspy/demo_agentspy.py new file mode 100644 index 0000000000..2b39674a7b --- /dev/null +++ b/dimos/utils/cli/agentspy/demo_agentspy.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Demo script that runs skills in the background while agentspy monitors them.""" + +import time +import threading +from dimos.protocol.skill.agent_interface import AgentInterface +from dimos.protocol.skill.skill import SkillContainer, skill + + +class DemoSkills(SkillContainer): + @skill() + def count_to(self, n: int) -> str: + """Count to n with delays.""" + for i in range(n): + time.sleep(0.5) + return f"Counted to {n}" + + @skill() + def compute_fibonacci(self, n: int) -> int: + """Compute nth fibonacci number.""" + if n <= 1: + return n + a, b = 0, 1 + for _ in range(2, n + 1): + time.sleep(0.1) # Simulate computation + a, b = b, a + b + return b + + @skill() + def simulate_error(self) -> None: + """Skill that always errors.""" + time.sleep(0.3) + raise RuntimeError("Simulated error for testing") + + @skill() + def quick_task(self, name: str) -> str: + """Quick task that completes fast.""" + time.sleep(0.1) + return f"Quick task '{name}' done!" + + +def run_demo_skills(): + """Run demo skills in background.""" + # Create and start agent interface + agent_interface = AgentInterface() + agent_interface.start() + + # Register skills + demo_skills = DemoSkills() + agent_interface.register_skills(demo_skills) + + # Run various skills periodically + def skill_runner(): + counter = 0 + while True: + time.sleep(2) + + # Run different skills based on counter + if counter % 4 == 0: + demo_skills.count_to(3, skillcall=True) + elif counter % 4 == 1: + demo_skills.compute_fibonacci(10, skillcall=True) + elif counter % 4 == 2: + demo_skills.quick_task(f"task-{counter}", skillcall=True) + else: + try: + demo_skills.simulate_error(skillcall=True) + except: + pass # Expected to fail + + counter += 1 + + # Start skill runner in background + thread = threading.Thread(target=skill_runner, daemon=True) + thread.start() + + print("Demo skills running in background. Start agentspy in another terminal to monitor.") + print("Run: agentspy") + + # Keep running + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nDemo stopped.") + + +if __name__ == "__main__": + run_demo_skills() diff --git a/dimos/utils/cli/foxglove_bridge/run_foxglove_bridge.py b/dimos/utils/cli/foxglove_bridge/run_foxglove_bridge.py index bbcb70faee..a0cf07ffb6 100644 --- a/dimos/utils/cli/foxglove_bridge/run_foxglove_bridge.py +++ b/dimos/utils/cli/foxglove_bridge/run_foxglove_bridge.py @@ -58,5 +58,9 @@ def bridge_thread(): print("Shutting down...") -if __name__ == "__main__": +def main(): run_bridge_example() + + +if __name__ == "__main__": + main() diff --git a/dimos/utils/cli/lcmspy/run_lcmspy.py b/dimos/utils/cli/lcmspy/run_lcmspy.py index 17a9d0bbc6..13288cafe9 100644 --- a/dimos/utils/cli/lcmspy/run_lcmspy.py +++ b/dimos/utils/cli/lcmspy/run_lcmspy.py @@ -118,7 +118,7 @@ def refresh_table(self): ) -if __name__ == "__main__": +def main(): import sys if len(sys.argv) > 1 and sys.argv[1] == "web": @@ -130,3 +130,7 @@ def refresh_table(self): server.serve() else: LCMSpyApp().run() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index fa0c73cbce..fcc62bf476 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,10 @@ dependencies = [ "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@ba3445d16be75a7ade6fb2a516b39a3e44319d5c" ] +[project.scripts] +lcmspy = "dimos.utils.cli.lcmspy.run_lcmspy:main" +foxglove-bridge = "dimos.utils.cli.foxglove_bridge.run_foxglove_bridge:main" +agentspy = "dimos.utils.cli.agentspy.agentspy:main" [project.optional-dependencies] manipulation = [