From 3542495418a5568119476bd2f3d15accccbf17c3 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Sat, 11 Oct 2025 08:39:51 +0300 Subject: [PATCH 1/3] add autoconnect back --- dimos/core/__init__.py | 56 ++-- dimos/core/blueprints.py | 154 +++++++++++ dimos/core/test_blueprints.py | 40 +++ dimos/navigation/bt_navigator/navigator.py | 15 ++ .../frontier_exploration/__init__.py | 2 +- .../wavefront_frontier_goal_selector.py | 5 + dimos/navigation/global_planner/__init__.py | 4 +- dimos/navigation/global_planner/planner.py | 5 + .../local_planner/holonomic_local_planner.py | 5 + .../navigation/local_planner/local_planner.py | 4 + dimos/perception/detection2d/module2D.py | 8 +- dimos/perception/object_tracker.py | 5 + dimos/perception/spatial_perception.py | 5 + dimos/robot/foxglove_bridge.py | 5 + dimos/robot/unitree_webrtc/depth_module.py | 5 + dimos/robot/unitree_webrtc/type/map.py | 5 + dimos/robot/unitree_webrtc/unitree_g1.py | 4 + dimos/robot/unitree_webrtc/unitree_go2.py | 4 + .../unitree_webrtc/unitree_go2_autoconnect.py | 249 ++++++++++++++++++ dimos/utils/monitoring.py | 6 +- .../web/websocket_vis/websocket_vis_module.py | 5 + 21 files changed, 563 insertions(+), 28 deletions(-) create mode 100644 dimos/core/blueprints.py create mode 100644 dimos/core/test_blueprints.py create mode 100644 dimos/robot/unitree_webrtc/unitree_go2_autoconnect.py diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 0bd3603126..249197f98a 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -49,6 +49,39 @@ ] +class RpcCall: + def __init__(self, original_method, rpc, name, remote_name, unsub_fns): + self.original_method = original_method + self.rpc = rpc + self.name = name + self.remote_name = remote_name + self._unsub_fns = unsub_fns + + if original_method: + self.__doc__ = original_method.__doc__ + self.__name__ = original_method.__name__ + self.__qualname__ = f"{self.__class__.__name__}.{original_method.__name__}" + + def __call__(self, *args, **kwargs): + # For stop/close/shutdown, use call_nowait to avoid deadlock + # (the remote side stops its RPC service before responding) + if self.name in ("stop"): + if self.rpc: + self.rpc.call_nowait(f"{self.remote_name}/{self.name}", (args, kwargs)) + self.stop_client() + return None + + result, unsub_fn = self.rpc.call_sync(f"{self.remote_name}/{self.name}", (args, kwargs)) + self._unsub_fns.append(unsub_fn) + return result + + def __getstate__(self): + return (self.original_method, self.name, self.remote_name, self._unsub_fns) + + def __setstate__(self, state): + self.original_method, self.name, self.remote_name, self._unsub_fns = state + + class CudaCleanupPlugin: """Dask worker plugin to cleanup CUDA resources on shutdown.""" @@ -125,29 +158,8 @@ def __getattr__(self, name: str): raise AttributeError(f"{name} is not found.") if name in self.rpcs: - # Get the original method to preserve its docstring original_method = getattr(self.actor_class, name, None) - - def rpc_call(*args, **kwargs): - # For stop/close/shutdown, use call_nowait to avoid deadlock - # (the remote side stops its RPC service before responding) - if name in ("stop", "close", "shutdown"): - if self.rpc: - self.rpc.call_nowait(f"{self.remote_name}/{name}", (args, kwargs)) - self.stop_client() - return None - - result, unsub_fn = self.rpc.call_sync(f"{self.remote_name}/{name}", (args, kwargs)) - self._unsub_fns.append(unsub_fn) - return result - - # Copy docstring and other attributes from original method - if original_method: - rpc_call.__doc__ = original_method.__doc__ - rpc_call.__name__ = original_method.__name__ - rpc_call.__qualname__ = f"{self.__class__.__name__}.{original_method.__name__}" - - return rpc_call + return RpcCall(original_method, self.rpc, name, self.remote_name, self._unsub_fns) # return super().__getattr__(name) # Try to avoid recursion by directly accessing attributes that are known diff --git a/dimos/core/blueprints.py b/dimos/core/blueprints.py new file mode 100644 index 0000000000..fa91e34145 --- /dev/null +++ b/dimos/core/blueprints.py @@ -0,0 +1,154 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field +from collections import defaultdict +from functools import cached_property +from types import MappingProxyType +from typing import Any, Mapping, get_origin, get_args + +from dimos.core.dimos import Dimos +from dimos.core.module import Module +from dimos.core.stream import In, Out +from dimos.core.transport import LCMTransport, pLCMTransport +from dimos.utils.generic import short_id + + +@dataclass(frozen=True) +class ModuleBlueprint: + module: type[Module] + incoming: dict[str, type] + outgoing: dict[str, type] + args: tuple[Any] + kwargs: dict[str, Any] + + +@dataclass(frozen=True) +class ModuleBlueprintSet: + blueprints: tuple[ModuleBlueprint, ...] + # TODO: Replace Any + transports: Mapping[tuple[str, type], Any] = field(default_factory=lambda: MappingProxyType({})) + + def with_transports(self, transports: dict[tuple[str, type], Any]) -> "ModuleBlueprintSet": + return ModuleBlueprintSet( + blueprints=self.blueprints, + transports=MappingProxyType({**self.transports, **transports}), + ) + + def _get_transport_for(self, name: str, type: type) -> Any: + transport = self.transports.get((name, type), None) + if transport: + return transport + + use_pickled = "lcm_encode" not in type.__dict__ + topic = f"/{name}" if self._is_name_unique(name) else f"/{short_id()}" + transport = pLCMTransport(topic) if use_pickled else LCMTransport(topic, type) + + return transport + + @cached_property + def _all_name_types(self) -> set[tuple[str, type]]: + all_name_types = set() + for blueprint in self.blueprints: + for name, type in blueprint.incoming.items(): + all_name_types.add((name, type)) + for name, type in blueprint.outgoing.items(): + all_name_types.add((name, type)) + return all_name_types + + def _is_name_unique(self, name: str) -> bool: + return sum(1 for n, _ in self._all_name_types if n == name) == 1 + + def build(self, n: int | None = None) -> Dimos: + dimos = Dimos(n=n) + + dimos.start() + + # Deploy all modules. + for blueprint in self.blueprints: + dimos.deploy(blueprint.module, *blueprint.args, **blueprint.kwargs) + + # Gather all the In/Out connections. + incoming = defaultdict(list) + outgoing = defaultdict(list) + for blueprint in self.blueprints: + for name, type in blueprint.incoming.items(): + incoming[(name, type)].append(blueprint.module) + for name, type in blueprint.outgoing.items(): + outgoing[(name, type)].append(blueprint.module) + + # Connect all In/Out connections by name and type. + for name, type in set(incoming.keys()).union(outgoing.keys()): + transport = self._get_transport_for(name, type) + for module in incoming[(name, type)] + outgoing[(name, type)]: + instance = dimos.get_instance(module) + getattr(instance, name).transport = transport + + # Gather all RPC methods. + rpc_methods = {} + for blueprint in self.blueprints: + for method_name in blueprint.module.rpcs.keys(): + method = getattr(dimos.get_instance(blueprint.module), method_name) + rpc_methods[f"{blueprint.module.__name__}_{method_name}"] = method + + # Fulfil method requests (so modules can call each other). + for blueprint in self.blueprints: + for method_name, method in blueprint.module.rpcs.items(): + if not method_name.startswith("set_"): + continue + linked_name = method_name.removeprefix("set_") + if linked_name not in rpc_methods: + continue + instance = dimos.get_instance(blueprint.module) + getattr(instance, method_name)(rpc_methods[linked_name]) + + dimos.start_all_modules() + + return dimos + + +def make_module_blueprint( + module: type[Module], args: tuple[Any], kwargs: dict[str, Any] +) -> ModuleBlueprint: + incoming: dict[str, type] = {} + outgoing: dict[str, type] = {} + + all_annotations = {} + for base_class in reversed(module.__mro__): + if hasattr(base_class, "__annotations__"): + all_annotations.update(base_class.__annotations__) + + for name, annotation in all_annotations.items(): + origin = get_origin(annotation) + if origin not in (In, Out): + continue + dict_ = incoming if origin == In else outgoing + dict_[name] = get_args(annotation)[0] + + return ModuleBlueprint( + module=module, incoming=incoming, outgoing=outgoing, args=args, kwargs=kwargs + ) + + +def create_module_blueprint(module: type[Module], *args: Any, **kwargs: Any) -> ModuleBlueprintSet: + blueprint = make_module_blueprint(module, args, kwargs) + return ModuleBlueprintSet(blueprints=(blueprint,)) + + +def autoconnect(*blueprints: ModuleBlueprintSet) -> ModuleBlueprintSet: + all_blueprints = tuple(bp for bs in blueprints for bp in bs.blueprints) + all_transports = dict(sum([list(x.transports.items()) for x in blueprints], [])) + return ModuleBlueprintSet( + blueprints=all_blueprints, transports=MappingProxyType(all_transports) + ) diff --git a/dimos/core/test_blueprints.py b/dimos/core/test_blueprints.py new file mode 100644 index 0000000000..078dbea87e --- /dev/null +++ b/dimos/core/test_blueprints.py @@ -0,0 +1,40 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dimos.core.blueprints import ModuleBlueprint, make_module_blueprint +from dimos.core.module import Module +from dimos.core.stream import In, Out + + +class Scratch: + pass + + +class Petting: + pass + + +class CatModule(Module): + pet_cat: In[Petting] + scratches: Out[Scratch] + + +def test_get_connection_set(): + assert make_module_blueprint(CatModule, args=(), kwargs={}) == ModuleBlueprint( + module=CatModule, + incoming={"pet_cat": Petting}, + outgoing={"scratches": Scratch}, + args=(), + kwargs={}, + ) diff --git a/dimos/navigation/bt_navigator/navigator.py b/dimos/navigation/bt_navigator/navigator.py index 33d516106f..9c05475e70 100644 --- a/dimos/navigation/bt_navigator/navigator.py +++ b/dimos/navigation/bt_navigator/navigator.py @@ -18,12 +18,14 @@ Navigator module for coordinating global and local planning. """ +from functools import partial import threading import time from enum import Enum from typing import Callable, Optional from dimos.core import Module, In, Out, rpc +from dimos.core.blueprints import create_module_blueprint from dimos.msgs.geometry_msgs import PoseStamped from dimos.msgs.nav_msgs import OccupancyGrid from dimos_lcm.std_msgs import String @@ -121,6 +123,16 @@ def __init__( logger.info("Navigator initialized with stuck detection") + @rpc + def set_HolonomicLocalPlanner_reset(self, callable) -> None: + self.reset_local_planner = callable + self.reset_local_planner.rpc = self.rpc + + @rpc + def set_HolonomicLocalPlanner_atgl(self, callable) -> None: + self.check_goal_reached = callable + self.check_goal_reached.rpc = self.rpc + @rpc def start(self): super().start() @@ -342,3 +354,6 @@ def stop_navigation(self) -> None: self.recovery_server.reset() # Reset recovery server when stopping logger.info("Navigator stopped") + + +behavior_tree_navigator = partial(create_module_blueprint, BehaviorTreeNavigator) diff --git a/dimos/navigation/frontier_exploration/__init__.py b/dimos/navigation/frontier_exploration/__init__.py index 388a5bfe6f..7236788842 100644 --- a/dimos/navigation/frontier_exploration/__init__.py +++ b/dimos/navigation/frontier_exploration/__init__.py @@ -1 +1 @@ -from .wavefront_frontier_goal_selector import WavefrontFrontierExplorer +from .wavefront_frontier_goal_selector import WavefrontFrontierExplorer, wavefront_frontier_explorer diff --git a/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py b/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py index 5acbf7b5bf..3d0d8a7f67 100644 --- a/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py +++ b/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py @@ -19,6 +19,7 @@ for autonomous navigation using the dimos Costmap and Vector types. """ +from functools import partial import threading from collections import deque from dataclasses import dataclass @@ -28,6 +29,7 @@ import numpy as np from dimos.core import Module, In, Out, rpc +from dimos.core.blueprints import create_module_blueprint from dimos.msgs.geometry_msgs import PoseStamped, Vector3 from dimos.msgs.nav_msgs import OccupancyGrid, CostValues from dimos.utils.logging_config import setup_logger @@ -810,3 +812,6 @@ def _exploration_loop(self): f"No frontier found (attempt {consecutive_failures}/{max_consecutive_failures}). Retrying in 2 seconds..." ) threading.Event().wait(2.0) + + +wavefront_frontier_explorer = partial(create_module_blueprint, WavefrontFrontierExplorer) diff --git a/dimos/navigation/global_planner/__init__.py b/dimos/navigation/global_planner/__init__.py index 0496f586b9..9aaf52e11e 100644 --- a/dimos/navigation/global_planner/__init__.py +++ b/dimos/navigation/global_planner/__init__.py @@ -1,2 +1,4 @@ -from dimos.navigation.global_planner.planner import AstarPlanner +from dimos.navigation.global_planner.planner import AstarPlanner, astar_planner from dimos.navigation.global_planner.algo import astar + +__all__ = ["AstarPlanner", "astar_planner", "astar"] diff --git a/dimos/navigation/global_planner/planner.py b/dimos/navigation/global_planner/planner.py index 08a00596aa..bab6aa817d 100644 --- a/dimos/navigation/global_planner/planner.py +++ b/dimos/navigation/global_planner/planner.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from functools import partial from typing import Optional from dimos.core import In, Module, Out, rpc +from dimos.core.blueprints import create_module_blueprint from dimos.msgs.geometry_msgs import Pose, PoseStamped from dimos.msgs.nav_msgs import OccupancyGrid, Path from dimos.navigation.global_planner.algo import astar @@ -216,3 +218,6 @@ def plan(self, goal: Pose) -> Optional[Path]: logger.warning("No path found to the goal.") return None + + +astar_planner = partial(create_module_blueprint, AstarPlanner) diff --git a/dimos/navigation/local_planner/holonomic_local_planner.py b/dimos/navigation/local_planner/holonomic_local_planner.py index d74e272724..c851d2b47e 100644 --- a/dimos/navigation/local_planner/holonomic_local_planner.py +++ b/dimos/navigation/local_planner/holonomic_local_planner.py @@ -18,10 +18,12 @@ Gradient-Augmented Look-Ahead Pursuit (GLAP) holonomic local planner. """ +from functools import partial from typing import Optional, Tuple import numpy as np +from dimos.core.blueprints import create_module_blueprint from dimos.msgs.geometry_msgs import Twist, Vector3 from dimos.navigation.local_planner import BaseLocalPlanner from dimos.utils.transform_utils import quaternion_to_euler, normalize_angle, get_distance @@ -260,3 +262,6 @@ def _find_lookahead_point(self, path: np.ndarray, start_idx: int) -> np.ndarray: def _clip(self, v: np.ndarray) -> np.ndarray: """Instance method to clip velocity with access to v_max.""" return np.clip(v, -self.v_max, self.v_max) + + +holonomic_local_planner = partial(create_module_blueprint, HolonomicLocalPlanner) diff --git a/dimos/navigation/local_planner/local_planner.py b/dimos/navigation/local_planner/local_planner.py index ac1a6ea744..9500c1c721 100644 --- a/dimos/navigation/local_planner/local_planner.py +++ b/dimos/navigation/local_planner/local_planner.py @@ -158,6 +158,10 @@ def compute_velocity(self) -> Optional[Twist]: """ pass + @rpc + def atgl(self) -> bool: + return self.is_goal_reached() + @rpc def is_goal_reached(self) -> bool: """ diff --git a/dimos/perception/detection2d/module2D.py b/dimos/perception/detection2d/module2D.py index c20e51cd9c..dddad29624 100644 --- a/dimos/perception/detection2d/module2D.py +++ b/dimos/perception/detection2d/module2D.py @@ -11,11 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import functools from dataclasses import dataclass +from functools import partial from typing import Any, Callable, Optional -import numpy as np from dimos_lcm.foxglove_msgs.ImageAnnotations import ( ImageAnnotations, ) @@ -24,13 +23,13 @@ from reactivex.subject import Subject from dimos.core import In, Module, Out, rpc +from dimos.core.blueprints import create_module_blueprint from dimos.msgs.sensor_msgs import Image from dimos.msgs.sensor_msgs.Image import sharpness_barrier from dimos.msgs.vision_msgs import Detection2DArray from dimos.perception.detection2d.type import ImageDetections2D from dimos.perception.detection2d.detectors import Detector, Yolo2DDetector from dimos.perception.detection2d.detectors.person.yolo import YoloPersonDetector -from dimos.perception.detection2d.type import ImageDetections2D from dimos.utils.decorators.decorators import simple_mcache from dimos.utils.reactive import backpressure @@ -109,3 +108,6 @@ def publish_cropped_images(detections: ImageDetections2D): @rpc def stop(self) -> None: super().stop() + + +detection_2d = partial(create_module_blueprint, Detection2DModule) diff --git a/dimos/perception/object_tracker.py b/dimos/perception/object_tracker.py index d59165cb06..e22f3a9df5 100644 --- a/dimos/perception/object_tracker.py +++ b/dimos/perception/object_tracker.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from functools import partial import cv2 import numpy as np import time @@ -19,6 +20,7 @@ from typing import Dict, List, Optional from dimos.core import In, Out, Module, rpc +from dimos.core.blueprints import create_module_blueprint from dimos.msgs.std_msgs import Header from dimos.msgs.sensor_msgs import Image, ImageFormat from dimos.msgs.vision_msgs import Detection2DArray, Detection3DArray @@ -622,3 +624,6 @@ def _get_depth_from_bbox(self, bbox: List[int], depth_frame: np.ndarray) -> Opti return depth_25th_percentile return None + + +object_tracking = partial(create_module_blueprint, ObjectTracking) \ No newline at end of file diff --git a/dimos/perception/spatial_perception.py b/dimos/perception/spatial_perception.py index 7d93e2e174..746ed7c9b4 100644 --- a/dimos/perception/spatial_perception.py +++ b/dimos/perception/spatial_perception.py @@ -16,6 +16,7 @@ Spatial Memory module for creating a semantic map of the environment. """ +from functools import partial import uuid import time import os @@ -29,6 +30,7 @@ from reactivex.disposable import Disposable from dimos.core import In, Module, rpc +from dimos.core.blueprints import create_module_blueprint from dimos.msgs.sensor_msgs import Image from dimos.msgs.geometry_msgs import Vector3, Pose, PoseStamped from dimos.utils.logging_config import setup_logger @@ -649,3 +651,6 @@ def query_tagged_location(self, query: str) -> Optional[RobotLocation]: if semantic_distance < 0.3: return location return None + + +spatial_memory = partial(create_module_blueprint, SpatialMemory) diff --git a/dimos/robot/foxglove_bridge.py b/dimos/robot/foxglove_bridge.py index 18211f65c2..48ab7ee2da 100644 --- a/dimos/robot/foxglove_bridge.py +++ b/dimos/robot/foxglove_bridge.py @@ -13,12 +13,14 @@ # limitations under the License. import asyncio +from functools import partial import threading # this is missing, I'm just trying to import lcm_foxglove_bridge.py from dimos_lcm from dimos_lcm.foxglove_bridge import FoxgloveBridge as LCMFoxgloveBridge from dimos.core import Module, rpc +from dimos.core.blueprints import create_module_blueprint class FoxgloveBridge(Module): @@ -58,3 +60,6 @@ def stop(self): self._thread.join(timeout=2) super().stop() + + +foxglove_bridge = partial(create_module_blueprint, FoxgloveBridge) diff --git a/dimos/robot/unitree_webrtc/depth_module.py b/dimos/robot/unitree_webrtc/depth_module.py index b5b3b12738..b521e7e6ef 100644 --- a/dimos/robot/unitree_webrtc/depth_module.py +++ b/dimos/robot/unitree_webrtc/depth_module.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from functools import partial import time import threading from typing import Optional @@ -21,6 +22,7 @@ import numpy as np from dimos.core import Module, In, Out, rpc +from dimos.core.blueprints import create_module_blueprint from dimos.msgs.sensor_msgs import Image, ImageFormat from dimos_lcm.sensor_msgs import CameraInfo from dimos.utils.logging_config import setup_logger @@ -232,3 +234,6 @@ def _publish_depth(self): except Exception as e: logger.error(f"Error publishing depth data: {e}", exc_info=True) + + +depth_module = partial(create_module_blueprint, DepthModule) diff --git a/dimos/robot/unitree_webrtc/type/map.py b/dimos/robot/unitree_webrtc/type/map.py index 52e2c62260..ac794716f4 100644 --- a/dimos/robot/unitree_webrtc/type/map.py +++ b/dimos/robot/unitree_webrtc/type/map.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from functools import partial import time from typing import Optional @@ -21,6 +22,7 @@ from reactivex.disposable import Disposable from dimos.core import In, Module, Out, rpc +from dimos.core.blueprints import create_module_blueprint from dimos.msgs.nav_msgs import OccupancyGrid from dimos.msgs.sensor_msgs import PointCloud2 from dimos.robot.unitree_webrtc.type.lidar import LidarMessage @@ -159,3 +161,6 @@ def splice_cylinder( survivors = map_pcd.select_by_index(victims, invert=True) return survivors + patch_pcd + + +mapper = partial(create_module_blueprint, Map) diff --git a/dimos/robot/unitree_webrtc/unitree_g1.py b/dimos/robot/unitree_webrtc/unitree_g1.py index f319e2c87c..92147c0f6c 100644 --- a/dimos/robot/unitree_webrtc/unitree_g1.py +++ b/dimos/robot/unitree_webrtc/unitree_g1.py @@ -112,7 +112,11 @@ def start(self): self._disposables.add(Disposable(unsub)) @rpc +<<<<<<< HEAD def stop(self) -> None: +======= + def stop(self): +>>>>>>> deac770f (squash) self.connection.stop() super().stop() diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index 35b54e5a5d..d3991d9462 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -28,6 +28,7 @@ from dimos import core from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core import In, Module, Out, rpc +from dimos.core.blueprints import create_module_blueprint from dimos.core.dimos import Dimos from dimos.core.resource import Resource from dimos.mapping.types import LatLon @@ -325,6 +326,9 @@ def publish_request(self, topic: str, data: dict): return self.connection.publish_request(topic, data) +connection = functools.partial(create_module_blueprint, ConnectionModule) + + class UnitreeGo2(UnitreeRobot, Resource): """Full Unitree Go2 robot with navigation and perception capabilities.""" diff --git a/dimos/robot/unitree_webrtc/unitree_go2_autoconnect.py b/dimos/robot/unitree_webrtc/unitree_go2_autoconnect.py new file mode 100644 index 0000000000..0a828d5dd0 --- /dev/null +++ b/dimos/robot/unitree_webrtc/unitree_go2_autoconnect.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from functools import cached_property +import logging +import os +import time +from typing import Optional + +from reactivex import Observable +from reactivex.disposable import CompositeDisposable + +from dimos.core.blueprints import autoconnect +from dimos.core.dimos import Dimos +from dimos.core.resource import Resource +from dimos.core.transport import LCMTransport +from dimos.mapping.types import LatLon +from dimos.msgs.geometry_msgs import PoseStamped, Twist +from dimos.msgs.sensor_msgs import Image +from dimos_lcm.sensor_msgs import CameraInfo +from dimos.perception.spatial_perception import SpatialMemory, spatial_memory +from dimos.protocol import pubsub +from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.robot.foxglove_bridge import foxglove_bridge +from dimos.robot.unitree_webrtc.unitree_go2 import ConnectionModule, connection +from dimos.utils.monitoring import utilization +from dimos.web.websocket_vis.websocket_vis_module import websocket_vis +from dimos.navigation.global_planner import astar_planner +from dimos.navigation.local_planner.holonomic_local_planner import ( + holonomic_local_planner, +) +from dimos.navigation.bt_navigator.navigator import ( + NavigatorState, + behavior_tree_navigator, + BehaviorTreeNavigator, +) +from dimos.navigation.frontier_exploration import ( + WavefrontFrontierExplorer, + wavefront_frontier_explorer, +) +from dimos.robot.unitree_webrtc.type.map import mapper +from dimos.robot.unitree_webrtc.unitree_skills import MyUnitreeSkills +from dimos.robot.unitree_webrtc.depth_module import depth_module +from dimos.skills.skills import AbstractRobotSkill, SkillLibrary +from dimos.utils.logging_config import setup_logger +from dimos.perception.object_tracker import object_tracking +from dimos.robot.robot import UnitreeRobot +from dimos.types.robot_capabilities import RobotCapability + + +logger = setup_logger(__file__, level=logging.INFO) + + +class UnitreeGo2(UnitreeRobot, Resource): + _dimos: Dimos + _disposables: CompositeDisposable = CompositeDisposable() + + def __init__( + self, + ip: Optional[str], + output_dir: str = None, + websocket_port: int = 7779, + skill_library: Optional[SkillLibrary] = None, + connection_type: Optional[str] = "webrtc", + ): + super().__init__() + self.ip = ip + self.connection_type = connection_type or "webrtc" + if ip is None and self.connection_type == "webrtc": + self.connection_type = "fake" # Auto-enable playback if no IP provided + self.output_dir = output_dir or os.path.join(os.getcwd(), "assets", "output") + self.websocket_port = websocket_port + self.lcm = LCM() + + if skill_library is None: + skill_library = MyUnitreeSkills() + self.skill_library = skill_library + + self.capabilities = [RobotCapability.LOCOMOTION, RobotCapability.VISION] + + self._setup_directories() + + def _setup_directories(self): + """Setup directories for spatial memory storage.""" + os.makedirs(self.output_dir, exist_ok=True) + logger.info(f"Robot outputs will be saved to: {self.output_dir}") + + # Initialize memory directories + self.memory_dir = os.path.join(self.output_dir, "memory") + os.makedirs(self.memory_dir, exist_ok=True) + + # Initialize spatial memory properties + self.spatial_memory_dir = os.path.join(self.memory_dir, "spatial_memory") + self.db_path = os.path.join(self.spatial_memory_dir, "chromadb_data") + self.visual_memory_path = os.path.join(self.spatial_memory_dir, "visual_memory.pkl") + + # Create spatial memory directories + os.makedirs(self.spatial_memory_dir, exist_ok=True) + os.makedirs(self.db_path, exist_ok=True) + + def start(self): + self.lcm.start() + + min_height = 0.3 if self.connection_type == "mujoco" else 0.15 + gt_depth_scale = 1.0 if self.connection_type == "mujoco" else 0.5 + + basic_robot = autoconnect( + connection(self.ip, connection_type=self.connection_type), + mapper(voxel_size=0.5, global_publish_interval=2.5, min_height=min_height), + astar_planner(), + holonomic_local_planner(), + behavior_tree_navigator(), + wavefront_frontier_explorer(), + websocket_vis(self.websocket_port), + foxglove_bridge(), + ) + + enhanced_robot = autoconnect( + basic_robot, + spatial_memory( + db_path=self.db_path, + visual_memory_path=self.visual_memory_path, + output_dir=self.spatial_memory_dir, + ), + object_tracking(frame_id="camera_link"), + depth_module(gt_depth_scale=gt_depth_scale), + utilization(), + ) + + self._dimos = enhanced_robot.with_transports( + { + ("color_image", Image): LCMTransport("/go2/color_image", Image), + ("depth_image", Image): LCMTransport("/go2/depth_image", Image), + ("camera_pose", PoseStamped): LCMTransport("/go2/camera_pose", PoseStamped), + ("camera_info", CameraInfo): LCMTransport("/go2/camera_info", CameraInfo), + } + ).build() + + self._start_skills() + + def stop(self): + self._disposables.dispose() + self._dimos.stop() + self.lcm.stop() + + def _start_skills(self): + # Initialize skills after connection is established + if self.skill_library is not None: + for skill in self.skill_library: + if isinstance(skill, AbstractRobotSkill): + self.skill_library.create_instance(skill.__name__, robot=self) + if isinstance(self.skill_library, MyUnitreeSkills): + self.skill_library._robot = self + self.skill_library.init() + self.skill_library.initialize_skills() + + def get_single_rgb_frame(self, timeout: float = 2.0) -> Image: + topic = Topic("/go2/color_image", Image) + return self.lcm.wait_for_message(topic, timeout=timeout) + + def move(self, twist: Twist, duration: float = 0.0): + self._dimos.get_instance(ConnectionModule).move(twist, duration) + + def explore(self) -> bool: + return self._dimos.get_instance(WavefrontFrontierExplorer).explore() + + def navigate_to(self, pose: PoseStamped, blocking: bool = True): + logger.info( + f"Navigating to pose: ({pose.position.x:.2f}, {pose.position.y:.2f}, {pose.position.z:.2f})" + ) + self._dimos.get_instance(BehaviorTreeNavigator).set_goal(pose) + time.sleep(1.0) + + if blocking: + while ( + self._dimos.get_instance(BehaviorTreeNavigator).get_state() + == NavigatorState.FOLLOWING_PATH + ): + time.sleep(0.25) + + time.sleep(1.0) + if not self._dimos.get_instance(BehaviorTreeNavigator).is_goal_reached(): + logger.info("Navigation was cancelled or failed") + return False + else: + logger.info("Navigation goal reached") + return True + + return True + + def stop_exploration(self) -> bool: + self._dimos.get_instance(BehaviorTreeNavigator).cancel_goal() + return self._dimos.get_instance(WaveFrontFrontierExplorer).stop_exploration() + + def is_exploration_active(self) -> bool: + return self._dimos.get_instance(WaveFrontFrontierExplorer).is_exploration_active() + + def cancel_navigation(self) -> bool: + return self._dimos.get_instance(BehaviorTreeNavigator).cancel_goal() + + @property + def spatial_memory(self) -> Optional[SpatialMemory]: + return self._dimos.get_instance(SpatialMemory) + + @cached_property + def gps_position_stream(self) -> Observable[LatLon]: + return self._dimos.get_instance(ConnectionModule).gps_location.transport.pure_observable() + + def get_odom(self) -> PoseStamped: + return self._dimos.get_instance(ConnectionModule).get_odom() + + def navigate_to_object(self, pose, blocking=True): + pass + + +def main(): + ip = os.getenv("ROBOT_IP") + connection_type = os.getenv("CONNECTION_TYPE", "webrtc") + + pubsub.lcm.autoconf() + + robot = UnitreeGo2(ip=ip, websocket_port=7779, connection_type=connection_type) + robot.start() + + try: + while True: + time.sleep(0.1) + except KeyboardInterrupt: + pass + finally: + robot.stop() + + +if __name__ == "__main__": + main() diff --git a/dimos/utils/monitoring.py b/dimos/utils/monitoring.py index c13c274cac..e07d6dcfeb 100644 --- a/dimos/utils/monitoring.py +++ b/dimos/utils/monitoring.py @@ -23,12 +23,13 @@ import re import os import shutil -from functools import lru_cache +from functools import lru_cache, partial from typing import Optional from distributed.client import Client from distributed import get_client from dimos.core import Module, rpc +from dimos.core.blueprints import create_module_blueprint from dimos.utils.actor_registry import ActorRegistry from dimos.utils.logging_config import setup_logger @@ -185,6 +186,9 @@ def stop(self): super().stop() +utilization = partial(create_module_blueprint, UtilizationModule) + + def _can_use_py_spy(): try: with open("/proc/sys/kernel/yama/ptrace_scope") as f: diff --git a/dimos/web/websocket_vis/websocket_vis_module.py b/dimos/web/websocket_vis/websocket_vis_module.py index f8440d4b20..130fd75016 100644 --- a/dimos/web/websocket_vis/websocket_vis_module.py +++ b/dimos/web/websocket_vis/websocket_vis_module.py @@ -19,6 +19,7 @@ """ import asyncio +from functools import partial import threading import time from typing import Any, Dict, Optional @@ -33,6 +34,7 @@ from dimos.core import Module, In, Out, rpc from dimos_lcm.std_msgs import Bool +from dimos.core.blueprints import create_module_blueprint from dimos.mapping.types import LatLon from dimos.msgs.geometry_msgs import PoseStamped, Twist, TwistStamped, Vector3 from dimos.msgs.nav_msgs import OccupancyGrid, Path @@ -293,3 +295,6 @@ def _process_costmap(self, costmap: OccupancyGrid) -> Dict[str, Any]: def _emit(self, event: str, data: Any): if self._broadcast_loop and not self._broadcast_loop.is_closed(): asyncio.run_coroutine_threadsafe(self.sio.emit(event, data), self._broadcast_loop) + + +websocket_vis = partial(create_module_blueprint, WebsocketVisModule) From 72a8e7f2bdd323c90d7f623c4387ea6207cdc617 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Mon, 13 Oct 2025 22:28:54 +0300 Subject: [PATCH 2/3] remove merge failure --- dimos/core/__init__.py | 38 ++++++++++++++---------- dimos/robot/unitree_webrtc/unitree_g1.py | 4 --- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 249197f98a..fd0fd371b1 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -6,6 +6,7 @@ from dask.distributed import Client, LocalCluster from rich.console import Console +import signal import dimos.core.colors as colors from dimos.core.core import rpc from dimos.core.module import Module, ModuleBase, ModuleConfig @@ -50,12 +51,13 @@ class RpcCall: - def __init__(self, original_method, rpc, name, remote_name, unsub_fns): - self.original_method = original_method - self.rpc = rpc - self.name = name - self.remote_name = remote_name + def __init__(self, original_method, rpc, name, remote_name, unsub_fns, stop_client=None): + self._original_method = original_method + self._rpc = rpc + self._name = name + self._remote_name = remote_name self._unsub_fns = unsub_fns + self._stop_client = stop_client if original_method: self.__doc__ = original_method.__doc__ @@ -63,23 +65,29 @@ def __init__(self, original_method, rpc, name, remote_name, unsub_fns): self.__qualname__ = f"{self.__class__.__name__}.{original_method.__name__}" def __call__(self, *args, **kwargs): + if not self._rpc: + return None + + # TODO: This may not be needed anymore. + # # For stop/close/shutdown, use call_nowait to avoid deadlock # (the remote side stops its RPC service before responding) - if self.name in ("stop"): - if self.rpc: - self.rpc.call_nowait(f"{self.remote_name}/{self.name}", (args, kwargs)) - self.stop_client() + if self._name == "stop": + if self._rpc: + self._rpc.call_nowait(f"{self._remote_name}/{self._name}", (args, kwargs)) + if self._stop_client: + self._stop_client() return None - result, unsub_fn = self.rpc.call_sync(f"{self.remote_name}/{self.name}", (args, kwargs)) + result, unsub_fn = self._rpc.call_sync(f"{self._remote_name}/{self._name}", (args, kwargs)) self._unsub_fns.append(unsub_fn) return result def __getstate__(self): - return (self.original_method, self.name, self.remote_name, self._unsub_fns) + return (self._original_method, self._name, self._remote_name, self._unsub_fns) def __setstate__(self, state): - self.original_method, self.name, self.remote_name, self._unsub_fns = state + self._original_method, self._name, self._remote_name, self._unsub_fns = state class CudaCleanupPlugin: @@ -159,7 +167,9 @@ def __getattr__(self, name: str): if name in self.rpcs: original_method = getattr(self.actor_class, name, None) - return RpcCall(original_method, self.rpc, name, self.remote_name, self._unsub_fns) + return RpcCall( + original_method, self.rpc, name, self.remote_name, self._unsub_fns, self.stop_client + ) # return super().__getattr__(name) # Try to avoid recursion by directly accessing attributes that are known @@ -321,8 +331,6 @@ def start(n: Optional[int] = None, memory_limit: str = "auto") -> Client: n: Number of workers (defaults to CPU count) memory_limit: Memory limit per worker (e.g., '4GB', '2GiB', or 'auto' for Dask's default) """ - import signal - import atexit console = Console() if not n: diff --git a/dimos/robot/unitree_webrtc/unitree_g1.py b/dimos/robot/unitree_webrtc/unitree_g1.py index 92147c0f6c..f319e2c87c 100644 --- a/dimos/robot/unitree_webrtc/unitree_g1.py +++ b/dimos/robot/unitree_webrtc/unitree_g1.py @@ -112,11 +112,7 @@ def start(self): self._disposables.add(Disposable(unsub)) @rpc -<<<<<<< HEAD def stop(self) -> None: -======= - def stop(self): ->>>>>>> deac770f (squash) self.connection.stop() super().stop() From 342c27588483c6da8e45d46a2bc9b340bac2b2d3 Mon Sep 17 00:00:00 2001 From: paul-nechifor <1262969+paul-nechifor@users.noreply.github.com> Date: Mon, 13 Oct 2025 19:29:25 +0000 Subject: [PATCH 3/3] CI code cleanup --- dimos/perception/object_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/perception/object_tracker.py b/dimos/perception/object_tracker.py index e22f3a9df5..4617344154 100644 --- a/dimos/perception/object_tracker.py +++ b/dimos/perception/object_tracker.py @@ -626,4 +626,4 @@ def _get_depth_from_bbox(self, bbox: List[int], depth_frame: np.ndarray) -> Opti return None -object_tracking = partial(create_module_blueprint, ObjectTracking) \ No newline at end of file +object_tracking = partial(create_module_blueprint, ObjectTracking)