diff --git a/assets/foxglove_g1_detections.json b/assets/foxglove_dashboards/old/foxglove_g1_detections.json similarity index 100% rename from assets/foxglove_g1_detections.json rename to assets/foxglove_dashboards/old/foxglove_g1_detections.json diff --git a/assets/foxglove_image_sharpness_test.json b/assets/foxglove_dashboards/old/foxglove_image_sharpness_test.json similarity index 100% rename from assets/foxglove_image_sharpness_test.json rename to assets/foxglove_dashboards/old/foxglove_image_sharpness_test.json diff --git a/assets/foxglove_unitree_lcm_dashboard.json b/assets/foxglove_dashboards/old/foxglove_unitree_lcm_dashboard.json similarity index 100% rename from assets/foxglove_unitree_lcm_dashboard.json rename to assets/foxglove_dashboards/old/foxglove_unitree_lcm_dashboard.json diff --git a/assets/foxglove_unitree_yolo.json b/assets/foxglove_dashboards/old/foxglove_unitree_yolo.json similarity index 100% rename from assets/foxglove_unitree_yolo.json rename to assets/foxglove_dashboards/old/foxglove_unitree_yolo.json diff --git a/assets/foxglove_dashboards/unitree.json b/assets/foxglove_dashboards/unitree.json new file mode 100644 index 0000000000..24a3c3c339 --- /dev/null +++ b/assets/foxglove_dashboards/unitree.json @@ -0,0 +1,486 @@ +{ + "configById": { + "3D!3ezwzdr": { + "cameraState": { + "perspective": true, + "distance": 11.974738784563408, + "phi": 46.81551198543342, + "thetaOffset": 71.81370622323534, + "targetOffset": [ + 0.7911075559165568, + 1.8116053369121197, + 3.4025317970075274e-16 + ], + "target": [ + 0, + 0, + 0 + ], + "targetOrientation": [ + 0, + 0, + 0, + 1 + ], + "fovy": 45, + "near": 0.5, + "far": 5000 + }, + "followMode": "follow-pose", + "scene": { + "transforms": { + "labelSize": 0.1 + } + }, + "transforms": { + "frame:sensor_at_scan": { + "visible": false + }, + "frame:camera_optical": { + "visible": false + }, + "frame:camera_link": { + "visible": false + }, + "frame:base_link": { + "visible": true + }, + "frame:sensor": { + "visible": false + }, + "frame:map": { + "visible": true + }, + "frame:world": { + "visible": true + } + }, + "topics": { + "/lidar": { + "visible": false, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointSize": 2.85, + "decayTime": 6, + "pointShape": "cube" + }, + "/detectorDB/scene_update": { + "visible": true + }, + "/path_active": { + "visible": true, + "lineWidth": 0.05, + "gradient": [ + "#00ff1eff", + "#6bff6e80" + ] + }, + "/map": { + "visible": false, + "colorField": "intensity", + "colorMode": "colormap", + "colorMap": "turbo" + }, + "/image": { + "visible": false + }, + "/camera_info": { + "visible": true + }, + "/detectorDB/pointcloud/0": { + "visible": true, + "colorField": "intensity", + "colorMode": "flat", + "colorMap": "turbo", + "pointShape": "cube", + "pointSize": 2, + "flatColor": "#00ff00ff", + "cubeSize": 0.03 + }, + "/detectorDB/pointcloud/1": { + "visible": true, + "colorField": "intensity", + "colorMode": "flat", + "colorMap": "turbo", + "pointShape": "cube", + "cubeSize": 0.03, + "flatColor": "#ff0000ff" + }, + "/detectorDB/pointcloud/2": { + "visible": true, + "colorField": "intensity", + "colorMode": "flat", + "colorMap": "turbo", + "pointShape": "cube", + "cubeSize": 0.03, + "flatColor": "#00aaffff" + }, + "/global_map": { + "visible": true, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointSize": 3.66, + "pointShape": "cube", + "explicitAlpha": 1, + "cubeSize": 0.05, + "cubeOutline": false, + "flatColor": "#ed8080ff" + }, + "/global_costmap": { + "visible": false, + "colorMode": "custom", + "unknownColor": "#80808000", + "minColor": "#282975ff", + "maxColor": "#ff0000ff", + "frameLocked": false, + "drawBehind": false + }, + "/go2/color_image": { + "visible": false, + "cameraInfoTopic": "/go2/camera_info" + }, + "/go2/camera_info": { + "visible": true + }, + "/color_image": { + "visible": false, + "cameraInfoTopic": "/camera_info" + }, + "color_image": { + "visible": false, + "cameraInfoTopic": "/camera_info" + }, + "lidar": { + "visible": false, + "colorField": "z", + "colorMode": "flat", + "colorMap": "turbo", + "pointSize": 2.76, + "pointShape": "cube" + }, + "odom": { + "visible": false + }, + "global_map": { + "visible": false, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointShape": "cube" + }, + "prev_lidar": { + "visible": false, + "pointShape": "cube", + "colorField": "z", + "colorMode": "flat", + "colorMap": "turbo", + "gradient": [ + "#b70000ff", + "#ff0000ff" + ], + "flatColor": "#80eda2ff" + }, + "additive_global_map": { + "visible": false, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointShape": "cube" + }, + "height_costmap": { + "visible": false + }, + "/odom": { + "visible": false + }, + "/costmap": { + "visible": false, + "colorMode": "custom", + "alpha": 1, + "frameLocked": false, + "maxColor": "#ff2222ff", + "minColor": "#00006bff", + "unknownColor": "#80808000" + } + }, + "layers": { + "grid": { + "visible": true, + "drawBehind": false, + "frameLocked": true, + "label": "Grid", + "instanceId": "8cb9fe46-7478-4aa6-95c5-75c511fee62d", + "layerId": "foxglove.Grid", + "size": 50, + "color": "#24b6ffff", + "position": [ + 0, + 0, + 0 + ], + "rotation": [ + 0, + 0, + 0 + ], + "frameId": "world", + "divisions": 25, + "lineWidth": 1 + } + }, + "publish": { + "type": "point", + "poseTopic": "/move_base_simple/goal", + "pointTopic": "/clicked_point", + "poseEstimateTopic": "/initialpose", + "poseEstimateXDeviation": 0.5, + "poseEstimateYDeviation": 0.5, + "poseEstimateThetaDeviation": 0.26179939 + }, + "imageMode": {}, + "followTf": "world" + }, + "Image!3mnp456": { + "cameraState": { + "distance": 20, + "perspective": true, + "phi": 60, + "target": [ + 0, + 0, + 0 + ], + "targetOffset": [ + 0, + 0, + 0 + ], + "targetOrientation": [ + 0, + 0, + 0, + 1 + ], + "thetaOffset": 45, + "fovy": 45, + "near": 0.5, + "far": 5000 + }, + "followMode": "follow-pose", + "scene": { + "enableStats": false, + "transforms": { + "showLabel": false, + "visible": false + } + }, + "transforms": { + "frame:world": { + "visible": true + }, + "frame:camera_optical": { + "visible": false + }, + "frame:camera_link": { + "visible": false + }, + "frame:base_link": { + "visible": false + } + }, + "topics": { + "/lidar": { + "visible": false, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointSize": 6, + "explicitAlpha": 0.6, + "pointShape": "circle", + "cubeSize": 0.016 + }, + "/odom": { + "visible": false + }, + "/local_costmap": { + "visible": false + }, + "/global_costmap": { + "visible": true, + "minColor": "#ffffff00", + "maxColor": "#ff0000ff", + "unknownColor": "#80808000" + }, + "/detected_0": { + "visible": true, + "colorField": "intensity", + "colorMode": "flat", + "colorMap": "turbo", + "pointSize": 23, + "pointShape": "cube", + "cubeSize": 0.04, + "flatColor": "#ff0000ff", + "stixelsEnabled": false + }, + "/detected_1": { + "visible": true, + "colorField": "intensity", + "colorMode": "flat", + "colorMap": "turbo", + "pointSize": 20.51, + "flatColor": "#34ff00ff", + "pointShape": "cube", + "cubeSize": 0.04, + "cubeOutline": false + }, + "/filtered_pointcloud": { + "visible": true, + "colorField": "intensity", + "colorMode": "flat", + "colorMap": "rainbow", + "pointSize": 1.5, + "pointShape": "cube", + "flatColor": "#ff0000ff", + "cubeSize": 0.1 + }, + "/global_map": { + "visible": false, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointShape": "cube", + "pointSize": 5 + }, + "/detected/pointcloud/1": { + "visible": false, + "colorField": "intensity", + "colorMode": "flat", + "colorMap": "turbo", + "pointShape": "cube", + "cubeSize": 0.01, + "flatColor": "#00ff1eff", + "pointSize": 15, + "decayTime": 0, + "cubeOutline": true + }, + "/detected/pointcloud/2": { + "visible": true, + "colorField": "intensity", + "colorMode": "flat", + "colorMap": "turbo", + "pointShape": "circle", + "cubeSize": 0.1, + "flatColor": "#00fbffff", + "pointSize": 0.01 + }, + "/detected/pointcloud/0": { + "visible": false, + "colorField": "intensity", + "colorMode": "flat", + "colorMap": "turbo", + "pointShape": "cube", + "flatColor": "#ff0000ff", + "pointSize": 15, + "cubeOutline": true, + "cubeSize": 0.03 + }, + "/registered_scan": { + "visible": false, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointShape": "circle", + "pointSize": 6.49 + }, + "/detection3d/markers": { + "visible": false + }, + "/foxglove/scene_update": { + "visible": true + }, + "/scene_update": { + "visible": false + }, + "/map": { + "visible": false, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointSize": 8 + }, + "/detection3d/scene_update": { + "visible": true + }, + "/detectorDB/scene_update": { + "visible": false + }, + "/detectorDB/pointcloud/0": { + "visible": false, + "colorField": "intensity", + "colorMode": "colormap", + "colorMap": "turbo" + }, + "/detectorDB/pointcloud/1": { + "visible": false, + "colorField": "intensity", + "colorMode": "colormap", + "colorMap": "turbo" + } + }, + "layers": {}, + "publish": { + "type": "point", + "poseTopic": "/move_base_simple/goal", + "pointTopic": "/clicked_point", + "poseEstimateTopic": "/initialpose", + "poseEstimateXDeviation": 0.5, + "poseEstimateYDeviation": 0.5, + "poseEstimateThetaDeviation": 0.26179939 + }, + "imageMode": { + "imageTopic": "/color_image", + "colorMode": "gradient", + "annotations": { + "/detections": { + "visible": true + }, + "/annotations": { + "visible": true + }, + "/reid/annotations": { + "visible": true + }, + "/objectdb/annotations": { + "visible": true + }, + "/detector3d/annotations": { + "visible": true + }, + "/detectorDB/annotations": { + "visible": true + } + }, + "synchronize": false, + "rotation": 0, + "calibrationTopic": "/camera_info" + }, + "foxglovePanelTitle": "" + } + }, + "globalVariables": {}, + "userNodes": {}, + "playbackConfig": { + "speed": 1 + }, + "drawerConfig": { + "tracks": [] + }, + "layout": { + "direction": "row", + "first": "3D!3ezwzdr", + "second": "Image!3mnp456", + "splitPercentage": 70.55232558139535 + } +} diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index c8bb091596..e8c4364a21 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -9,7 +9,7 @@ import dimos.core.colors as colors from dimos.core.core import rpc -from dimos.core.module import Module, ModuleBase, ModuleConfig +from dimos.core.module import Module, ModuleBase, ModuleConfig, ModuleConfigT from dimos.core.rpc_client import RPCClient from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport from dimos.core.transport import ( @@ -37,6 +37,7 @@ "Module", "ModuleBase", "ModuleConfig", + "ModuleConfigT", "Out", "PubSubTF", "RPCSpec", diff --git a/dimos/core/module.py b/dimos/core/module.py index c6d8a69302..06bc1f1cae 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -28,6 +28,7 @@ from dask.distributed import Actor, get_worker from reactivex.disposable import CompositeDisposable +from typing_extensions import TypeVar from dimos.core import colors from dimos.core.core import T, rpc @@ -73,9 +74,14 @@ def get_loop() -> tuple[asyncio.AbstractEventLoop, threading.Thread | None]: class ModuleConfig: rpc_transport: type[RPCSpec] = LCMRPC tf_transport: type[TFSpec] = LCMTF + frame_id_prefix: str | None = None + frame_id: str | None = None -class ModuleBase(Configurable[ModuleConfig], SkillContainer, Resource): +ModuleConfigT = TypeVar("ModuleConfigT", bound=ModuleConfig, default=ModuleConfig) + + +class ModuleBase(Configurable[ModuleConfigT], SkillContainer, Resource): _rpc: RPCSpec | None = None _tf: TFSpec | None = None _loop: asyncio.AbstractEventLoop | None = None @@ -85,7 +91,7 @@ class ModuleBase(Configurable[ModuleConfig], SkillContainer, Resource): rpc_calls: list[str] = [] - default_config = ModuleConfig + default_config: type[ModuleConfigT] = ModuleConfig # type: ignore[assignment] def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*args, **kwargs) @@ -102,6 +108,13 @@ def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] except ValueError: ... + @property + def frame_id(self) -> str: + base = self.config.frame_id or self.__class__.__name__ + if self.config.frame_id_prefix: + return f"{self.config.frame_id_prefix}/{base}" + return base + @rpc def start(self) -> None: pass @@ -276,7 +289,7 @@ def get_rpc_calls(self, *methods: str) -> RpcCall | tuple[RpcCall, ...]: # type return result[0] if len(result) == 1 else result -class DaskModule(ModuleBase): +class DaskModule(ModuleBase[ModuleConfigT]): ref: Actor worker: int diff --git a/dimos/core/stream.py b/dimos/core/stream.py index 2eab6de710..b0e96434b6 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -28,6 +28,7 @@ from reactivex.disposable import Disposable import dimos.core.colors as colors +from dimos.core.resource import Resource from dimos.utils.logging_config import setup_logger import dimos.utils.reactive as reactive from dimos.utils.reactive import backpressure @@ -79,7 +80,7 @@ class State(enum.Enum): FLOWING = "flowing" # runtime: data observed -class Transport(ObservableMixin[T]): +class Transport(Resource, ObservableMixin[T]): # used by local Output def broadcast(self, selfstream: Out[T], value: T) -> None: ... diff --git a/dimos/core/transport.py b/dimos/core/transport.py index d5c7a090df..8980c1806c 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -73,6 +73,11 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> self._started = True return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] + def start(self) -> None: ... + + def stop(self) -> None: + self.lcm.stop() + class LCMTransport(PubSubTransport[T]): _started: bool = False @@ -82,6 +87,11 @@ def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no if not hasattr(self, "lcm"): self.lcm = LCM(**kwargs) + def start(self) -> None: ... + + def stop(self) -> None: + self.lcm.stop() + def __reduce__(self): # type: ignore[no-untyped-def] return (LCMTransport, (self.topic.topic, self.topic.lcm_type)) @@ -107,6 +117,11 @@ def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no def __reduce__(self): # type: ignore[no-untyped-def] return (JpegLcmTransport, (self.topic.topic, self.topic.lcm_type)) + def start(self) -> None: ... + + def stop(self) -> None: + self.lcm.stop() + class pSHMTransport(PubSubTransport[T]): _started: bool = False @@ -131,6 +146,11 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> self._started = True return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] + def start(self) -> None: ... + + def stop(self) -> None: + self.shm.stop() + class SHMTransport(PubSubTransport[T]): _started: bool = False @@ -155,6 +175,11 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] | None = No self._started = True return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[arg-type, return-value] + def start(self) -> None: ... + + def stop(self) -> None: + self.shm.stop() + class JpegShmTransport(PubSubTransport[T]): _started: bool = False @@ -180,5 +205,9 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] | None = No self._started = True return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[arg-type, return-value] + def start(self) -> None: ... + + def stop(self) -> None: ... + class ZenohTransport(PubSubTransport[T]): ... diff --git a/dimos/hardware/camera/module.py b/dimos/hardware/camera/module.py index f1727d32b2..929e9548e5 100644 --- a/dimos/hardware/camera/module.py +++ b/dimos/hardware/camera/module.py @@ -49,18 +49,16 @@ class CameraModuleConfig(ModuleConfig): frequency: float = 5.0 -class CameraModule(Module, spec.Camera): +class CameraModule(Module[CameraModuleConfig], spec.Camera): color_image: Out[Image] camera_info: Out[CameraInfo] hardware: Callable[[], CameraHardware] | CameraHardware = None # type: ignore[assignment, type-arg] _skill_stream: Observable[Image] | None = None + config: CameraModuleConfig default_config = CameraModuleConfig - def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] - super().__init__(*args, **kwargs) - @property def hardware_camera_info(self) -> CameraInfo: return self.hardware.camera_info # type: ignore[union-attr] diff --git a/dimos/hardware/camera/zed/camera.py b/dimos/hardware/camera/zed/camera.py index d4b84aff87..d9586890af 100644 --- a/dimos/hardware/camera/zed/camera.py +++ b/dimos/hardware/camera/zed/camera.py @@ -547,7 +547,6 @@ def __init__( # type: ignore[no-untyped-def] enable_imu_fusion: bool = True, set_floor_as_origin: bool = True, publish_rate: float = 30.0, - frame_id: str = "zed_camera", recording_path: str | None = None, **kwargs, ) -> None: @@ -574,7 +573,6 @@ def __init__( # type: ignore[no-untyped-def] self.enable_imu_fusion = enable_imu_fusion self.set_floor_as_origin = set_floor_as_origin self.publish_rate = publish_rate - self.frame_id = frame_id self.recording_path = recording_path # Convert string parameters to ZED enums diff --git a/dimos/hardware/fake_zed_module.py b/dimos/hardware/fake_zed_module.py index b1a2c9cedc..2be71e7b9a 100644 --- a/dimos/hardware/fake_zed_module.py +++ b/dimos/hardware/fake_zed_module.py @@ -17,13 +17,14 @@ FakeZEDModule - Replays recorded ZED data for testing without hardware. """ +from dataclasses import dataclass import functools import logging from dimos_lcm.sensor_msgs import CameraInfo # type: ignore[import-untyped] import numpy as np -from dimos.core import Module, Out, rpc +from dimos.core import Module, ModuleConfig, Out, rpc from dimos.msgs.geometry_msgs import PoseStamped from dimos.msgs.sensor_msgs import Image, ImageFormat from dimos.msgs.std_msgs import Header @@ -34,7 +35,12 @@ logger = setup_logger(level=logging.INFO) -class FakeZEDModule(Module): +@dataclass +class FakeZEDModuleConfig(ModuleConfig): + frame_id: str = "zed_camera" + + +class FakeZEDModule(Module[FakeZEDModuleConfig]): """ Fake ZED module that replays recorded data instead of real camera. """ @@ -45,18 +51,19 @@ class FakeZEDModule(Module): camera_info: Out[CameraInfo] pose: Out[PoseStamped] - def __init__(self, recording_path: str, frame_id: str = "zed_camera", **kwargs) -> None: # type: ignore[no-untyped-def] + default_config = FakeZEDModuleConfig + config: FakeZEDModuleConfig + + def __init__(self, recording_path: str, **kwargs: object) -> None: """ Initialize FakeZEDModule with recording path. Args: recording_path: Path to recorded data directory - frame_id: TF frame ID for messages """ super().__init__(**kwargs) self.recording_path = recording_path - self.frame_id = frame_id self._running = False # Initialize TF publisher diff --git a/dimos/hardware/gstreamer_camera.py b/dimos/hardware/gstreamer_camera.py index b94d22e7eb..dbd6de4a31 100644 --- a/dimos/hardware/gstreamer_camera.py +++ b/dimos/hardware/gstreamer_camera.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from dataclasses import dataclass import logging import sys import threading @@ -21,7 +22,7 @@ import numpy as np -from dimos.core import Module, Out, rpc +from dimos.core import Module, ModuleConfig, Out, rpc from dimos.msgs.sensor_msgs import Image, ImageFormat from dimos.utils.logging_config import setup_logger @@ -40,16 +41,23 @@ Gst.init(None) +@dataclass +class Config(ModuleConfig): + frame_id: str = "camera" + + class GstreamerCameraModule(Module): """Module that captures frames from a remote camera using GStreamer TCP with absolute timestamps.""" + default_config = Config + config: Config + video: Out[Image] def __init__( # type: ignore[no-untyped-def] self, host: str = "localhost", port: int = 5000, - frame_id: str = "camera", timestamp_offset: float = 0.0, reconnect_interval: float = 5.0, *args, @@ -66,7 +74,6 @@ def __init__( # type: ignore[no-untyped-def] """ self.host = host self.port = port - self.frame_id = frame_id self.timestamp_offset = timestamp_offset self.reconnect_interval = reconnect_interval @@ -79,8 +86,7 @@ def __init__( # type: ignore[no-untyped-def] self.frame_count = 0 self.last_log_time = time.time() self.reconnect_timer_id = None - - Module.__init__(self, *args, **kwargs) + super().__init__(**kwargs) @rpc def start(self) -> None: diff --git a/dimos/mapping/pointclouds/occupancy.py b/dimos/mapping/pointclouds/occupancy.py index 849be722bc..6b4180ab03 100644 --- a/dimos/mapping/pointclouds/occupancy.py +++ b/dimos/mapping/pointclouds/occupancy.py @@ -26,6 +26,156 @@ from dimos.msgs.sensor_msgs import PointCloud2 +def height_cost_occupancy( + cloud: PointCloud2, + resolution: float = 0.05, + can_pass_under: float = 0.6, + can_climb: float = 0.15, + ignore_noise: float = 0.05, + smoothing: float = 1.0, + frame_id: str | None = None, +) -> OccupancyGrid: + """Create a costmap based on terrain slope (rate of change of height). + + Costs are assigned based on the gradient magnitude of the terrain height. + Steeper slopes get higher costs, with max_step height change mapping to cost 100. + Cells without observations are marked unknown (-1). + + Args: + cloud: PointCloud2 message containing 3D points + resolution: Grid resolution in meters/cell (default: 0.05) + can_pass_under: Max height to consider - points above are ignored (default: 0.6) + can_climb: Height change in meters that maps to cost 100 (default: 0.15) + smoothing: Gaussian smoothing sigma in cells for filling gaps (default: 1.0) + frame_id: Reference frame for the grid (default: uses cloud's frame_id) + + Returns: + OccupancyGrid with costs 0-100 based on terrain slope, -1 for unknown + """ + points = cloud.as_numpy() + ts = cloud.ts if hasattr(cloud, "ts") and cloud.ts is not None else 0.0 + + if len(points) == 0: + return OccupancyGrid( + width=1, height=1, resolution=resolution, frame_id=frame_id or cloud.frame_id + ) + + # Find bounds of the point cloud in X-Y plane (use all points) + min_x = np.min(points[:, 0]) + max_x = np.max(points[:, 0]) + min_y = np.min(points[:, 1]) + max_y = np.max(points[:, 1]) + + # Add padding + padding = 1.0 + min_x -= padding + max_x += padding + min_y -= padding + max_y += padding + + # Calculate grid dimensions + width = int(np.ceil((max_x - min_x) / resolution)) + height = int(np.ceil((max_y - min_y) / resolution)) + + # Create origin pose + origin = Pose() + origin.position.x = min_x + origin.position.y = min_y + origin.position.z = 0.0 + origin.orientation.w = 1.0 + + # Step 1: Build min and max height maps for each cell + # Initialize with NaN to track which cells have observations + min_height_map = np.full((height, width), np.nan, dtype=np.float32) + max_height_map = np.full((height, width), np.nan, dtype=np.float32) + + # Convert point XY to grid indices + grid_x = np.round((points[:, 0] - min_x) / resolution).astype(np.int32) + grid_y = np.round((points[:, 1] - min_y) / resolution).astype(np.int32) + + # Clip to grid bounds + grid_x = np.clip(grid_x, 0, width - 1) + grid_y = np.clip(grid_y, 0, height - 1) + + # Use np.fmax/fmin.at which ignore NaN + np.fmax.at(max_height_map, (grid_y, grid_x), points[:, 2]) + np.fmin.at(min_height_map, (grid_y, grid_x), points[:, 2]) + + # Step 2: Determine effective height for each cell + # If gap between min and max > can_pass_under, robot can pass under - use min (ground) + # Otherwise use max (solid obstacle) + height_gap = max_height_map - min_height_map + height_map = np.where(height_gap > can_pass_under, min_height_map, max_height_map) + + # Track which cells have observations + observed_mask = ~np.isnan(height_map) + + # Step 3: Apply smoothing to fill gaps while preserving unknown space + if smoothing > 0 and np.any(observed_mask): + # Use a weighted smoothing approach that only interpolates from known cells + # Create a weight map (1 for observed, 0 for unknown) + weights = observed_mask.astype(np.float32) + height_map_filled = np.where(observed_mask, height_map, 0.0) + + # Smooth both height values and weights + smoothed_heights = ndimage.gaussian_filter(height_map_filled, sigma=smoothing) + smoothed_weights = ndimage.gaussian_filter(weights, sigma=smoothing) + + # Avoid division by zero (use np.divide with where to prevent warning) + valid_smooth = smoothed_weights > 0.01 + height_map_smoothed = np.full_like(smoothed_heights, np.nan) + np.divide(smoothed_heights, smoothed_weights, out=height_map_smoothed, where=valid_smooth) + + # Keep original values where we had observations, use smoothed elsewhere + height_map = np.where(observed_mask, height_map, height_map_smoothed) + + # Update observed mask to include smoothed cells + observed_mask = ~np.isnan(height_map) + + # Step 4: Calculate rate of change (gradient magnitude) + # Use Sobel filters for gradient calculation + if np.any(observed_mask): + # Replace NaN with 0 for gradient calculation + height_for_grad = np.where(observed_mask, height_map, 0.0) + + # Calculate gradients (Sobel gives gradient in pixels, scale by resolution) + grad_x = ndimage.sobel(height_for_grad, axis=1) / (8.0 * resolution) + grad_y = ndimage.sobel(height_for_grad, axis=0) / (8.0 * resolution) + + # Gradient magnitude = height change per meter + gradient_magnitude = np.sqrt(grad_x**2 + grad_y**2) + + # Map gradient to cost: can_climb height change over one cell maps to cost 100 + # gradient_magnitude is in m/m, so multiply by resolution to get height change per cell + height_change_per_cell = gradient_magnitude * resolution + + # Ignore height changes below noise threshold (lidar floor noise) + height_change_per_cell = np.where( + height_change_per_cell < ignore_noise, 0.0, height_change_per_cell + ) + + cost_float = (height_change_per_cell / can_climb) * 100.0 + cost_float = np.clip(cost_float, 0, 100) + + # Erode observed mask - only trust gradients where all neighbors are observed + # This prevents false high costs at boundaries with unknown regions + structure = ndimage.generate_binary_structure(2, 1) # 4-connectivity + valid_gradient_mask = ndimage.binary_erosion(observed_mask, structure=structure) + + # Convert to int8, marking cells without valid gradients as -1 + cost = np.where(valid_gradient_mask, cost_float.astype(np.int8), -1) + else: + cost = np.full((height, width), -1, dtype=np.int8) + + return OccupancyGrid( + grid=cost, + resolution=resolution, + origin=origin, + frame_id=frame_id or cloud.frame_id, + ts=ts, + ) + + def general_occupancy( cloud: PointCloud2, resolution: float = 0.05, diff --git a/dimos/mapping/pointclouds/test_occupancy.py b/dimos/mapping/pointclouds/test_occupancy.py index 6c057527d9..5ab712ec08 100644 --- a/dimos/mapping/pointclouds/test_occupancy.py +++ b/dimos/mapping/pointclouds/test_occupancy.py @@ -18,10 +18,18 @@ from open3d.geometry import PointCloud # type: ignore[import-untyped] import pytest -from dimos.mapping.pointclouds.occupancy import general_occupancy, simple_occupancy +from dimos.core import LCMTransport +from dimos.mapping.pointclouds.occupancy import ( + general_occupancy, + height_cost_occupancy, + simple_occupancy, +) from dimos.mapping.pointclouds.util import read_pointcloud +from dimos.msgs.nav_msgs.OccupancyGrid import OccupancyGrid from dimos.msgs.sensor_msgs import PointCloud2 from dimos.utils.data import get_data +from dimos.utils.testing.moment import OutputMoment +from dimos.utils.testing.test_moment import Go2Moment @pytest.fixture @@ -46,3 +54,52 @@ def test_occupancy(apartment: PointCloud, occupancy_fn, output_name: str) -> Non computed_image = (occupancy_grid.grid + 1).astype(np.uint8) np.testing.assert_array_equal(computed_image, expected_image) + + +class HeightCostMoment(Go2Moment): + costmap: OutputMoment[OccupancyGrid] = OutputMoment(LCMTransport("/costmap", OccupancyGrid)) + + +@pytest.fixture +def height_cost_moment(): + moment = HeightCostMoment() + + def get_moment(ts: float, publish: bool = True) -> HeightCostMoment: + moment.seek(ts) + if moment.lidar.value is not None: + costmap = height_cost_occupancy( + moment.lidar.value, + resolution=0.05, + can_pass_under=0.6, + can_climb=0.15, + ) + moment.costmap.set(costmap) + if publish: + moment.publish() + return moment + + yield get_moment + + moment.stop() + + +def test_height_cost_occupancy_from_lidar(height_cost_moment) -> None: + """Test height_cost_occupancy with real lidar data.""" + moment = height_cost_moment(1.0) + + costmap = moment.costmap.value + assert costmap is not None + + # Basic sanity checks + assert costmap.grid is not None + assert costmap.width > 0 + assert costmap.height > 0 + + # Costs should be in range -1 to 100 (-1 = unknown) + assert costmap.grid.min() >= -1 + assert costmap.grid.max() <= 100 + + # Check we have some unknown, some known + known_mask = costmap.grid >= 0 + assert known_mask.sum() > 0, "Expected some known cells" + assert (~known_mask).sum() > 0, "Expected some unknown cells" diff --git a/dimos/mapping/test_voxels.py b/dimos/mapping/test_voxels.py new file mode 100644 index 0000000000..bbb63b1c10 --- /dev/null +++ b/dimos/mapping/test_voxels.py @@ -0,0 +1,212 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable, Generator +import time + +import numpy as np +import open3d as o3d # type: ignore[import-untyped] +import pytest + +from dimos.core import LCMTransport, Transport +from dimos.mapping.voxels import VoxelGridMapper +from dimos.msgs.nav_msgs.OccupancyGrid import OccupancyGrid +from dimos.msgs.sensor_msgs import PointCloud2 +from dimos.robot.unitree_webrtc.type.lidar import LidarMessage +from dimos.utils.data import get_data +from dimos.utils.testing.moment import OutputMoment +from dimos.utils.testing.replay import TimedSensorReplay +from dimos.utils.testing.test_moment import Go2Moment + + +@pytest.fixture +def mapper() -> Generator[VoxelGridMapper, None, None]: + mapper = VoxelGridMapper() + yield mapper + mapper.stop() + + +class Go2MapperMoment(Go2Moment): + global_map: OutputMoment[PointCloud2] = OutputMoment(LCMTransport("/global_map", PointCloud2)) + costmap: OutputMoment[OccupancyGrid] = OutputMoment(LCMTransport("/costmap", OccupancyGrid)) + + +MomentFactory = Callable[[float, bool], Go2MapperMoment] + + +@pytest.fixture +def moment() -> Generator[MomentFactory, None, None]: + instances: list[Go2MapperMoment] = [] + + def get_moment(ts: float, publish: bool = True) -> Go2MapperMoment: + m = Go2MapperMoment() + m.seek(ts) + if publish: + m.publish() + instances.append(m) + return m + + yield get_moment + for m in instances: + m.stop() + + +@pytest.fixture +def moment1(moment: MomentFactory) -> Go2MapperMoment: + return moment(10, False) + + +@pytest.fixture +def moment2(moment: MomentFactory) -> Go2MapperMoment: + return moment(85, False) + + +@pytest.mark.tool +def two_perspectives_loop(moment: MomentFactory) -> None: + while True: + moment(10, True) + time.sleep(1) + moment(85, True) + time.sleep(1) + + +def test_carving( + mapper: VoxelGridMapper, moment1: Go2MapperMoment, moment2: Go2MapperMoment +) -> None: + lidar_frame1 = moment1.lidar.value + assert lidar_frame1 is not None + lidar_frame1_transport: LCMTransport[PointCloud2] = LCMTransport("/prev_lidar", PointCloud2) + lidar_frame1_transport.publish(lidar_frame1) + lidar_frame1_transport.stop() + + lidar_frame2 = moment2.lidar.value + assert lidar_frame2 is not None + + # Debug: check XY overlap + pts1 = np.asarray(lidar_frame1.pointcloud.points) + pts2 = np.asarray(lidar_frame2.pointcloud.points) + + voxel_size = mapper.config.voxel_size + xy1 = set(map(tuple, (pts1[:, :2] / voxel_size).astype(int))) + xy2 = set(map(tuple, (pts2[:, :2] / voxel_size).astype(int))) + + overlap = xy1 & xy2 + print(f"\nFrame1 XY columns: {len(xy1)}") + print(f"Frame2 XY columns: {len(xy2)}") + print(f"Overlapping XY columns: {len(overlap)}") + + # Carving mapper (default, carve_columns=True) + mapper.add_frame(lidar_frame1) + mapper.add_frame(lidar_frame2) + + moment2.global_map.set(mapper.get_global_pointcloud2()) + moment2.costmap.set(mapper.get_global_occupancygrid()) + moment2.publish() + + count_carving = mapper.size() + # Additive mapper (carve_columns=False) + additive_mapper = VoxelGridMapper(carve_columns=False) + additive_mapper.add_frame(lidar_frame1) + additive_mapper.add_frame(lidar_frame2) + count_additive = additive_mapper.size() + + print("\n=== Carving comparison ===") + print(f"Additive (no carving): {count_additive}") + print(f"With carving: {count_carving}") + print(f"Voxels carved: {count_additive - count_carving}") + + # Carving should result in fewer voxels + assert count_carving < count_additive, ( + f"Carving should remove some voxels. Additive: {count_additive}, Carving: {count_carving}" + ) + + additive_global_map: LCMTransport[PointCloud2] = LCMTransport( + "additive_global_map", PointCloud2 + ) + additive_global_map.publish(additive_mapper.get_global_pointcloud2()) + additive_global_map.stop() + additive_mapper.stop() + + +def test_injest_a_few(mapper: VoxelGridMapper) -> None: + data_dir = get_data("unitree_go2_office_walk2") + lidar_store = TimedSensorReplay(f"{data_dir}/lidar") # type: ignore[var-annotated] + + for i in [1, 4, 8]: + frame = lidar_store.find_closest_seek(i) + assert frame is not None + print("add", frame) + mapper.add_frame(frame) + + assert len(mapper.get_global_pointcloud2()) == 30136 + + +@pytest.mark.parametrize( + "voxel_size, expected_points", + [ + (0.5, 277), + (0.1, 7290), + (0.05, 28199), + ], +) +def test_roundtrip(moment1: Go2MapperMoment, voxel_size: float, expected_points: int) -> None: + lidar_frame = moment1.lidar.value + assert lidar_frame is not None + + mapper = VoxelGridMapper(voxel_size=voxel_size) + mapper.add_frame(lidar_frame) + + global1 = mapper.get_global_pointcloud2() + assert len(global1) == expected_points + + # loseless roundtrip + if voxel_size == 0.05: + assert len(global1) == len(lidar_frame) + # TODO: we want __eq__ on PointCloud2 - should actually compare + # all points in both frames + + mapper.add_frame(global1) + # no new information, no global map change + assert len(mapper.get_global_pointcloud2()) == len(global1) + + moment1.publish() + mapper.stop() + + +def test_roundtrip_range_preserved(mapper: VoxelGridMapper) -> None: + """Test that input coordinate ranges are preserved in output.""" + data_dir = get_data("unitree_go2_office_walk2") + lidar_store = TimedSensorReplay(f"{data_dir}/lidar") # type: ignore[var-annotated] + + frame = lidar_store.find_closest_seek(1.0) + assert frame is not None + input_pts = np.asarray(frame.pointcloud.points) + + mapper.add_frame(frame) + + out_pcd = mapper.get_global_pointcloud().to_legacy() + out_pts = np.asarray(out_pcd.points) + + voxel_size = mapper.config.voxel_size + tolerance = voxel_size # Allow one voxel of difference at boundaries + + # TODO: we want __eq__ on PointCloud2 - should actually compare + # all points in both frames + + for axis, name in enumerate(["X", "Y", "Z"]): + in_min, in_max = input_pts[:, axis].min(), input_pts[:, axis].max() + out_min, out_max = out_pts[:, axis].min(), out_pts[:, axis].max() + + assert abs(in_min - out_min) < tolerance, f"{name} min mismatch: in={in_min}, out={out_min}" + assert abs(in_max - out_max) < tolerance, f"{name} max mismatch: in={in_max}, out={out_max}" diff --git a/dimos/mapping/voxels.py b/dimos/mapping/voxels.py new file mode 100644 index 0000000000..b5d1d235ff --- /dev/null +++ b/dimos/mapping/voxels.py @@ -0,0 +1,283 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable +from dataclasses import dataclass, field +import functools +import time + +import numpy as np +import open3d as o3d # type: ignore[import-untyped] +import open3d.core as o3c # type: ignore[import-untyped] +from reactivex import interval +from reactivex.disposable import Disposable + +from dimos.core import DimosCluster, In, LCMTransport, Module, Out, rpc +from dimos.core.global_config import GlobalConfig +from dimos.core.module import ModuleConfig +from dimos.mapping.pointclouds.occupancy import height_cost_occupancy +from dimos.msgs.nav_msgs import OccupancyGrid +from dimos.msgs.sensor_msgs import PointCloud2 +from dimos.robot.unitree.connection.go2 import Go2ConnectionProtocol +from dimos.robot.unitree_webrtc.type.lidar import LidarMessage +from dimos.spec.map import Global3DMap, GlobalCostmap +from dimos.utils.decorators import simple_mcache +from dimos.utils.metrics import timed + + +@dataclass +class CostmapConfig: + publish: bool = True + resolution: float = 0.05 + can_pass_under: float = 0.6 + can_climb: float = 0.15 + + +@dataclass +class Config(ModuleConfig): + frame_id: str = "world" + publish_interval: float = 0 + voxel_size: float = 0.05 + block_count: int = 2_000_000 + device: str = "CUDA:0" + carve_columns: bool = True + costmap: CostmapConfig = field(default_factory=CostmapConfig) + + +class VoxelGridMapper(Module): + default_config = Config + config: Config + + lidar: In[LidarMessage] + global_map: Out[LidarMessage] + global_costmap: Out[OccupancyGrid] + + def __init__(self, **kwargs: object) -> None: + super().__init__(**kwargs) + dev = ( + o3c.Device(self.config.device) + if (self.config.device.startswith("CUDA") and o3c.cuda.is_available()) + else o3c.Device("CPU:0") + ) + + print(f"VoxelGridMapper using device: {dev}") + + self.vbg = o3d.t.geometry.VoxelBlockGrid( + attr_names=("dummy",), + attr_dtypes=(o3c.uint8,), + attr_channels=(o3c.SizeVector([1]),), + voxel_size=self.config.voxel_size, + block_resolution=1, + block_count=self.config.block_count, + device=dev, + ) + + self._dev = dev + self._voxel_hashmap = self.vbg.hashmap() + self._key_dtype = self._voxel_hashmap.key_tensor().dtype + + @rpc + def start(self) -> None: + super().start() + + lidar_unsub = self.lidar.subscribe(self._on_frame) + self._disposables.add(Disposable(lidar_unsub)) + + # If publish_interval > 0, publish on timer; otherwise publish on each frame + if self.config.publish_interval > 0: + disposable = interval(self.config.publish_interval).subscribe( + lambda _: self.publish_global_map() + ) + self._disposables.add(disposable) + + @rpc + def stop(self) -> None: + super().stop() + + def _on_frame(self, frame: LidarMessage) -> None: + self.add_frame(frame) + if self.config.publish_interval <= 0: + self.publish_global_map() + + def publish_global_map(self) -> None: + self.global_map.publish(self.get_global_pointcloud2()) + if self.config.costmap.publish: + self.global_costmap.publish(self.get_global_occupancygrid()) + + def size(self) -> int: + return self._voxel_hashmap.size() # type: ignore[no-any-return] + + def __len__(self) -> int: + return self.size() + + # @timed() # TODO: fix thread leak in timed decorator + def add_frame(self, frame: PointCloud2) -> None: + # we are potentially moving into CUDA here + pcd = ensure_tensor_pcd(frame.pointcloud, self._dev) + + if pcd.is_empty(): + return + + pts = pcd.point["positions"].to(self._dev, o3c.float32) + vox = (pts / self.config.voxel_size).floor().to(self._key_dtype) + keys_Nx3 = vox.contiguous() + + if self.config.carve_columns: + self._carve_and_insert(keys_Nx3) + else: + self._voxel_hashmap.activate(keys_Nx3) + + self.get_global_pointcloud.invalidate_cache(self) # type: ignore[attr-defined] + self.get_global_pointcloud2.invalidate_cache(self) # type: ignore[attr-defined] + self.get_global_occupancygrid.invalidate_cache(self) # type: ignore[attr-defined] + + def _carve_and_insert(self, new_keys: o3c.Tensor) -> None: + """Column carving: remove all existing voxels sharing (X,Y) with new_keys, then insert.""" + if new_keys.shape[0] == 0: + self._voxel_hashmap.activate(new_keys) + return + + # Extract (X, Y) from incoming keys + xy_keys = new_keys[:, :2].contiguous() + + # Build temp hashmap for O(1) (X,Y) membership lookup + xy_hashmap = o3c.HashMap( + init_capacity=xy_keys.shape[0], + key_dtype=self._key_dtype, + key_element_shape=o3c.SizeVector([2]), + value_dtypes=[o3c.uint8], + value_element_shapes=[o3c.SizeVector([1])], + device=self._dev, + ) + dummy_vals = o3c.Tensor.zeros((xy_keys.shape[0], 1), o3c.uint8, self._dev) + xy_hashmap.insert(xy_keys, dummy_vals) + + # Get existing keys from main hashmap + active_indices = self._voxel_hashmap.active_buf_indices() + if active_indices.shape[0] == 0: + self._voxel_hashmap.activate(new_keys) + return + + existing_keys = self._voxel_hashmap.key_tensor()[active_indices] + existing_xy = existing_keys[:, :2].contiguous() + + # Find which existing keys have (X,Y) in the incoming set + _, found_mask = xy_hashmap.find(existing_xy) + + # Erase those columns + to_erase = existing_keys[found_mask] + if to_erase.shape[0] > 0: + self._voxel_hashmap.erase(to_erase) + + # Insert new keys + self._voxel_hashmap.activate(new_keys) + + # returns PointCloud2 message (ready to send off down the pipeline) + @simple_mcache + def get_global_pointcloud2(self) -> PointCloud2: + return PointCloud2( + # we are potentially moving out of CUDA here + ensure_legacy_pcd(self.get_global_pointcloud()), + frame_id=self.frame_id, + ts=time.time(), + ) + + @simple_mcache + def get_global_pointcloud(self) -> o3d.t.geometry.PointCloud: + voxel_coords, _ = self.vbg.voxel_coordinates_and_flattened_indices() + pts = voxel_coords + (self.config.voxel_size * 0.5) + out = o3d.t.geometry.PointCloud(device=self._dev) + out.point["positions"] = pts + return out + + @simple_mcache + def get_global_occupancygrid(self) -> OccupancyGrid: + return height_cost_occupancy( + self.get_global_pointcloud2(), + resolution=self.config.costmap.resolution, + can_pass_under=self.config.costmap.can_pass_under, + can_climb=self.config.costmap.can_climb, + ) + + +# @timed() +def splice_cylinder( + map_pcd: o3d.geometry.PointCloud, + patch_pcd: o3d.geometry.PointCloud, + axis: int = 2, + shrink: float = 0.95, +) -> o3d.geometry.PointCloud: + center = patch_pcd.get_center() + patch_pts = np.asarray(patch_pcd.points) + + # Axes perpendicular to cylinder + axes = [0, 1, 2] + axes.remove(axis) + + planar_dists = np.linalg.norm(patch_pts[:, axes] - center[axes], axis=1) + radius = planar_dists.max() * shrink + + axis_min = (patch_pts[:, axis].min() - center[axis]) * shrink + center[axis] + axis_max = (patch_pts[:, axis].max() - center[axis]) * shrink + center[axis] + + map_pts = np.asarray(map_pcd.points) + planar_dists_map = np.linalg.norm(map_pts[:, axes] - center[axes], axis=1) + + victims = np.nonzero( + (planar_dists_map < radius) + & (map_pts[:, axis] >= axis_min) + & (map_pts[:, axis] <= axis_max) + )[0] + + survivors = map_pcd.select_by_index(victims, invert=True) + return survivors + patch_pcd + + +def ensure_tensor_pcd( + pcd_any: o3d.t.geometry.PointCloud | o3d.geometry.PointCloud, + device: o3c.Device, +) -> o3d.t.geometry.PointCloud: + """Convert legacy / cuda.pybind point clouds into o3d.t.geometry.PointCloud on `device`.""" + + if isinstance(pcd_any, o3d.t.geometry.PointCloud): + return pcd_any.to(device) + + assert isinstance(pcd_any, o3d.geometry.PointCloud), ( + "Input must be a legacy PointCloud or a tensor PointCloud" + ) + + # Legacy CPU point cloud -> tensor + if isinstance(pcd_any, o3d.geometry.PointCloud): + return o3d.t.geometry.PointCloud.from_legacy(pcd_any, o3c.float32, device) + + pts = np.asarray(pcd_any.points, dtype=np.float32) + pcd_t = o3d.t.geometry.PointCloud(device=device) + pcd_t.point["positions"] = o3c.Tensor(pts, o3c.float32, device) + return pcd_t + + +def ensure_legacy_pcd( + pcd_any: o3d.t.geometry.PointCloud | o3d.geometry.PointCloud, +) -> o3d.geometry.PointCloud: + if isinstance(pcd_any, o3d.geometry.PointCloud): + return pcd_any + + assert isinstance(pcd_any, o3d.t.geometry.PointCloud), ( + "Input must be a legacy PointCloud or a tensor PointCloud" + ) + + return pcd_any.to_legacy() + + +voxel_mapper = VoxelGridMapper.blueprint diff --git a/dimos/msgs/nav_msgs/test_OccupancyGrid.py b/dimos/msgs/nav_msgs/test_OccupancyGrid.py index 247aa9cc7f..092f28223e 100644 --- a/dimos/msgs/nav_msgs/test_OccupancyGrid.py +++ b/dimos/msgs/nav_msgs/test_OccupancyGrid.py @@ -27,7 +27,7 @@ from dimos.msgs.nav_msgs import OccupancyGrid from dimos.msgs.sensor_msgs import PointCloud2 from dimos.protocol.pubsub.lcmpubsub import LCM, Topic -from dimos.utils.testing import get_data +from dimos.utils.data import get_data def test_empty_grid() -> None: diff --git a/dimos/perception/object_tracker.py b/dimos/perception/object_tracker.py index 1aaa3f709b..eefd14e414 100644 --- a/dimos/perception/object_tracker.py +++ b/dimos/perception/object_tracker.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from dataclasses import dataclass import threading import time import cv2 -from dimos_lcm.sensor_msgs import CameraInfo # type: ignore[import-untyped] # Import LCM messages from dimos_lcm.vision_msgs import ( # type: ignore[import-untyped] @@ -27,10 +27,14 @@ import numpy as np from reactivex.disposable import Disposable -from dimos.core import In, Module, Out, rpc +from dimos.core import In, Module, ModuleConfig, Out, rpc from dimos.manipulation.visual_servoing.utils import visualize_detections_3d from dimos.msgs.geometry_msgs import Pose, Quaternion, Transform, Vector3 -from dimos.msgs.sensor_msgs import Image, ImageFormat +from dimos.msgs.sensor_msgs import ( + CameraInfo, # type: ignore[import-untyped] + Image, + ImageFormat, +) from dimos.msgs.std_msgs import Header from dimos.msgs.vision_msgs import Detection2DArray, Detection3DArray from dimos.protocol.tf import TF @@ -45,7 +49,12 @@ logger = setup_logger() -class ObjectTracking(Module): +@dataclass +class ObjectTrackingConfig(ModuleConfig): + frame_id: str = "camera_link" + + +class ObjectTracking(Module[ObjectTrackingConfig]): """Module for object tracking with LCM input/output.""" # LCM inputs @@ -58,11 +67,11 @@ class ObjectTracking(Module): detection3darray: Out[Detection3DArray] tracked_overlay: Out[Image] # Visualization output + default_config = ObjectTrackingConfig + config: ObjectTrackingConfig + def __init__( - self, - reid_threshold: int = 10, - reid_fail_tolerance: int = 5, - frame_id: str = "camera_link", + self, reid_threshold: int = 10, reid_fail_tolerance: int = 5, **kwargs: object ) -> None: """ Initialize an object tracking module using OpenCV's CSRT tracker with ORB re-ID. @@ -73,15 +82,13 @@ def __init__( reid_threshold: Minimum good feature matches needed to confirm re-ID. reid_fail_tolerance: Number of consecutive frames Re-ID can fail before tracking is stopped. - frame_id: TF frame ID for the camera (default: "camera_link") """ # Call parent Module init - super().__init__() + super().__init__(**kwargs) self.camera_intrinsics = None self.reid_threshold = reid_threshold self.reid_fail_tolerance = reid_fail_tolerance - self.frame_id = frame_id self.tracker = None self.tracking_bbox = None # Stores (x, y, w, h) for tracker initialization diff --git a/dimos/perception/object_tracker_2d.py b/dimos/perception/object_tracker_2d.py index 02d7166861..878685ff74 100644 --- a/dimos/perception/object_tracker_2d.py +++ b/dimos/perception/object_tracker_2d.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from dataclasses import dataclass import logging import threading import time @@ -30,7 +31,7 @@ import numpy as np from reactivex.disposable import Disposable -from dimos.core import In, Module, Out, rpc +from dimos.core import In, Module, ModuleConfig, Out, rpc from dimos.msgs.sensor_msgs import Image, ImageFormat from dimos.msgs.std_msgs import Header from dimos.msgs.vision_msgs import Detection2DArray @@ -39,7 +40,12 @@ logger = setup_logger(level=logging.INFO) -class ObjectTracker2D(Module): +@dataclass +class ObjectTracker2DConfig(ModuleConfig): + frame_id: str = "camera_link" + + +class ObjectTracker2D(Module[ObjectTracker2DConfig]): """Pure 2D object tracking module using OpenCV's CSRT tracker.""" color_image: In[Image] @@ -47,19 +53,12 @@ class ObjectTracker2D(Module): detection2darray: Out[Detection2DArray] tracked_overlay: Out[Image] # Visualization output - def __init__( - self, - frame_id: str = "camera_link", - ) -> None: - """ - Initialize 2D object tracking module using OpenCV's CSRT tracker. - - Args: - frame_id: TF frame ID for the camera (default: "camera_link") - """ - super().__init__() + default_config = ObjectTracker2DConfig + config: ObjectTracker2DConfig - self.frame_id = frame_id + def __init__(self, **kwargs: object) -> None: + """Initialize 2D object tracking module using OpenCV's CSRT tracker.""" + super().__init__(**kwargs) # Tracker state self.tracker = None diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 04cdf435f4..432e89e190 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -17,6 +17,7 @@ # The blueprints are defined as import strings so as not to trigger unnecessary imports. all_blueprints = { "unitree-go2": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard", + "unitree-go2-newmapper": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:newmapper", "unitree-go2-basic": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:basic", "unitree-go2-test-nav": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:test_new_nav", "unitree-go2-shm": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard_with_shm", diff --git a/dimos/robot/drone/mavlink_connection.py b/dimos/robot/drone/mavlink_connection.py index 92bbcc0ec8..d9bb6d470f 100644 --- a/dimos/robot/drone/mavlink_connection.py +++ b/dimos/robot/drone/mavlink_connection.py @@ -20,7 +20,7 @@ import time from typing import Any -from pymavlink import mavutil # type: ignore[import-untyped] +from pymavlink import mavutil # type: ignore[import-untyped,import-not-found] from reactivex import Subject from dimos.msgs.geometry_msgs import PoseStamped, Quaternion, Twist, Vector3 diff --git a/dimos/robot/unitree/connection/connection.py b/dimos/robot/unitree/connection/connection.py index c8360cbfd2..6052b36cf1 100644 --- a/dimos/robot/unitree/connection/connection.py +++ b/dimos/robot/unitree/connection/connection.py @@ -350,7 +350,7 @@ def switch_video_channel_off() -> None: return subject.pipe(ops.finally_action(stop)) - def get_video_stream(self, fps: int = 30) -> Observable[VideoMessage]: + def get_video_stream(self, fps: int = 30) -> Observable[Image]: """Get the video stream from the robot's camera. Implements the AbstractRobot interface method. diff --git a/dimos/robot/unitree/connection/go2.py b/dimos/robot/unitree/connection/go2.py index 6717d4dd48..5fda73fba7 100644 --- a/dimos/robot/unitree/connection/go2.py +++ b/dimos/robot/unitree/connection/go2.py @@ -17,7 +17,6 @@ import time from typing import Any, Protocol -from dimos_lcm.sensor_msgs import CameraInfo # type: ignore[import-untyped] from reactivex.disposable import Disposable from reactivex.observable import Observable @@ -31,7 +30,7 @@ Twist, Vector3, ) -from dimos.msgs.sensor_msgs import Image, PointCloud2 +from dimos.msgs.sensor_msgs import CameraInfo, Image, PointCloud2 from dimos.msgs.std_msgs import Header from dimos.robot.unitree.connection.connection import UnitreeWebRTCConnection from dimos.robot.unitree_webrtc.type.lidar import LidarMessage @@ -61,32 +60,18 @@ def _camera_info_static() -> CameraInfo: fx, fy, cx, cy = (819.553492, 820.646595, 625.284099, 336.808987) width, height = (1280, 720) - # Camera matrix K (3x3) - K = [fx, 0, cx, 0, fy, cy, 0, 0, 1] - - # No distortion coefficients for now - D = [0.0, 0.0, 0.0, 0.0, 0.0] - - # Identity rotation matrix - R = [1, 0, 0, 0, 1, 0, 0, 0, 1] - - # Projection matrix P (3x4) - P = [fx, 0, cx, 0, 0, fy, cy, 0, 0, 0, 1, 0] - - base_msg = { - "D_length": len(D), - "height": height, - "width": width, - "distortion_model": "plumb_bob", - "D": D, - "K": K, - "R": R, - "P": P, - "binning_x": 0, - "binning_y": 0, - } - - return CameraInfo(**base_msg, header=Header("camera_optical")) + return CameraInfo( + frame_id="camera_optical", + height=height, + width=width, + distortion_model="plumb_bob", + D=[0.0, 0.0, 0.0, 0.0, 0.0], + K=[fx, 0.0, cx, 0.0, fy, cy, 0.0, 0.0, 1.0], + R=[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0], + P=[fx, 0.0, cx, 0.0, 0.0, fy, cy, 0.0, 0.0, 0.0, 1.0, 0.0], + binning_x=0, + binning_y=0, + ) class ReplayConnection(UnitreeWebRTCConnection): diff --git a/dimos/robot/unitree_webrtc/modular/connection_module.py b/dimos/robot/unitree_webrtc/modular/connection_module.py index bb53f5ec92..fb4e6ac570 100644 --- a/dimos/robot/unitree_webrtc/modular/connection_module.py +++ b/dimos/robot/unitree_webrtc/modular/connection_module.py @@ -37,6 +37,7 @@ from dimos.robot.unitree.connection.connection import UnitreeWebRTCConnection from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.utils.data import get_data +from dimos.utils.decorators import simple_mcache from dimos.utils.logging_config import setup_logger from dimos.utils.testing import TimedSensorReplay, TimedSensorStorage @@ -86,20 +87,20 @@ def standup(self) -> None: def liedown(self) -> None: print("liedown suppressed") - @functools.cache + @simple_mcache def lidar_stream(self): # type: ignore[no-untyped-def] print("lidar stream start") lidar_store = TimedSensorReplay(f"{self.dir_name}/lidar") # type: ignore[var-annotated] return lidar_store.stream(**self.replay_config) # type: ignore[arg-type] - @functools.cache + @simple_mcache def odom_stream(self): # type: ignore[no-untyped-def] print("odom stream start") odom_store = TimedSensorReplay(f"{self.dir_name}/odom") # type: ignore[var-annotated] return odom_store.stream(**self.replay_config) # type: ignore[arg-type] # we don't have raw video stream in the data set - @functools.cache + @simple_mcache def video_stream(self): # type: ignore[no-untyped-def] print("video stream start") video_store = TimedSensorReplay(f"{self.dir_name}/video") # type: ignore[var-annotated] diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index c5366cb460..6f27909de3 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import platform + from dimos_lcm.sensor_msgs import CameraInfo # type: ignore[import-untyped] from dimos.agents2.agent import llm_agent @@ -26,6 +28,7 @@ from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core.blueprints import autoconnect from dimos.core.transport import JpegLcmTransport, JpegShmTransport, LCMTransport, pSHMTransport +from dimos.mapping.voxels import voxel_mapper from dimos.msgs.geometry_msgs import PoseStamped from dimos.msgs.sensor_msgs import Image from dimos.navigation.bt_navigator.navigator import ( @@ -50,35 +53,50 @@ from dimos.utils.monitoring import utilization from dimos.web.websocket_vis.websocket_vis_module import websocket_vis -basic = ( - autoconnect( - go2_connection(), - mapper(voxel_size=0.5, global_publish_interval=2.5), - astar_planner(), - holonomic_local_planner(), - behavior_tree_navigator(), - wavefront_frontier_explorer(), - websocket_vis(), - foxglove_bridge( - shm_channels=[ - "/go2/color_image#sensor_msgs.Image", - ] +# Mac has some issue with high bandwidth UDP +# +# so we use pSHMTransport for color_image +# (Could we adress this on the system config layer? Is this fixable on mac?) +mac = autoconnect( + foxglove_bridge( + shm_channels=[ + "/color_image#sensor_msgs.Image", + ] + ), +).transports( + { + ("color_image", Image): pSHMTransport( + "color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE ), - ) - .global_config(n_dask_workers=4, robot_model="unitree_go2") - .transports( - # These are kept the same so that we don't have to change foxglove configs. - # Although we probably should. - { - ("color_image", Image): pSHMTransport( - "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE - ), - ("camera_pose", PoseStamped): LCMTransport("/go2/camera_pose", PoseStamped), - ("camera_info", CameraInfo): LCMTransport("/go2/camera_info", CameraInfo), - } - ) + } ) + +linux = autoconnect(foxglove_bridge()) + +basic = autoconnect( + linux if platform.system() == "Linux" else mac, + websocket_vis(), + go2_connection(), + mapper(voxel_size=0.5, global_publish_interval=2.5), + astar_planner(), + holonomic_local_planner(), + behavior_tree_navigator(), + wavefront_frontier_explorer(), +).global_config(n_dask_workers=4, robot_model="unitree_go2") + + +newmapper = autoconnect( + linux if platform.system() == "Linux" else mac, + go2_connection(), + # these values are defaults but leaving here for clarity + # + # no publish interval - publishes immediately on each lidar frame + # voxel size same as input + voxel_mapper(voxel_size=0.05, publish_interval=0), +).global_config(n_dask_workers=4, robot_model="unitree_go2") + + standard = autoconnect( basic, spatial_memory(), @@ -110,19 +128,19 @@ standard_with_jpeglcm = standard.transports( { - ("color_image", Image): JpegLcmTransport("/go2/color_image", Image), + ("color_image", Image): JpegLcmTransport("/color_image", Image), } ) standard_with_jpegshm = autoconnect( standard.transports( { - ("color_image", Image): JpegShmTransport("/go2/color_image", quality=75), + ("color_image", Image): JpegShmTransport("/color_image", quality=75), } ), foxglove_bridge( jpeg_shm_channels=[ - "/go2/color_image#sensor_msgs.Image", + "/color_image#sensor_msgs.Image", ] ), ) diff --git a/dimos/utils/decorators/__init__.py b/dimos/utils/decorators/__init__.py index ee17260c20..79623922a0 100644 --- a/dimos/utils/decorators/__init__.py +++ b/dimos/utils/decorators/__init__.py @@ -1,12 +1,14 @@ """Decorators and accumulators for rate limiting and other utilities.""" from .accumulators import Accumulator, LatestAccumulator, RollingAverageAccumulator -from .decorators import limit, retry +from .decorators import CachedMethod, limit, retry, simple_mcache __all__ = [ "Accumulator", + "CachedMethod", "LatestAccumulator", "RollingAverageAccumulator", "limit", "retry", + "simple_mcache", ] diff --git a/dimos/utils/decorators/decorators.py b/dimos/utils/decorators/decorators.py index f41b21c598..5bf1b561b6 100644 --- a/dimos/utils/decorators/decorators.py +++ b/dimos/utils/decorators/decorators.py @@ -16,9 +16,20 @@ from functools import wraps import threading import time +from typing import Any, Protocol, TypeVar from .accumulators import Accumulator, LatestAccumulator +_CacheResult_co = TypeVar("_CacheResult_co", covariant=True) +_CacheReturn = TypeVar("_CacheReturn") + + +class CachedMethod(Protocol[_CacheResult_co]): + """Protocol for methods decorated with simple_mcache.""" + + def __call__(self) -> _CacheResult_co: ... + def invalidate_cache(self, instance: Any) -> None: ... + def limit(max_freq: float, accumulator: Accumulator | None = None): # type: ignore[no-untyped-def, type-arg] """ @@ -127,8 +138,6 @@ def simple_mcache(method: Callable) -> Callable: # type: ignore[type-arg] def getter(self): # type: ignore[no-untyped-def] # Get or create the lock for this instance if not hasattr(self, lock_name): - # This is a one-time operation, race condition here is acceptable - # as worst case we create multiple locks but only one gets stored setattr(self, lock_name, threading.Lock()) lock = getattr(self, lock_name) @@ -142,6 +151,18 @@ def getter(self): # type: ignore[no-untyped-def] setattr(self, attr_name, method(self)) return getattr(self, attr_name) + def invalidate_cache(instance: Any) -> None: + """Clear the cached value for the given instance.""" + if not hasattr(instance, lock_name): + return + + lock = getattr(instance, lock_name) + with lock: + if hasattr(instance, attr_name): + delattr(instance, attr_name) + + getter.invalidate_cache = invalidate_cache # type: ignore[attr-defined] + return getter diff --git a/dimos/utils/decorators/test_decorators.py b/dimos/utils/decorators/test_decorators.py index fdad670454..f48dfa7839 100644 --- a/dimos/utils/decorators/test_decorators.py +++ b/dimos/utils/decorators/test_decorators.py @@ -16,7 +16,7 @@ import pytest -from dimos.utils.decorators import RollingAverageAccumulator, limit, retry +from dimos.utils.decorators import RollingAverageAccumulator, limit, retry, simple_mcache def test_limit() -> None: @@ -260,3 +260,59 @@ def static_method(attempts_list, fail_times: int = 1) -> str: result = obj2.instance_method() assert result == "instance success with value 100" assert len(obj2.instance_attempts) == 3 + + +def test_simple_mcache() -> None: + """Test simple_mcache decorator caches and can be invalidated.""" + call_count = 0 + + class Counter: + @simple_mcache + def expensive(self) -> int: + nonlocal call_count + call_count += 1 + return call_count + + obj = Counter() + + # First call computes + assert obj.expensive() == 1 + assert call_count == 1 + + # Second call returns cached + assert obj.expensive() == 1 + assert call_count == 1 + + # Invalidate and call again + obj.expensive.invalidate_cache(obj) + assert obj.expensive() == 2 + assert call_count == 2 + + # Cached again + assert obj.expensive() == 2 + assert call_count == 2 + + +def test_simple_mcache_separate_instances() -> None: + """Test that simple_mcache caches per instance.""" + call_count = 0 + + class Counter: + @simple_mcache + def expensive(self) -> int: + nonlocal call_count + call_count += 1 + return call_count + + obj1 = Counter() + obj2 = Counter() + + assert obj1.expensive() == 1 + assert obj2.expensive() == 2 # separate cache + assert obj1.expensive() == 1 # still cached + assert call_count == 2 + + # Invalidating one doesn't affect the other + obj1.expensive.invalidate_cache(obj1) + assert obj1.expensive() == 3 + assert obj2.expensive() == 2 # still cached diff --git a/dimos/utils/metrics.py b/dimos/utils/metrics.py new file mode 100644 index 0000000000..733399952b --- /dev/null +++ b/dimos/utils/metrics.py @@ -0,0 +1,52 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable +import functools +import time +from typing import Any, TypeVar, cast + +from dimos_lcm.std_msgs import Float32 # type: ignore[import-untyped] + +from dimos.core import DimosCluster, In, LCMTransport, Module, Out, Transport, rpc + +F = TypeVar("F", bound=Callable[..., Any]) + + +def timed( + transport: Callable[[F], Transport[Float32]] | Transport[Float32] | None = None, +) -> Callable[[F], F]: + def timed_decorator(func: F) -> F: + t: Transport[Float32] + if transport is None: + t = LCMTransport(f"/metrics/{func.__name__}", Float32) + elif callable(transport): + t = transport(func) + else: + t = transport + + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + start = time.perf_counter() + result = func(*args, **kwargs) + elapsed = time.perf_counter() - start + + msg = Float32() + msg.data = elapsed * 1000 # ms + t.publish(msg) + return result + + return cast("F", wrapper) + + return timed_decorator diff --git a/dimos/utils/testing/__init__.py b/dimos/utils/testing/__init__.py new file mode 100644 index 0000000000..ffb640de39 --- /dev/null +++ b/dimos/utils/testing/__init__.py @@ -0,0 +1,11 @@ +from dimos.utils.testing.moment import Moment, OutputMoment, SensorMoment +from dimos.utils.testing.replay import SensorReplay, TimedSensorReplay, TimedSensorStorage + +__all__ = [ + "Moment", + "OutputMoment", + "SensorMoment", + "SensorReplay", + "TimedSensorReplay", + "TimedSensorStorage", +] diff --git a/dimos/utils/testing/moment.py b/dimos/utils/testing/moment.py new file mode 100644 index 0000000000..2389256d49 --- /dev/null +++ b/dimos/utils/testing/moment.py @@ -0,0 +1,99 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Generic, TypeVar + +from dimos.core.resource import Resource +from dimos.utils.testing.replay import TimedSensorReplay + +if TYPE_CHECKING: + from dimos.core import Transport + +T = TypeVar("T") + + +class SensorMoment(Generic[T], Resource): + value: T | None = None + + def __init__(self, name: str, transport: Transport[T]) -> None: + self.replay: TimedSensorReplay[T] = TimedSensorReplay(name) + self.transport = transport + + def seek(self, timestamp: float) -> None: + self.value = self.replay.find_closest_seek(timestamp) + + def publish(self) -> None: + if self.value is not None: + self.transport.publish(self.value) + + def start(self) -> None: + pass + + def stop(self) -> None: + self.transport.stop() + + +class OutputMoment(Generic[T], Resource): + value: T | None = None + transport: Transport[T] + + def __init__(self, transport: Transport[T]): + self.transport = transport + + def set(self, value: T) -> None: + self.value = value + + def publish(self) -> None: + if self.value is not None: + self.transport.publish(self.value) + + def start(self) -> None: + pass + + def stop(self) -> None: + self.transport.stop() + + +class Moment(Resource): + def moments( + self, *classes: type[SensorMoment[Any]] | type[OutputMoment[Any]] + ) -> list[SensorMoment[Any] | OutputMoment[Any]]: + moments: list[SensorMoment[Any] | OutputMoment[Any]] = [] + for attr_name in dir(self): + attr_value = getattr(self, attr_name) + if isinstance(attr_value, classes): + moments.append(attr_value) # type: ignore[arg-type] + return moments + + def seekable_moments(self) -> list[SensorMoment[Any]]: + return [m for m in self.moments(SensorMoment) if isinstance(m, SensorMoment)] + + def publishable_moments(self) -> list[SensorMoment[Any] | OutputMoment[Any]]: + return self.moments(OutputMoment, SensorMoment) + + def seek(self, timestamp: float) -> None: + for moment in self.seekable_moments(): + moment.seek(timestamp) + + def publish(self) -> None: + for moment in self.publishable_moments(): + moment.publish() + + def start(self) -> None: ... + + def stop(self) -> None: + for moment in self.publishable_moments(): + moment.stop() diff --git a/dimos/utils/testing.py b/dimos/utils/testing/replay.py similarity index 100% rename from dimos/utils/testing.py rename to dimos/utils/testing/replay.py diff --git a/dimos/utils/testing/test_moment.py b/dimos/utils/testing/test_moment.py new file mode 100644 index 0000000000..3f49c11767 --- /dev/null +++ b/dimos/utils/testing/test_moment.py @@ -0,0 +1,75 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time + +from dimos.core import LCMTransport +from dimos.msgs.geometry_msgs import PoseStamped, Transform +from dimos.msgs.sensor_msgs import CameraInfo, Image, PointCloud2 +from dimos.protocol.tf import TF +from dimos.robot.unitree.connection import go2 +from dimos.utils.data import get_data +from dimos.utils.testing.moment import Moment, SensorMoment + +data_dir = get_data("unitree_go2_office_walk2") + + +class Go2Moment(Moment): + lidar: SensorMoment[PointCloud2] + video: SensorMoment[Image] + odom: SensorMoment[PoseStamped] + + def __init__(self) -> None: + self.lidar = SensorMoment(f"{data_dir}/lidar", LCMTransport("/lidar", PointCloud2)) + self.video = SensorMoment(f"{data_dir}/video", LCMTransport("/color_image", Image)) + self.odom = SensorMoment(f"{data_dir}/odom", LCMTransport("/odom", PoseStamped)) + + @property + def transforms(self) -> list[Transform]: + if self.odom.value is None: + return [] + + # we just make sure to change timestamps so that we can jump + # back and forth through time and foxglove doesn't get confused + odom = self.odom.value + odom.ts = time.time() + return go2.GO2Connection._odom_to_tf(odom) + + def publish(self) -> None: + t = TF() + t.publish(*self.transforms) + t.stop() + + camera_info = go2._camera_info_static() + camera_info.ts = time.time() + camera_info_transport: LCMTransport[CameraInfo] = LCMTransport("/camera_info", CameraInfo) + camera_info_transport.publish(camera_info) + camera_info_transport.stop() + + super().publish() + + +def test_moment_seek_and_publish() -> None: + moment = Go2Moment() + + # Seek to 5 seconds + moment.seek(5.0) + + # Check that frames were loaded + assert moment.lidar.value is not None + assert moment.video.value is not None + assert moment.odom.value is not None + + # Publish all frames + moment.publish() + moment.stop() diff --git a/dimos/utils/test_testing.py b/dimos/utils/testing/test_replay.py similarity index 89% rename from dimos/utils/test_testing.py rename to dimos/utils/testing/test_replay.py index 3684031170..ad3cb9064d 100644 --- a/dimos/utils/test_testing.py +++ b/dimos/utils/testing/test_replay.py @@ -18,13 +18,13 @@ from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.odometry import Odometry -from dimos.utils import testing from dimos.utils.data import get_data +from dimos.utils.testing import replay def test_sensor_replay() -> None: counter = 0 - for message in testing.SensorReplay(name="office_lidar").iterate(): + for message in replay.SensorReplay(name="office_lidar").iterate(): counter += 1 assert isinstance(message, dict) assert counter == 500 @@ -32,7 +32,7 @@ def test_sensor_replay() -> None: def test_sensor_replay_cast() -> None: counter = 0 - for message in testing.SensorReplay( + for message in replay.SensorReplay( name="office_lidar", autocast=LidarMessage.from_msg ).iterate(): counter += 1 @@ -42,7 +42,7 @@ def test_sensor_replay_cast() -> None: def test_timed_sensor_replay() -> None: get_data("unitree_office_walk") - odom_store = testing.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + odom_store = replay.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) itermsgs = [] for msg in odom_store.iterate(): @@ -68,7 +68,7 @@ def test_timed_sensor_replay() -> None: def test_iterate_ts_no_seek() -> None: """Test iterate_ts without seek (start_timestamp=None)""" - odom_store = testing.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + odom_store = replay.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) # Test without seek ts_msgs = [] @@ -86,7 +86,7 @@ def test_iterate_ts_no_seek() -> None: def test_iterate_ts_with_from_timestamp() -> None: """Test iterate_ts with from_timestamp (absolute timestamp)""" - odom_store = testing.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + odom_store = replay.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) # First get all messages to find a good seek point all_msgs = [] @@ -114,7 +114,7 @@ def test_iterate_ts_with_from_timestamp() -> None: def test_iterate_ts_with_relative_seek() -> None: """Test iterate_ts with seek (relative seconds after first timestamp)""" - odom_store = testing.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + odom_store = replay.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) # Get first few messages to understand timing all_msgs = [] @@ -143,7 +143,7 @@ def test_iterate_ts_with_relative_seek() -> None: def test_stream_with_seek() -> None: """Test stream method with seek parameters""" - odom_store = testing.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + odom_store = replay.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) # Test stream with relative seek msgs_with_seek = [] @@ -169,7 +169,7 @@ def test_stream_with_seek() -> None: def test_duration_with_loop() -> None: """Test duration parameter with looping in TimedSensorReplay""" - odom_store = testing.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + odom_store = replay.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) # Collect timestamps from a small duration window collected_ts = [] @@ -204,7 +204,7 @@ def test_first_methods() -> None: """Test first() and first_timestamp() methods""" # Test SensorReplay.first() - lidar_replay = testing.SensorReplay("office_lidar", autocast=LidarMessage.from_msg) + lidar_replay = replay.SensorReplay("office_lidar", autocast=LidarMessage.from_msg) print("first file", lidar_replay.files[0]) # Verify the first file ends with 000.pickle using regex @@ -224,7 +224,7 @@ def test_first_methods() -> None: assert abs(first_msg.ts - first_from_iterate.ts) < 1.0 # Within 1 second tolerance # Test TimedSensorReplay.first_timestamp() - odom_store = testing.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + odom_store = replay.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) first_ts = odom_store.first_timestamp() assert first_ts is not None assert isinstance(first_ts, float) @@ -241,7 +241,7 @@ def test_first_methods() -> None: def test_find_closest() -> None: """Test find_closest method in TimedSensorReplay""" - odom_store = testing.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + odom_store = replay.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) # Get some reference timestamps timestamps = []