diff --git a/dimos/robot/unitree_webrtc/type/costmap.py b/dimos/robot/unitree_webrtc/type/costmap.py index 814184479e..ba8dba55e3 100644 --- a/dimos/robot/unitree_webrtc/type/costmap.py +++ b/dimos/robot/unitree_webrtc/type/costmap.py @@ -231,6 +231,34 @@ def smudge( origin=self.origin, ) + @property + def total_cells(self) -> int: + return self.width * self.height + + @property + def occupied_cells(self) -> int: + return np.sum(self.grid >= 0.1) + + @property + def unknown_cells(self) -> int: + return np.sum(self.grid == -1) + + @property + def free_cells(self) -> int: + return self.total_cells - self.occupied_cells - self.unknown_cells + + @property + def free_percent(self) -> float: + return (self.free_cells / self.total_cells) * 100 if self.total_cells > 0 else 0.0 + + @property + def occupied_percent(self) -> float: + return (self.occupied_cells / self.total_cells) * 100 if self.total_cells > 0 else 0.0 + + @property + def unknown_percent(self) -> float: + return (self.unknown_cells / self.total_cells) * 100 if self.total_cells > 0 else 0.0 + def __str__(self) -> str: """ Create a string representation of the Costmap. @@ -238,16 +266,6 @@ def __str__(self) -> str: Returns: A formatted string with key costmap information """ - # Calculate occupancy statistics - total_cells = self.width * self.height - occupied_cells = np.sum(self.grid >= 0.1) - unknown_cells = np.sum(self.grid == -1) - free_cells = total_cells - occupied_cells - unknown_cells - - # Calculate percentages - occupied_percent = (occupied_cells / total_cells) * 100 - unknown_percent = (unknown_cells / total_cells) * 100 - free_percent = (free_cells / total_cells) * 100 cell_info = [ "▦ Costmap", @@ -255,9 +273,9 @@ def __str__(self) -> str: f"({self.width * self.resolution:.1f}x{self.height * self.resolution:.1f}m @", f"{1 / self.resolution:.0f}cm res)", f"Origin: ({x(self.origin):.2f}, {y(self.origin):.2f})", - f"▣ {occupied_percent:.1f}%", - f"□ {free_percent:.1f}%", - f"◌ {unknown_percent:.1f}%", + f"▣ {self.occupied_percent:.1f}%", + f"□ {self.free_percent:.1f}%", + f"◌ {self.unknown_percent:.1f}%", ] return " ".join(cell_info) diff --git a/dimos/robot/unitree_webrtc/type/test_map.py b/dimos/robot/unitree_webrtc/type/test_map.py index 25deccda00..88c394236c 100644 --- a/dimos/robot/unitree_webrtc/type/test_map.py +++ b/dimos/robot/unitree_webrtc/type/test_map.py @@ -1,8 +1,11 @@ import pytest + +from dimos.robot.unitree_webrtc.testing.helpers import show3d, show3d_stream from dimos.robot.unitree_webrtc.testing.mock import Mock -from dimos.robot.unitree_webrtc.testing.helpers import show3d_stream, show3d +from dimos.robot.unitree_webrtc.type.lidar import LidarMessage +from dimos.robot.unitree_webrtc.type.map import Map, splice_sphere from dimos.utils.reactive import backpressure -from dimos.robot.unitree_webrtc.type.map import splice_sphere, Map +from dimos.utils.testing import SensorReplay @pytest.mark.vis @@ -36,3 +39,25 @@ def test_robot_vis(): clearframe=True, title="gloal dynamic map test", ) + + +def test_robot_mapping(): + lidar_stream = SensorReplay("office_lidar", autocast=lambda x: LidarMessage.from_msg(x)) + map = Map(voxel_size=0.5) + map.consume(lidar_stream.stream(rate_hz=100.0)).run() + + costmap = map.costmap + + assert costmap.grid.shape == (404, 276) + + assert 70 <= costmap.unknown_percent <= 80, ( + f"Unknown percent {costmap.unknown_percent} is not within the range 70-80" + ) + + assert 5 < costmap.free_percent < 10, ( + f"Free percent {costmap.free_percent} is not within the range 5-10" + ) + + assert 8 < costmap.occupied_percent < 15, ( + f"Occupied percent {costmap.occupied_percent} is not within the range 8-15" + ) diff --git a/dimos/robot/unitree_webrtc/type/vector.py b/dimos/robot/unitree_webrtc/type/vector.py index e5fb446884..0343a71118 100644 --- a/dimos/robot/unitree_webrtc/type/vector.py +++ b/dimos/robot/unitree_webrtc/type/vector.py @@ -116,7 +116,6 @@ def __add__(self: T, other: Union["Vector", Iterable[float]]) -> T: def __sub__(self: T, other: Union["Vector", Iterable[float]]) -> T: if isinstance(other, Vector): - print(self, other) return self.__class__(self._data - other._data) return self.__class__(self._data - np.array(other, dtype=float)) @@ -433,142 +432,3 @@ def z(value: VectorLike) -> float: else: arr = to_numpy(value) return float(arr[2]) if len(arr) > 2 else 0.0 - - -if __name__ == "__main__": - # Test vectors in various directions - test_vectors = [ - Vector(1, 0), # Right - Vector(1, 1), # Up-Right - Vector(0, 1), # Up - Vector(-1, 1), # Up-Left - Vector(-1, 0), # Left - Vector(-1, -1), # Down-Left - Vector(0, -1), # Down - Vector(1, -1), # Down-Right - Vector(0.5, 0.5), # Up-Right (shorter) - Vector(-3, 4), # Up-Left (longer) - ] - - for v in test_vectors: - print(str(v)) - - # Test the vector compatibility functions - print("Testing vectortypes.py conversion functions\n") - - # Create test vectors in different formats - vector_obj = Vector(1.0, 2.0, 3.0) - numpy_arr = np.array([4.0, 5.0, 6.0]) - tuple_vec = (7.0, 8.0, 9.0) - list_vec = [10.0, 11.0, 12.0] - - print("Original values:") - print(f"Vector: {vector_obj}") - print(f"NumPy: {numpy_arr}") - print(f"Tuple: {tuple_vec}") - print(f"List: {list_vec}") - print() - - # Test to_numpy - print("to_numpy() conversions:") - print(f"Vector → NumPy: {to_numpy(vector_obj)}") - print(f"NumPy → NumPy: {to_numpy(numpy_arr)}") - print(f"Tuple → NumPy: {to_numpy(tuple_vec)}") - print(f"List → NumPy: {to_numpy(list_vec)}") - print() - - # Test to_vector - print("to_vector() conversions:") - print(f"Vector → Vector: {to_vector(vector_obj)}") - print(f"NumPy → Vector: {to_vector(numpy_arr)}") - print(f"Tuple → Vector: {to_vector(tuple_vec)}") - print(f"List → Vector: {to_vector(list_vec)}") - print() - - # Test to_tuple - print("to_tuple() conversions:") - print(f"Vector → Tuple: {to_tuple(vector_obj)}") - print(f"NumPy → Tuple: {to_tuple(numpy_arr)}") - print(f"Tuple → Tuple: {to_tuple(tuple_vec)}") - print(f"List → Tuple: {to_tuple(list_vec)}") - print() - - # Test to_list - print("to_list() conversions:") - print(f"Vector → List: {to_list(vector_obj)}") - print(f"NumPy → List: {to_list(numpy_arr)}") - print(f"Tuple → List: {to_list(tuple_vec)}") - print(f"List → List: {to_list(list_vec)}") - print() - - # Test component extraction - print("Component extraction:") - print("x() function:") - print(f"x(Vector): {x(vector_obj)}") - print(f"x(NumPy): {x(numpy_arr)}") - print(f"x(Tuple): {x(tuple_vec)}") - print(f"x(List): {x(list_vec)}") - print() - - print("y() function:") - print(f"y(Vector): {y(vector_obj)}") - print(f"y(NumPy): {y(numpy_arr)}") - print(f"y(Tuple): {y(tuple_vec)}") - print(f"y(List): {y(list_vec)}") - print() - - print("z() function:") - print(f"z(Vector): {z(vector_obj)}") - print(f"z(NumPy): {z(numpy_arr)}") - print(f"z(Tuple): {z(tuple_vec)}") - print(f"z(List): {z(list_vec)}") - print() - - # Test dimension checking - print("Dimension checking:") - vec2d = Vector(1.0, 2.0) - vec3d = Vector(1.0, 2.0, 3.0) - arr2d = np.array([1.0, 2.0]) - arr3d = np.array([1.0, 2.0, 3.0]) - - print(f"is_2d(Vector(1,2)): {is_2d(vec2d)}") - print(f"is_2d(Vector(1,2,3)): {is_2d(vec3d)}") - print(f"is_2d(np.array([1,2])): {is_2d(arr2d)}") - print(f"is_2d(np.array([1,2,3])): {is_2d(arr3d)}") - print(f"is_2d((1,2)): {is_2d((1.0, 2.0))}") - print(f"is_2d((1,2,3)): {is_2d((1.0, 2.0, 3.0))}") - print() - - print(f"is_3d(Vector(1,2)): {is_3d(vec2d)}") - print(f"is_3d(Vector(1,2,3)): {is_3d(vec3d)}") - print(f"is_3d(np.array([1,2])): {is_3d(arr2d)}") - print(f"is_3d(np.array([1,2,3])): {is_3d(arr3d)}") - print(f"is_3d((1,2)): {is_3d((1.0, 2.0))}") - print(f"is_3d((1,2,3)): {is_3d((1.0, 2.0, 3.0))}") - print() - - # Test the Protocol interface - print("Testing VectorLike Protocol:") - print(f"isinstance(Vector(1,2), VectorLike): {isinstance(vec2d, VectorLike)}") - print(f"isinstance(np.array([1,2]), VectorLike): {isinstance(arr2d, VectorLike)}") - print(f"isinstance((1,2), VectorLike): {isinstance((1.0, 2.0), VectorLike)}") - print(f"isinstance([1,2], VectorLike): {isinstance([1.0, 2.0], VectorLike)}") - print() - - # Test mixed operations using different vector types - # These functions aren't defined in vectortypes, but demonstrate the concept - def distance(a: VectorLike, b: VectorLike) -> float: - a_np = to_numpy(a) - b_np = to_numpy(b) - diff = a_np - b_np - return float(np.sqrt(np.sum(diff * diff))) - - def midpoint(a: VectorLike, b: VectorLike) -> NDArray[np.float64]: - a_np = to_numpy(a) - b_np = to_numpy(b) - return (a_np + b_np) / 2 - - print("Mixed operations between different vector types:") - print(f"distance(Vector(1,2,3), [4,5,6]): {distance(vec3d, [4.0, 5.0, 6.0])}") - print(f"distance(np.array([1,2,3]), (4,5,6)): {distance(arr3d, (4.0, 5.0, 6.0))}") - print(f"midpoint(Vector(1,2,3), np.array([4,5,6])): {midpoint(vec3d, numpy_arr)}") diff --git a/dimos/utils/test_testing.py b/dimos/utils/test_testing.py index c1c0c10072..796c190c8a 100644 --- a/dimos/utils/test_testing.py +++ b/dimos/utils/test_testing.py @@ -2,6 +2,7 @@ import os import subprocess from dimos.utils import testing +from dimos.robot.unitree_webrtc.type.lidar import LidarMessage def test_pull_file(): @@ -108,3 +109,21 @@ def test_pull_dir(): with file.open("rb") as f: sha256 = hashlib.sha256(f.read()).hexdigest() assert sha256 == expected_hash + + +def test_sensor_replay(): + counter = 0 + for message in testing.SensorReplay(name="office_lidar").iterate(): + counter += 1 + assert isinstance(message, dict) + assert counter == 500 + + +def test_sensor_replay_cast(): + counter = 0 + for message in testing.SensorReplay( + name="office_lidar", autocast=lambda x: LidarMessage.from_msg(x) + ).iterate(): + counter += 1 + assert isinstance(message, LidarMessage) + assert counter == 500 diff --git a/dimos/utils/testing.py b/dimos/utils/testing.py index 53b9849718..bd9a37d00a 100644 --- a/dimos/utils/testing.py +++ b/dimos/utils/testing.py @@ -1,8 +1,15 @@ import subprocess import tarfile +import glob +import os +import pickle from functools import cache from pathlib import Path -from typing import Union +from typing import Union, Iterator, TypeVar, Generic, Optional, Any, Type, Callable + +from reactivex import operators as ops +from reactivex import interval, from_iterable +from reactivex.observable import Observable def _check_git_lfs_available() -> None: @@ -140,3 +147,78 @@ def testData(filename: Union[str, Path]) -> Path: return file_path return _decompress_archive(_pull_lfs_archive(filename)) + + +T = TypeVar("T") + + +class SensorReplay(Generic[T]): + """Generic sensor data replay utility. + + Args: + name: The name of the test dataset + autocast: Optional function that takes unpickled data and returns a processed result. + For example: lambda data: LidarMessage.from_msg(data) + """ + + def __init__(self, name: str, autocast: Optional[Callable[[Any], T]] = None): + self.root_dir = testData(name) + self.autocast = autocast + self.cnt = 0 + + def load(self, *names: Union[int, str]) -> Union[T, Any, list[T], list[Any]]: + if len(names) == 1: + return self.load_one(names[0]) + return list(map(lambda name: self.load_one(name), names)) + + def load_one(self, name: Union[int, str, Path]) -> Union[T, Any]: + if isinstance(name, int): + full_path = self.root_dir / f"/{name:03d}.pickle" + elif isinstance(name, Path): + full_path = self.root_dir / f"/{name}.pickle" + else: + full_path = name + + with open(full_path, "rb") as f: + data = pickle.load(f) + if self.autocast: + return self.autocast(data) + return data + + def iterate(self) -> Iterator[Union[T, Any]]: + pattern = os.path.join(self.root_dir, "*") + for file_path in sorted(glob.glob(pattern)): + yield self.load_one(file_path) + + def stream(self, rate_hz: float = 10.0) -> Observable[Union[T, Any]]: + sleep_time = 1.0 / rate_hz + + return from_iterable(self.iterate()).pipe( + ops.zip(interval(sleep_time)), + ops.map(lambda x: x[0] if isinstance(x, tuple) else x), + ) + + def save_stream(self, observable: Observable[Union[T, Any]]) -> Observable[int]: + return observable.pipe(ops.map(lambda frame: self.save_one(frame))) + + def save(self, *frames) -> int: + [self.save_one(frame) for frame in frames] + return self.cnt + + def save_one(self, frame) -> int: + file_name = f"/{self.cnt:03d}.pickle" + full_path = self.root_dir + file_name + + self.cnt += 1 + + if os.path.isfile(full_path): + raise Exception(f"file {full_path} exists") + + # Convert to raw message if frame has a raw_msg attribute + if hasattr(frame, "raw_msg"): + frame = frame.raw_msg + + with open(full_path, "wb") as f: + pickle.dump(frame, f) + + return self.cnt