diff --git a/dimos/robot/global_planner/planner.py b/dimos/robot/global_planner/planner.py index f4b5d17bc6..75a780d7cd 100644 --- a/dimos/robot/global_planner/planner.py +++ b/dimos/robot/global_planner/planner.py @@ -44,6 +44,7 @@ def set_goal( if not path: logger.warning("No path found to the goal.") return False + print("pathing success", path) return self.set_local_nav(path, stop_event=stop_event, goal_theta=goal_theta) @@ -58,7 +59,7 @@ class AstarPlanner(Planner): def plan(self, goal: VectorLike) -> Path: goal = to_vector(goal).to_2d() pos = self.get_robot_pos().to_2d() - costmap = self.get_costmap().smudge(preserve_unknown=False) + costmap = self.get_costmap().smudge() # self.vis("costmap", costmap) self.vis("target", goal) diff --git a/dimos/robot/unitree_webrtc/type/costmap.py b/dimos/robot/unitree_webrtc/type/costmap.py deleted file mode 100644 index 711ae8c0e1..0000000000 --- a/dimos/robot/unitree_webrtc/type/costmap.py +++ /dev/null @@ -1,366 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -import pickle -import numpy as np -from typing import Optional -from scipy import ndimage -from dimos.types.vector import Vector, VectorLike, x, y, to_vector -import open3d as o3d -from matplotlib import cm # any matplotlib colormap - -DTYPE2STR = { - np.float32: "f32", - np.float64: "f64", - np.int32: "i32", - np.int8: "i8", -} - -STR2DTYPE = {v: k for k, v in DTYPE2STR.items()} - - -def encode_ndarray(arr: np.ndarray, compress: bool = False): - arr_c = np.ascontiguousarray(arr) - payload = arr_c.tobytes() - b64 = base64.b64encode(payload).decode("ascii") - - return { - "type": "grid", - "shape": arr_c.shape, - "dtype": DTYPE2STR[arr_c.dtype.type], - "data": b64, - } - - -class Costmap: - """Class to hold ROS OccupancyGrid data.""" - - def __init__( - self, - grid: np.ndarray, - origin: VectorLike, - resolution: float = 0.05, - ): - """Initialize Costmap with its core attributes.""" - self.grid = grid - self.resolution = resolution - self.origin = to_vector(origin).to_2d() - self.width = self.grid.shape[1] - self.height = self.grid.shape[0] - - def serialize(self) -> dict: - """Serialize the Costmap instance to a dictionary.""" - return { - "type": "costmap", - "grid": encode_ndarray(self.grid), - "origin": self.origin.serialize(), - "resolution": self.resolution, - } - - def save_pickle(self, pickle_path: str): - """Save costmap to a pickle file. - - Args: - pickle_path: Path to save the pickle file - """ - with open(pickle_path, "wb") as f: - pickle.dump(self, f) - - @classmethod - def create_empty( - cls, width: int = 100, height: int = 100, resolution: float = 0.1 - ) -> "Costmap": - """Create an empty costmap with specified dimensions.""" - return cls( - grid=np.zeros((height, width), dtype=np.int8), - resolution=resolution, - origin=(0.0, 0.0), - ) - - def world_to_grid(self, point: VectorLike) -> Vector: - """Convert world coordinates to grid coordinates. - - Args: - point: A vector-like object containing X,Y coordinates - - Returns: - Vector containing grid_x and grid_y coordinates - """ - return (to_vector(point) - self.origin) / self.resolution - - def grid_to_world(self, grid_point: VectorLike) -> Vector: - return to_vector(grid_point) * self.resolution + self.origin - - def is_occupied(self, point: VectorLike, threshold: int = 50) -> bool: - """Check if a position in world coordinates is occupied. - - Args: - point: Vector-like object containing X,Y coordinates - threshold: Cost threshold above which a cell is considered occupied (0-100) - - Returns: - True if position is occupied or out of bounds, False otherwise - """ - grid_pos = self.world_to_grid(point) - - if 0 <= grid_pos.x < self.width and 0 <= grid_pos.y < self.height: - # Consider unknown (-1) as unoccupied for navigation purposes - # Convert to int coordinates for grid indexing - grid_y, grid_x = int(grid_pos.y), int(grid_pos.x) - value = self.grid[grid_y, grid_x] - return bool(value > 0 and value >= threshold) - return True # Consider out-of-bounds as occupied - - def get_value(self, point: VectorLike) -> Optional[int]: - grid_pos = self.world_to_grid(point) - - if 0 <= grid_pos.x < self.width and 0 <= grid_pos.y < self.height: - grid_y, grid_x = int(grid_pos.y), int(grid_pos.x) - return int(self.grid[grid_y, grid_x]) - return None - - def set_value(self, point: VectorLike, value: int = 0) -> bool: - grid_pos = self.world_to_grid(point) - - if 0 <= grid_pos.x < self.width and 0 <= grid_pos.y < self.height: - grid_y, grid_x = int(grid_pos.y), int(grid_pos.x) - self.grid[grid_y, grid_x] = value - return True - return False - - def smudge( - self, - kernel_size: int = 3, - iterations: int = 20, - decay_factor: float = 0.9, - threshold: int = 90, - preserve_unknown: bool = False, - ) -> "Costmap": - """ - Creates a new costmap with expanded obstacles (smudged). - - Args: - kernel_size: Size of the convolution kernel for dilation (must be odd) - iterations: Number of dilation iterations - decay_factor: Factor to reduce cost as distance increases (0.0-1.0) - threshold: Minimum cost value to consider as an obstacle for expansion - preserve_unknown: Whether to keep unknown (-1) cells as unknown - - Returns: - A new Costmap instance with expanded obstacles - """ - # Make sure kernel size is odd - if kernel_size % 2 == 0: - kernel_size += 1 - - # Create a copy of the grid for processing - grid_copy = self.grid.copy() - - # Create a mask of unknown cells if needed - unknown_mask = None - if preserve_unknown: - unknown_mask = grid_copy == -1 - # Temporarily replace unknown cells with 0 for processing - # This allows smudging to go over unknown areas - grid_copy[unknown_mask] = 0 - - # Create a mask of cells that are above the threshold - obstacle_mask = grid_copy >= threshold - - # Create a binary map of obstacles - binary_map = obstacle_mask.astype(np.uint8) * 100 - - # Create a circular kernel for dilation (instead of square) - y, x = np.ogrid[ - -kernel_size // 2 : kernel_size // 2 + 1, - -kernel_size // 2 : kernel_size // 2 + 1, - ] - kernel = (x * x + y * y <= (kernel_size // 2) * (kernel_size // 2)).astype(np.uint8) - - # Create distance map using dilation - # Each iteration adds one 'ring' of cells around obstacles - dilated_map = binary_map.copy() - - # Store each layer of dilation with decreasing values - layers = [] - - # First layer is the original obstacle cells - layers.append(binary_map.copy()) - - for i in range(iterations): - # Dilate the binary map - dilated = ndimage.binary_dilation( - dilated_map > 0, structure=kernel, iterations=1 - ).astype(np.uint8) - - # Calculate the new layer (cells that were just added in this iteration) - new_layer = (dilated - (dilated_map > 0).astype(np.uint8)) * 100 - - # Apply decay factor based on distance from obstacle - new_layer = new_layer * (decay_factor ** (i + 1)) - - # Add to layers list - layers.append(new_layer) - - # Update dilated map for next iteration - dilated_map = dilated * 100 - - # Combine all layers to create a distance-based cost map - smudged_map = np.zeros_like(grid_copy) - for layer in layers: - # For each cell, keep the maximum value across all layers - smudged_map = np.maximum(smudged_map, layer) - - # Preserve original obstacles - smudged_map[obstacle_mask] = grid_copy[obstacle_mask] - - # When preserve_unknown is true, restore all original unknown cells - # This overlays unknown cells on top of the smudged map - if preserve_unknown and unknown_mask is not None: - smudged_map[unknown_mask] = -1 - - # Ensure cost values are in valid range (0-100) except for unknown (-1) - if preserve_unknown and unknown_mask is not None: - valid_cells = ~unknown_mask - smudged_map[valid_cells] = np.clip(smudged_map[valid_cells], 0, 100) - else: - smudged_map = np.clip(smudged_map, 0, 100) - - # Create a new costmap with the smudged grid - return Costmap( - grid=smudged_map.astype(np.int8), - resolution=self.resolution, - origin=self.origin, - ) - - @property - def total_cells(self) -> int: - return self.width * self.height - - @property - def occupied_cells(self) -> int: - return np.sum(self.grid >= 0.1) - - @property - def unknown_cells(self) -> int: - return np.sum(self.grid == -1) - - @property - def free_cells(self) -> int: - return self.total_cells - self.occupied_cells - self.unknown_cells - - @property - def free_percent(self) -> float: - return (self.free_cells / self.total_cells) * 100 if self.total_cells > 0 else 0.0 - - @property - def occupied_percent(self) -> float: - return (self.occupied_cells / self.total_cells) * 100 if self.total_cells > 0 else 0.0 - - @property - def unknown_percent(self) -> float: - return (self.unknown_cells / self.total_cells) * 100 if self.total_cells > 0 else 0.0 - - def __str__(self) -> str: - """ - Create a string representation of the Costmap. - - Returns: - A formatted string with key costmap information - """ - - cell_info = [ - "▦ Costmap", - f"{self.width}x{self.height}", - f"({self.width * self.resolution:.1f}x{self.height * self.resolution:.1f}m @", - f"{1 / self.resolution:.0f}cm res)", - f"Origin: ({x(self.origin):.2f}, {y(self.origin):.2f})", - f"▣ {self.occupied_percent:.1f}%", - f"□ {self.free_percent:.1f}%", - f"◌ {self.unknown_percent:.1f}%", - ] - - return " ".join(cell_info) - - @property - def o3d_geometry(self): - return self.pointcloud - - @property - def pointcloud(self, *, res: float = 0.25, origin=(0.0, 0.0), show_unknown: bool = False): - """ - Visualise a 2-D costmap (int8, −1…100) as an Open3D PointCloud. - - • −1 → ‘unknown’ (optionally drawn as mid-grey, or skipped) - • 0 → free - • 1-99→ graduated cost (turbo colour-ramp) - • 100 → lethal / obstacle (red end of ramp) - - Parameters - ---------- - res : float - Cell size in metres. - origin : (float, float) - World-space coord of costmap [row0,col0] centre. - show_unknown : bool - If true, draw unknown cells in grey; otherwise omit them. - """ - cost = np.asarray(self.grid, dtype=np.int16) - if cost.ndim != 2: - raise ValueError("cost map must be 2-D (H×W)") - - H, W = cost.shape - ys, xs = np.mgrid[0:H, 0:W] - - # ---------- flatten & mask -------------------------------------------------- - xs = xs.ravel() - ys = ys.ravel() - vals = cost.ravel() - - unknown_mask = vals == -1 - if not show_unknown: - keep = ~unknown_mask - xs, ys, vals = xs[keep], ys[keep], vals[keep] - - # ---------- 3-D points ------------------------------------------------------ - xyz = np.column_stack( - ( - (xs + 0.5) * res + origin[0], # X - (ys + 0.5) * res + origin[1], # Y - np.zeros_like(xs, dtype=np.float32), # Z = 0 - ) - ) - - # ---------- colours --------------------------------------------------------- - rgb = np.empty((len(vals), 3), dtype=np.float32) - - if show_unknown: - # mid-grey for unknown - rgb[unknown_mask[~unknown_mask if not show_unknown else slice(None)]] = ( - 0.4, - 0.4, - 0.4, - ) - - # normalise valid costs: 0…100 → 0…1 - norm = np.clip(vals.astype(np.float32), 0, 100) / 100.0 - rgb_valid = cm.turbo(norm)[:, :3] # type: ignore[attr-defined] # strip alpha - rgb[:] = rgb_valid # unknown already set if needed - - pcd = o3d.geometry.PointCloud() - pcd.points = o3d.utility.Vector3dVector(xyz) - pcd.colors = o3d.utility.Vector3dVector(rgb) - - return pcd diff --git a/dimos/robot/unitree_webrtc/type/lidar.py b/dimos/robot/unitree_webrtc/type/lidar.py index c885889997..29ccab4555 100644 --- a/dimos/robot/unitree_webrtc/type/lidar.py +++ b/dimos/robot/unitree_webrtc/type/lidar.py @@ -15,6 +15,7 @@ from dimos.robot.unitree_webrtc.testing.helpers import color from datetime import datetime from dimos.robot.unitree_webrtc.type.timeseries import Timestamped, to_datetime, to_human_readable +from dimos.types.costmap import Costmap, pointcloud_to_costmap from dimos.types.vector import Vector from dataclasses import dataclass, field from typing import List, TypedDict @@ -54,6 +55,7 @@ class LidarMessage(Timestamped): resolution: float pointcloud: o3d.geometry.PointCloud raw_msg: RawLidarMsg = field(repr=False, default=None) + _costmap: Costmap = field(init=False, repr=False, default=None) @classmethod def from_msg(cls, raw_message: RawLidarMsg) -> "LidarMessage": @@ -153,3 +155,10 @@ def get_color(color_choice): # Looks like we'll be displaying so might as well? self.estimate_normals() return self + + def costmap(self) -> Costmap: + if not self._costmap: + grid, origin_xy = pointcloud_to_costmap(self.pointcloud, resolution=self.resolution) + self._costmap = Costmap(grid=grid, origin=[*origin_xy, 0.0], resolution=self.resolution) + + return self._costmap diff --git a/dimos/robot/unitree_webrtc/type/map.py b/dimos/robot/unitree_webrtc/type/map.py index d354b47396..09518433c3 100644 --- a/dimos/robot/unitree_webrtc/type/map.py +++ b/dimos/robot/unitree_webrtc/type/map.py @@ -18,7 +18,7 @@ from typing import Tuple, Optional from dimos.robot.unitree_webrtc.type.lidar import LidarMessage -from dimos.robot.unitree_webrtc.type.costmap import Costmap +from dimos.types.costmap import Costmap, pointcloud_to_costmap from reactivex.observable import Observable import reactivex.operators as ops @@ -117,68 +117,3 @@ def _inflate_lethal(costmap: np.ndarray, radius: int, lethal_val: int = 100) -> out = costmap.copy() out[dilated] = lethal_val return out - - -def pointcloud_to_costmap( - pcd: o3d.geometry.PointCloud, - *, - resolution: float = 0.05, - ground_z: float = 0.0, - obs_min_height: float = 0.15, - max_height: Optional[float] = 0.5, - inflate_radius_m: Optional[float] = None, - default_unknown: int = -1, - cost_free: int = 0, - cost_lethal: int = 100, -) -> Tuple[np.ndarray, np.ndarray]: - """Rasterise *pcd* into a 2-D int8 cost-map with optional obstacle inflation. - - Grid origin is **aligned** to the `resolution` lattice so that when - `resolution == voxel_size` every voxel centroid lands squarely inside a cell - (no alternating blank lines). - """ - - pts = np.asarray(pcd.points, dtype=np.float32) - if pts.size == 0: - return np.full((1, 1), default_unknown, np.int8), np.zeros(2, np.float32) - - # 0. Ceiling filter -------------------------------------------------------- - if max_height is not None: - pts = pts[pts[:, 2] <= max_height] - if pts.size == 0: - return np.full((1, 1), default_unknown, np.int8), np.zeros(2, np.float32) - - # 1. Bounding box & aligned origin --------------------------------------- - xy_min = pts[:, :2].min(axis=0) - xy_max = pts[:, :2].max(axis=0) - - # Align origin to the resolution grid (anchor = 0,0) - origin = np.floor(xy_min / resolution) * resolution - - # Grid dimensions (inclusive) ------------------------------------------- - Nx, Ny = (np.ceil((xy_max - origin) / resolution).astype(int) + 1).tolist() - - # 2. Bin points ------------------------------------------------------------ - idx_xy = np.floor((pts[:, :2] - origin) / resolution).astype(np.int32) - np.clip(idx_xy[:, 0], 0, Nx - 1, out=idx_xy[:, 0]) - np.clip(idx_xy[:, 1], 0, Ny - 1, out=idx_xy[:, 1]) - - lin = idx_xy[:, 1] * Nx + idx_xy[:, 0] - z_max = np.full(Nx * Ny, -np.inf, np.float32) - np.maximum.at(z_max, lin, pts[:, 2]) - z_max = z_max.reshape(Ny, Nx) - - # 3. Cost rules ----------------------------------------------------------- - costmap = np.full_like(z_max, default_unknown, np.int8) - known = z_max != -np.inf - costmap[known] = cost_free - - lethal = z_max >= (ground_z + obs_min_height) - costmap[lethal] = cost_lethal - - # 4. Optional inflation ---------------------------------------------------- - if inflate_radius_m and inflate_radius_m > 0: - cells = int(np.ceil(inflate_radius_m / resolution)) - costmap = _inflate_lethal(costmap, cells, lethal_val=cost_lethal) - - return costmap, origin.astype(np.float32) diff --git a/dimos/robot/unitree_webrtc/type/odometry.py b/dimos/robot/unitree_webrtc/type/odometry.py index 07ccdd8304..a2df73184f 100644 --- a/dimos/robot/unitree_webrtc/type/odometry.py +++ b/dimos/robot/unitree_webrtc/type/odometry.py @@ -12,12 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TypedDict, Literal +import math from datetime import datetime -from dataclasses import dataclass -from dimos.types.vector import Vector +from typing import Literal, TypedDict + +from dimos.robot.unitree_webrtc.type.timeseries import ( + EpochLike, + to_datetime, + to_human_readable, +) from dimos.types.position import Position -from dimos.robot.unitree_webrtc.type.timeseries import Timestamped, to_human_readable +from dimos.types.vector import VectorLike raw_odometry_msg_sample = { "type": "msg", @@ -71,18 +76,41 @@ class RawOdometryMessage(TypedDict): data: OdometryData -@dataclass -class Odometry(Timestamped, Position): - ts: datetime +class Odometry(Position): + def __init__(self, pos: VectorLike, rot: VectorLike, ts: EpochLike): + super().__init__(pos, rot) + self.ts = to_datetime(ts) if ts else datetime.now() + + @staticmethod + def quaternion_to_yaw(x: float, y: float, z: float, w: float) -> float: + """Convert quaternion to yaw angle (rotation around z-axis) in radians.""" + # Calculate yaw (rotation around z-axis) + siny_cosp = 2 * (w * z + x * y) + cosy_cosp = 1 - 2 * (y * y + z * z) + yaw = math.atan2(siny_cosp, cosy_cosp) + return yaw @classmethod def from_msg(cls, msg: RawOdometryMessage) -> "Odometry": pose = msg["data"]["pose"] orientation = pose["orientation"] position = pose["position"] - pos = Vector(position.get("x"), position.get("y"), position.get("z")) - rot = Vector(orientation.get("x"), orientation.get("y"), orientation.get("z")) - return cls(pos=pos, rot=rot, ts=msg["data"]["header"]["stamp"]) + + # Extract position + pos = [position.get("x"), position.get("y"), position.get("z")] + + # Extract quaternion components + qx = orientation.get("x") + qy = orientation.get("y") + qz = orientation.get("z") + qw = orientation.get("w") + + # Convert quaternion to yaw angle and store in rot.z + # Keep x,y as quaternion components for now, but z becomes the actual yaw angle + yaw_radians = cls.quaternion_to_yaw(qx, qy, qz, qw) + rot = [qx, qy, yaw_radians] + + return cls(pos, rot, msg["data"]["header"]["stamp"]) def __repr__(self) -> str: - return f"Odom ts({to_human_readable(self.ts)}) pos({self.pos}), rot({self.rot})" + return f"Odom ts({to_human_readable(self.ts)}) pos({self.pos}), rot({self.rot}) yaw({math.degrees(self.rot.z):.1f}°)" diff --git a/dimos/robot/unitree_webrtc/type/test_map.py b/dimos/robot/unitree_webrtc/type/test_map.py index 44b85d5d68..0e1c5059d9 100644 --- a/dimos/robot/unitree_webrtc/type/test_map.py +++ b/dimos/robot/unitree_webrtc/type/test_map.py @@ -56,10 +56,13 @@ def test_robot_vis(): def test_robot_mapping(): - lidar_stream = SensorReplay("office_lidar", autocast=lambda x: LidarMessage.from_msg(x)) + lidar_stream = SensorReplay("office_lidar", autocast=LidarMessage.from_msg) map = Map(voxel_size=0.5) - map.consume(lidar_stream.stream(rate_hz=100.0)).run() + # this will block until map has consumed the whole stream + map.consume(lidar_stream.stream()).run() + + # we investigate built map costmap = map.costmap assert costmap.grid.shape == (404, 276) diff --git a/dimos/robot/unitree_webrtc/type/test_odometry.py b/dimos/robot/unitree_webrtc/type/test_odometry.py index 3685278166..74fa512177 100644 --- a/dimos/robot/unitree_webrtc/type/test_odometry.py +++ b/dimos/robot/unitree_webrtc/type/test_odometry.py @@ -12,13 +12,99 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + +from functools import reduce +import math +from operator import sub, add +import os +import threading +from typing import Iterable, Optional +import reactivex.operators as ops + import pytest -from dimos.robot.unitree_webrtc.type.odometry import Odometry -from dimos.robot.unitree_webrtc.testing.multimock import Multimock +from dotenv import load_dotenv + +from dimos.robot.unitree_webrtc.type.odometry import Odometry, RawOdometryMessage +from dimos.robot.unitree_webrtc.unitree_go2 import UnitreeGo2 +from dimos.utils.testing import SensorReplay, SensorStorage + +_EXPECTED_TOTAL_RAD = -4.05212 + + +def test_dataset_size() -> None: + """Ensure the replay contains the expected number of messages.""" + assert sum(1 for _ in SensorReplay(name="raw_odometry_rotate_walk").iterate()) == 179 + + +def test_odometry_conversion_and_count() -> None: + """Each replay entry converts to :class:`Odometry` and count is correct.""" + for raw in SensorReplay(name="raw_odometry_rotate_walk").iterate(): + odom = Odometry.from_msg(raw) + assert isinstance(raw, dict) + assert isinstance(odom, Odometry) + + +def test_last_yaw_value() -> None: + """Verify yaw of the final message (regression guard).""" + last_msg = SensorReplay(name="raw_odometry_rotate_walk").stream().pipe(ops.last()).run() + + assert last_msg is not None, "Replay is empty" + assert last_msg["data"]["pose"]["orientation"] == { + "x": 0.01077, + "y": 0.008505, + "z": 0.499171, + "w": -0.866395, + } + + +def test_total_rotation_travel_iterate() -> None: + total_rad = 0.0 + prev_yaw: Optional[float] = None + + for odom in SensorReplay(name="raw_odometry_rotate_walk", autocast=Odometry.from_msg).iterate(): + yaw = odom.rot.z + if prev_yaw is not None: + diff = yaw - prev_yaw + total_rad += diff + prev_yaw = yaw + + assert total_rad == pytest.approx(_EXPECTED_TOTAL_RAD, abs=0.001) + + +def test_total_rotation_travel_rxpy() -> None: + total_rad = ( + SensorReplay(name="raw_odometry_rotate_walk", autocast=Odometry.from_msg) + .stream() + .pipe( + ops.map(lambda odom: odom.rot.z), + ops.pairwise(), # [1,2,3,4] -> [[1,2], [2,3], [3,4]] + ops.starmap(sub), # [sub(1,2), sub(2,3), sub(3,4)] + ops.reduce(add), + ) + .run() + ) + + assert total_rad == pytest.approx(4.05, abs=0.01) + + +# data collection tool +@pytest.mark.tool +def test_store_odometry_stream() -> None: + load_dotenv() + + robot = UnitreeGo2(ip=os.getenv("ROBOT_IP"), mode="ai") + robot.standup() + + storage = SensorStorage("raw_odometry_rotate_walk") + storage.save_stream(robot.raw_odom_stream()) + shutdown = threading.Event() -@pytest.mark.needsdata -def test_odometry_time(): - (timestamp, odom_raw) = Multimock("athens_odom").load_one(33) - print("RAW MSG", odom_raw) - print(Odometry.from_msg(odom_raw)) + try: + while not shutdown.wait(0.1): + pass + except KeyboardInterrupt: + shutdown.set() + finally: + robot.liedown() diff --git a/dimos/robot/unitree_webrtc/type/timeseries.py b/dimos/robot/unitree_webrtc/type/timeseries.py index 3694ff3850..bec7c4c701 100644 --- a/dimos/robot/unitree_webrtc/type/timeseries.py +++ b/dimos/robot/unitree_webrtc/type/timeseries.py @@ -54,8 +54,10 @@ def to_datetime(ts: EpochLike, tz: timezone = None) -> datetime: class Timestamped(ABC): """Abstract class for an event with a timestamp.""" - def __init__(self, timestamp: EpochLike): - self.ts = to_datetime(timestamp) + ts: datetime + + def __init__(self, ts: EpochLike): + self.ts = to_datetime(ts) class TEvent(Timestamped, Generic[PAYLOAD]): diff --git a/dimos/types/costmap.py b/dimos/types/costmap.py index 177733ff03..b349af7fd5 100644 --- a/dimos/types/costmap.py +++ b/dimos/types/costmap.py @@ -19,6 +19,7 @@ from scipy import ndimage from nav_msgs.msg import OccupancyGrid from dimos.types.vector import Vector, VectorLike, x, y, to_vector +import open3d as o3d DTYPE2STR = { np.float32: "f32", @@ -49,8 +50,8 @@ class Costmap: def __init__( self, grid: np.ndarray, - origin_theta: float, origin: VectorLike, + origin_theta: float = 0, resolution: float = 0.05, ): """Initialize Costmap with its core attributes.""" @@ -184,8 +185,8 @@ def set_value(self, point: VectorLike, value: int = 0) -> bool: def smudge( self, - kernel_size: int = 6, - iterations: int = 20, + kernel_size: int = 7, + iterations: int = 25, decay_factor: float = 0.9, threshold: int = 90, preserve_unknown: bool = False, @@ -288,6 +289,34 @@ def smudge( origin_theta=self.origin_theta, ) + @property + def total_cells(self) -> int: + return self.width * self.height + + @property + def occupied_cells(self) -> int: + return np.sum(self.grid >= 0.1) + + @property + def unknown_cells(self) -> int: + return np.sum(self.grid == -1) + + @property + def free_cells(self) -> int: + return self.total_cells - self.occupied_cells - self.unknown_cells + + @property + def free_percent(self) -> float: + return (self.free_cells / self.total_cells) * 100 if self.total_cells > 0 else 0.0 + + @property + def occupied_percent(self) -> float: + return (self.occupied_cells / self.total_cells) * 100 if self.total_cells > 0 else 0.0 + + @property + def unknown_percent(self) -> float: + return (self.unknown_cells / self.total_cells) * 100 if self.total_cells > 0 else 0.0 + def __str__(self) -> str: """ Create a string representation of the Costmap. @@ -295,16 +324,6 @@ def __str__(self) -> str: Returns: A formatted string with key costmap information """ - # Calculate occupancy statistics - total_cells = self.width * self.height - occupied_cells = np.sum(self.grid >= 50) - unknown_cells = np.sum(self.grid == -1) - free_cells = total_cells - occupied_cells - unknown_cells - - # Calculate percentages - occupied_percent = (occupied_cells / total_cells) * 100 - unknown_percent = (unknown_cells / total_cells) * 100 - free_percent = (free_cells / total_cells) * 100 cell_info = [ "▦ Costmap", @@ -312,14 +331,97 @@ def __str__(self) -> str: f"({self.width * self.resolution:.1f}x{self.height * self.resolution:.1f}m @", f"{1 / self.resolution:.0f}cm res)", f"Origin: ({x(self.origin):.2f}, {y(self.origin):.2f})", - f"▣ {occupied_percent:.1f}%", - f"□ {free_percent:.1f}%", - f"◌ {unknown_percent:.1f}%", + f"▣ {self.occupied_percent:.1f}%", + f"□ {self.free_percent:.1f}%", + f"◌ {self.unknown_percent:.1f}%", ] return " ".join(cell_info) +def _inflate_lethal(costmap: np.ndarray, radius: int, lethal_val: int = 100) -> np.ndarray: + """Return *costmap* with lethal cells dilated by *radius* grid steps (circular).""" + if radius <= 0 or not np.any(costmap == lethal_val): + return costmap + + mask = costmap == lethal_val + dilated = mask.copy() + for dy in range(-radius, radius + 1): + for dx in range(-radius, radius + 1): + if dx * dx + dy * dy > radius * radius or (dx == 0 and dy == 0): + continue + dilated |= np.roll(mask, shift=(dy, dx), axis=(0, 1)) + + out = costmap.copy() + out[dilated] = lethal_val + return out + + +def pointcloud_to_costmap( + pcd: o3d.geometry.PointCloud, + *, + resolution: float = 0.05, + ground_z: float = 0.0, + obs_min_height: float = 0.15, + max_height: Optional[float] = 0.5, + inflate_radius_m: Optional[float] = None, + default_unknown: int = -1, + cost_free: int = 0, + cost_lethal: int = 100, +) -> tuple[np.ndarray, np.ndarray]: + """Rasterise *pcd* into a 2-D int8 cost-map with optional obstacle inflation. + + Grid origin is **aligned** to the `resolution` lattice so that when + `resolution == voxel_size` every voxel centroid lands squarely inside a cell + (no alternating blank lines). + """ + + pts = np.asarray(pcd.points, dtype=np.float32) + if pts.size == 0: + return np.full((1, 1), default_unknown, np.int8), np.zeros(2, np.float32) + + # 0. Ceiling filter -------------------------------------------------------- + if max_height is not None: + pts = pts[pts[:, 2] <= max_height] + if pts.size == 0: + return np.full((1, 1), default_unknown, np.int8), np.zeros(2, np.float32) + + # 1. Bounding box & aligned origin --------------------------------------- + xy_min = pts[:, :2].min(axis=0) + xy_max = pts[:, :2].max(axis=0) + + # Align origin to the resolution grid (anchor = 0,0) + origin = np.floor(xy_min / resolution) * resolution + + # Grid dimensions (inclusive) ------------------------------------------- + Nx, Ny = (np.ceil((xy_max - origin) / resolution).astype(int) + 1).tolist() + + # 2. Bin points ------------------------------------------------------------ + idx_xy = np.floor((pts[:, :2] - origin) / resolution).astype(np.int32) + np.clip(idx_xy[:, 0], 0, Nx - 1, out=idx_xy[:, 0]) + np.clip(idx_xy[:, 1], 0, Ny - 1, out=idx_xy[:, 1]) + + lin = idx_xy[:, 1] * Nx + idx_xy[:, 0] + z_max = np.full(Nx * Ny, -np.inf, np.float32) + np.maximum.at(z_max, lin, pts[:, 2]) + z_max = z_max.reshape(Ny, Nx) + + # 3. Cost rules ----------------------------------------------------------- + costmap = np.full_like(z_max, default_unknown, np.int8) + known = z_max != -np.inf + costmap[known] = cost_free + + lethal = z_max >= (ground_z + obs_min_height) + costmap[lethal] = cost_lethal + + # 4. Optional inflation ---------------------------------------------------- + if inflate_radius_m and inflate_radius_m > 0: + cells = int(np.ceil(inflate_radius_m / resolution)) + costmap = _inflate_lethal(costmap, cells, lethal_val=cost_lethal) + + return costmap, origin.astype(np.float32) + + if __name__ == "__main__": costmap = Costmap.from_pickle("costmapMsg.pickle") print(costmap) diff --git a/dimos/types/position.py b/dimos/types/position.py index 37515d3990..69af640fed 100644 --- a/dimos/types/position.py +++ b/dimos/types/position.py @@ -15,6 +15,7 @@ from typing import TypeVar, Union, Sequence import numpy as np from plum import dispatch +import math from dimos.types.vector import Vector, to_vector, to_numpy, VectorLike @@ -24,6 +25,12 @@ PositionLike = Union["Position", VectorLike, Sequence[VectorLike]] +def yaw_to_matrix(yaw: float) -> np.ndarray: + """2-D yaw (about Z) to a 3×3 rotation matrix.""" + c, s = math.cos(yaw), math.sin(yaw) + return np.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) + + class Position(Vector): """A position in 3D space, consisting of a position vector and a rotation vector. @@ -59,6 +66,17 @@ def serialize(self): """Serialize the position to a dictionary.""" return {"type": "position", "pos": self.to_list(), "rot": self.rot.to_list()} + def vector_to(self, target: Vector) -> Vector: + direction = target - self.pos.to_2d() + + cos_y = math.cos(-self.yaw) + sin_y = math.sin(-self.yaw) + + x = cos_y * direction.x - sin_y * direction.y + y = sin_y * direction.x + cos_y * direction.y + + return Vector(x, y) + def __eq__(self, other) -> bool: """Check if two positions are equal using numpy's allclose for floating point comparison.""" if not isinstance(other, Position): @@ -93,6 +111,11 @@ def __add__(self: T, other) -> T: # For other types, just use Vector's addition return Position(super().__add__(other), self.rot) + @property + def yaw(self) -> float: + """Get the yaw (rotation around Z-axis) from the rotation vector.""" + return self.rot.z + def __sub__(self: T, other) -> T: """Override Vector's __sub__ to handle Position objects specially. diff --git a/dimos/types/test_position.py b/dimos/types/test_position.py index c2dec9bf5f..bf4e4cf4c8 100644 --- a/dimos/types/test_position.py +++ b/dimos/types/test_position.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import numpy as np - +import math from dimos.types.position import Position, to_position from dimos.types.vector import Vector @@ -315,3 +315,9 @@ def test_to_position_with_sequence(): assert pos3.rot.x == 16.0 assert pos3.rot.y == 17.0 assert pos3.rot.z == 18.0 + + +def test_vector_transform(): + robot_position = Position(Vector(4.0, 2.0, 0.5), Vector(0.0, 0.0, math.pi / 2)) + target = Vector(1.0, 3.0, 0.0) + print(robot_position.vector_to(target)) diff --git a/dimos/types/test_vector.py b/dimos/types/test_vector.py index b448620145..6a93d37afd 100644 --- a/dimos/types/test_vector.py +++ b/dimos/types/test_vector.py @@ -351,3 +351,34 @@ def test_vector_bool_conversion(): pass # Expected path else: assert False, "Non-zero vector should be True in boolean context" + + +def test_vector_add(): + """Test vector addition operator.""" + v1 = Vector(1.0, 2.0, 3.0) + v2 = Vector(4.0, 5.0, 6.0) + + # Using __add__ method + v_add = v1.__add__(v2) + assert v_add.x == 5.0 + assert v_add.y == 7.0 + assert v_add.z == 9.0 + + # Using + operator + v_add_op = v1 + v2 + assert v_add_op.x == 5.0 + assert v_add_op.y == 7.0 + assert v_add_op.z == 9.0 + + # Adding zero vector should return original vector + v_zero = Vector.zeros(3) + assert (v1 + v_zero) == v1 + + +def test_vector_add_dim_mismatch(): + """Test vector addition operator.""" + v1 = Vector(1.0, 2.0) + v2 = Vector(4.0, 5.0, 6.0) + + # Using + operator + v_add_op = v1 + v2 diff --git a/dimos/types/vector.py b/dimos/types/vector.py index 8d3ae1ef91..9b01c6a26a 100644 --- a/dimos/types/vector.py +++ b/dimos/types/vector.py @@ -107,10 +107,6 @@ def serialize(self) -> Tuple: """Serialize the vector to a tuple.""" return {"type": "vector", "c": self._data.tolist()} - def __len__(self) -> int: - """Return the dimension of the vector.""" - return len(self._data) - def __eq__(self, other) -> bool: """Check if two vectors are equal using numpy's allclose for floating point comparison.""" if not isinstance(other, Vector): @@ -119,15 +115,19 @@ def __eq__(self, other) -> bool: return False return np.allclose(self._data, other._data) - def __add__(self: T, other) -> T: - if isinstance(other, Vector): - return self.__class__(self._data + other._data) - return self.__class__(self._data + np.array(other, dtype=float)) + def __add__(self: T, other: VectorLike) -> T: + other = to_vector(other) + if self.dim != other.dim: + max_dim = max(self.dim, other.dim) + return self.pad(max_dim) + other.pad(max_dim) + return self.__class__(self._data + other._data) - def __sub__(self: T, other) -> T: - if isinstance(other, Vector): - return self.__class__(self._data - other._data) - return self.__class__(self._data - np.array(other, dtype=float)) + def __sub__(self: T, other: VectorLike) -> T: + other = to_vector(other) + if self.dim != other.dim: + max_dim = max(self.dim, other.dim) + return self.pad(max_dim) - other.pad(max_dim) + return self.__class__(self._data - other._data) def __mul__(self: T, scalar: float) -> T: return self.__class__(self._data * scalar) @@ -141,26 +141,21 @@ def __truediv__(self: T, scalar: float) -> T: def __neg__(self: T) -> T: return self.__class__(-self._data) - def dot(self, other) -> float: + def dot(self, other: VectorLike) -> float: """Compute dot product.""" - if isinstance(other, Vector): - return float(np.dot(self._data, other._data)) - return float(np.dot(self._data, np.array(other, dtype=float))) + other = to_vector(other) + return float(np.dot(self._data, other._data)) - def cross(self: T, other) -> T: + def cross(self: T, other: VectorLike) -> T: """Compute cross product (3D vectors only).""" if self.dim != 3: raise ValueError("Cross product is only defined for 3D vectors") - if isinstance(other, Vector): - other_data = other._data - else: - other_data = np.array(other, dtype=float) - - if len(other_data) != 3: + other = to_vector(other) + if other.dim != 3: raise ValueError("Cross product requires two 3D vectors") - return self.__class__(np.cross(self._data, other_data)) + return self.__class__(np.cross(self._data, other._data)) def length(self) -> float: """Compute the Euclidean length (magnitude) of the vector.""" @@ -181,51 +176,52 @@ def to_2d(self: T) -> T: """Convert a vector to a 2D vector by taking only the x and y components.""" return self.__class__(self._data[:2]) - def distance(self, other) -> float: + def pad(self: T, dim: int) -> T: + """Pad a vector with zeros to reach the specified dimension. + + If vector already has dimension >= dim, it is returned unchanged. + """ + if self.dim >= dim: + return self + + padded = np.zeros(dim, dtype=float) + padded[: len(self._data)] = self._data + return self.__class__(padded) + + def distance(self, other: VectorLike) -> float: """Compute Euclidean distance to another vector.""" - if isinstance(other, Vector): - return float(np.linalg.norm(self._data - other._data)) - return float(np.linalg.norm(self._data - np.array(other, dtype=float))) + other = to_vector(other) + return float(np.linalg.norm(self._data - other._data)) - def distance_squared(self, other) -> float: + def distance_squared(self, other: VectorLike) -> float: """Compute squared Euclidean distance to another vector (faster than distance()).""" - if isinstance(other, Vector): - diff = self._data - other._data - else: - diff = self._data - np.array(other, dtype=float) + other = to_vector(other) + diff = self._data - other._data return float(np.sum(diff * diff)) - def angle(self, other) -> float: + def angle(self, other: VectorLike) -> float: """Compute the angle (in radians) between this vector and another.""" - if self.length() < 1e-10 or (isinstance(other, Vector) and other.length() < 1e-10): + other = to_vector(other) + if self.length() < 1e-10 or other.length() < 1e-10: return 0.0 - if isinstance(other, Vector): - other_data = other._data - else: - other_data = np.array(other, dtype=float) - cos_angle = np.clip( - np.dot(self._data, other_data) - / (np.linalg.norm(self._data) * np.linalg.norm(other_data)), + np.dot(self._data, other._data) + / (np.linalg.norm(self._data) * np.linalg.norm(other._data)), -1.0, 1.0, ) return float(np.arccos(cos_angle)) - def project(self: T, onto) -> T: + def project(self: T, onto: VectorLike) -> T: """Project this vector onto another vector.""" - if isinstance(onto, Vector): - onto_data = onto._data - else: - onto_data = np.array(onto, dtype=float) - - onto_length_sq = np.sum(onto_data * onto_data) + onto = to_vector(onto) + onto_length_sq = np.sum(onto._data * onto._data) if onto_length_sq < 1e-10: return self.__class__(np.zeros_like(self._data)) - scalar_projection = np.dot(self._data, onto_data) / onto_length_sq - return self.__class__(scalar_projection * onto_data) + scalar_projection = np.dot(self._data, onto._data) / onto_length_sq + return self.__class__(scalar_projection * onto._data) # this is here to test ros_observable_topic # doesn't happen irl afaik that we want a vector from ros message diff --git a/dimos/utils/reactive.py b/dimos/utils/reactive.py index 82933e7f10..3318eef2ec 100644 --- a/dimos/utils/reactive.py +++ b/dimos/utils/reactive.py @@ -161,3 +161,11 @@ def _on_msg(value: T): return Disposable(lambda: stop(_on_msg)) return rx.create(_subscribe) + + +def spy(name: str): + def spyfun(x): + print(f"SPY {name}:", x) + return x + + return ops.map(spyfun) diff --git a/dimos/utils/test_testing.py b/dimos/utils/test_testing.py index 579ec13670..788acd0d67 100644 --- a/dimos/utils/test_testing.py +++ b/dimos/utils/test_testing.py @@ -15,6 +15,8 @@ import hashlib import os import subprocess + +from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.utils import testing from dimos.robot.unitree_webrtc.type.lidar import LidarMessage @@ -136,7 +138,7 @@ def test_sensor_replay(): def test_sensor_replay_cast(): counter = 0 for message in testing.SensorReplay( - name="office_lidar", autocast=lambda x: LidarMessage.from_msg(x) + name="office_lidar", autocast=LidarMessage.from_msg ).iterate(): counter += 1 assert isinstance(message, LidarMessage) diff --git a/dimos/utils/testing.py b/dimos/utils/testing.py index 97a4652ff5..b64fcad397 100644 --- a/dimos/utils/testing.py +++ b/dimos/utils/testing.py @@ -178,7 +178,6 @@ class SensorReplay(Generic[T]): def __init__(self, name: str, autocast: Optional[Callable[[Any], T]] = None): self.root_dir = testData(name) self.autocast = autocast - self.cnt = 0 def load(self, *names: Union[int, str]) -> Union[T, Any, list[T], list[Any]]: if len(names) == 1: @@ -204,7 +203,10 @@ def iterate(self) -> Iterator[Union[T, Any]]: for file_path in sorted(glob.glob(pattern)): yield self.load_one(file_path) - def stream(self, rate_hz: float = 10.0) -> Observable[Union[T, Any]]: + def stream(self, rate_hz: Optional[float] = None) -> Observable[Union[T, Any]]: + if rate_hz is None: + return from_iterable(self.iterate()) + sleep_time = 1.0 / rate_hz return from_iterable(self.iterate()).pipe( @@ -212,27 +214,65 @@ def stream(self, rate_hz: float = 10.0) -> Observable[Union[T, Any]]: ops.map(lambda x: x[0] if isinstance(x, tuple) else x), ) + +class SensorStorage(Generic[T]): + """Generic sensor data storage utility. + + Creates a directory in the test data directory and stores pickled sensor data. + + Args: + name: The name of the storage directory + autocast: Optional function that takes data and returns a processed result before storage. + """ + + def __init__(self, name: str, autocast: Optional[Callable[[T], Any]] = None): + self.name = name + self.autocast = autocast + self.cnt = 0 + + # Create storage directory in the data dir + self.root_dir = _get_data_dir() / name + + # Check if directory exists and is not empty + if self.root_dir.exists(): + existing_files = list(self.root_dir.glob("*.pickle")) + if existing_files: + raise RuntimeError( + f"Storage directory '{name}' already exists and contains {len(existing_files)} files. " + f"Please use a different name or clean the directory first." + ) + else: + # Create the directory + self.root_dir.mkdir(parents=True, exist_ok=True) + def save_stream(self, observable: Observable[Union[T, Any]]) -> Observable[int]: + """Save an observable stream of sensor data to pickle files.""" return observable.pipe(ops.map(lambda frame: self.save_one(frame))) def save(self, *frames) -> int: - [self.save_one(frame) for frame in frames] + """Save one or more frames to pickle files.""" + for frame in frames: + self.save_one(frame) return self.cnt def save_one(self, frame) -> int: - file_name = f"/{self.cnt:03d}.pickle" - full_path = self.root_dir + file_name - - self.cnt += 1 + """Save a single frame to a pickle file.""" + file_name = f"{self.cnt:03d}.pickle" + full_path = self.root_dir / file_name - if os.path.isfile(full_path): - raise Exception(f"file {full_path} exists") + if full_path.exists(): + raise RuntimeError(f"File {full_path} already exists") + # Apply autocast if provided + data_to_save = frame + if self.autocast: + data_to_save = self.autocast(frame) # Convert to raw message if frame has a raw_msg attribute - if hasattr(frame, "raw_msg"): - frame = frame.raw_msg + elif hasattr(frame, "raw_msg"): + data_to_save = frame.raw_msg with open(full_path, "wb") as f: - pickle.dump(frame, f) + pickle.dump(data_to_save, f) + self.cnt += 1 return self.cnt diff --git a/dimos/web/websocket_vis/server.py b/dimos/web/websocket_vis/server.py index d5bcd85733..cfb404fadd 100644 --- a/dimos/web/websocket_vis/server.py +++ b/dimos/web/websocket_vis/server.py @@ -105,7 +105,7 @@ async def update_state(new_data): class WebsocketVis: - def __init__(self, port=7778, use_reload=False, msg_handler=None): + def __init__(self, port=7779, use_reload=False, msg_handler=None): self.port = port self.server = None self.server_thread = None diff --git a/docs/testing_stream_reply.md b/docs/testing_stream_reply.md new file mode 100644 index 0000000000..90f9f38b1d --- /dev/null +++ b/docs/testing_stream_reply.md @@ -0,0 +1,172 @@ +# Sensor Replay & Storage Toolkit + +A lightweight framework for **recording, storing, and replaying binary data streams for automated tests**. It keeps your repository small (data lives in Git LFS) while giving you Python‑first ergonomics for working with RxPY streams, point‑clouds, videos, command logs—anything you can pickle. + +--- + +## 1 At a Glance + +| Need | One liner | +| ------------------------------ | ------------------------------------------------------------- | +| **Iterate over every message** | `SensorReplay("raw_odometry_rotate_walk").iterate(print)` | +| **RxPY stream for piping** | `SensorReplay("raw_odometry_rotate_walk").stream().pipe(...)` | +| **Throttle replay rate** | `SensorReplay("raw_odometry_rotate_walk").stream(rate_hz=10)` | +| **Raw path to a blob/dir** | `path = testData("raw_odometry_rotate_walk")` | +| **Store a new stream** | see [`SensorStorage`](#5-storing-new-streams) | + +> If the requested blob is missing locally, it is transparently downloaded from Git LFS, extracted to `tests/data//`, and cached for subsequent runs. + +--- + +## 2 Goals + +* **Zero setup for CI & collaborators** – data is fetched on demand. +* **No repo bloat** – binaries live in Git LFS; the working tree stays trim. +* **Symmetric API** – `SensorReplay` ↔︎ `SensorStorage`; same name, different direction. +* **Format agnostic** – replay *anything* you can pickle (protobuf, numpy, JPEG, …). +* **Data type agnostic** – with testData("raw_odometry_rotate_walk") you get a Path object back, can be a raw video file, whole codebase, ML model etc + + +--- + +## 3 Replaying Data + +### 3.1 Iterating Messages + +```python +from sensor_tools import SensorReplay + +# Print every stored Odometry message +SensorReplay(name="raw_odometry_rotate_walk").iterate(print) +``` + +### 3.2 RxPY Streaming + +```python +from rx import operators as ops +from operator import sub, add +from sensor_tools import SensorReplay, Odometry + +# Compute total yaw rotation (radians) + +total_rad = ( + SensorReplay("raw_odometry_rotate_walk", autocast=Odometry.from_msg) + .stream() + .pipe( + ops.map(lambda odom: odom.rot.z), + ops.pairwise(), # [1,2,3,4] -> [[1,2], [2,3], [3,4]] + ops.starmap(sub), # [sub(1,2), sub(2,3), sub(3,4)] + ops.reduce(add), + ) + .run() +) + +assert total_rad == pytest.approx(4.05, abs=0.01) +``` + +### 3.3 Lidar Mapping Example (200MB blob) + +```python +from sensor_tools import SensorReplay, LidarMessage +from mapping import Map + +lidar_stream = SensorReplay("office_lidar", autocast=LidarMessage.from_msg) +map_ = Map(voxel_size=0.5) + +# Blocks until the stream is consumed +map_.consume(lidar_stream.stream()).run() + +assert map_.costmap.grid.shape == (404, 276) +``` + +--- + +## 4 Low Level Access + +If you want complete control, call **`testData(name)`** to get a `Path` to the extracted file or directory — no pickling assumptions: + +```python +absolute_path: Path = testData("some_name") +``` + +Do whatever you like: open a video file, load a model checkpoint, etc. + +--- + +## 5 Storing New Streams + +1. **Write a test marked `@pytest.mark.tool`** so CI skips it by default. +2. Use `SensorStorage` to persist the stream into `tests/data//*.pickle`. + +```python +@pytest.mark.tool +def test_store_odometry_stream(): + load_dotenv() + + robot = UnitreeGo2(ip=os.getenv("ROBOT_IP"), mode="ai") + robot.standup() + + storage = SensorStorage("raw_odometry_rotate_walk2") + storage.save_stream(robot.raw_odom_stream()) # ← records until interrupted + + try: + while True: + time.sleep(0.1) + except KeyboardInterrupt: + robot.liedown() +``` + +### 5.1 Behind the Scenes + +* Any new file/dir under `tests/data/` is treated as a **data blob**. +* `./bin/lfs_push` compresses it into `tests/data/.lfs/.tar.gz` *and* uploads it to Git LFS. +* Only the `.lfs/` archive is committed; raw binaries remain `.gitignored`. + +--- + +## 6 Storing Arbitrary Binary Data + +Just copy to `tests/data/whatever` +* `./bin/lfs_push` compresses it into `tests/data/.lfs/.tar.gz` *and* uploads it to Git LFS. + +--- + +## 7 Developer Workflow Checklist + +1. **Drop new data** into `tests/data/`. +2. Run `./bin/lfs_push` (or let the pre commit hook nag you). +3. Commit the resulting `tests/data/.lfs/.tar.gz`. +4. Open PR — reviewers pull only what they need, when they need it. + +### 7.1 Pre commit Setup (optional but recommended) + +```sh +sudo apt install pre-commit +pre-commit install # inside repo root +``` + +Now each commit checks formatting, linting, *and* whether you forgot to push new blobs: + +``` +$ echo test > tests/data/foo.txt +$ git add tests/data/foo.txt && git commit -m "demo" +LFS data ......................................................... Failed +✗ New test data detected at /tests/data: + foo.txt +Either delete or run ./bin/lfs_push +``` + +--- + +## 8 Future Work + +- A replay rate that mirrors the **original message timestamps** can be implemented downstream (e.g., an RxPY operator) +- Likely this same system should be used for production binary data delivery as well (Models etc) + +--- + +## 9 Existing Examples + +* `dimos/robot/unitree_webrtc/type/test_odometry.py` +* `dimos/robot/unitree_webrtc/type/test_map.py` + diff --git a/flake.lock b/flake.lock deleted file mode 100644 index faa26f56c7..0000000000 --- a/flake.lock +++ /dev/null @@ -1,61 +0,0 @@ -{ - "nodes": { - "flake-utils": { - "inputs": { - "systems": "systems" - }, - "locked": { - "lastModified": 1731533236, - "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", - "owner": "numtide", - "repo": "flake-utils", - "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", - "type": "github" - }, - "original": { - "owner": "numtide", - "repo": "flake-utils", - "type": "github" - } - }, - "nixpkgs": { - "locked": { - "lastModified": 1745526057, - "narHash": "sha256-ITSpPDwvLBZBnPRS2bUcHY3gZSwis/uTe255QgMtTLA=", - "owner": "NixOS", - "repo": "nixpkgs", - "rev": "f771eb401a46846c1aebd20552521b233dd7e18b", - "type": "github" - }, - "original": { - "owner": "NixOS", - "ref": "nixos-unstable", - "repo": "nixpkgs", - "type": "github" - } - }, - "root": { - "inputs": { - "flake-utils": "flake-utils", - "nixpkgs": "nixpkgs" - } - }, - "systems": { - "locked": { - "lastModified": 1681028828, - "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", - "owner": "nix-systems", - "repo": "default", - "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", - "type": "github" - }, - "original": { - "owner": "nix-systems", - "repo": "default", - "type": "github" - } - } - }, - "root": "root", - "version": 7 -} diff --git a/flake.nix b/flake.nix deleted file mode 100644 index 60751b156f..0000000000 --- a/flake.nix +++ /dev/null @@ -1,95 +0,0 @@ -{ - description = "Project dev environment as Nix shell + DockerTools layered image"; - - inputs = { - nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; - flake-utils.url = "github:numtide/flake-utils"; - }; - - outputs = { self, nixpkgs, flake-utils, ... }: - flake-utils.lib.eachDefaultSystem (system: - let - pkgs = import nixpkgs { inherit system; }; - - # ------------------------------------------------------------ - # 1. Shared package list (tool-chain + project deps) - # ------------------------------------------------------------ - devPackages = with pkgs; [ - ### Core shell & utils - bashInteractive coreutils gh - stdenv.cc.cc.lib - - ### Python + static analysis - python312 python312Packages.pip python312Packages.setuptools - python312Packages.virtualenv ruff mypy pre-commit - - ### Runtime deps - python312Packages.pyaudio portaudio ffmpeg_6 ffmpeg_6.dev - - ### Graphics / X11 stack - libGL libGLU mesa glfw - xorg.libX11 xorg.libXi xorg.libXext xorg.libXrandr xorg.libXinerama - xorg.libXcursor xorg.libXfixes xorg.libXrender xorg.libXdamage - xorg.libXcomposite xorg.libxcb xorg.libXScrnSaver xorg.libXxf86vm - - udev SDL2 SDL2.dev zlib - - ### GTK / OpenCV helpers - glib gtk3 gdk-pixbuf gobject-introspection - - ### Open3D & build-time - eigen cmake ninja jsoncpp libjpeg libpng - ]; - - # ------------------------------------------------------------ - # 2. Host interactive shell → `nix develop` - # ------------------------------------------------------------ - devShell = pkgs.mkShell { - packages = devPackages; - shellHook = '' - export LD_LIBRARY_PATH="${pkgs.lib.makeLibraryPath [ - pkgs.stdenv.cc.cc.lib pkgs.libGL pkgs.libGLU pkgs.mesa pkgs.glfw - pkgs.xorg.libX11 pkgs.xorg.libXi pkgs.xorg.libXext pkgs.xorg.libXrandr - pkgs.xorg.libXinerama pkgs.xorg.libXcursor pkgs.xorg.libXfixes - pkgs.xorg.libXrender pkgs.xorg.libXdamage pkgs.xorg.libXcomposite - pkgs.xorg.libxcb pkgs.xorg.libXScrnSaver pkgs.xorg.libXxf86vm - pkgs.udev pkgs.portaudio pkgs.SDL2.dev pkgs.zlib pkgs.glib pkgs.gtk3 - pkgs.gdk-pixbuf pkgs.gobject-introspection]}:$LD_LIBRARY_PATH" - - export DISPLAY=:0 - - PROJECT_ROOT=$(git rev-parse --show-toplevel 2>/dev/null || echo "$PWD") - if [ -f "$PROJECT_ROOT/env/bin/activate" ]; then - . "$PROJECT_ROOT/env/bin/activate" - fi - - [ -f "$PROJECT_ROOT/motd" ] && cat "$PROJECT_ROOT/motd" - [ -f "$PROJECT_ROOT/.pre-commit-config.yaml" ] && pre-commit install --install-hooks - ''; - }; - - # ------------------------------------------------------------ - # 3. Closure copied into the OCI image rootfs - # ------------------------------------------------------------ - imageRoot = pkgs.buildEnv { - name = "dimos-image-root"; - paths = devPackages; - pathsToLink = [ "/bin" ]; - }; - - in { - ## Local dev shell - devShells.default = devShell; - - ## Layered docker image with DockerTools - packages.devcontainer = pkgs.dockerTools.buildLayeredImage { - name = "dimensionalos/dimos-dev"; - tag = "latest"; - contents = [ imageRoot ]; - config = { - WorkingDir = "/workspace"; - Cmd = [ "bash" ]; - }; - }; - }); -} diff --git a/tests/data/.lfs/raw_odometry_rotate_walk.tar.gz b/tests/data/.lfs/raw_odometry_rotate_walk.tar.gz new file mode 100644 index 0000000000..ce8bb1d2b0 --- /dev/null +++ b/tests/data/.lfs/raw_odometry_rotate_walk.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:396345f0cd7a94bb9d85540d4bbce01b027618972f83e713e4550abf1d6ec445 +size 15685