diff --git a/assets/foxglove_unitree_lcm_dashboard.json b/assets/foxglove_unitree_lcm_dashboard.json index 932ee65fd5..df4e2715bc 100644 --- a/assets/foxglove_unitree_lcm_dashboard.json +++ b/assets/foxglove_unitree_lcm_dashboard.json @@ -21,18 +21,42 @@ ], "order": 1, "size": 30, - "divisions": 30 + "divisions": 30, + "color": "#248eff57" + }, + "ff758451-8c06-4419-a995-e93c825eb8be": { + "visible": true, + "frameLocked": true, + "label": "Grid", + "instanceId": "ff758451-8c06-4419-a995-e93c825eb8be", + "layerId": "foxglove.Grid", + "frameId": "base_link", + "size": 3, + "divisions": 3, + "lineWidth": 1.5, + "color": "#24fff4ff", + "position": [ + 0, + 0, + 0 + ], + "rotation": [ + 0, + 0, + 0 + ], + "order": 2 } }, "cameraState": { - "perspective": true, - "distance": 22.16066481982733, - "phi": 34.816360162458444, - "thetaOffset": 110.38709006815255, + "perspective": false, + "distance": 25.847108697365048, + "phi": 32.532756465990374, + "thetaOffset": -179.288640038416, "targetOffset": [ - 0.6543165797799305, - -5.069343424147507, - 5.603018248159094e-16 + 1.620731759058286, + -2.9069622235988986, + -0.09942375087215619 ], "target": [ 0, @@ -53,7 +77,10 @@ "scene": { "enableStats": true, "ignoreColladaUpAxis": false, - "syncCamera": false + "syncCamera": false, + "transforms": { + "visible": true + } }, "transforms": {}, "topics": { @@ -63,12 +90,13 @@ "colorField": "z", "colorMode": "colormap", "colorMap": "turbo", - "pointShape": "square", + "pointShape": "circle", "pointSize": 10, "explicitAlpha": 1, - "minValue": -0.1, "decayTime": 0, - "cubeSize": 0.06 + "cubeSize": 0.1, + "minValue": -0.3, + "cubeOutline": false }, "/odom": { "visible": true, @@ -83,16 +111,67 @@ "colorMode": "colormap", "colorMap": "turbo", "pointSize": 10, - "minValue": -0.1, "decayTime": 0, - "pointShape": "square", + "pointShape": "cube", "cubeOutline": false, - "cubeSize": 0.05, + "cubeSize": 0.08, "gradient": [ "#06011dff", "#d1e2e2ff" ], - "stixelsEnabled": false + "stixelsEnabled": false, + "explicitAlpha": 1, + "minValue": -0.2 + }, + "/global_path": { + "visible": true, + "type": "line", + "arrowScale": [ + 1, + 0.15, + 0.15 + ], + "lineWidth": 0.132, + "gradient": [ + "#6bff7cff", + "#0081ffff" + ] + }, + "/global_target": { + "visible": true + }, + "/pt": { + "visible": false + }, + "/global_costmap": { + "visible": true, + "maxColor": "#8d3939ff", + "frameLocked": false, + "unknownColor": "#80808000", + "colorMode": "custom", + "alpha": 0.517, + "minColor": "#1e00ff00" + }, + "/global_gradient": { + "visible": true, + "maxColor": "#690066ff", + "unknownColor": "#30b89a00", + "minColor": "#00000000", + "colorMode": "custom", + "alpha": 0.3662, + "frameLocked": false, + "drawBehind": false + }, + "/global_cost_field": { + "visible": false, + "maxColor": "#ff0000ff", + "unknownColor": "#80808000" + }, + "/global_passable": { + "visible": false, + "maxColor": "#ffffff00", + "minColor": "#ff0000ff", + "unknownColor": "#80808000" } }, "publish": { @@ -105,7 +184,8 @@ "poseEstimateThetaDeviation": 0.26179939 }, "imageMode": {}, - "foxglovePanelTitle": "test" + "foxglovePanelTitle": "test", + "followTf": "world" }, "Image!3mnp456": { "cameraState": { @@ -155,14 +235,6 @@ }, "foxglovePanelTitle": "/video" }, - "RawMessages!os6rgs": { - "diffEnabled": true, - "diffMethod": "previous message", - "diffTopicPath": "/lidar", - "showFullMessageForDiff": false, - "topicPath": "/odom", - "fontSize": 12 - }, "Plot!a1gj37": { "paths": [ { @@ -206,16 +278,11 @@ "first": "3D!18i6zy7", "second": { "first": "Image!3mnp456", - "second": { - "first": "RawMessages!os6rgs", - "second": "Plot!a1gj37", - "direction": "row", - "splitPercentage": 21.133036282622545 - }, + "second": "Plot!a1gj37", "direction": "column", - "splitPercentage": 45.83732057416268 + "splitPercentage": 28.030303030303028 }, "direction": "row", - "splitPercentage": 61.01509144891709 + "splitPercentage": 69.43271928754422 } } diff --git a/data/.lfs/lcm_msgs.tar.gz b/data/.lfs/lcm_msgs.tar.gz new file mode 100644 index 0000000000..2b2f28c252 --- /dev/null +++ b/data/.lfs/lcm_msgs.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:245395d0c3e200fcfcea8de5de217f645362b145b200c81abc3862e0afc1aa7e +size 327201 diff --git a/dimos/msgs/nav_msgs/OccupancyGrid.py b/dimos/msgs/nav_msgs/OccupancyGrid.py new file mode 100644 index 0000000000..e66da51e89 --- /dev/null +++ b/dimos/msgs/nav_msgs/OccupancyGrid.py @@ -0,0 +1,623 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import math +from enum import IntEnum +from typing import TYPE_CHECKING, BinaryIO, Optional + +import numpy as np +from dimos_lcm.nav_msgs import MapMetaData +from dimos_lcm.nav_msgs import OccupancyGrid as LCMOccupancyGrid +from scipy import ndimage + +from dimos.msgs.geometry_msgs import Pose, Vector3, VectorLike +from dimos.types.timestamped import Timestamped + +if TYPE_CHECKING: + from dimos.msgs.sensor_msgs import PointCloud2 + + +class CostValues(IntEnum): + """Standard cost values for occupancy grid cells. + + These values follow the ROS nav_msgs/OccupancyGrid convention: + - 0: Free space + - 1-99: Occupied space with varying cost levels + - 100: Lethal obstacle (definitely occupied) + - -1: Unknown space + """ + + UNKNOWN = -1 # Unknown space + FREE = 0 # Free space + OCCUPIED = 100 # Occupied/lethal space + + +class OccupancyGrid(Timestamped): + """ + Convenience wrapper for nav_msgs/OccupancyGrid with numpy array support. + """ + + msg_name = "nav_msgs.OccupancyGrid" + + # Attributes + ts: float + frame_id: str + info: MapMetaData + grid: np.ndarray + + def __init__( + self, + grid: Optional[np.ndarray] = None, + width: Optional[int] = None, + height: Optional[int] = None, + resolution: float = 0.05, + origin: Optional[Pose] = None, + frame_id: str = "world", + ts: float = 0.0, + ) -> None: + """Initialize OccupancyGrid. + + Args: + grid: 2D numpy array of int8 values (height x width) + width: Width in cells (used if grid is None) + height: Height in cells (used if grid is None) + resolution: Grid resolution in meters/cell + origin: Origin pose of the grid + frame_id: Reference frame + ts: Timestamp (defaults to current time if 0) + """ + import time + + self.frame_id = frame_id + self.ts = ts if ts != 0 else time.time() + + if grid is not None: + # Initialize from numpy array + if grid.ndim != 2: + raise ValueError("Grid must be a 2D array") + height, width = grid.shape + self.info = MapMetaData( + map_load_time=self._to_lcm_time(), + resolution=resolution, + width=width, + height=height, + origin=origin or Pose(), + ) + self.grid = grid.astype(np.int8) + elif width is not None and height is not None: + # Initialize with dimensions + self.info = MapMetaData( + map_load_time=self._to_lcm_time(), + resolution=resolution, + width=width, + height=height, + origin=origin or Pose(), + ) + self.grid = np.full((height, width), -1, dtype=np.int8) + else: + # Initialize empty + self.info = MapMetaData(map_load_time=self._to_lcm_time()) + self.grid = np.array([], dtype=np.int8) + + def _to_lcm_time(self): + """Convert timestamp to LCM Time.""" + from dimos_lcm.std_msgs import Time as LCMTime + + s = int(self.ts) + return LCMTime(sec=s, nsec=int((self.ts - s) * 1_000_000_000)) + + @property + def width(self) -> int: + """Width of the grid in cells.""" + return self.info.width + + @property + def height(self) -> int: + """Height of the grid in cells.""" + return self.info.height + + @property + def resolution(self) -> float: + """Grid resolution in meters/cell.""" + return self.info.resolution + + @property + def origin(self) -> Pose: + """Origin pose of the grid.""" + return self.info.origin + + @property + def total_cells(self) -> int: + """Total number of cells in the grid.""" + return self.width * self.height + + @property + def occupied_cells(self) -> int: + """Number of occupied cells (value >= 1).""" + return int(np.sum(self.grid >= 1)) + + @property + def free_cells(self) -> int: + """Number of free cells (value == 0).""" + return int(np.sum(self.grid == 0)) + + @property + def unknown_cells(self) -> int: + """Number of unknown cells (value == -1).""" + return int(np.sum(self.grid == -1)) + + @property + def occupied_percent(self) -> float: + """Percentage of cells that are occupied.""" + return (self.occupied_cells / self.total_cells * 100) if self.total_cells > 0 else 0.0 + + @property + def free_percent(self) -> float: + """Percentage of cells that are free.""" + return (self.free_cells / self.total_cells * 100) if self.total_cells > 0 else 0.0 + + @property + def unknown_percent(self) -> float: + """Percentage of cells that are unknown.""" + return (self.unknown_cells / self.total_cells * 100) if self.total_cells > 0 else 0.0 + + def inflate(self, radius: float, cost_scaling_factor: float = 0.0) -> "OccupancyGrid": + """Inflate obstacles by a given radius (vectorized). + Args: + radius: Inflation radius in meters + cost_scaling_factor: Factor for decay (0.0 = no decay, binary inflation) + Returns: + New OccupancyGrid with inflated obstacles + """ + # Convert radius to grid cells + cell_radius = int(np.ceil(radius / self.resolution)) + + # Get grid as numpy array + grid_array = self.grid + + # Create square kernel for binary inflation + kernel_size = 2 * cell_radius + 1 + kernel = np.ones((kernel_size, kernel_size), dtype=np.uint8) + + # Find occupied cells + occupied_mask = grid_array >= CostValues.OCCUPIED + + if cost_scaling_factor == 0.0: + # Binary inflation + inflated = ndimage.binary_dilation(occupied_mask, structure=kernel) + result_grid = grid_array.copy() + result_grid[inflated] = CostValues.OCCUPIED + else: + # Distance-based inflation with decay + # Create distance transform from occupied cells + distance_field = ndimage.distance_transform_edt(~occupied_mask) + + # Apply exponential decay based on distance + cost_field = CostValues.OCCUPIED * np.exp(-cost_scaling_factor * distance_field) + + # Combine with original grid, keeping higher values + result_grid = np.maximum(grid_array, cost_field).astype(np.int8) + + # Ensure occupied cells remain at max value + result_grid[occupied_mask] = CostValues.OCCUPIED + + # Create new OccupancyGrid with inflated data using numpy constructor + return OccupancyGrid( + grid=result_grid, + resolution=self.resolution, + origin=self.origin, + frame_id=self.frame_id, + ts=self.ts, + ) + + def world_to_grid(self, point: VectorLike) -> Vector3: + """Convert world coordinates to grid coordinates. + + Args: + point: A vector-like object containing X,Y coordinates + + Returns: + Vector3 with grid coordinates + """ + positionVector = Vector3(point) + # Get origin position + ox = self.origin.position.x + oy = self.origin.position.y + + # Convert to grid coordinates (simplified, assuming no rotation) + grid_x = (positionVector.x - ox) / self.resolution + grid_y = (positionVector.y - oy) / self.resolution + + return Vector3(grid_x, grid_y, 0.0) + + def grid_to_world(self, grid_point: VectorLike) -> Vector3: + """Convert grid coordinates to world coordinates. + + Args: + grid_point: Vector-like object containing grid coordinates + + Returns: + World position as Vector3 + """ + gridVector = Vector3(grid_point) + # Get origin position + ox = self.origin.position.x + oy = self.origin.position.y + + # Convert to world (simplified, no rotation) + x = ox + gridVector.x * self.resolution + y = oy + gridVector.y * self.resolution + + return Vector3(x, y, 0.0) + + def __str__(self) -> str: + """Create a concise string representation.""" + origin_pos = self.origin.position + + parts = [ + f"▦ OccupancyGrid[{self.frame_id}]", + f"{self.width}x{self.height}", + f"({self.width * self.resolution:.1f}x{self.height * self.resolution:.1f}m @", + f"{1 / self.resolution:.0f}cm res)", + f"Origin: ({origin_pos.x:.2f}, {origin_pos.y:.2f})", + f"▣ {self.occupied_percent:.1f}%", + f"□ {self.free_percent:.1f}%", + f"◌ {self.unknown_percent:.1f}%", + ] + + return " ".join(parts) + + def __repr__(self) -> str: + """Create a detailed representation.""" + return ( + f"OccupancyGrid(width={self.width}, height={self.height}, " + f"resolution={self.resolution}, frame_id='{self.frame_id}', " + f"occupied={self.occupied_cells}, free={self.free_cells}, " + f"unknown={self.unknown_cells})" + ) + + def lcm_encode(self) -> bytes: + """Encode OccupancyGrid to LCM bytes.""" + # Create LCM message + lcm_msg = LCMOccupancyGrid() + + # Build header on demand + s = int(self.ts) + lcm_msg.header.stamp.sec = s + lcm_msg.header.stamp.nsec = int((self.ts - s) * 1_000_000_000) + lcm_msg.header.frame_id = self.frame_id + + # Copy map metadata + lcm_msg.info = self.info + + # Convert numpy array to flat data list + if self.grid.size > 0: + flat_data = self.grid.flatten() + lcm_msg.data_length = len(flat_data) + lcm_msg.data = flat_data.tolist() + else: + lcm_msg.data_length = 0 + lcm_msg.data = [] + + return lcm_msg.lcm_encode() + + @classmethod + def lcm_decode(cls, data: bytes | BinaryIO) -> "OccupancyGrid": + """Decode LCM bytes to OccupancyGrid.""" + lcm_msg = LCMOccupancyGrid.lcm_decode(data) + + # Extract timestamp and frame_id from header + ts = lcm_msg.header.stamp.sec + (lcm_msg.header.stamp.nsec / 1_000_000_000) + frame_id = lcm_msg.header.frame_id + + # Extract grid data + if lcm_msg.data and lcm_msg.info.width > 0 and lcm_msg.info.height > 0: + grid = np.array(lcm_msg.data, dtype=np.int8).reshape( + (lcm_msg.info.height, lcm_msg.info.width) + ) + else: + grid = np.array([], dtype=np.int8) + + # Create new instance + instance = cls( + grid=grid, + resolution=lcm_msg.info.resolution, + origin=lcm_msg.info.origin, + frame_id=frame_id, + ts=ts, + ) + instance.info = lcm_msg.info + return instance + + @classmethod + def from_pointcloud( + cls, + cloud: "PointCloud2", + resolution: float = 0.05, + min_height: float = 0.1, + max_height: float = 2.0, + frame_id: Optional[str] = None, + mark_free_radius: float = 0.0, + ) -> "OccupancyGrid": + """Create an OccupancyGrid from a PointCloud2 message. + + Args: + cloud: PointCloud2 message containing 3D points + resolution: Grid resolution in meters/cell (default: 0.05) + min_height: Minimum height threshold for including points (default: 0.1) + max_height: Maximum height threshold for including points (default: 2.0) + frame_id: Reference frame for the grid (default: uses cloud's frame_id) + mark_free_radius: Radius in meters around obstacles to mark as free space (default: 0.0) + If 0, only immediate neighbors are marked free. + Set to preserve unknown areas for exploration. + + Returns: + OccupancyGrid with occupied cells where points were projected + """ + # Import here to avoid circular dependency + from dimos.msgs.sensor_msgs import PointCloud2 + + # Get points as numpy array + points = cloud.as_numpy() + + if len(points) == 0: + # Return empty grid + return cls( + width=1, height=1, resolution=resolution, frame_id=frame_id or cloud.frame_id + ) + + # Filter points by height + height_mask = (points[:, 2] >= min_height) & (points[:, 2] <= max_height) + filtered_points = points[height_mask] + + if len(filtered_points) == 0: + # No points in height range + return cls( + width=1, height=1, resolution=resolution, frame_id=frame_id or cloud.frame_id + ) + + # Find bounds of the point cloud in X-Y plane + min_x = np.min(filtered_points[:, 0]) + max_x = np.max(filtered_points[:, 0]) + min_y = np.min(filtered_points[:, 1]) + max_y = np.max(filtered_points[:, 1]) + + # Add some padding around the bounds + padding = 1.0 # 1 meter padding + min_x -= padding + max_x += padding + min_y -= padding + max_y += padding + + # Calculate grid dimensions + width = int(np.ceil((max_x - min_x) / resolution)) + height = int(np.ceil((max_y - min_y) / resolution)) + + # Create origin pose (bottom-left corner of the grid) + origin = Pose() + origin.position.x = min_x + origin.position.y = min_y + origin.position.z = 0.0 + origin.orientation.w = 1.0 # No rotation + + # Initialize grid (all unknown) + grid = np.full((height, width), -1, dtype=np.int8) + + # Convert points to grid indices + grid_x = ((filtered_points[:, 0] - min_x) / resolution).astype(np.int32) + grid_y = ((filtered_points[:, 1] - min_y) / resolution).astype(np.int32) + + # Clip indices to grid bounds + grid_x = np.clip(grid_x, 0, width - 1) + grid_y = np.clip(grid_y, 0, height - 1) + + # Mark cells as occupied + grid[grid_y, grid_x] = 100 # Lethal obstacle + + # Mark free space around obstacles based on mark_free_radius + if mark_free_radius > 0: + # Mark a specified radius around occupied cells as free + from scipy.ndimage import binary_dilation + + occupied_mask = grid == 100 + free_radius_cells = int(np.ceil(mark_free_radius / resolution)) + + # Create circular kernel + y, x = np.ogrid[ + -free_radius_cells : free_radius_cells + 1, + -free_radius_cells : free_radius_cells + 1, + ] + kernel = x**2 + y**2 <= free_radius_cells**2 + + known_area = binary_dilation(occupied_mask, structure=kernel, iterations=1) + # Mark non-occupied cells in the known area as free + grid[known_area & (grid != 100)] = 0 + else: + # Default: only mark immediate neighbors as free to preserve unknown + from scipy.ndimage import binary_dilation + + occupied_mask = grid == 100 + # Use a small 3x3 kernel to only mark immediate neighbors + structure = np.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]]) + immediate_neighbors = binary_dilation(occupied_mask, structure=structure, iterations=1) + + # Mark only immediate neighbors as free (not the occupied cells themselves) + grid[immediate_neighbors & (grid != 100)] = 0 + + # Create and return OccupancyGrid + # Get timestamp from cloud if available + ts = cloud.ts if hasattr(cloud, "ts") and cloud.ts is not None else 0.0 + + occupancy_grid = cls( + grid=grid, + resolution=resolution, + origin=origin, + frame_id=frame_id or cloud.frame_id, + ts=ts, + ) + + return occupancy_grid + + def gradient(self, obstacle_threshold: int = 50, max_distance: float = 2.0) -> "OccupancyGrid": + """Create a gradient OccupancyGrid for path planning. + + Creates a gradient where free space has value 0 and values increase near obstacles. + This can be used as a cost map for path planning algorithms like A*. + + Args: + obstacle_threshold: Cell values >= this are considered obstacles (default: 50) + max_distance: Maximum distance to compute gradient in meters (default: 5.0) + + Returns: + New OccupancyGrid with gradient values: + - -1: Unknown cells far from obstacles (beyond max_distance) + - 0: Free space far from obstacles + - 1-99: Increasing cost as you approach obstacles + - 100: At obstacles + + Note: Unknown cells within max_distance of obstacles will have gradient + values assigned, allowing path planning through unknown areas. + """ + + # Remember which cells are unknown + unknown_mask = self.grid == -1 + + # Create a working grid where unknown cells are treated as free for distance calculation + working_grid = self.grid.copy() + working_grid[unknown_mask] = 0 # Treat unknown as free for gradient computation + + # Create binary obstacle map from working grid + # Consider cells >= threshold as obstacles (1), everything else as free (0) + obstacle_map = (working_grid >= obstacle_threshold).astype(np.float32) + + # Compute distance transform (distance to nearest obstacle in cells) + distance_cells = ndimage.distance_transform_edt(1 - obstacle_map) + + # Convert to meters and clip to max distance + distance_meters = np.clip(distance_cells * self.resolution, 0, max_distance) + + # Invert and scale to 0-100 range + # Far from obstacles (max_distance) -> 0 + # At obstacles (0 distance) -> 100 + gradient_values = (1 - distance_meters / max_distance) * 100 + + # Ensure obstacles are exactly 100 + gradient_values[obstacle_map > 0] = 100 + + # Convert to int8 for OccupancyGrid + gradient_data = gradient_values.astype(np.int8) + + # Only preserve unknown cells that are beyond max_distance from any obstacle + # This allows gradient to spread through unknown areas near obstacles + far_unknown_mask = unknown_mask & (distance_meters >= max_distance) + gradient_data[far_unknown_mask] = -1 + + # Create new OccupancyGrid with gradient + gradient_grid = OccupancyGrid( + grid=gradient_data, + resolution=self.resolution, + origin=self.origin, + frame_id=self.frame_id, + ts=self.ts, + ) + + return gradient_grid + + def filter_above(self, threshold: int) -> "OccupancyGrid": + """Create a new OccupancyGrid with only values above threshold. + + Args: + threshold: Keep cells with values > threshold + + Returns: + New OccupancyGrid where: + - Cells > threshold: kept as-is + - Cells <= threshold: set to -1 (unknown) + - Unknown cells (-1): preserved + """ + new_grid = self.grid.copy() + + # Create mask for cells to filter (not unknown and <= threshold) + filter_mask = (new_grid != -1) & (new_grid <= threshold) + + # Set filtered cells to unknown + new_grid[filter_mask] = -1 + + # Create new OccupancyGrid + filtered = OccupancyGrid( + new_grid, + resolution=self.resolution, + origin=self.origin, + frame_id=self.frame_id, + ts=self.ts, + ) + + return filtered + + def filter_below(self, threshold: int) -> "OccupancyGrid": + """Create a new OccupancyGrid with only values below threshold. + + Args: + threshold: Keep cells with values < threshold + + Returns: + New OccupancyGrid where: + - Cells < threshold: kept as-is + - Cells >= threshold: set to -1 (unknown) + - Unknown cells (-1): preserved + """ + new_grid = self.grid.copy() + + # Create mask for cells to filter (not unknown and >= threshold) + filter_mask = (new_grid != -1) & (new_grid >= threshold) + + # Set filtered cells to unknown + new_grid[filter_mask] = -1 + + # Create new OccupancyGrid + filtered = OccupancyGrid( + new_grid, + resolution=self.resolution, + origin=self.origin, + frame_id=self.frame_id, + ts=self.ts, + ) + + return filtered + + def max(self) -> "OccupancyGrid": + """Create a new OccupancyGrid with all non-unknown cells set to maximum value. + + Returns: + New OccupancyGrid where: + - All non-unknown cells: set to CostValues.OCCUPIED (100) + - Unknown cells: preserved as CostValues.UNKNOWN (-1) + """ + new_grid = self.grid.copy() + + # Set all non-unknown cells to max + new_grid[new_grid != CostValues.UNKNOWN] = CostValues.OCCUPIED + + # Create new OccupancyGrid + maxed = OccupancyGrid( + new_grid, + resolution=self.resolution, + origin=self.origin, + frame_id=self.frame_id, + ts=self.ts, + ) + + return maxed diff --git a/dimos/msgs/nav_msgs/Path.py b/dimos/msgs/nav_msgs/Path.py new file mode 100644 index 0000000000..8fca1cf25f --- /dev/null +++ b/dimos/msgs/nav_msgs/Path.py @@ -0,0 +1,189 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import struct +import time +from io import BytesIO +from typing import BinaryIO, TypeAlias + +from dimos_lcm.geometry_msgs import Point as LCMPoint +from dimos_lcm.geometry_msgs import Pose as LCMPose +from dimos_lcm.geometry_msgs import PoseStamped as LCMPoseStamped +from dimos_lcm.geometry_msgs import Quaternion as LCMQuaternion +from dimos_lcm.nav_msgs import Path as LCMPath +from dimos_lcm.std_msgs import Header as LCMHeader +from dimos_lcm.std_msgs import Time as LCMTime + +from dimos.msgs.geometry_msgs.Pose import Pose +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion, QuaternionConvertable +from dimos.msgs.geometry_msgs.Transform import Transform +from dimos.msgs.geometry_msgs.Vector3 import Vector3, VectorConvertable +from dimos.types.timestamped import Timestamped + + +def sec_nsec(ts): + s = int(ts) + return [s, int((ts - s) * 1_000_000_000)] + + +class Path(Timestamped): + msg_name = "nav_msgs.Path" + ts: float + frame_id: str + poses: list[PoseStamped] + + def __init__( + self, + ts: float = 0.0, + frame_id: str = "world", + poses: list[PoseStamped] | None = None, + **kwargs, + ) -> None: + self.frame_id = frame_id + self.ts = ts if ts != 0 else time.time() + self.poses = poses if poses is not None else [] + + def __len__(self) -> int: + """Return the number of poses in the path.""" + return len(self.poses) + + def __bool__(self) -> bool: + """Return True if path has poses.""" + return len(self.poses) > 0 + + def head(self) -> PoseStamped | None: + """Return the first pose in the path, or None if empty.""" + return self.poses[0] if self.poses else None + + def last(self) -> PoseStamped | None: + """Return the last pose in the path, or None if empty.""" + return self.poses[-1] if self.poses else None + + def tail(self) -> "Path": + """Return a new Path with all poses except the first.""" + return Path(ts=self.ts, frame_id=self.frame_id, poses=self.poses[1:] if self.poses else []) + + def push(self, pose: PoseStamped) -> "Path": + """Return a new Path with the pose appended (immutable).""" + return Path(ts=self.ts, frame_id=self.frame_id, poses=self.poses + [pose]) + + def push_mut(self, pose: PoseStamped) -> None: + """Append a pose to this path (mutable).""" + self.poses.append(pose) + + def lcm_encode(self) -> bytes: + """Encode Path to LCM bytes.""" + lcm_msg = LCMPath() + + # Set poses + lcm_msg.poses_length = len(self.poses) + lcm_poses = [] # Build list separately to avoid LCM library reuse issues + for pose in self.poses: + lcm_pose = LCMPoseStamped() + # Create new pose objects to avoid LCM library reuse bug + lcm_pose.pose = LCMPose() + lcm_pose.pose.position = LCMPoint() + lcm_pose.pose.orientation = LCMQuaternion() + + # Set the pose geometry data + lcm_pose.pose.position.x = pose.x + lcm_pose.pose.position.y = pose.y + lcm_pose.pose.position.z = pose.z + lcm_pose.pose.orientation.x = pose.orientation.x + lcm_pose.pose.orientation.y = pose.orientation.y + lcm_pose.pose.orientation.z = pose.orientation.z + lcm_pose.pose.orientation.w = pose.orientation.w + + # Create new header to avoid reuse + lcm_pose.header = LCMHeader() + lcm_pose.header.stamp = LCMTime() + + # Set the header with pose timestamp but path's frame_id + [lcm_pose.header.stamp.sec, lcm_pose.header.stamp.nsec] = sec_nsec(pose.ts) + lcm_pose.header.frame_id = self.frame_id # All poses use path's frame_id + lcm_poses.append(lcm_pose) + lcm_msg.poses = lcm_poses + + # Set header with path's own timestamp + [lcm_msg.header.stamp.sec, lcm_msg.header.stamp.nsec] = sec_nsec(self.ts) + lcm_msg.header.frame_id = self.frame_id + + return lcm_msg.lcm_encode() + + @classmethod + def lcm_decode(cls, data: bytes | BinaryIO) -> "Path": + """Decode LCM bytes to Path.""" + lcm_msg = LCMPath.lcm_decode(data) + + # Decode header + header_ts = lcm_msg.header.stamp.sec + (lcm_msg.header.stamp.nsec / 1_000_000_000) + frame_id = lcm_msg.header.frame_id + + # Decode poses - all use the path's frame_id + poses = [] + for lcm_pose in lcm_msg.poses: + pose = PoseStamped( + ts=lcm_pose.header.stamp.sec + (lcm_pose.header.stamp.nsec / 1_000_000_000), + frame_id=frame_id, # Use path's frame_id for all poses + position=[ + lcm_pose.pose.position.x, + lcm_pose.pose.position.y, + lcm_pose.pose.position.z, + ], + orientation=[ + lcm_pose.pose.orientation.x, + lcm_pose.pose.orientation.y, + lcm_pose.pose.orientation.z, + lcm_pose.pose.orientation.w, + ], + ) + poses.append(pose) + + # Use header timestamp for the path + return cls(ts=header_ts, frame_id=frame_id, poses=poses) + + def __str__(self) -> str: + """String representation of Path.""" + return f"Path(frame_id='{self.frame_id}', poses={len(self.poses)})" + + def __getitem__(self, index: int | slice) -> PoseStamped | list[PoseStamped]: + """Allow indexing and slicing of poses.""" + return self.poses[index] + + def __iter__(self): + """Allow iteration over poses.""" + return iter(self.poses) + + def slice(self, start: int, end: int | None = None) -> "Path": + """Return a new Path with a slice of poses.""" + return Path(ts=self.ts, frame_id=self.frame_id, poses=self.poses[start:end]) + + def extend(self, other: "Path") -> "Path": + """Return a new Path with poses from both paths (immutable).""" + return Path(ts=self.ts, frame_id=self.frame_id, poses=self.poses + other.poses) + + def extend_mut(self, other: "Path") -> None: + """Extend this path with poses from another path (mutable).""" + self.poses.extend(other.poses) + + def reverse(self) -> "Path": + """Return a new Path with poses in reverse order.""" + return Path(ts=self.ts, frame_id=self.frame_id, poses=list(reversed(self.poses))) + + def clear(self) -> None: + """Clear all poses from this path (mutable).""" + self.poses.clear() diff --git a/dimos/msgs/nav_msgs/__init__.py b/dimos/msgs/nav_msgs/__init__.py new file mode 100644 index 0000000000..3e4241daa0 --- /dev/null +++ b/dimos/msgs/nav_msgs/__init__.py @@ -0,0 +1,4 @@ +from dimos.msgs.nav_msgs.OccupancyGrid import CostValues, MapMetaData, OccupancyGrid +from dimos.msgs.nav_msgs.Path import Path + +__all__ = ["Path", "OccupancyGrid", "MapMetaData", "CostValues"] diff --git a/dimos/msgs/nav_msgs/test_OccupancyGrid.py b/dimos/msgs/nav_msgs/test_OccupancyGrid.py new file mode 100644 index 0000000000..94467726c7 --- /dev/null +++ b/dimos/msgs/nav_msgs/test_OccupancyGrid.py @@ -0,0 +1,471 @@ +#!/usr/bin/env python3 +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the OccupancyGrid convenience class.""" + +import pickle + +import numpy as np +import pytest + +from dimos.msgs.geometry_msgs import Pose +from dimos.msgs.nav_msgs import OccupancyGrid +from dimos.msgs.sensor_msgs import PointCloud2 +from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.utils.testing import get_data + + +def test_empty_grid(): + """Test creating an empty grid.""" + grid = OccupancyGrid() + assert grid.width == 0 + assert grid.height == 0 + assert grid.grid.shape == (0,) + assert grid.total_cells == 0 + assert grid.frame_id == "world" + + +def test_grid_with_dimensions(): + """Test creating a grid with specified dimensions.""" + grid = OccupancyGrid(width=10, height=10, resolution=0.1, frame_id="map") + assert grid.width == 10 + assert grid.height == 10 + assert grid.resolution == 0.1 + assert grid.frame_id == "map" + assert grid.grid.shape == (10, 10) + assert np.all(grid.grid == -1) # All unknown + assert grid.unknown_cells == 100 + assert grid.unknown_percent == 100.0 + + +def test_grid_from_numpy_array(): + """Test creating a grid from a numpy array.""" + data = np.zeros((20, 30), dtype=np.int8) + data[5:10, 10:20] = 100 # Add some obstacles + data[15:18, 5:8] = -1 # Add unknown area + + origin = Pose(1.0, 2.0, 0.0) + grid = OccupancyGrid(grid=data, resolution=0.05, origin=origin, frame_id="odom") + + assert grid.width == 30 + assert grid.height == 20 + assert grid.resolution == 0.05 + assert grid.frame_id == "odom" + assert grid.origin.position.x == 1.0 + assert grid.origin.position.y == 2.0 + assert grid.grid.shape == (20, 30) + + # Check cell counts + assert grid.occupied_cells == 50 # 5x10 obstacle area + assert grid.free_cells == 541 # Total - occupied - unknown + assert grid.unknown_cells == 9 # 3x3 unknown area + + # Check percentages (approximately) + assert abs(grid.occupied_percent - 8.33) < 0.1 + assert abs(grid.free_percent - 90.17) < 0.1 + assert abs(grid.unknown_percent - 1.5) < 0.1 + + +def test_world_grid_coordinate_conversion(): + """Test converting between world and grid coordinates.""" + data = np.zeros((20, 30), dtype=np.int8) + origin = Pose(1.0, 2.0, 0.0) + grid = OccupancyGrid(grid=data, resolution=0.05, origin=origin, frame_id="odom") + + # Test world to grid + grid_pos = grid.world_to_grid((2.5, 3.0)) + assert int(grid_pos.x) == 30 + assert int(grid_pos.y) == 20 + + # Test grid to world + world_pos = grid.grid_to_world((10, 5)) + assert world_pos.x == 1.5 + assert world_pos.y == 2.25 + + +def test_lcm_encode_decode(): + """Test LCM encoding and decoding.""" + data = np.zeros((20, 30), dtype=np.int8) + data[5:10, 10:20] = 100 # Add some obstacles + data[15:18, 5:8] = -1 # Add unknown area + origin = Pose(1.0, 2.0, 0.0) + grid = OccupancyGrid(grid=data, resolution=0.05, origin=origin, frame_id="odom") + + # Set a specific value for testing + # Convert world coordinates to grid indices + grid_pos = grid.world_to_grid((1.5, 2.25)) + grid.grid[int(grid_pos.y), int(grid_pos.x)] = 50 + + # Encode + lcm_data = grid.lcm_encode() + assert isinstance(lcm_data, bytes) + assert len(lcm_data) > 0 + + # Decode + decoded = OccupancyGrid.lcm_decode(lcm_data) + + # Check that data matches exactly (grid arrays should be identical) + assert np.array_equal(grid.grid, decoded.grid) + assert grid.width == decoded.width + assert grid.height == decoded.height + assert abs(grid.resolution - decoded.resolution) < 1e-6 # Use approximate equality for floats + assert abs(grid.origin.position.x - decoded.origin.position.x) < 1e-6 + assert abs(grid.origin.position.y - decoded.origin.position.y) < 1e-6 + assert grid.frame_id == decoded.frame_id + + # Check that the actual grid data was preserved (don't rely on float conversions) + assert decoded.grid[5, 10] == 50 # Value we set should be preserved in grid + + +def test_string_representation(): + """Test string representations.""" + grid = OccupancyGrid(width=10, height=10, resolution=0.1, frame_id="map") + + # Test __str__ + str_repr = str(grid) + assert "OccupancyGrid[map]" in str_repr + assert "10x10" in str_repr + assert "1.0x1.0m" in str_repr + assert "10cm res" in str_repr + + # Test __repr__ + repr_str = repr(grid) + assert "OccupancyGrid(" in repr_str + assert "width=10" in repr_str + assert "height=10" in repr_str + assert "resolution=0.1" in repr_str + + +def test_grid_property_sync(): + """Test that the grid property works correctly.""" + grid = OccupancyGrid(width=5, height=5, resolution=0.1, frame_id="map") + + # Modify via numpy array + grid.grid[2, 3] = 100 + assert grid.grid[2, 3] == 100 + + # Check that we can access grid values + grid.grid[0, 0] = 50 + assert grid.grid[0, 0] == 50 + + +def test_invalid_grid_dimensions(): + """Test handling of invalid grid dimensions.""" + # Test with non-2D array + with pytest.raises(ValueError, match="Grid must be a 2D array"): + OccupancyGrid(grid=np.zeros(10), resolution=0.1) + + +def test_from_pointcloud(): + """Test creating OccupancyGrid from PointCloud2.""" + file_path = get_data("lcm_msgs") / "sensor_msgs/PointCloud2.pickle" + with open(file_path, "rb") as f: + lcm_msg = pickle.loads(f.read()) + + pointcloud = PointCloud2.lcm_decode(lcm_msg) + + # Convert pointcloud to occupancy grid + occupancygrid = OccupancyGrid.from_pointcloud( + pointcloud, resolution=0.05, min_height=0.1, max_height=2.0 + ) + # Apply inflation separately if needed + occupancygrid = occupancygrid.inflate(0.1) + + # Check that grid was created with reasonable properties + assert occupancygrid.width > 0 + assert occupancygrid.height > 0 + assert occupancygrid.resolution == 0.05 + assert occupancygrid.frame_id == pointcloud.frame_id + assert occupancygrid.occupied_cells > 0 # Should have some occupied cells + + +def test_gradient(): + """Test converting occupancy grid to gradient field.""" + # Create a small test grid with an obstacle in the middle + data = np.zeros((10, 10), dtype=np.int8) + data[4:6, 4:6] = 100 # 2x2 obstacle in center + + grid = OccupancyGrid(grid=data, resolution=0.1) # 0.1m per cell + + # Convert to gradient + gradient_grid = grid.gradient(obstacle_threshold=50, max_distance=1.0) + + # Check that we get an OccupancyGrid back + assert isinstance(gradient_grid, OccupancyGrid) + assert gradient_grid.grid.shape == (10, 10) + assert gradient_grid.resolution == grid.resolution + assert gradient_grid.frame_id == grid.frame_id + + # Obstacle cells should have value 100 + assert gradient_grid.grid[4, 4] == 100 + assert gradient_grid.grid[5, 5] == 100 + + # Adjacent cells should have high values (near obstacles) + assert gradient_grid.grid[3, 4] > 85 # Very close to obstacle + assert gradient_grid.grid[4, 3] > 85 # Very close to obstacle + + # Cells at moderate distance should have moderate values + assert 30 < gradient_grid.grid[0, 0] < 60 # Corner is ~0.57m away + + # Check that gradient decreases with distance + assert gradient_grid.grid[3, 4] > gradient_grid.grid[2, 4] # Closer is higher + assert gradient_grid.grid[2, 4] > gradient_grid.grid[0, 4] # Further is lower + + # Test with unknown cells + data_with_unknown = data.copy() + data_with_unknown[0:2, 0:2] = -1 # Add unknown area (close to obstacle) + data_with_unknown[8:10, 8:10] = -1 # Add unknown area (far from obstacle) + + grid_with_unknown = OccupancyGrid(data_with_unknown, resolution=0.1) + gradient_with_unknown = grid_with_unknown.gradient(max_distance=1.0) # 1m max distance + + # Unknown cells close to obstacles should get gradient values + assert gradient_with_unknown.grid[0, 0] != -1 # Should have gradient + assert gradient_with_unknown.grid[1, 1] != -1 # Should have gradient + + # Unknown cells far from obstacles (beyond max_distance) should remain unknown + # The far corner (8,8) is ~0.57m from nearest obstacle, within 1m threshold + # So it will get a gradient value, not remain unknown + assert gradient_with_unknown.unknown_cells < 8 # Some unknowns converted to gradient + + +def test_filter_above(): + """Test filtering cells above threshold.""" + # Create test grid with various values + data = np.array( + [[-1, 0, 20, 50], [10, 30, 60, 80], [40, 70, 90, 100], [-1, 15, 25, -1]], dtype=np.int8 + ) + + grid = OccupancyGrid(grid=data, resolution=0.1) + + # Filter to keep only values > 50 + filtered = grid.filter_above(50) + + # Check that values > 50 are preserved + assert filtered.grid[1, 2] == 60 + assert filtered.grid[1, 3] == 80 + assert filtered.grid[2, 1] == 70 + assert filtered.grid[2, 2] == 90 + assert filtered.grid[2, 3] == 100 + + # Check that values <= 50 are set to -1 (unknown) + assert filtered.grid[0, 1] == -1 # was 0 + assert filtered.grid[0, 2] == -1 # was 20 + assert filtered.grid[0, 3] == -1 # was 50 + assert filtered.grid[1, 0] == -1 # was 10 + assert filtered.grid[1, 1] == -1 # was 30 + assert filtered.grid[2, 0] == -1 # was 40 + + # Check that unknown cells are preserved + assert filtered.grid[0, 0] == -1 + assert filtered.grid[3, 0] == -1 + assert filtered.grid[3, 3] == -1 + + # Check dimensions and metadata preserved + assert filtered.width == grid.width + assert filtered.height == grid.height + assert filtered.resolution == grid.resolution + assert filtered.frame_id == grid.frame_id + + +def test_filter_below(): + """Test filtering cells below threshold.""" + # Create test grid with various values + data = np.array( + [[-1, 0, 20, 50], [10, 30, 60, 80], [40, 70, 90, 100], [-1, 15, 25, -1]], dtype=np.int8 + ) + + grid = OccupancyGrid(grid=data, resolution=0.1) + + # Filter to keep only values < 50 + filtered = grid.filter_below(50) + + # Check that values < 50 are preserved + assert filtered.grid[0, 1] == 0 + assert filtered.grid[0, 2] == 20 + assert filtered.grid[1, 0] == 10 + assert filtered.grid[1, 1] == 30 + assert filtered.grid[2, 0] == 40 + assert filtered.grid[3, 1] == 15 + assert filtered.grid[3, 2] == 25 + + # Check that values >= 50 are set to -1 (unknown) + assert filtered.grid[0, 3] == -1 # was 50 + assert filtered.grid[1, 2] == -1 # was 60 + assert filtered.grid[1, 3] == -1 # was 80 + assert filtered.grid[2, 1] == -1 # was 70 + assert filtered.grid[2, 2] == -1 # was 90 + assert filtered.grid[2, 3] == -1 # was 100 + + # Check that unknown cells are preserved + assert filtered.grid[0, 0] == -1 + assert filtered.grid[3, 0] == -1 + assert filtered.grid[3, 3] == -1 + + # Check dimensions and metadata preserved + assert filtered.width == grid.width + assert filtered.height == grid.height + assert filtered.resolution == grid.resolution + assert filtered.frame_id == grid.frame_id + + +def test_max(): + """Test setting all non-unknown cells to maximum.""" + # Create test grid with various values + data = np.array( + [[-1, 0, 20, 50], [10, 30, 60, 80], [40, 70, 90, 100], [-1, 15, 25, -1]], dtype=np.int8 + ) + + grid = OccupancyGrid(grid=data, resolution=0.1) + + # Apply max + maxed = grid.max() + + # Check that all non-unknown cells are set to 100 + assert maxed.grid[0, 1] == 100 # was 0 + assert maxed.grid[0, 2] == 100 # was 20 + assert maxed.grid[0, 3] == 100 # was 50 + assert maxed.grid[1, 0] == 100 # was 10 + assert maxed.grid[1, 1] == 100 # was 30 + assert maxed.grid[1, 2] == 100 # was 60 + assert maxed.grid[1, 3] == 100 # was 80 + assert maxed.grid[2, 0] == 100 # was 40 + assert maxed.grid[2, 1] == 100 # was 70 + assert maxed.grid[2, 2] == 100 # was 90 + assert maxed.grid[2, 3] == 100 # was 100 (already max) + assert maxed.grid[3, 1] == 100 # was 15 + assert maxed.grid[3, 2] == 100 # was 25 + + # Check that unknown cells are preserved + assert maxed.grid[0, 0] == -1 + assert maxed.grid[3, 0] == -1 + assert maxed.grid[3, 3] == -1 + + # Check dimensions and metadata preserved + assert maxed.width == grid.width + assert maxed.height == grid.height + assert maxed.resolution == grid.resolution + assert maxed.frame_id == grid.frame_id + + # Verify statistics + assert maxed.unknown_cells == 3 # Same as original + assert maxed.occupied_cells == 13 # All non-unknown cells + assert maxed.free_cells == 0 # No free cells + + +@pytest.mark.lcm +def test_lcm_broadcast(): + """Test broadcasting OccupancyGrid and gradient over LCM.""" + file_path = get_data("lcm_msgs") / "sensor_msgs/PointCloud2.pickle" + with open(file_path, "rb") as f: + lcm_msg = pickle.loads(f.read()) + + pointcloud = PointCloud2.lcm_decode(lcm_msg) + + # Create occupancy grid from pointcloud + occupancygrid = OccupancyGrid.from_pointcloud( + pointcloud, resolution=0.05, min_height=0.1, max_height=2.0 + ) + # Apply inflation separately if needed + occupancygrid = occupancygrid.inflate(0.1) + + # Create gradient field with larger max_distance for better visualization + gradient_grid = occupancygrid.gradient(obstacle_threshold=70, max_distance=2.0) + + # Debug: Print actual values to see the difference + print("\n=== DEBUG: Comparing grids ===") + print(f"Original grid unique values: {np.unique(occupancygrid.grid)}") + print(f"Gradient grid unique values: {np.unique(gradient_grid.grid)}") + + # Find an area with occupied cells to show the difference + occupied_indices = np.argwhere(occupancygrid.grid == 100) + if len(occupied_indices) > 0: + # Pick a point near an occupied cell + idx = len(occupied_indices) // 2 # Middle occupied cell + sample_y, sample_x = occupied_indices[idx] + sample_size = 15 + + # Ensure we don't go out of bounds + y_start = max(0, sample_y - sample_size // 2) + y_end = min(occupancygrid.height, y_start + sample_size) + x_start = max(0, sample_x - sample_size // 2) + x_end = min(occupancygrid.width, x_start + sample_size) + + print(f"\nSample area around occupied cell ({sample_x}, {sample_y}):") + print("Original occupancy grid:") + print(occupancygrid.grid[y_start:y_end, x_start:x_end]) + print("\nGradient grid (same area):") + print(gradient_grid.grid[y_start:y_end, x_start:x_end]) + else: + print("\nNo occupied cells found for sampling") + + # Check statistics + print(f"\nOriginal grid stats:") + print(f" Occupied (100): {np.sum(occupancygrid.grid == 100)} cells") + print(f" Inflated (99): {np.sum(occupancygrid.grid == 99)} cells") + print(f" Free (0): {np.sum(occupancygrid.grid == 0)} cells") + print(f" Unknown (-1): {np.sum(occupancygrid.grid == -1)} cells") + + print(f"\nGradient grid stats:") + print(f" Max gradient (100): {np.sum(gradient_grid.grid == 100)} cells") + print( + f" High gradient (80-99): {np.sum((gradient_grid.grid >= 80) & (gradient_grid.grid < 100))} cells" + ) + print( + f" Medium gradient (40-79): {np.sum((gradient_grid.grid >= 40) & (gradient_grid.grid < 80))} cells" + ) + print( + f" Low gradient (1-39): {np.sum((gradient_grid.grid >= 1) & (gradient_grid.grid < 40))} cells" + ) + print(f" Zero gradient (0): {np.sum(gradient_grid.grid == 0)} cells") + print(f" Unknown (-1): {np.sum(gradient_grid.grid == -1)} cells") + + # # Save debug images + # import matplotlib.pyplot as plt + + # fig, axes = plt.subplots(1, 2, figsize=(12, 5)) + + # # Original + # ax = axes[0] + # im1 = ax.imshow(occupancygrid.grid, origin="lower", cmap="gray_r", vmin=-1, vmax=100) + # ax.set_title(f"Original Occupancy Grid\n{occupancygrid}") + # plt.colorbar(im1, ax=ax) + + # # Gradient + # ax = axes[1] + # im2 = ax.imshow(gradient_grid.grid, origin="lower", cmap="hot", vmin=-1, vmax=100) + # ax.set_title(f"Gradient Grid\n{gradient_grid}") + # plt.colorbar(im2, ax=ax) + + # plt.tight_layout() + # plt.savefig("lcm_debug_grids.png", dpi=150) + # print("\nSaved debug visualization to lcm_debug_grids.png") + # plt.close() + + # Broadcast all the data + lcm = LCM() + lcm.start() + lcm.publish(Topic("/global_map", PointCloud2), pointcloud) + lcm.publish(Topic("/global_costmap", OccupancyGrid), occupancygrid) + lcm.publish(Topic("/global_gradient", OccupancyGrid), gradient_grid) + + print(f"\nPublished to LCM:") + print(f" /global_map: PointCloud2 with {len(pointcloud)} points") + print(f" /global_costmap: {occupancygrid}") + print(f" /global_gradient: {gradient_grid}") + print(f"\nGradient info:") + print(f" Values: 0 (free far from obstacles) -> 100 (at obstacles)") + print(f" Unknown cells: {gradient_grid.unknown_cells} (preserved as -1)") + print(f" Max distance for gradient: 5.0 meters") diff --git a/dimos/msgs/nav_msgs/test_Path.py b/dimos/msgs/nav_msgs/test_Path.py new file mode 100644 index 0000000000..0a34245448 --- /dev/null +++ b/dimos/msgs/nav_msgs/test_Path.py @@ -0,0 +1,290 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import pytest + +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.nav_msgs.Path import Path + + +def create_test_pose(x: float, y: float, z: float, frame_id: str = "map") -> PoseStamped: + """Helper to create a test PoseStamped.""" + return PoseStamped( + frame_id=frame_id, + position=[x, y, z], + orientation=Quaternion(0, 0, 0, 1), # Identity quaternion + ) + + +def test_init_empty(): + """Test creating an empty path.""" + path = Path(frame_id="map") + assert path.frame_id == "map" + assert len(path) == 0 + assert not path # Should be falsy when empty + assert path.poses == [] + + +def test_init_with_poses(): + """Test creating a path with initial poses.""" + poses = [create_test_pose(i, i, 0) for i in range(3)] + path = Path(frame_id="map", poses=poses) + assert len(path) == 3 + assert bool(path) # Should be truthy when has poses + assert path.poses == poses + + +def test_head(): + """Test getting the first pose.""" + poses = [create_test_pose(i, i, 0) for i in range(3)] + path = Path(poses=poses) + assert path.head() == poses[0] + + # Test empty path + empty_path = Path() + assert empty_path.head() is None + + +def test_last(): + """Test getting the last pose.""" + poses = [create_test_pose(i, i, 0) for i in range(3)] + path = Path(poses=poses) + assert path.last() == poses[-1] + + # Test empty path + empty_path = Path() + assert empty_path.last() is None + + +def test_tail(): + """Test getting all poses except the first.""" + poses = [create_test_pose(i, i, 0) for i in range(3)] + path = Path(poses=poses) + tail = path.tail() + assert len(tail) == 2 + assert tail.poses == poses[1:] + assert tail.frame_id == path.frame_id + + # Test single element path + single_path = Path(poses=[poses[0]]) + assert len(single_path.tail()) == 0 + + # Test empty path + empty_path = Path() + assert len(empty_path.tail()) == 0 + + +def test_push_immutable(): + """Test immutable push operation.""" + path = Path(frame_id="map") + pose1 = create_test_pose(1, 1, 0) + pose2 = create_test_pose(2, 2, 0) + + # Push should return new path + path2 = path.push(pose1) + assert len(path) == 0 # Original unchanged + assert len(path2) == 1 + assert path2.poses[0] == pose1 + + # Chain pushes + path3 = path2.push(pose2) + assert len(path2) == 1 # Previous unchanged + assert len(path3) == 2 + assert path3.poses == [pose1, pose2] + + +def test_push_mutable(): + """Test mutable push operation.""" + path = Path(frame_id="map") + pose1 = create_test_pose(1, 1, 0) + pose2 = create_test_pose(2, 2, 0) + + # Push should modify in place + path.push_mut(pose1) + assert len(path) == 1 + assert path.poses[0] == pose1 + + path.push_mut(pose2) + assert len(path) == 2 + assert path.poses == [pose1, pose2] + + +def test_indexing(): + """Test indexing and slicing.""" + poses = [create_test_pose(i, i, 0) for i in range(5)] + path = Path(poses=poses) + + # Single index + assert path[0] == poses[0] + assert path[-1] == poses[-1] + + # Slicing + assert path[1:3] == poses[1:3] + assert path[:2] == poses[:2] + assert path[3:] == poses[3:] + + +def test_iteration(): + """Test iterating over poses.""" + poses = [create_test_pose(i, i, 0) for i in range(3)] + path = Path(poses=poses) + + collected = [] + for pose in path: + collected.append(pose) + assert collected == poses + + +def test_slice_method(): + """Test slice method.""" + poses = [create_test_pose(i, i, 0) for i in range(5)] + path = Path(frame_id="map", poses=poses) + + sliced = path.slice(1, 4) + assert len(sliced) == 3 + assert sliced.poses == poses[1:4] + assert sliced.frame_id == "map" + + # Test open-ended slice + sliced2 = path.slice(2) + assert sliced2.poses == poses[2:] + + +def test_extend_immutable(): + """Test immutable extend operation.""" + poses1 = [create_test_pose(i, i, 0) for i in range(2)] + poses2 = [create_test_pose(i + 2, i + 2, 0) for i in range(2)] + + path1 = Path(frame_id="map", poses=poses1) + path2 = Path(frame_id="odom", poses=poses2) + + extended = path1.extend(path2) + assert len(path1) == 2 # Original unchanged + assert len(extended) == 4 + assert extended.poses == poses1 + poses2 + assert extended.frame_id == "map" # Keeps first path's frame + + +def test_extend_mutable(): + """Test mutable extend operation.""" + poses1 = [create_test_pose(i, i, 0) for i in range(2)] + poses2 = [create_test_pose(i + 2, i + 2, 0) for i in range(2)] + + path1 = Path(frame_id="map", poses=poses1.copy()) # Use copy to avoid modifying original + path2 = Path(frame_id="odom", poses=poses2) + + path1.extend_mut(path2) + assert len(path1) == 4 + # Check poses are the same as concatenation + for i, (p1, p2) in enumerate(zip(path1.poses, poses1 + poses2)): + assert p1.x == p2.x + assert p1.y == p2.y + assert p1.z == p2.z + + +def test_reverse(): + """Test reverse operation.""" + poses = [create_test_pose(i, i, 0) for i in range(3)] + path = Path(poses=poses) + + reversed_path = path.reverse() + assert len(path) == 3 # Original unchanged + assert reversed_path.poses == list(reversed(poses)) + + +def test_clear(): + """Test clear operation.""" + poses = [create_test_pose(i, i, 0) for i in range(3)] + path = Path(poses=poses) + + path.clear() + assert len(path) == 0 + assert path.poses == [] + + +def test_lcm_encode_decode(): + """Test encoding and decoding of Path to/from binary LCM format.""" + # Create path with poses + # Use timestamps that can be represented exactly in float64 + path_ts = 1234567890.5 + poses = [ + PoseStamped( + ts=1234567890.0 + i * 0.1, # Use simpler timestamps + frame_id=f"frame_{i}", + position=[i * 1.5, i * 2.5, i * 3.5], + orientation=(0.1 * i, 0.2 * i, 0.3 * i, 0.9), + ) + for i in range(3) + ] + + path_source = Path(ts=path_ts, frame_id="world", poses=poses) + + # Encode to binary + binary_msg = path_source.lcm_encode() + + # Decode from binary + path_dest = Path.lcm_decode(binary_msg) + + assert isinstance(path_dest, Path) + assert path_dest is not path_source + + # Check header + assert path_dest.frame_id == path_source.frame_id + # Path timestamp should be preserved + assert abs(path_dest.ts - path_source.ts) < 1e-6 # Microsecond precision + + # Check poses + assert len(path_dest.poses) == len(path_source.poses) + + for orig, decoded in zip(path_source.poses, path_dest.poses): + # Check pose timestamps + assert abs(decoded.ts - orig.ts) < 1e-6 + # All poses should have the path's frame_id + assert decoded.frame_id == path_dest.frame_id + + # Check position + assert decoded.x == orig.x + assert decoded.y == orig.y + assert decoded.z == orig.z + + # Check orientation + assert decoded.orientation.x == orig.orientation.x + assert decoded.orientation.y == orig.orientation.y + assert decoded.orientation.z == orig.orientation.z + assert decoded.orientation.w == orig.orientation.w + + +def test_lcm_encode_decode_empty(): + """Test encoding and decoding of empty Path.""" + path_source = Path(frame_id="base_link") + + binary_msg = path_source.lcm_encode() + path_dest = Path.lcm_decode(binary_msg) + + assert isinstance(path_dest, Path) + assert path_dest.frame_id == path_source.frame_id + assert len(path_dest.poses) == 0 + + +def test_str_representation(): + """Test string representation.""" + path = Path(frame_id="map") + assert str(path) == "Path(frame_id='map', poses=0)" + + path.push_mut(create_test_pose(1, 1, 0)) + path.push_mut(create_test_pose(2, 2, 0)) + assert str(path) == "Path(frame_id='map', poses=2)" diff --git a/dimos/msgs/std_msgs/Header.py b/dimos/msgs/std_msgs/Header.py index 642b3d36c0..fd708be2f3 100644 --- a/dimos/msgs/std_msgs/Header.py +++ b/dimos/msgs/std_msgs/Header.py @@ -22,6 +22,12 @@ from dimos_lcm.std_msgs import Time as LCMTime from plum import dispatch +# Import the actual LCM header type that's returned from decoding +try: + from lcm_msgs.std_msgs.Header import Header as DecodedLCMHeader +except ImportError: + DecodedLCMHeader = None + class Header(LCMHeader): msg_name = "std_msgs.Header" @@ -64,6 +70,15 @@ def __init__(self, header: LCMHeader) -> None: """Initialize from another Header (copy constructor).""" super().__init__(seq=header.seq, stamp=header.stamp, frame_id=header.frame_id) + @dispatch + def __init__(self, header: object) -> None: + """Initialize from a decoded LCM header object.""" + # Handle the case where we get an lcm_msgs.std_msgs.Header.Header object + if hasattr(header, "seq") and hasattr(header, "stamp") and hasattr(header, "frame_id"): + super().__init__(seq=header.seq, stamp=header.stamp, frame_id=header.frame_id) + else: + raise ValueError(f"Cannot create Header from {type(header)}") + @classmethod def now(cls, frame_id: str = "", seq: int = 1) -> Header: """Create a Header with current timestamp.""" diff --git a/dimos/robot/global_planner/algo.py b/dimos/robot/global_planner/algo.py index 45aa483e07..893efa4de9 100644 --- a/dimos/robot/global_planner/algo.py +++ b/dimos/robot/global_planner/algo.py @@ -17,17 +17,19 @@ from collections import deque from typing import Optional, Tuple +from dimos.msgs.geometry_msgs import PoseStamped, Quaternion, VectorLike from dimos.msgs.geometry_msgs import Vector3 as Vector -from dimos.msgs.geometry_msgs import VectorLike -from dimos.types.costmap import Costmap -from dimos.types.path import Path +from dimos.msgs.nav_msgs import OccupancyGrid, Path from dimos.utils.logging_config import setup_logger logger = setup_logger("dimos.robot.unitree.global_planner.astar") def find_nearest_free_cell( - costmap: Costmap, position: VectorLike, cost_threshold: int = 90, max_search_radius: int = 20 + costmap: OccupancyGrid, + position: VectorLike, + cost_threshold: int = 90, + max_search_radius: int = 20, ) -> Tuple[int, int]: """ Find the nearest unoccupied cell in the costmap using BFS. @@ -97,7 +99,7 @@ def find_nearest_free_cell( def astar( - costmap: Costmap, + costmap: OccupancyGrid, goal: VectorLike, start: VectorLike = (0.0, 0.0), cost_threshold: int = 90, @@ -210,12 +212,23 @@ def heuristic(x1, y1, x2, y2): waypoints = [] while current in parents: world_point = costmap.grid_to_world(current) - waypoints.append(world_point) + # Create PoseStamped with identity quaternion (no orientation) + pose = PoseStamped( + frame_id="world", + position=[world_point.x, world_point.y, 0.0], + orientation=Quaternion(0, 0, 0, 1), # Identity quaternion + ) + waypoints.append(pose) current = parents[current] # Add the start position start_world_point = costmap.grid_to_world(start_tuple) - waypoints.append(start_world_point) + start_pose = PoseStamped( + frame_id="world", + position=[start_world_point.x, start_world_point.y, 0.0], + orientation=Quaternion(0, 0, 0, 1), + ) + waypoints.append(start_pose) # Reverse the path (start to goal) waypoints.reverse() @@ -223,15 +236,29 @@ def heuristic(x1, y1, x2, y2): # Add the goal position if it's not already included goal_point = costmap.grid_to_world(goal_tuple) - if not waypoints or waypoints[-1].distance(goal_point) > 1e-5: - waypoints.append(goal_point) + if ( + not waypoints + or (waypoints[-1].x - goal_point.x) ** 2 + (waypoints[-1].y - goal_point.y) ** 2 + > 1e-10 + ): + goal_pose = PoseStamped( + frame_id="world", + position=[goal_point.x, goal_point.y, 0.0], + orientation=Quaternion(0, 0, 0, 1), + ) + waypoints.append(goal_pose) # If we adjusted the goal, add the original goal as the final point if adjusted_goal != original_goal and goal_valid: original_goal_point = costmap.grid_to_world(original_goal) - waypoints.append(original_goal_point) + original_goal_pose = PoseStamped( + frame_id="world", + position=[original_goal_point.x, original_goal_point.y, 0.0], + orientation=Quaternion(0, 0, 0, 1), + ) + waypoints.append(original_goal_pose) - return Path(waypoints) + return Path(frame_id="world", poses=waypoints) # Add current node to closed set closed_set.add(current) diff --git a/dimos/robot/global_planner/planner.py b/dimos/robot/global_planner/planner.py index 3b08c783e0..95b0d060e5 100644 --- a/dimos/robot/global_planner/planner.py +++ b/dimos/robot/global_planner/planner.py @@ -18,17 +18,82 @@ from typing import Callable, Optional from dimos.core import In, Module, Out, rpc -from dimos.msgs.geometry_msgs import Pose, PoseLike, PoseStamped, Vector3, to_pose +from dimos.msgs.geometry_msgs import Pose, PoseLike, PoseStamped, Vector3, VectorLike, to_pose +from dimos.msgs.nav_msgs import OccupancyGrid, Path from dimos.robot.global_planner.algo import astar -from dimos.types.costmap import Costmap -from dimos.types.path import Path -from dimos.types.vector import Vector, VectorLike, to_vector from dimos.utils.logging_config import setup_logger from dimos.web.websocket_vis.helpers import Visualizable logger = setup_logger("dimos.robot.unitree.global_planner") +def resample_path(path: Path, spacing: float) -> Path: + """Resample a path to have approximately uniform spacing between poses. + + Args: + path: The original Path + spacing: Desired distance between consecutive poses + + Returns: + A new Path with resampled poses + """ + if len(path) < 2 or spacing <= 0: + return path + + resampled = [] + resampled.append(path.poses[0]) + + accumulated_distance = 0.0 + + for i in range(1, len(path.poses)): + current = path.poses[i] + prev = path.poses[i - 1] + + # Calculate segment distance + dx = current.x - prev.x + dy = current.y - prev.y + segment_length = (dx**2 + dy**2) ** 0.5 + + if segment_length < 1e-10: + continue + + # Direction vector + dir_x = dx / segment_length + dir_y = dy / segment_length + + # Add points along this segment + while accumulated_distance + segment_length >= spacing: + # Distance along segment for next point + dist_along = spacing - accumulated_distance + if dist_along < 0: + break + + # Create new pose + new_x = prev.x + dir_x * dist_along + new_y = prev.y + dir_y * dist_along + new_pose = PoseStamped( + frame_id=path.frame_id, + position=[new_x, new_y, 0.0], + orientation=prev.orientation, # Keep same orientation + ) + resampled.append(new_pose) + + # Update for next iteration + accumulated_distance = 0 + segment_length -= dist_along + prev = new_pose + + accumulated_distance += segment_length + + # Add last pose if not already there + if len(path.poses) > 1: + last = path.poses[-1] + if not resampled or (resampled[-1].x != last.x or resampled[-1].y != last.y): + resampled.append(last) + + return Path(frame_id=path.frame_id, poses=resampled) + + @dataclass class Planner(Visualizable, Module): target: In[PoseStamped] = None @@ -57,7 +122,7 @@ class AstarPlanner(Planner): target: In[Vector3] = None path: Out[Path] = None - get_costmap: Callable[[], Costmap] + get_costmap: Callable[[], OccupancyGrid] get_robot_pos: Callable[[], Vector3] set_local_nav: Callable[[Path, Optional[threading.Event], Optional[float]], bool] = None @@ -65,7 +130,7 @@ class AstarPlanner(Planner): def __init__( self, - get_costmap: Callable[[], Costmap], + get_costmap: Callable[[], OccupancyGrid], get_robot_pos: Callable[[], Vector3], set_local_nav: Callable[[Path, Optional[threading.Event], Optional[float]], bool] = None, ): @@ -82,20 +147,17 @@ def plan(self, goallike: PoseLike) -> Path: goal = to_pose(goallike) logger.info(f"planning path to goal {goal}") pos = self.get_robot_pos() - print("current pos", pos) - costmap = self.get_costmap() - - print("current costmap", costmap) + costmap = self.get_costmap().gradient() self.vis("target", goal) path = astar(costmap, goal.position, pos) if path: - path = path.resample(0.1) - self.vis("a*", path) + path = resample_path(path, 0.1) self.path.publish(path) if hasattr(self, "set_local_nav") and self.set_local_nav: self.set_local_nav(path) + logger.warning(f"Path found: {path}") return path logger.warning("No path found to the goal.") diff --git a/dimos/robot/local_planner/simple.py b/dimos/robot/local_planner/simple.py deleted file mode 100644 index 4b771f3124..0000000000 --- a/dimos/robot/local_planner/simple.py +++ /dev/null @@ -1,312 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import math -import time -import traceback -from typing import Callable, Optional - -import reactivex as rx -from plum import dispatch -from reactivex import operators as ops - -from dimos.core import In, Module, Out, rpc - -# from dimos.robot.local_planner.local_planner import LocalPlanner -from dimos.msgs.geometry_msgs import ( - Pose, - PoseLike, - PoseStamped, - Transform, - Twist, - Vector3, - VectorLike, - to_pose, -) -from dimos.msgs.tf2_msgs import TFMessage -from dimos.types.costmap import Costmap -from dimos.types.path import Path -from dimos.utils.logging_config import setup_logger -from dimos.utils.threadpool import get_scheduler - -logger = setup_logger("dimos.robot.unitree.local_planner") - - -def vector_to_twist(vector: Vector3) -> Twist: - """Convert a Vector3 to a Twist message.""" - twist = Twist() - twist.linear.x = vector.x - twist.linear.y = vector.y - twist.linear.z = 0.0 - twist.angular.x = 0.0 - twist.angular.y = 0.0 - twist.angular.z = vector.z - return twist - - -class SimplePlanner(Module): - path: In[Path] = None - odom: In[PoseStamped] = None - movecmd: Out[Twist] = None - - tf: Out[Transform] = None - - get_costmap: Callable[[], Costmap] - - latest_odom: Optional[PoseStamped] = None - - goal: Optional[PoseStamped] = None - speed: float = 0.3 - - def __init__( - self, - get_costmap: Callable[[], Costmap], - ): - Module.__init__(self) - self.get_costmap = get_costmap - - def _odom_to_tf(self, odom: PoseStamped) -> Transform: - """Convert Odometry to Transform.""" - return Transform( - translation=odom.position, - rotation=odom.orientation, - frame_id="world", - child_frame_id="base_link", - ts=odom.ts, - ) - - def transform_to_robot_frame( - self, global_target: Vector3, global_robot_position: PoseStamped - ) -> Vector3: - tf1 = self._odom_to_tf(global_robot_position) - tf2 = global_robot_position.find_transform(global_target) - tf2.child_frame_id = "target" - tf2.frame_id = "base_link" - self.tf.publish(TFMessage(tf1, tf2)) - return tf2.translation - - def get_move_stream(self, frequency: float = 20.0) -> rx.Observable: - return rx.interval(1.0 / frequency, scheduler=get_scheduler()).pipe( - # do we have a goal? - ops.filter(lambda _: self.goal is not None), - # do we have odometry data? - ops.filter(lambda _: self.latest_odom is not None), - # transform goal to robot frame with error handling - ops.map(lambda _: self._safe_transform_goal()), - # filter out None results (errors) - ops.filter(lambda result: result is not None), - ops.map(self._calculate_rotation_to_target), - # filter out None results from calc_move - ops.filter(lambda result: result is not None), - ) - - def _safe_transform_goal(self) -> Optional[Vector3]: - """Safely transform goal to robot frame with error handling.""" - try: - return self.transform_to_robot_frame(self.goal, self.latest_odom) - except Exception as e: - logger.error(f"Error transforming goal to robot frame: {e}") - print(traceback.format_exc()) - return None - - @rpc - def start(self): - print(self.path.connection, self.path.transport) - self.path.subscribe(self.set_goal) - - def setodom(odom: PoseStamped): - self.latest_odom = odom - - def pubmove(move: Vector3): - print("PUBMOVE", move, "\n\n") - # print(self.movecmd) - try: - self.movecmd.publish(move) - except Exception as e: - print(traceback.format_exc()) - print(e) - - self.odom.subscribe(setodom) - self.get_move_stream(frequency=20.0).subscribe(pubmove) - - @dispatch - def set_goal(self, goal: Path, stop_event=None, goal_theta=None) -> bool: - self.goal = to_pose(goal.last()) - logger.info(f"Setting goal: {self.goal}") - return True - - @dispatch - def set_goal(self, goal: VectorLike, stop_event=None, goal_theta=None) -> bool: - self.goal = to_pose(goal.last()) - logger.info(f"Setting goal: {self.goal}") - return True - - def calc_move(self, direction: Vector3) -> Optional[Vector3]: - """Calculate the movement vector based on the direction to the goal. - - Args: - direction: Direction vector towards the goal - - Returns: - Movement vector scaled by speed, or None if error occurs - """ - try: - # Normalize the direction vector and scale by speed - normalized_direction = direction.normalize() - move_vector = normalized_direction * self.speed - print("CALC MOVE", direction, normalized_direction, move_vector) - return move_vector - except Exception as e: - logger.error(f"Error calculating move vector: {e}") - return None - - def spy(self, name: str): - def spyfun(x): - print(f"SPY {name}:", x) - return x - - return ops.map(spyfun) - - def frequency_spy(self, name: str, window_size: int = 10): - """Create a frequency spy that logs message rate over a sliding window. - - Args: - name: Name for the spy output - window_size: Number of messages to average frequency over - """ - timestamps = [] - - def freq_spy_fun(x): - current_time = time.time() - timestamps.append(current_time) - print(x) - # Keep only the last window_size timestamps - if len(timestamps) > window_size: - timestamps.pop(0) - - # Calculate frequency if we have enough samples - if len(timestamps) >= 2: - time_span = timestamps[-1] - timestamps[0] - if time_span > 0: - frequency = (len(timestamps) - 1) / time_span - print(f"FREQ SPY {name}: {frequency:.2f} Hz ({len(timestamps)} samples)") - else: - print(f"FREQ SPY {name}: calculating... ({len(timestamps)} samples)") - else: - print(f"FREQ SPY {name}: warming up... ({len(timestamps)} samples)") - - return x - - return ops.map(freq_spy_fun) - - def _test_translational_movement(self) -> Vector3: - """Test translational movement by alternating left and right movement. - - Returns: - Vector with (x=0, y=left/right, z=0) for testing left-right movement - """ - # Use time to alternate between left and right movement every 3 seconds - current_time = time.time() - cycle_time = 6.0 # 6 second cycle (3 seconds each direction) - phase = (current_time % cycle_time) / cycle_time - - if phase < 0.5: - # First half: move LEFT (positive X according to our documentation) - movement = Vector3(0.2, 0, 0) # Move left at 0.2 m/s - direction = "LEFT (positive X)" - else: - # Second half: move RIGHT (negative X according to our documentation) - movement = Vector3(-0.2, 0, 0) # Move right at 0.2 m/s - direction = "RIGHT (negative X)" - - print("=== LEFT-RIGHT MOVEMENT TEST ===") - print(f"Phase: {phase:.2f}, Direction: {direction}") - print(f"Sending movement command: {movement}") - print(f"Expected: Robot should move {direction.split()[0]} relative to its body") - print("===================================") - return movement - - def _calculate_rotation_to_target(self, direction_to_goal: Vector3) -> Vector3: - """Calculate the rotation needed for the robot to face the target. - - Args: - direction_to_goal: Vector pointing from robot position to goal in global coordinates - - Returns: - Vector with (x=0, y=0, z=angular_velocity) for rotation only - """ - if self.latest_odom is None: - logger.warning("No odometry data available for rotation calculation") - return Vector3(0, 0, 0) - - # Calculate the desired yaw angle to face the target - desired_yaw = math.atan2(direction_to_goal.y, direction_to_goal.x) - - # Get current robot yaw - current_yaw = self.latest_odom.yaw - - # Calculate the yaw error using a more robust method to avoid oscillation - yaw_error = math.atan2( - math.sin(desired_yaw - current_yaw), math.cos(desired_yaw - current_yaw) - ) - - print( - f"DEBUG: direction_to_goal={direction_to_goal}, desired_yaw={math.degrees(desired_yaw):.1f}°, current_yaw={math.degrees(current_yaw):.1f}°" - ) - print( - f"DEBUG: yaw_error={math.degrees(yaw_error):.1f}°, abs_error={abs(yaw_error):.3f}, tolerance=0.1" - ) - - # Calculate angular velocity (proportional control) - max_angular_speed = 0.15 # rad/s - raw_angular_velocity = yaw_error * 2.0 - angular_velocity = max(-max_angular_speed, min(max_angular_speed, raw_angular_velocity)) - - print( - f"DEBUG: raw_ang_vel={raw_angular_velocity:.3f}, clamped_ang_vel={angular_velocity:.3f}" - ) - - # Stop rotating if we're close enough to the target angle - if abs(yaw_error) < 0.1: # ~5.7 degrees tolerance - print("DEBUG: Within tolerance - stopping rotation") - angular_velocity = 0.0 - else: - print("DEBUG: Outside tolerance - continuing rotation") - - print( - f"Rotation control: current_yaw={math.degrees(current_yaw):.1f}°, desired_yaw={math.degrees(desired_yaw):.1f}°, error={math.degrees(yaw_error):.1f}°, ang_vel={angular_velocity:.3f}" - ) - - # Return movement command: no translation (x=0, y=0), only rotation (z=angular_velocity) - # Try flipping the sign in case the rotation convention is opposite - return Vector3(0, 0, -angular_velocity) - - def _debug_direction(self, name: str, direction: Vector3) -> Vector3: - """Debug helper to log direction information""" - robot_pos = self.latest_odom - if robot_pos is None: - print(f"DEBUG {name}: direction={direction}, robot_pos=None, goal={self.goal}") - return direction - - print( - f"DEBUG {name}: direction={direction}, robot_pos={robot_pos.position.to_2d()}, robot_yaw={math.degrees(robot_pos.orientation.z):.1f}°, goal={self.goal}" - ) - return direction - - def _debug_robot_command(self, robot_cmd: Vector3) -> Vector3: - """Debug helper to log robot command information""" - print( - f"DEBUG robot_command: x={robot_cmd.x:.3f}, y={robot_cmd.y:.3f} (forward/backward, left/right)" - ) - return robot_cmd diff --git a/dimos/robot/unitree_webrtc/multiprocess/test_actors.py b/dimos/robot/unitree_webrtc/multiprocess/test_actors.py index c6679d3262..a346859afb 100644 --- a/dimos/robot/unitree_webrtc/multiprocess/test_actors.py +++ b/dimos/robot/unitree_webrtc/multiprocess/test_actors.py @@ -25,7 +25,6 @@ from dimos.msgs.sensor_msgs import Image from dimos.protocol import pubsub from dimos.robot.global_planner import AstarPlanner -from dimos.robot.local_planner.simple import SimplePlanner from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.map import Map from dimos.robot.unitree_webrtc.type.map import Map as Mapper diff --git a/dimos/robot/unitree_webrtc/multiprocess/unitree_go2_navonly.py b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2_navonly.py new file mode 100644 index 0000000000..3ee42305b2 --- /dev/null +++ b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2_navonly.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import functools +import logging +import os +import threading +import time +import warnings +from typing import Callable, Optional + +from reactivex import Observable +from reactivex import operators as ops + +import dimos.core.colors as colors +from dimos import core +from dimos.core import In, Module, Out, rpc +from dimos.msgs.geometry_msgs import Pose, PoseStamped, Transform, Vector3 +from dimos.msgs.nav_msgs import OccupancyGrid, Path +from dimos.msgs.sensor_msgs import Image +from dimos.perception.spatial_perception import SpatialMemory +from dimos.protocol import pubsub +from dimos.protocol.tf import TF +from dimos.robot.foxglove_bridge import FoxgloveBridge +from dimos.robot.frontier_exploration.wavefront_frontier_goal_selector import ( + WavefrontFrontierExplorer, +) +from dimos.robot.global_planner import AstarPlanner +from dimos.robot.local_planner.vfh_local_planner import VFHPurePursuitPlanner +from dimos.robot.unitree_webrtc.connection import UnitreeWebRTCConnection, VideoMessage +from dimos.robot.unitree_webrtc.type.lidar import LidarMessage +from dimos.robot.unitree_webrtc.type.map import Map +from dimos.robot.unitree_webrtc.type.odometry import Odometry +from dimos.types.costmap import Costmap +from dimos.types.vector import Vector +from dimos.utils.data import get_data +from dimos.utils.logging_config import setup_logger +from dimos.utils.reactive import getter_streaming +from dimos.utils.testing import TimedSensorReplay + +logger = setup_logger("dimos.robot.unitree_webrtc.multiprocess.unitree_go2", level=logging.INFO) + +# Suppress verbose loggers +logging.getLogger("aiortc.codecs.h264").setLevel(logging.ERROR) +logging.getLogger("lcm_foxglove_bridge").setLevel(logging.ERROR) +logging.getLogger("websockets.server").setLevel(logging.ERROR) +logging.getLogger("FoxgloveServer").setLevel(logging.ERROR) +logging.getLogger("asyncio").setLevel(logging.ERROR) +logging.getLogger("root").setLevel(logging.WARNING) + +# Suppress warnings +warnings.filterwarnings("ignore", message="coroutine.*was never awaited") +warnings.filterwarnings("ignore", message="H264Decoder.*failed to decode") + + +# can be swapped in for UnitreeWebRTCConnection +class FakeRTC(UnitreeWebRTCConnection): + def __init__(self, *args, **kwargs): + # ensures we download msgs from lfs store + data = get_data("unitree_office_walk") + + def connect(self): ... + + def standup(self): + print("standup supressed") + + def liedown(self): + print("liedown supressed") + + @functools.cache + def lidar_stream(self): + print("lidar stream start") + lidar_store = TimedSensorReplay("unitree_office_walk/lidar", autocast=LidarMessage.from_msg) + return lidar_store.stream() + + @functools.cache + def odom_stream(self): + print("odom stream start") + odom_store = TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + return odom_store.stream() + + @functools.cache + def video_stream(self): + print("video stream start") + video_store = TimedSensorReplay("unitree_office_walk/video", autocast=Image.from_numpy) + return video_store.stream() + + def move(self, vector: Vector): + ... + # print("move supressed", vector) + + +class ConnectionModule(FakeRTC, Module): + movecmd: In[Vector3] = None + odom: Out[Vector3] = None + lidar: Out[LidarMessage] = None + video: Out[VideoMessage] = None + ip: str + + _odom: Callable[[], Odometry] + _lidar: Callable[[], LidarMessage] + + @rpc + def move(self, vector: Vector3): + super().move(vector) + + def __init__(self, ip: str, *args, **kwargs): + self.ip = ip + self.tf = TF() + Module.__init__(self, *args, **kwargs) + + @rpc + def start(self): + # Initialize the parent WebRTC connection + super().__init__(self.ip) + # Connect sensor streams to LCM outputs + self.lidar_stream().subscribe(self.lidar.publish) + self.odom_stream().subscribe(self.odom.publish) + # self.video_stream().subscribe(self.video.publish) + self.tf_stream().subscribe(self.tf.publish) + + # Connect LCM input to robot movement commands + self.movecmd.subscribe(self.move) + + # Set up streaming getters for latest sensor data + self._odom = getter_streaming(self.odom_stream()) + self._lidar = getter_streaming(self.lidar_stream()) + + @rpc + def get_local_costmap(self) -> Costmap: + return self._lidar().costmap() + + @rpc + def get_odom(self) -> Odometry: + return self._odom() + + @rpc + def get_pos(self) -> Vector: + return self._odom().position + + +class ControlModule(Module): + plancmd: Out[Pose] = None + + @rpc + def start(self): + def plancmd(): + while True: + time.sleep(0.5) + print(colors.red("requesting global plan")) + self.plancmd.publish( + PoseStamped( + ts=time.time(), + position=(0, 0, 0), + orientation=(0, 0, 0, 1), + ) + ) + + thread = threading.Thread(target=plancmd, daemon=True) + thread.start() + + +class UnitreeGo2Light: + ip: str + + def __init__(self, ip: str): + self.ip = ip + + def start(self): + dimos = core.start(4) + + connection = dimos.deploy(ConnectionModule, self.ip) + connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage) + connection.odom.transport = core.LCMTransport("/odom", PoseStamped) + connection.video.transport = core.LCMTransport("/video", Image) + connection.movecmd.transport = core.LCMTransport("/mov", Vector3) + + mapper = dimos.deploy(Map, voxel_size=0.5, global_publish_interval=2.5) + + mapper.global_map.transport = core.LCMTransport("/global_map", LidarMessage) + mapper.global_costmap.transport = core.LCMTransport("/global_costmap", OccupancyGrid) + + mapper.lidar.connect(connection.lidar) + + global_planner = dimos.deploy( + AstarPlanner, + get_costmap=mapper.costmap, + get_robot_pos=connection.get_pos, + set_local_nav=print, + ) + + ctrl = dimos.deploy(ControlModule) + + ctrl.plancmd.transport = core.LCMTransport("/global_target", PoseStamped) + global_planner.path.transport = core.LCMTransport("/global_path", Path) + global_planner.target.connect(ctrl.plancmd) + foxglove_bridge = FoxgloveBridge() + + connection.start() + mapper.start() + global_planner.start() + foxglove_bridge.start() + ctrl.start() + + +if __name__ == "__main__": + import os + + ip = os.getenv("ROBOT_IP") + pubsub.lcm.autoconf() + robot = UnitreeGo2Light(ip) + robot.start() + + while True: + time.sleep(1) diff --git a/dimos/robot/unitree_webrtc/type/map.py b/dimos/robot/unitree_webrtc/type/map.py index 898bd473b5..46d311b039 100644 --- a/dimos/robot/unitree_webrtc/type/map.py +++ b/dimos/robot/unitree_webrtc/type/map.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from dataclasses import dataclass from typing import Optional, Tuple -import time + import numpy as np import open3d as o3d import reactivex.operators as ops @@ -22,13 +23,16 @@ from reactivex.observable import Observable from dimos.core import In, Module, Out, rpc +from dimos.msgs.nav_msgs import OccupancyGrid +from dimos.msgs.sensor_msgs import PointCloud2 from dimos.robot.unitree_webrtc.type.lidar import LidarMessage -from dimos.types.costmap import Costmap, pointcloud_to_costmap class Map(Module): lidar: In[LidarMessage] = None global_map: Out[LidarMessage] = None + global_costmap: Out[OccupancyGrid] = None + pointcloud: o3d.geometry.PointCloud = o3d.geometry.PointCloud() def __init__( @@ -47,11 +51,33 @@ def __init__( def start(self): self.lidar.subscribe(self.add_frame) - if self.global_publish_interval is not None: - interval(self.global_publish_interval).subscribe( - lambda _: self.global_map.publish(self.to_lidar_message()) + def publish(_): + self.global_map.publish(self.to_lidar_message()) + + # temporary, not sure if it belogs in mapper + # used only for visualizations, not for any algo + occupancygrid = ( + OccupancyGrid.from_pointcloud( + self.to_lidar_message(), + resolution=0.05, + min_height=0.1, + max_height=2.0, + ) + .inflate(0.1) + .gradient() ) + self.global_costmap.publish(occupancygrid) + + if self.global_publish_interval is not None: + interval(self.global_publish_interval).subscribe(publish) + + def to_PointCloud2(self) -> PointCloud2: + return PointCloud2( + pointcloud=self.pointcloud, + ts=time.time(), + ) + def to_lidar_message(self) -> LidarMessage: return LidarMessage( pointcloud=self.pointcloud, @@ -75,16 +101,8 @@ def o3d_geometry(self) -> o3d.geometry.PointCloud: return self.pointcloud @rpc - def costmap(self) -> Costmap: - """Return a fully inflated cost-map in a `Costmap` wrapper.""" - inflate_radius_m = 0.5 * self.voxel_size if self.voxel_size > self.cost_resolution else 0.0 - grid, origin_xy = pointcloud_to_costmap( - self.pointcloud, - resolution=self.cost_resolution, - inflate_radius_m=inflate_radius_m, - ) - - return Costmap(grid=grid, origin=[*origin_xy, 0.0], resolution=self.cost_resolution) + def costmap(self) -> OccupancyGrid: + return OccupancyGrid.from_pointcloud(self.to_PointCloud2()) def splice_sphere( diff --git a/dimos/robot/unitree_webrtc/type/test_map.py b/dimos/robot/unitree_webrtc/type/test_map.py index d705bb965b..9e3141768d 100644 --- a/dimos/robot/unitree_webrtc/type/test_map.py +++ b/dimos/robot/unitree_webrtc/type/test_map.py @@ -65,16 +65,8 @@ def test_robot_mapping(): # we investigate built map costmap = map.costmap() - assert costmap.grid.shape == (404, 276) + assert costmap.grid.shape == (442, 314) - assert 70 <= costmap.unknown_percent <= 80, ( - f"Unknown percent {costmap.unknown_percent} is not within the range 70-80" - ) - - assert 5 < costmap.free_percent < 10, ( - f"Free percent {costmap.free_percent} is not within the range 5-10" - ) - - assert 8 < costmap.occupied_percent < 15, ( - f"Occupied percent {costmap.occupied_percent} is not within the range 8-15" - ) + assert 70 <= costmap.unknown_percent <= 95 + assert 4 < costmap.free_percent < 10 + assert 1 < costmap.occupied_percent < 15 diff --git a/pyproject.toml b/pyproject.toml index 731301a74a..fa0c73cbce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,8 +94,7 @@ dependencies = [ "dask[complete]==2025.5.1", # LCM / DimOS utilities - "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@b15f8c65db6ed04173f9b35fd70287453c3625f7" - # "tf-lcm-py @ git+https://github.com/dimensionalOS/dimos-lcm.git@9befcc1a75f43fb1110d468c182dd19a549eaa36#subdirectory=tf_lcm/python" + "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@ba3445d16be75a7ade6fb2a516b39a3e44319d5c" ]