diff --git a/dimos/msgs/foxglove_msgs/Arrow.py b/dimos/msgs/foxglove_msgs/Arrow.py new file mode 100644 index 0000000000..c2087071b1 --- /dev/null +++ b/dimos/msgs/foxglove_msgs/Arrow.py @@ -0,0 +1,111 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import asdict, dataclass +from typing import TYPE_CHECKING, Optional, TypedDict + +import numpy as np +from dimos_lcm.foxglove_msgs import ArrowPrimitive, Color, LinePrimitive +from dimos_lcm.geometry_msgs import Point, Vector3 + +if TYPE_CHECKING: + from dimos.msgs.geometry_msgs.Pose import Pose + from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped + from dimos.msgs.geometry_msgs.Transform import Transform + from dimos.msgs.geometry_msgs.Twist import Twist + + +class ArrowConfig(TypedDict, total=False): + shaft_diameter: float + head_diameter: float + head_length_ratio: float + head_length: Optional[float] + color: tuple[float, float, float, float] + + +default_config: ArrowConfig = { + "shaft_diameter": 0.02, + "head_diameter": 2.0, + "head_length_ratio": 1.0, + "head_length": None, + "color": (1.0, 0.0, 0.0, 1.0), +} + + +class Arrow(ArrowPrimitive): + @classmethod + def from_transform( + cls, + transform: "Transform", + pose: "Pose | PoseStamped", + arrow_config: ArrowConfig = {}, + ): + config = {**default_config, **arrow_config} + + # Apply transform to pose to get end position + transformed_pose = pose @ transform + + # Calculate arrow vector using Vector3 operations + arrow_vec = transformed_pose.position - pose.position + length = arrow_vec.length() + + # Calculate head length + head_length = config["head_length"] + if head_length is None: + head_length = config["head_diameter"] * config["head_length_ratio"] + + # Create arrow geometry + arrow = cls() + + # Set the pose (start position and orientation) + arrow.pose = pose + + # Set arrow properties using the actual ArrowPrimitive fields + arrow.shaft_length = length + arrow.shaft_diameter = config["shaft_diameter"] + arrow.head_length = head_length + arrow.head_diameter = config["head_diameter"] + + # Set color + arrow.color = Color( + r=config["color"][0], g=config["color"][1], b=config["color"][2], a=config["color"][3] + ) + + return arrow + + def lcm_encode(self) -> bytes: + """Encode Arrow to LCM binary format.""" + return self.encode() + + @classmethod + def lcm_decode(cls, data: bytes): + """Decode Arrow from LCM binary format. + + Note: This returns an Arrow instance, not ArrowPrimitive. + """ + # First decode as ArrowPrimitive + arrow_primitive = ArrowPrimitive.decode(data) + + # Create a new Arrow instance and copy all fields + arrow = cls() + arrow.pose = arrow_primitive.pose + arrow.shaft_length = arrow_primitive.shaft_length + arrow.shaft_diameter = arrow_primitive.shaft_diameter + arrow.head_length = arrow_primitive.head_length + arrow.head_diameter = arrow_primitive.head_diameter + arrow.color = arrow_primitive.color + + return arrow diff --git a/dimos/msgs/foxglove_msgs/__init__.py b/dimos/msgs/foxglove_msgs/__init__.py new file mode 100644 index 0000000000..638a3480f2 --- /dev/null +++ b/dimos/msgs/foxglove_msgs/__init__.py @@ -0,0 +1 @@ +from dimos.msgs.foxglove_msgs.Arrow import Arrow diff --git a/dimos/msgs/foxglove_msgs/test_Arrow.py b/dimos/msgs/foxglove_msgs/test_Arrow.py new file mode 100644 index 0000000000..511ce5a01c --- /dev/null +++ b/dimos/msgs/foxglove_msgs/test_Arrow.py @@ -0,0 +1,252 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, TypedDict + +import numpy as np + +from dimos.msgs.foxglove_msgs.Arrow import Arrow +from dimos.msgs.geometry_msgs import Pose, PoseStamped, Quaternion, Transform, Vector3 + + +class ArrowConfigDict(TypedDict, total=False): + shaft_diameter: float + head_diameter: float + head_length_ratio: float + head_length: Optional[float] + color: tuple[float, float, float, float] + + +def test_arrow_from_transform_basic(): + """Test basic arrow creation from pose and transform.""" + # Create a pose at origin + pose = Pose(1.0, 2.0, 3.0) + + # Create a transform that moves 2 units in x direction + transform = Transform(translation=Vector3(2.0, 0.0, 0.0)) + + # Create arrow + arrow = Arrow.from_transform(transform, pose) + + # Check that arrow pose matches input pose + assert arrow.pose.position.x == 1.0 + assert arrow.pose.position.y == 2.0 + assert arrow.pose.position.z == 3.0 + + # Check that shaft length matches the transform magnitude + expected_length = 2.0 # magnitude of Vector3(2.0, 0.0, 0.0) + assert np.isclose(arrow.shaft_length, expected_length, atol=1e-10) + + # Check default configuration values + assert arrow.shaft_diameter == 0.02 + assert arrow.head_diameter == 2.0 + assert arrow.color.r == 1.0 + assert arrow.color.g == 0.0 + assert arrow.color.b == 0.0 + assert arrow.color.a == 1.0 + + +def test_arrow_from_transform_with_config(): + """Test arrow creation with custom configuration.""" + pose = Pose(0.0, 0.0, 0.0) + transform = Transform(translation=Vector3(1.0, 1.0, 0.0)) + + # Custom configuration + config = { + "shaft_diameter": 0.05, + "head_diameter": 1.5, + "color": (0.0, 1.0, 0.0, 0.8), # Green with transparency + } + + arrow = Arrow.from_transform(transform, pose, config) + + # Check custom values were applied + assert arrow.shaft_diameter == 0.05 + assert arrow.head_diameter == 1.5 + assert arrow.color.r == 0.0 + assert arrow.color.g == 1.0 + assert arrow.color.b == 0.0 + assert arrow.color.a == 0.8 + + # Check shaft length matches transform magnitude + expected_length = np.sqrt(2.0) # magnitude of Vector3(1.0, 1.0, 0.0) + assert np.isclose(arrow.shaft_length, expected_length, atol=1e-10) + + +def test_arrow_from_transform_zero_length(): + """Test arrow creation with zero-length transform.""" + pose = Pose(5.0, 5.0, 5.0) + + # Zero transform (no movement) - identity transform + transform = Transform() + + arrow = Arrow.from_transform( + transform, + pose, + ) + + # Arrow should have zero length + assert arrow.shaft_length == 0.0 + + # Pose should be preserved + assert arrow.pose.position.x == 5.0 + assert arrow.pose.position.y == 5.0 + assert arrow.pose.position.z == 5.0 + + +def test_arrow_head_length_calculation(): + """Test head length calculation with and without explicit setting.""" + pose = Pose() + transform = Transform(translation=Vector3(1.0, 0.0, 0.0)) + + # Test with default head length (should be head_diameter * head_length_ratio) + arrow1 = Arrow.from_transform( + transform, + pose, + ) + expected_head_length = 2.0 * 1.0 # head_diameter * head_length_ratio + assert arrow1.head_length == expected_head_length + + # Test with explicit head length + config = {"head_length": 0.5} + arrow2 = Arrow.from_transform(transform, pose, config) + assert arrow2.head_length == 0.5 + + # Test with custom ratio + config = {"head_length_ratio": 2.0} + arrow3 = Arrow.from_transform(transform, pose, config) + expected_head_length = 2.0 * 2.0 # head_diameter * custom_ratio + assert arrow3.head_length == expected_head_length + + +def test_arrow_3d_transform(): + """Test arrow with 3D translation vector.""" + pose = Pose(1.0, 1.0, 1.0) + transform = Transform(translation=Vector3(2.0, 3.0, 6.0)) # magnitude = 7.0 + + arrow = Arrow.from_transform(transform, pose) + + expected_length = 7.0 # sqrt(2^2 + 3^2 + 6^2) + assert np.isclose(arrow.shaft_length, expected_length, atol=1e-10) + + # Verify the arrow starts at the original pose + assert arrow.pose.position.x == 1.0 + assert arrow.pose.position.y == 1.0 + assert arrow.pose.position.z == 1.0 + + +def test_arrow_lcm_encode_decode(): + """Test LCM encoding and decoding of Arrow.""" + # Create an arrow using from_transform + pose = Pose(1.0, 2.0, 3.0, 0.0, 0.0, 0.707107, 0.707107) # 90 deg around Z + transform = Transform(translation=Vector3(2.0, 1.0, 0.5)) + config = { + "shaft_diameter": 0.03, + "head_diameter": 1.8, + "head_length": 0.4, + "color": (0.2, 0.8, 0.3, 0.9), + } + + arrow_source = Arrow.from_transform(transform, pose, config) + + # Encode to binary + binary_msg = arrow_source.lcm_encode() + + # Decode from binary + arrow_dest = Arrow.lcm_decode(binary_msg) + + # Verify it's a new instance of Arrow (not ArrowPrimitive) + assert isinstance(arrow_dest, Arrow) + assert arrow_dest is not arrow_source + + # Verify all fields match + assert np.isclose(arrow_dest.pose.position.x, arrow_source.pose.position.x, atol=1e-10) + assert np.isclose(arrow_dest.pose.position.y, arrow_source.pose.position.y, atol=1e-10) + assert np.isclose(arrow_dest.pose.position.z, arrow_source.pose.position.z, atol=1e-10) + assert np.isclose(arrow_dest.pose.orientation.x, arrow_source.pose.orientation.x, atol=1e-10) + assert np.isclose(arrow_dest.pose.orientation.y, arrow_source.pose.orientation.y, atol=1e-10) + assert np.isclose(arrow_dest.pose.orientation.z, arrow_source.pose.orientation.z, atol=1e-10) + assert np.isclose(arrow_dest.pose.orientation.w, arrow_source.pose.orientation.w, atol=1e-10) + + assert np.isclose(arrow_dest.shaft_length, arrow_source.shaft_length, atol=1e-10) + assert np.isclose(arrow_dest.shaft_diameter, arrow_source.shaft_diameter, atol=1e-10) + assert np.isclose(arrow_dest.head_length, arrow_source.head_length, atol=1e-10) + assert np.isclose(arrow_dest.head_diameter, arrow_source.head_diameter, atol=1e-10) + + assert np.isclose(arrow_dest.color.r, arrow_source.color.r, atol=1e-10) + assert np.isclose(arrow_dest.color.g, arrow_source.color.g, atol=1e-10) + assert np.isclose(arrow_dest.color.b, arrow_source.color.b, atol=1e-10) + assert np.isclose(arrow_dest.color.a, arrow_source.color.a, atol=1e-10) + + +def test_arrow_from_transform_with_posestamped(): + """Test arrow creation from PoseStamped and transform.""" + # Create a PoseStamped + pose_stamped = PoseStamped( + ts=1234567890.123, + frame_id="base_link", + position=Vector3(2.0, 3.0, 4.0), + orientation=Quaternion(0.0, 0.0, 0.0, 1.0), + ) + + # Create a transform + transform = Transform( + translation=Vector3(3.0, 0.0, 0.0), rotation=Quaternion(0.0, 0.0, 0.0, 1.0) + ) + + # Create arrow + arrow = Arrow.from_transform(transform, pose_stamped) + + # Check that arrow pose matches input pose_stamped + assert arrow.pose.position.x == 2.0 + assert arrow.pose.position.y == 3.0 + assert arrow.pose.position.z == 4.0 + + # Check that shaft length matches the transform magnitude + expected_length = 3.0 # magnitude of Vector3(3.0, 0.0, 0.0) + assert np.isclose(arrow.shaft_length, expected_length, atol=1e-10) + + # Verify the arrow properties are set correctly + assert arrow.shaft_diameter == 0.02 + assert arrow.head_diameter == 2.0 + assert arrow.color.r == 1.0 + assert arrow.color.g == 0.0 + assert arrow.color.b == 0.0 + assert arrow.color.a == 1.0 + + +def test_arrow_complex_transform_with_rotation(): + """Test arrow with transform that includes both translation and rotation.""" + # Create a pose at origin facing forward + pose = Pose(0.0, 0.0, 0.0) + + # Create a transform with translation and 45 degree rotation around Z + angle = np.pi / 4 # 45 degrees + transform = Transform( + translation=Vector3(2.0, 2.0, 0.0), + rotation=Quaternion(0.0, 0.0, np.sin(angle / 2), np.cos(angle / 2)), + ) + + # Create arrow + arrow = Arrow.from_transform(transform, pose) + + # The arrow vector should be from pose position to transformed position + # Since pose is at origin, the transformed position is just the translation + expected_length = np.sqrt(8.0) # magnitude of Vector3(2.0, 2.0, 0.0) + assert np.isclose(arrow.shaft_length, expected_length, atol=1e-10) + + # Arrow should start at the original pose + assert arrow.pose.position.x == 0.0 + assert arrow.pose.position.y == 0.0 + assert arrow.pose.position.z == 0.0 diff --git a/dimos/msgs/geometry_msgs/Pose.py b/dimos/msgs/geometry_msgs/Pose.py index 74b534fefa..0a32210be0 100644 --- a/dimos/msgs/geometry_msgs/Pose.py +++ b/dimos/msgs/geometry_msgs/Pose.py @@ -20,15 +20,18 @@ from typing import BinaryIO, TypeAlias from dimos_lcm.geometry_msgs import Pose as LCMPose +from dimos_lcm.geometry_msgs import Transform as LCMTransform from plum import dispatch from dimos.msgs.geometry_msgs.Quaternion import Quaternion, QuaternionConvertable +from dimos.msgs.geometry_msgs.Transform import Transform from dimos.msgs.geometry_msgs.Vector3 import Vector3, VectorConvertable # Types that can be converted to/from Pose PoseConvertable: TypeAlias = ( tuple[VectorConvertable, QuaternionConvertable] | LCMPose + | Vector3 | dict[str, VectorConvertable | QuaternionConvertable] ) @@ -156,7 +159,8 @@ def __repr__(self) -> str: def __str__(self) -> str: return ( f"Pose(pos=[{self.x:.3f}, {self.y:.3f}, {self.z:.3f}], " - f"euler=[{self.roll:.3f}, {self.pitch:.3f}, {self.yaw:.3f}])" + f"euler=[{self.roll:.3f}, {self.pitch:.3f}, {self.yaw:.3f}]), " + f"quaternion=[{self.orientation}])" ) def __eq__(self, other) -> bool: @@ -165,15 +169,86 @@ def __eq__(self, other) -> bool: return False return self.position == other.position and self.orientation == other.orientation + def __matmul__(self, transform: LCMTransform | Transform) -> Pose: + return self + transform + + def find_transform(self, other: PoseConvertable) -> LCMTransform: + other_pose = to_pose(other) if not isinstance(other, Pose) else other + + inv_orientation = self.orientation.conjugate() + + pos_diff = other_pose.position - self.position + + local_translation = inv_orientation.rotate_vector(pos_diff) + + relative_rotation = inv_orientation * other_pose.orientation + + transform = LCMTransform() + transform.translation = local_translation + transform.rotation = relative_rotation + + return transform + + def __add__(self, other: "Pose" | PoseConvertable | LCMTransform | Transform) -> "Pose": + """Compose two poses or apply a transform (transform composition). + + The operation self + other represents applying transformation 'other' + in the coordinate frame defined by 'self'. This is equivalent to: + - First apply transformation 'self' (from world to self's frame) + - Then apply transformation 'other' (from self's frame to other's frame) + + This matches ROS tf convention where: + T_world_to_other = T_world_to_self * T_self_to_other + + Args: + other: The pose or transform to compose with this one + + Returns: + A new Pose representing the composed transformation + + Example: + robot_pose = Pose(1, 0, 0) # Robot at (1,0,0) facing forward + object_in_robot = Pose(2, 0, 0) # Object 2m in front of robot + object_in_world = robot_pose + object_in_robot # Object at (3,0,0) in world + + # Or with a Transform: + transform = Transform() + transform.translation = Vector3(2, 0, 0) + transform.rotation = Quaternion(0, 0, 0, 1) + new_pose = pose + transform + """ + # Handle Transform objects + if isinstance(other, (LCMTransform, Transform)): + # Convert Transform to Pose using its translation and rotation + other_position = Vector3(other.translation) + other_orientation = Quaternion(other.rotation) + elif isinstance(other, Pose): + other_position = other.position + other_orientation = other.orientation + else: + # Convert to Pose if it's a convertible type + other_pose = Pose(other) + other_position = other_pose.position + other_orientation = other_pose.orientation + + # Compose orientations: self.orientation * other.orientation + new_orientation = self.orientation * other_orientation + + # Transform other's position by self's orientation, then add to self's position + rotated_position = self.orientation.rotate_vector(other_position) + new_position = self.position + rotated_position + + return Pose(new_position, new_orientation) + @dispatch -def to_pose(value: "Pose") -> Pose: +def to_pose(value: "Pose") -> "Pose": """Pass through Pose objects.""" return value @dispatch -def to_pose(value: PoseConvertable | Pose) -> Pose: +def to_pose(value: PoseConvertable) -> Pose: """Convert a pose-compatible value to a Pose object.""" return Pose(value) diff --git a/dimos/msgs/geometry_msgs/Quaternion.py b/dimos/msgs/geometry_msgs/Quaternion.py index ccb3328510..2a7e398379 100644 --- a/dimos/msgs/geometry_msgs/Quaternion.py +++ b/dimos/msgs/geometry_msgs/Quaternion.py @@ -165,3 +165,73 @@ def __eq__(self, other) -> bool: if not isinstance(other, Quaternion): return False return self.x == other.x and self.y == other.y and self.z == other.z and self.w == other.w + + def __mul__(self, other: "Quaternion") -> "Quaternion": + """Multiply two quaternions (Hamilton product). + + The result represents the composition of rotations: + q1 * q2 represents rotating by q2 first, then by q1. + """ + if not isinstance(other, Quaternion): + raise TypeError(f"Cannot multiply Quaternion with {type(other)}") + + # Hamilton product formula + w = self.w * other.w - self.x * other.x - self.y * other.y - self.z * other.z + x = self.w * other.x + self.x * other.w + self.y * other.z - self.z * other.y + y = self.w * other.y - self.x * other.z + self.y * other.w + self.z * other.x + z = self.w * other.z + self.x * other.y - self.y * other.x + self.z * other.w + + return Quaternion(x, y, z, w) + + def conjugate(self) -> Quaternion: + """Return the conjugate of the quaternion. + + For unit quaternions, the conjugate represents the inverse rotation. + """ + return Quaternion(-self.x, -self.y, -self.z, self.w) + + def inverse(self) -> Quaternion: + """Return the inverse of the quaternion. + + For unit quaternions, this is equivalent to the conjugate. + For non-unit quaternions, this is conjugate / norm^2. + """ + norm_sq = self.x**2 + self.y**2 + self.z**2 + self.w**2 + if norm_sq == 0: + raise ZeroDivisionError("Cannot invert zero quaternion") + + # For unit quaternions (norm_sq ≈ 1), this simplifies to conjugate + if np.isclose(norm_sq, 1.0): + return self.conjugate() + + # For non-unit quaternions + conj = self.conjugate() + return Quaternion(conj.x / norm_sq, conj.y / norm_sq, conj.z / norm_sq, conj.w / norm_sq) + + def normalize(self) -> Quaternion: + """Return a normalized (unit) quaternion.""" + norm = np.sqrt(self.x**2 + self.y**2 + self.z**2 + self.w**2) + if norm == 0: + raise ZeroDivisionError("Cannot normalize zero quaternion") + return Quaternion(self.x / norm, self.y / norm, self.z / norm, self.w / norm) + + def rotate_vector(self, vector: Vector3) -> Vector3: + """Rotate a 3D vector by this quaternion. + + Args: + vector: The vector to rotate + + Returns: + The rotated vector + """ + # For unit quaternions, conjugate equals inverse, so we use conjugate for efficiency + # The rotation formula is: q * v * q^* where q^* is the conjugate + + # Convert vector to pure quaternion (w=0) + v_quat = Quaternion(vector.x, vector.y, vector.z, 0) + + # Apply rotation: q * v * q^* (conjugate for unit quaternions) + rotated = self * v_quat * self.conjugate() + + # Extract vector components + return Vector3(rotated.x, rotated.y, rotated.z) diff --git a/dimos/msgs/geometry_msgs/Transform.py b/dimos/msgs/geometry_msgs/Transform.py new file mode 100644 index 0000000000..a4a75993fb --- /dev/null +++ b/dimos/msgs/geometry_msgs/Transform.py @@ -0,0 +1,111 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import struct +from io import BytesIO +from typing import BinaryIO + +from dimos_lcm.geometry_msgs import Transform as LCMTransform +from plum import dispatch + +from .Quaternion import Quaternion +from .Vector3 import Vector3 + + +class Transform(LCMTransform): + translation: Vector3 + rotation: Quaternion + msg_name = "geometry_msgs.Transform" + + @dispatch + def __init__(self) -> None: + """Initialize an identity transform.""" + self.translation = Vector3() + self.rotation = Quaternion() + + @dispatch + def __init__(self, translation: Vector3) -> None: + """Initialize a transform with only translation (identity rotation).""" + self.translation = translation + self.rotation = Quaternion() + + @dispatch + def __init__(self, rotation: Quaternion) -> None: + """Initialize a transform with only rotation (zero translation).""" + self.translation = Vector3() + self.rotation = rotation + + @dispatch + def __init__(self, translation: Vector3, rotation: Quaternion) -> None: + """Initialize a transform from translation and rotation.""" + self.translation = translation + self.rotation = rotation + + @dispatch + def __init__(self, transform: "Transform") -> None: + """Initialize from another Transform (copy constructor).""" + self.translation = Vector3(transform.translation) + self.rotation = Quaternion(transform.rotation) + + @dispatch + def __init__(self, lcm_transform: LCMTransform) -> None: + """Initialize from an LCM Transform.""" + self.translation = Vector3(lcm_transform.translation) + self.rotation = Quaternion(lcm_transform.rotation) + + @dispatch + def __init__(self, **kwargs): + """Handle keyword arguments for LCM compatibility.""" + # Get values with defaults and let dispatch handle type conversion + translation = kwargs.get("translation", Vector3()) + rotation = kwargs.get("rotation", Quaternion()) + + # Call the appropriate positional init - dispatch will handle the types + self.__init__(translation, rotation) + + @classmethod + def lcm_decode(cls, data: bytes | BinaryIO): + if not hasattr(data, "read"): + data = BytesIO(data) + if data.read(8) != cls._get_packed_fingerprint(): + raise ValueError("Decode error") + return cls._lcm_decode_one(data) + + @classmethod + def _lcm_decode_one(cls, buf): + translation = Vector3._lcm_decode_one(buf) + rotation = Quaternion._lcm_decode_one(buf) + return cls(translation=translation, rotation=rotation) + + def lcm_encode(self) -> bytes: + return super().encode() + + def __repr__(self) -> str: + return f"Transform(translation={self.translation!r}, rotation={self.rotation!r})" + + def __str__(self) -> str: + return f"Transform:\n Translation: {self.translation}\n Rotation: {self.rotation}" + + def __eq__(self, other) -> bool: + """Check if two transforms are equal.""" + if not isinstance(other, Transform): + return False + return self.translation == other.translation and self.rotation == other.rotation + + @classmethod + def identity(cls) -> Transform: + """Create an identity transform.""" + return cls() diff --git a/dimos/msgs/geometry_msgs/Twist.py b/dimos/msgs/geometry_msgs/Twist.py new file mode 100644 index 0000000000..2d67d8c765 --- /dev/null +++ b/dimos/msgs/geometry_msgs/Twist.py @@ -0,0 +1,119 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import struct +from io import BytesIO +from typing import BinaryIO + +from dimos_lcm.geometry_msgs import Twist as LCMTwist +from plum import dispatch + +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3, VectorLike + + +class Twist(LCMTwist): + linear: Vector3 + angular: Vector3 + msg_name = "geometry_msgs.Twist" + + @dispatch + def __init__(self) -> None: + """Initialize a zero twist (no linear or angular velocity).""" + self.linear = Vector3() + self.angular = Vector3() + + @dispatch + def __init__(self, linear: VectorLike, angular: VectorLike) -> None: + """Initialize a twist from linear and angular velocities.""" + + self.linear = Vector3(linear) + self.angular = Vector3(angular) + + @dispatch + def __init__(self, linear: VectorLike, angular: Quaternion) -> None: + """Initialize a twist from linear velocity and angular as quaternion (converted to euler).""" + self.linear = Vector3(linear) + self.angular = angular.to_euler() + + @dispatch + def __init__(self, twist: "Twist") -> None: + """Initialize from another Twist (copy constructor).""" + self.linear = Vector3(twist.linear) + self.angular = Vector3(twist.angular) + + @dispatch + def __init__(self, lcm_twist: LCMTwist) -> None: + """Initialize from an LCM Twist.""" + self.linear = Vector3(lcm_twist.linear) + self.angular = Vector3(lcm_twist.angular) + + @dispatch + def __init__(self, **kwargs): + """Handle keyword arguments for LCM compatibility.""" + linear = kwargs.get("linear", Vector3()) + angular = kwargs.get("angular", Vector3()) + + self.__init__(linear, angular) + + @classmethod + def lcm_decode(cls, data: bytes | BinaryIO): + if not hasattr(data, "read"): + data = BytesIO(data) + if data.read(8) != cls._get_packed_fingerprint(): + raise ValueError("Decode error") + return cls._lcm_decode_one(data) + + @classmethod + def _lcm_decode_one(cls, buf): + linear = Vector3._lcm_decode_one(buf) + angular = Vector3._lcm_decode_one(buf) + return cls(linear=linear, angular=angular) + + def lcm_encode(self) -> bytes: + return super().encode() + + def __repr__(self) -> str: + return f"Twist(linear={self.linear!r}, angular={self.angular!r})" + + def __str__(self) -> str: + return f"Twist:\n Linear: {self.linear}\n Angular: {self.angular}" + + def __eq__(self, other) -> bool: + """Check if two twists are equal.""" + if not isinstance(other, Twist): + return False + return self.linear == other.linear and self.angular == other.angular + + @classmethod + def zero(cls) -> Twist: + """Create a zero twist (no motion).""" + return cls() + + def is_zero(self) -> bool: + """Check if this is a zero twist (no linear or angular velocity).""" + return self.linear.is_zero() and self.angular.is_zero() + + def __bool__(self) -> bool: + """Boolean conversion for Twist. + + A Twist is considered False if it's a zero twist (no motion), + and True otherwise. + + Returns: + False if twist is zero, True otherwise + """ + return not self.is_zero() diff --git a/dimos/msgs/geometry_msgs/__init__.py b/dimos/msgs/geometry_msgs/__init__.py index 2af44a7ff5..86d25bb843 100644 --- a/dimos/msgs/geometry_msgs/__init__.py +++ b/dimos/msgs/geometry_msgs/__init__.py @@ -1,4 +1,6 @@ -from dimos.msgs.geometry_msgs.Pose import Pose +from dimos.msgs.geometry_msgs.Pose import Pose, PoseLike, to_pose from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Quaternion import Quaternion -from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.msgs.geometry_msgs.Transform import Transform +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.msgs.geometry_msgs.Vector3 import Vector3, VectorLike diff --git a/dimos/msgs/geometry_msgs/test_Pose.py b/dimos/msgs/geometry_msgs/test_Pose.py index 590a17549c..ad103e8255 100644 --- a/dimos/msgs/geometry_msgs/test_Pose.py +++ b/dimos/msgs/geometry_msgs/test_Pose.py @@ -44,6 +44,29 @@ def test_pose_default_init(): assert pose.z == 0.0 +def test_pose_pose_init(): + """Test initialization with position coordinates only (identity orientation).""" + pose_data = Pose(1.0, 2.0, 3.0) + + pose = to_pose(pose_data) + + # Position should be as specified + assert pose.position.x == 1.0 + assert pose.position.y == 2.0 + assert pose.position.z == 3.0 + + # Orientation should be identity quaternion + assert pose.orientation.x == 0.0 + assert pose.orientation.y == 0.0 + assert pose.orientation.z == 0.0 + assert pose.orientation.w == 1.0 + + # Test convenience properties + assert pose.x == 1.0 + assert pose.y == 2.0 + assert pose.z == 3.0 + + def test_pose_position_init(): """Test initialization with position coordinates only (identity orientation).""" pose = Pose(1.0, 2.0, 3.0) @@ -553,3 +576,171 @@ def encodepass(): import timeit print(f"{timeit.timeit(encodepass, number=1000)} ms per cycle") + + +def test_pose_addition_translation_only(): + """Test pose addition with translation only (identity rotations).""" + # Two poses with only translations + pose1 = Pose(1.0, 2.0, 3.0) # First translation + pose2 = Pose(4.0, 5.0, 6.0) # Second translation + + # Adding should combine translations + result = pose1 + pose2 + + assert result.position.x == 5.0 # 1 + 4 + assert result.position.y == 7.0 # 2 + 5 + assert result.position.z == 9.0 # 3 + 6 + + # Orientation should remain identity + assert result.orientation.x == 0.0 + assert result.orientation.y == 0.0 + assert result.orientation.z == 0.0 + assert result.orientation.w == 1.0 + + +def test_pose_addition_with_rotation(): + """Test pose addition with rotation applied to translation.""" + # First pose: at origin, rotated 90 degrees around Z (yaw) + # 90 degree rotation quaternion around Z: (0, 0, sin(pi/4), cos(pi/4)) + angle = np.pi / 2 # 90 degrees + pose1 = Pose(0.0, 0.0, 0.0, 0.0, 0.0, np.sin(angle / 2), np.cos(angle / 2)) + + # Second pose: 1 unit forward (along X in its frame) + pose2 = Pose(1.0, 0.0, 0.0) + + # After rotation, the forward direction should be along Y + result = pose1 + pose2 + + # Position should be rotated + assert np.isclose(result.position.x, 0.0, atol=1e-10) + assert np.isclose(result.position.y, 1.0, atol=1e-10) + assert np.isclose(result.position.z, 0.0, atol=1e-10) + + # Orientation should be same as pose1 (pose2 has identity rotation) + assert np.isclose(result.orientation.x, 0.0, atol=1e-10) + assert np.isclose(result.orientation.y, 0.0, atol=1e-10) + assert np.isclose(result.orientation.z, np.sin(angle / 2), atol=1e-10) + assert np.isclose(result.orientation.w, np.cos(angle / 2), atol=1e-10) + + +def test_pose_addition_rotation_composition(): + """Test that rotations are properly composed.""" + # First pose: 45 degrees around Z + angle1 = np.pi / 4 # 45 degrees + pose1 = Pose(0.0, 0.0, 0.0, 0.0, 0.0, np.sin(angle1 / 2), np.cos(angle1 / 2)) + + # Second pose: another 45 degrees around Z + angle2 = np.pi / 4 # 45 degrees + pose2 = Pose(0.0, 0.0, 0.0, 0.0, 0.0, np.sin(angle2 / 2), np.cos(angle2 / 2)) + + # Result should be 90 degrees around Z + result = pose1 + pose2 + + # Check final angle is 90 degrees + expected_angle = angle1 + angle2 # 90 degrees + expected_qz = np.sin(expected_angle / 2) + expected_qw = np.cos(expected_angle / 2) + + assert np.isclose(result.orientation.z, expected_qz, atol=1e-10) + assert np.isclose(result.orientation.w, expected_qw, atol=1e-10) + + +def test_pose_addition_full_transform(): + """Test full pose composition with translation and rotation.""" + # Robot pose: at (2, 1, 0), facing 90 degrees left (positive yaw) + robot_yaw = np.pi / 2 # 90 degrees + robot_pose = Pose(2.0, 1.0, 0.0, 0.0, 0.0, np.sin(robot_yaw / 2), np.cos(robot_yaw / 2)) + + # Object in robot frame: 3 units forward, 1 unit right + object_in_robot = Pose(3.0, -1.0, 0.0) + + # Compose to get object in world frame + object_in_world = robot_pose + object_in_robot + + # Robot is facing left (90 degrees), so: + # - Robot's forward (X) is world's negative Y + # - Robot's right (negative Y) is world's X + # So object should be at: robot_pos + rotated_offset + # rotated_offset: (3, -1) rotated 90° CCW = (1, 3) + assert np.isclose(object_in_world.position.x, 3.0, atol=1e-10) # 2 + 1 + assert np.isclose(object_in_world.position.y, 4.0, atol=1e-10) # 1 + 3 + assert np.isclose(object_in_world.position.z, 0.0, atol=1e-10) + + # Orientation should match robot's orientation (object has no rotation) + assert np.isclose(object_in_world.yaw, robot_yaw, atol=1e-10) + + +def test_pose_addition_chain(): + """Test chaining multiple pose additions.""" + # Create a chain of transformations + pose1 = Pose(1.0, 0.0, 0.0) # Move 1 unit in X + pose2 = Pose(0.0, 1.0, 0.0) # Move 1 unit in Y (relative to pose1) + pose3 = Pose(0.0, 0.0, 1.0) # Move 1 unit in Z (relative to pose1+pose2) + + # Chain them together + result = pose1 + pose2 + pose3 + + # Should accumulate all translations + assert result.position.x == 1.0 + assert result.position.y == 1.0 + assert result.position.z == 1.0 + + +def test_pose_addition_with_convertible(): + """Test pose addition with convertible types.""" + pose1 = Pose(1.0, 2.0, 3.0) + + # Add with tuple + pose_tuple = ([4.0, 5.0, 6.0], [0.0, 0.0, 0.0, 1.0]) + result1 = pose1 + pose_tuple + assert result1.position.x == 5.0 + assert result1.position.y == 7.0 + assert result1.position.z == 9.0 + + # Add with dict + pose_dict = {"position": [1.0, 0.0, 0.0], "orientation": [0.0, 0.0, 0.0, 1.0]} + result2 = pose1 + pose_dict + assert result2.position.x == 2.0 + assert result2.position.y == 2.0 + assert result2.position.z == 3.0 + + +def test_pose_identity_addition(): + """Test that adding identity pose leaves pose unchanged.""" + pose = Pose(1.0, 2.0, 3.0, 0.1, 0.2, 0.3, 0.9) + identity = Pose() # Identity pose at origin + + result = pose + identity + + # Should be unchanged + assert result.position.x == pose.position.x + assert result.position.y == pose.position.y + assert result.position.z == pose.position.z + assert result.orientation.x == pose.orientation.x + assert result.orientation.y == pose.orientation.y + assert result.orientation.z == pose.orientation.z + assert result.orientation.w == pose.orientation.w + + +def test_pose_addition_3d_rotation(): + """Test pose addition with 3D rotations.""" + # First pose: rotated around X axis (roll) + roll = np.pi / 4 # 45 degrees + pose1 = Pose(1.0, 0.0, 0.0, np.sin(roll / 2), 0.0, 0.0, np.cos(roll / 2)) + + # Second pose: movement along Y and Z in local frame + pose2 = Pose(0.0, 1.0, 1.0) + + # Compose transformations + result = pose1 + pose2 + + # The Y and Z movement should be rotated around X + # After 45° rotation around X: + # - Local Y -> world Y * cos(45°) - Z * sin(45°) + # - Local Z -> world Y * sin(45°) + Z * cos(45°) + cos45 = np.cos(roll) + sin45 = np.sin(roll) + + assert np.isclose(result.position.x, 1.0, atol=1e-10) # X unchanged + assert np.isclose(result.position.y, cos45 - sin45, atol=1e-10) + assert np.isclose(result.position.z, sin45 + cos45, atol=1e-10) diff --git a/dimos/msgs/geometry_msgs/test_Quaternion.py b/dimos/msgs/geometry_msgs/test_Quaternion.py index ab049f809f..18f9e2c5ab 100644 --- a/dimos/msgs/geometry_msgs/test_Quaternion.py +++ b/dimos/msgs/geometry_msgs/test_Quaternion.py @@ -208,3 +208,180 @@ def test_lcm_encode_decode(): assert isinstance(q_dest, Quaternion) assert q_dest is not q_source assert q_dest == q_source + + +def test_quaternion_multiplication(): + """Test quaternion multiplication (Hamilton product).""" + # Test identity multiplication + q1 = Quaternion(0.5, 0.5, 0.5, 0.5) + identity = Quaternion(0, 0, 0, 1) + + result = q1 * identity + assert np.allclose([result.x, result.y, result.z, result.w], [q1.x, q1.y, q1.z, q1.w]) + + # Test multiplication order matters (non-commutative) + q2 = Quaternion(0.1, 0.2, 0.3, 0.4) + q3 = Quaternion(0.4, 0.3, 0.2, 0.1) + + result1 = q2 * q3 + result2 = q3 * q2 + + # Results should be different + assert not np.allclose( + [result1.x, result1.y, result1.z, result1.w], [result2.x, result2.y, result2.z, result2.w] + ) + + # Test specific multiplication case + # 90 degree rotations around Z axis + angle = np.pi / 2 + q_90z = Quaternion(0, 0, np.sin(angle / 2), np.cos(angle / 2)) + + # Two 90 degree rotations should give 180 degrees + result = q_90z * q_90z + expected_angle = np.pi + assert np.isclose(result.x, 0, atol=1e-10) + assert np.isclose(result.y, 0, atol=1e-10) + assert np.isclose(result.z, np.sin(expected_angle / 2), atol=1e-10) + assert np.isclose(result.w, np.cos(expected_angle / 2), atol=1e-10) + + +def test_quaternion_conjugate(): + """Test quaternion conjugate.""" + q = Quaternion(0.1, 0.2, 0.3, 0.4) + conj = q.conjugate() + + # Conjugate should negate x, y, z but keep w + assert conj.x == -q.x + assert conj.y == -q.y + assert conj.z == -q.z + assert conj.w == q.w + + # Test that q * q^* gives a real quaternion (x=y=z=0) + result = q * conj + assert np.isclose(result.x, 0, atol=1e-10) + assert np.isclose(result.y, 0, atol=1e-10) + assert np.isclose(result.z, 0, atol=1e-10) + # w should be the squared norm + expected_w = q.x**2 + q.y**2 + q.z**2 + q.w**2 + assert np.isclose(result.w, expected_w, atol=1e-10) + + +def test_quaternion_inverse(): + """Test quaternion inverse.""" + # Test with unit quaternion + q_unit = Quaternion(0, 0, 0, 1).normalize() # Already normalized but being explicit + inv = q_unit.inverse() + + # For unit quaternion, inverse equals conjugate + conj = q_unit.conjugate() + assert np.allclose([inv.x, inv.y, inv.z, inv.w], [conj.x, conj.y, conj.z, conj.w]) + + # Test that q * q^-1 = identity + q = Quaternion(0.5, 0.5, 0.5, 0.5) + inv = q.inverse() + result = q * inv + + assert np.isclose(result.x, 0, atol=1e-10) + assert np.isclose(result.y, 0, atol=1e-10) + assert np.isclose(result.z, 0, atol=1e-10) + assert np.isclose(result.w, 1, atol=1e-10) + + # Test inverse of non-unit quaternion + q_non_unit = Quaternion(2, 0, 0, 0) # Non-unit quaternion + inv = q_non_unit.inverse() + result = q_non_unit * inv + + assert np.isclose(result.x, 0, atol=1e-10) + assert np.isclose(result.y, 0, atol=1e-10) + assert np.isclose(result.z, 0, atol=1e-10) + assert np.isclose(result.w, 1, atol=1e-10) + + +def test_quaternion_normalize(): + """Test quaternion normalization.""" + # Test non-unit quaternion + q = Quaternion(1, 2, 3, 4) + q_norm = q.normalize() + + # Check that magnitude is 1 + magnitude = np.sqrt(q_norm.x**2 + q_norm.y**2 + q_norm.z**2 + q_norm.w**2) + assert np.isclose(magnitude, 1.0, atol=1e-10) + + # Check that direction is preserved + scale = np.sqrt(q.x**2 + q.y**2 + q.z**2 + q.w**2) + assert np.isclose(q_norm.x, q.x / scale, atol=1e-10) + assert np.isclose(q_norm.y, q.y / scale, atol=1e-10) + assert np.isclose(q_norm.z, q.z / scale, atol=1e-10) + assert np.isclose(q_norm.w, q.w / scale, atol=1e-10) + + +def test_quaternion_rotate_vector(): + """Test rotating vectors with quaternions.""" + from dimos.msgs.geometry_msgs.Vector3 import Vector3 + + # Test rotation of unit vectors + # 90 degree rotation around Z axis + angle = np.pi / 2 + q_rot = Quaternion(0, 0, np.sin(angle / 2), np.cos(angle / 2)) + + # Rotate X unit vector + v_x = Vector3(1, 0, 0) + v_rotated = q_rot.rotate_vector(v_x) + + # Should now point along Y axis + assert np.isclose(v_rotated.x, 0, atol=1e-10) + assert np.isclose(v_rotated.y, 1, atol=1e-10) + assert np.isclose(v_rotated.z, 0, atol=1e-10) + + # Rotate Y unit vector + v_y = Vector3(0, 1, 0) + v_rotated = q_rot.rotate_vector(v_y) + + # Should now point along negative X axis + assert np.isclose(v_rotated.x, -1, atol=1e-10) + assert np.isclose(v_rotated.y, 0, atol=1e-10) + assert np.isclose(v_rotated.z, 0, atol=1e-10) + + # Test that Z vector is unchanged (rotation axis) + v_z = Vector3(0, 0, 1) + v_rotated = q_rot.rotate_vector(v_z) + + assert np.isclose(v_rotated.x, 0, atol=1e-10) + assert np.isclose(v_rotated.y, 0, atol=1e-10) + assert np.isclose(v_rotated.z, 1, atol=1e-10) + + # Test identity rotation + q_identity = Quaternion(0, 0, 0, 1) + v = Vector3(1, 2, 3) + v_rotated = q_identity.rotate_vector(v) + + assert np.isclose(v_rotated.x, v.x, atol=1e-10) + assert np.isclose(v_rotated.y, v.y, atol=1e-10) + assert np.isclose(v_rotated.z, v.z, atol=1e-10) + + +def test_quaternion_inverse_zero(): + """Test that inverting zero quaternion raises error.""" + q_zero = Quaternion(0, 0, 0, 0) + + with pytest.raises(ZeroDivisionError, match="Cannot invert zero quaternion"): + q_zero.inverse() + + +def test_quaternion_normalize_zero(): + """Test that normalizing zero quaternion raises error.""" + q_zero = Quaternion(0, 0, 0, 0) + + with pytest.raises(ZeroDivisionError, match="Cannot normalize zero quaternion"): + q_zero.normalize() + + +def test_quaternion_multiplication_type_error(): + """Test that multiplying quaternion with non-quaternion raises error.""" + q = Quaternion(1, 0, 0, 0) + + with pytest.raises(TypeError, match="Cannot multiply Quaternion with"): + q * 5.0 + + with pytest.raises(TypeError, match="Cannot multiply Quaternion with"): + q * [1, 2, 3, 4] diff --git a/dimos/msgs/geometry_msgs/test_Transform.py b/dimos/msgs/geometry_msgs/test_Transform.py new file mode 100644 index 0000000000..64f8c6f8b1 --- /dev/null +++ b/dimos/msgs/geometry_msgs/test_Transform.py @@ -0,0 +1,270 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pytest +from dimos_lcm.geometry_msgs import Transform as LCMTransform + +from dimos.msgs.geometry_msgs import Pose, Quaternion, Transform, Vector3 + + +def test_transform_initialization(): + # Test default initialization (identity transform) + tf = Transform() + assert tf.translation.x == 0.0 + assert tf.translation.y == 0.0 + assert tf.translation.z == 0.0 + assert tf.rotation.x == 0.0 + assert tf.rotation.y == 0.0 + assert tf.rotation.z == 0.0 + assert tf.rotation.w == 1.0 + + # Test initialization with Vector3 and Quaternion + trans = Vector3(1.0, 2.0, 3.0) + rot = Quaternion(0.0, 0.0, 0.707107, 0.707107) # 90 degrees around Z + tf2 = Transform(trans, rot) + assert tf2.translation == trans + assert tf2.rotation == rot + + # Test copy constructor + tf3 = Transform(tf2) + assert tf3.translation == tf2.translation + assert tf3.rotation == tf2.rotation + assert tf3 == tf2 + # Ensure it's a deep copy + tf3.translation.x = 10.0 + assert tf2.translation.x == 1.0 + + # Test initialization from LCM Transform + lcm_tf = LCMTransform() + lcm_tf.translation = Vector3(4.0, 5.0, 6.0) + lcm_tf.rotation = Quaternion(0.0, 0.707107, 0.0, 0.707107) # 90 degrees around Y + tf4 = Transform(lcm_tf) + assert tf4.translation.x == 4.0 + assert tf4.translation.y == 5.0 + assert tf4.translation.z == 6.0 + assert np.isclose(tf4.rotation.x, 0.0) + assert np.isclose(tf4.rotation.y, 0.707107) + assert np.isclose(tf4.rotation.z, 0.0) + assert np.isclose(tf4.rotation.w, 0.707107) + + # Test initialization with only translation + tf5 = Transform(Vector3(7.0, 8.0, 9.0)) + assert tf5.translation.x == 7.0 + assert tf5.translation.y == 8.0 + assert tf5.translation.z == 9.0 + assert tf5.rotation.w == 1.0 # Identity rotation + + # Test initialization with only rotation + tf6 = Transform(Quaternion(0.0, 0.0, 0.0, 1.0)) + assert tf6.translation.is_zero() # Zero translation + assert tf6.rotation.w == 1.0 + + # Test keyword argument initialization + tf7 = Transform(translation=Vector3(1, 2, 3), rotation=Quaternion()) + assert tf7.translation == Vector3(1, 2, 3) + assert tf7.rotation == Quaternion() + + # Test keyword with only translation + tf8 = Transform(translation=Vector3(4, 5, 6)) + assert tf8.translation == Vector3(4, 5, 6) + assert tf8.rotation.w == 1.0 + + # Test keyword with only rotation + tf9 = Transform(rotation=Quaternion(0, 0, 1, 0)) + assert tf9.translation.is_zero() + assert tf9.rotation == Quaternion(0, 0, 1, 0) + + +def test_transform_identity(): + # Test identity class method + tf = Transform.identity() + assert tf.translation.is_zero() + assert tf.rotation.x == 0.0 + assert tf.rotation.y == 0.0 + assert tf.rotation.z == 0.0 + assert tf.rotation.w == 1.0 + + # Identity should equal default constructor + assert tf == Transform() + + +def test_transform_equality(): + tf1 = Transform(Vector3(1, 2, 3), Quaternion(0, 0, 0, 1)) + tf2 = Transform(Vector3(1, 2, 3), Quaternion(0, 0, 0, 1)) + tf3 = Transform(Vector3(1, 2, 4), Quaternion(0, 0, 0, 1)) # Different z + tf4 = Transform(Vector3(1, 2, 3), Quaternion(0, 0, 1, 0)) # Different rotation + + assert tf1 == tf2 + assert tf1 != tf3 + assert tf1 != tf4 + assert tf1 != "not a transform" + + +def test_transform_string_representations(): + tf = Transform(Vector3(1.5, -2.0, 3.14), Quaternion(0, 0, 0.707107, 0.707107)) + + # Test repr + repr_str = repr(tf) + assert "Transform" in repr_str + assert "translation=" in repr_str + assert "rotation=" in repr_str + assert "1.5" in repr_str + + # Test str + str_str = str(tf) + assert "Transform:" in str_str + assert "Translation:" in str_str + assert "Rotation:" in str_str + + +def test_pose_add_transform(): + initial_pose = Pose(1.0, 0.0, 0.0) + + # 90 degree rotation around Z axis + angle = np.pi / 2 + transform = Transform( + translation=Vector3(2.0, 1.0, 0.0), + rotation=Quaternion(0.0, 0.0, np.sin(angle / 2), np.cos(angle / 2)), + ) + + transformed_pose = initial_pose @ transform + + # - Translation (2, 1, 0) is added directly to position (1, 0, 0) + # - Result position: (3, 1, 0) + assert np.isclose(transformed_pose.position.x, 3.0, atol=1e-10) + assert np.isclose(transformed_pose.position.y, 1.0, atol=1e-10) + assert np.isclose(transformed_pose.position.z, 0.0, atol=1e-10) + + # Rotation should be 90 degrees around Z + assert np.isclose(transformed_pose.orientation.x, 0.0, atol=1e-10) + assert np.isclose(transformed_pose.orientation.y, 0.0, atol=1e-10) + assert np.isclose(transformed_pose.orientation.z, np.sin(angle / 2), atol=1e-10) + assert np.isclose(transformed_pose.orientation.w, np.cos(angle / 2), atol=1e-10) + + found_tf = initial_pose.find_transform(transformed_pose) + + assert found_tf.translation == transform.translation + assert found_tf.rotation == transform.rotation + assert found_tf.translation.x == transform.translation.x + assert found_tf.translation.y == transform.translation.y + assert found_tf.translation.z == transform.translation.z + + assert found_tf.rotation.x == transform.rotation.x + assert found_tf.rotation.y == transform.rotation.y + assert found_tf.rotation.z == transform.rotation.z + assert found_tf.rotation.w == transform.rotation.w + + print(found_tf.rotation, found_tf.translation) + + +def test_pose_add_transform_with_rotation(): + # Create a pose at (0, 0, 0) rotated 90 degrees around Z + angle = np.pi / 2 + initial_pose = Pose(0.0, 0.0, 0.0, 0.0, 0.0, np.sin(angle / 2), np.cos(angle / 2)) + + # Add 45 degree rotation to transform1 + rotation_angle = np.pi / 4 # 45 degrees + transform1 = Transform( + translation=Vector3(1.0, 0.0, 0.0), + rotation=Quaternion( + 0.0, 0.0, np.sin(rotation_angle / 2), np.cos(rotation_angle / 2) + ), # 45° around Z + ) + + transform2 = Transform( + translation=Vector3(0.0, 1.0, 1.0), + rotation=Quaternion(0.0, 0.0, 0.0, 1.0), # No rotation + ) + + transformed_pose1 = initial_pose @ transform1 + transformed_pose2 = initial_pose @ transform1 @ transform2 + + # Test transformed_pose1: initial_pose + transform1 + # Since the pose is rotated 90° (facing +Y), moving forward (local X) + # means moving in the +Y direction in world frame + assert np.isclose(transformed_pose1.position.x, 0.0, atol=1e-10) + assert np.isclose(transformed_pose1.position.y, 1.0, atol=1e-10) + assert np.isclose(transformed_pose1.position.z, 0.0, atol=1e-10) + + # Orientation should be 90° + 45° = 135° around Z + total_angle1 = angle + rotation_angle # 135 degrees + assert np.isclose(transformed_pose1.orientation.x, 0.0, atol=1e-10) + assert np.isclose(transformed_pose1.orientation.y, 0.0, atol=1e-10) + assert np.isclose(transformed_pose1.orientation.z, np.sin(total_angle1 / 2), atol=1e-10) + assert np.isclose(transformed_pose1.orientation.w, np.cos(total_angle1 / 2), atol=1e-10) + + # Test transformed_pose2: initial_pose + transform1 + transform2 + # Starting from (0, 0, 0) facing 90°: + # + # - Apply transform1: move 1 forward (along +Y) → (0, 1, 0), now facing 135° + # + # - Apply transform2: move 1 in local Y and 1 up + # At 135°, local Y points at 225° (135° + 90°) + # + # x += cos(225°) = -√2/2, y += sin(225°) = -√2/2 + sqrt2_2 = np.sqrt(2) / 2 + expected_x = 0.0 - sqrt2_2 # 0 - √2/2 ≈ -0.707 + expected_y = 1.0 - sqrt2_2 # 1 - √2/2 ≈ 0.293 + expected_z = 1.0 # 0 + 1 + + assert np.isclose(transformed_pose2.position.x, expected_x, atol=1e-10) + assert np.isclose(transformed_pose2.position.y, expected_y, atol=1e-10) + assert np.isclose(transformed_pose2.position.z, expected_z, atol=1e-10) + + # Orientation should be 135° (only transform1 has rotation) + total_angle2 = total_angle1 # 135 degrees (transform2 has no rotation) + assert np.isclose(transformed_pose2.orientation.x, 0.0, atol=1e-10) + assert np.isclose(transformed_pose2.orientation.y, 0.0, atol=1e-10) + assert np.isclose(transformed_pose2.orientation.z, np.sin(total_angle2 / 2), atol=1e-10) + assert np.isclose(transformed_pose2.orientation.w, np.cos(total_angle2 / 2), atol=1e-10) + + +def test_pose_add_lcm_transform(): + """Test that Pose works with both LCMTransform and Transform types.""" + initial_pose = Pose(1.0, 0.0, 0.0) + + # Create an LCMTransform + lcm_transform = LCMTransform() + lcm_transform.translation = Vector3(2.0, 1.0, 0.0) + angle = np.pi / 2 # 90 degrees + lcm_transform.rotation = Quaternion(0.0, 0.0, np.sin(angle / 2), np.cos(angle / 2)) + + # Create a regular Transform with same values + transform = Transform( + translation=Vector3(2.0, 1.0, 0.0), + rotation=Quaternion(0.0, 0.0, np.sin(angle / 2), np.cos(angle / 2)), + ) + + # Test both + and @ operators with LCMTransform + pose_lcm_add = initial_pose + lcm_transform + pose_lcm_matmul = initial_pose @ lcm_transform + + # Test both + and @ operators with Transform + pose_tf_add = initial_pose + transform + pose_tf_matmul = initial_pose @ transform + + # All results should be the same + assert np.isclose(pose_lcm_add.position.x, 3.0, atol=1e-10) + assert np.isclose(pose_lcm_add.position.y, 1.0, atol=1e-10) + assert np.isclose(pose_lcm_add.position.z, 0.0, atol=1e-10) + + # Check that all methods produce the same result + assert pose_lcm_add.position == pose_lcm_matmul.position + assert pose_lcm_add.position == pose_tf_add.position + assert pose_lcm_add.position == pose_tf_matmul.position + + assert pose_lcm_add.orientation == pose_lcm_matmul.orientation + assert pose_lcm_add.orientation == pose_tf_add.orientation + assert pose_lcm_add.orientation == pose_tf_matmul.orientation diff --git a/dimos/msgs/geometry_msgs/test_Twist.py b/dimos/msgs/geometry_msgs/test_Twist.py new file mode 100644 index 0000000000..c5df6d5597 --- /dev/null +++ b/dimos/msgs/geometry_msgs/test_Twist.py @@ -0,0 +1,198 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pytest +from dimos_lcm.geometry_msgs import Twist as LCMTwist + +from dimos.msgs.geometry_msgs import Quaternion, Twist, Vector3 + + +def test_twist_initialization(): + # Test default initialization (zero twist) + tw = Twist() + assert tw.linear.x == 0.0 + assert tw.linear.y == 0.0 + assert tw.linear.z == 0.0 + assert tw.angular.x == 0.0 + assert tw.angular.y == 0.0 + assert tw.angular.z == 0.0 + + # Test initialization with Vector3 linear and angular + lin = Vector3(1.0, 2.0, 3.0) + ang = Vector3(0.1, 0.2, 0.3) + tw2 = Twist(lin, ang) + assert tw2.linear == lin + assert tw2.angular == ang + + # Test copy constructor + tw3 = Twist(tw2) + assert tw3.linear == tw2.linear + assert tw3.angular == tw2.angular + assert tw3 == tw2 + # Ensure it's a deep copy + tw3.linear.x = 10.0 + assert tw2.linear.x == 1.0 + + # Test initialization from LCM Twist + lcm_tw = LCMTwist() + lcm_tw.linear = Vector3(4.0, 5.0, 6.0) + lcm_tw.angular = Vector3(0.4, 0.5, 0.6) + tw4 = Twist(lcm_tw) + assert tw4.linear.x == 4.0 + assert tw4.linear.y == 5.0 + assert tw4.linear.z == 6.0 + assert tw4.angular.x == 0.4 + assert tw4.angular.y == 0.5 + assert tw4.angular.z == 0.6 + + # Test initialization with linear and angular as quaternion + quat = Quaternion(0, 0, 0.707107, 0.707107) # 90 degrees around Z + tw5 = Twist(Vector3(1.0, 2.0, 3.0), quat) + assert tw5.linear == Vector3(1.0, 2.0, 3.0) + # Quaternion should be converted to euler angles + euler = quat.to_euler() + assert np.allclose(tw5.angular.x, euler.x) + assert np.allclose(tw5.angular.y, euler.y) + assert np.allclose(tw5.angular.z, euler.z) + + # Test keyword argument initialization + tw7 = Twist(linear=Vector3(1, 2, 3), angular=Vector3(0.1, 0.2, 0.3)) + assert tw7.linear == Vector3(1, 2, 3) + assert tw7.angular == Vector3(0.1, 0.2, 0.3) + + # Test keyword with only linear + tw8 = Twist(linear=Vector3(4, 5, 6)) + assert tw8.linear == Vector3(4, 5, 6) + assert tw8.angular.is_zero() + + # Test keyword with only angular + tw9 = Twist(angular=Vector3(0.4, 0.5, 0.6)) + assert tw9.linear.is_zero() + assert tw9.angular == Vector3(0.4, 0.5, 0.6) + + # Test keyword with angular as quaternion + tw10 = Twist(angular=Quaternion(0, 0, 0.707107, 0.707107)) + assert tw10.linear.is_zero() + euler = Quaternion(0, 0, 0.707107, 0.707107).to_euler() + assert np.allclose(tw10.angular.x, euler.x) + assert np.allclose(tw10.angular.y, euler.y) + assert np.allclose(tw10.angular.z, euler.z) + + # Test keyword with linear and angular as quaternion + tw11 = Twist(linear=Vector3(1, 0, 0), angular=Quaternion(0, 0, 0, 1)) + assert tw11.linear == Vector3(1, 0, 0) + assert tw11.angular.is_zero() # Identity quaternion -> zero euler angles + + +def test_twist_zero(): + # Test zero class method + tw = Twist.zero() + assert tw.linear.is_zero() + assert tw.angular.is_zero() + assert tw.is_zero() + + # Zero should equal default constructor + assert tw == Twist() + + +def test_twist_equality(): + tw1 = Twist(Vector3(1, 2, 3), Vector3(0.1, 0.2, 0.3)) + tw2 = Twist(Vector3(1, 2, 3), Vector3(0.1, 0.2, 0.3)) + tw3 = Twist(Vector3(1, 2, 4), Vector3(0.1, 0.2, 0.3)) # Different linear z + tw4 = Twist(Vector3(1, 2, 3), Vector3(0.1, 0.2, 0.4)) # Different angular z + + assert tw1 == tw2 + assert tw1 != tw3 + assert tw1 != tw4 + assert tw1 != "not a twist" + + +def test_twist_string_representations(): + tw = Twist(Vector3(1.5, -2.0, 3.14), Vector3(0.1, -0.2, 0.3)) + + # Test repr + repr_str = repr(tw) + assert "Twist" in repr_str + assert "linear=" in repr_str + assert "angular=" in repr_str + assert "1.5" in repr_str + assert "0.1" in repr_str + + # Test str + str_str = str(tw) + assert "Twist:" in str_str + assert "Linear:" in str_str + assert "Angular:" in str_str + + +def test_twist_is_zero(): + # Test zero twist + tw1 = Twist() + assert tw1.is_zero() + + # Test non-zero linear + tw2 = Twist(linear=Vector3(0.1, 0, 0)) + assert not tw2.is_zero() + + # Test non-zero angular + tw3 = Twist(angular=Vector3(0, 0, 0.1)) + assert not tw3.is_zero() + + # Test both non-zero + tw4 = Twist(Vector3(1, 2, 3), Vector3(0.1, 0.2, 0.3)) + assert not tw4.is_zero() + + +def test_twist_bool(): + # Test zero twist is False + tw1 = Twist() + assert not tw1 + + # Test non-zero twist is True + tw2 = Twist(linear=Vector3(1, 0, 0)) + assert tw2 + + tw3 = Twist(angular=Vector3(0, 0, 0.1)) + assert tw3 + + tw4 = Twist(Vector3(1, 2, 3), Vector3(0.1, 0.2, 0.3)) + assert tw4 + + +def test_twist_lcm_encoding(): + # Test encoding and decoding + tw = Twist(Vector3(1.5, 2.5, 3.5), Vector3(0.1, 0.2, 0.3)) + + # Encode + encoded = tw.lcm_encode() + assert isinstance(encoded, bytes) + + # Decode + decoded = Twist.lcm_decode(encoded) + assert decoded.linear == tw.linear + assert decoded.angular == tw.angular + assert decoded == tw + + +def test_twist_with_lists(): + # Test initialization with lists instead of Vector3 + tw1 = Twist(linear=[1, 2, 3], angular=[0.1, 0.2, 0.3]) + assert tw1.linear == Vector3(1, 2, 3) + assert tw1.angular == Vector3(0.1, 0.2, 0.3) + + # Test with numpy arrays + tw2 = Twist(linear=np.array([4, 5, 6]), angular=np.array([0.4, 0.5, 0.6])) + assert tw2.linear == Vector3(4, 5, 6) + assert tw2.angular == Vector3(0.4, 0.5, 0.6) diff --git a/dimos/perception/test_spatial_memory.py b/dimos/perception/test_spatial_memory.py index 9a519fe59c..11af5edb42 100644 --- a/dimos/perception/test_spatial_memory.py +++ b/dimos/perception/test_spatial_memory.py @@ -25,9 +25,9 @@ from reactivex import operators as ops from reactivex.subject import Subject +from dimos.msgs.geometry_msgs import Pose from dimos.perception.spatial_perception import SpatialMemory from dimos.stream.video_provider import VideoProvider -from dimos.types.pose import Pose from dimos.types.vector import Vector @@ -188,8 +188,8 @@ def on_completed(): similarities = [1 - r.get("distance") for r in results] print(f"Similarities: {similarities}") - assert any(d > 0.24 for d in similarities), ( - f"Expected at least one result with similarity > 0.24 for query '{query}'" + assert any(d > 0.22 for d in similarities), ( + f"Expected at least one result with similarity > 0.22 for query '{query}'" ) results = memory.query_by_text(irrelevant_query, limit=2) diff --git a/dimos/robot/frontier_exploration/wavefront_frontier_goal_selector.py b/dimos/robot/frontier_exploration/wavefront_frontier_goal_selector.py index 0e86462090..1aca32fc93 100644 --- a/dimos/robot/frontier_exploration/wavefront_frontier_goal_selector.py +++ b/dimos/robot/frontier_exploration/wavefront_frontier_goal_selector.py @@ -19,17 +19,18 @@ for autonomous navigation using the dimos Costmap and Vector types. """ -from typing import List, Tuple, Optional, Callable +import threading from collections import deque -import numpy as np from dataclasses import dataclass from enum import IntFlag -import threading -from dimos.utils.logging_config import setup_logger +from typing import Callable, List, Optional, Tuple -from dimos.types.costmap import Costmap, CostValues -from dimos.types.vector import Vector +import numpy as np + +from dimos.msgs.geometry_msgs import Vector3 as Vector from dimos.robot.frontier_exploration.utils import smooth_costmap_for_frontiers +from dimos.types.costmap import Costmap, CostValues +from dimos.utils.logging_config import setup_logger logger = setup_logger("dimos.robot.unitree.frontier_exploration") diff --git a/dimos/robot/global_planner/algo.py b/dimos/robot/global_planner/algo.py index fad3cf13f5..45aa483e07 100644 --- a/dimos/robot/global_planner/algo.py +++ b/dimos/robot/global_planner/algo.py @@ -18,9 +18,12 @@ from typing import Optional, Tuple from dimos.msgs.geometry_msgs import Vector3 as Vector -from dimos.msgs.geometry_msgs.Vector3 import VectorLike +from dimos.msgs.geometry_msgs import VectorLike from dimos.types.costmap import Costmap from dimos.types.path import Path +from dimos.utils.logging_config import setup_logger + +logger = setup_logger("dimos.robot.unitree.global_planner.astar") def find_nearest_free_cell( @@ -69,7 +72,7 @@ def find_nearest_free_cell( # Check if we've reached the maximum search radius if dist > max_search_radius: - print( + logger.info( f"Could not find free cell within {max_search_radius} cells of ({start_x}, {start_y})" ) return (start_x, start_y) # Return original position if no free cell found @@ -77,7 +80,7 @@ def find_nearest_free_cell( # Check if this cell is valid and free if 0 <= x < costmap.width and 0 <= y < costmap.height: if costmap.grid[y, x] < cost_threshold: - print( + logger.info( f"Found free cell at ({x}, {y}), {dist} cells away from ({start_x}, {start_y})" ) return (x, y) @@ -113,9 +116,11 @@ def astar( Returns: Path object containing waypoints, or None if no path found """ + # Convert world coordinates to grid coordinates directly using vector-like inputs start_vector = costmap.world_to_grid(start) goal_vector = costmap.world_to_grid(goal) + logger.info(f"ASTAR {costmap} {start_vector} -> {goal_vector}") # Store original positions for reference original_start = (int(start_vector.x), int(start_vector.y)) @@ -132,7 +137,7 @@ def astar( start_in_obstacle = costmap.grid[int(start_vector.y), int(start_vector.x)] >= cost_threshold if not start_valid or start_in_obstacle: - print("Start position is out of bounds or in an obstacle, finding nearest free cell") + logger.info("Start position is out of bounds or in an obstacle, finding nearest free cell") adjusted_start = find_nearest_free_cell(costmap, start, cost_threshold) # Update start_vector for later use start_vector = Vector(adjusted_start[0], adjusted_start[1]) @@ -145,7 +150,7 @@ def astar( goal_in_obstacle = costmap.grid[int(goal_vector.y), int(goal_vector.x)] >= cost_threshold if not goal_valid or goal_in_obstacle: - print("Goal position is out of bounds or in an obstacle, finding nearest free cell") + logger.info("Goal position is out of bounds or in an obstacle, finding nearest free cell") adjusted_goal = find_nearest_free_cell(costmap, goal, cost_threshold) # Update goal_vector for later use goal_vector = Vector(adjusted_goal[0], adjusted_goal[1]) diff --git a/dimos/robot/global_planner/planner.py b/dimos/robot/global_planner/planner.py index 9a1cea91b3..e1484e2bc9 100644 --- a/dimos/robot/global_planner/planner.py +++ b/dimos/robot/global_planner/planner.py @@ -18,7 +18,7 @@ from typing import Callable, Optional from dimos.core import In, Module, Out, rpc -from dimos.msgs.geometry_msgs import Vector3 +from dimos.msgs.geometry_msgs import Pose, PoseLike, PoseStamped, Vector3, to_pose from dimos.robot.global_planner.algo import astar from dimos.types.costmap import Costmap from dimos.types.path import Path @@ -31,7 +31,7 @@ @dataclass class Planner(Visualizable, Module): - target: In[Vector3] = None + target: In[PoseStamped] = None path: Out[Path] = None def __init__(self): @@ -78,9 +78,9 @@ def __init__( def start(self): self.target.subscribe(self.plan) - def plan(self, goal: VectorLike) -> Path: - print("planning path to goal", goal) - goal = to_vector(goal).to_2d() + def plan(self, goallike: PoseLike) -> Path: + goal = to_pose(goallike) + logger.info(f"planning path to goal {goal}") pos = self.get_robot_pos() print("current pos", pos) costmap = self.get_costmap() @@ -89,8 +89,7 @@ def plan(self, goal: VectorLike) -> Path: self.vis("target", goal) - print("ASTAR ", costmap, goal, pos) - path = astar(costmap, goal, pos) + path = astar(costmap, goal.position, pos) if path: path = path.resample(0.1) diff --git a/dimos/robot/local_planner/simple.py b/dimos/robot/local_planner/simple.py index ffa9a72867..e9c4dfe545 100644 --- a/dimos/robot/local_planner/simple.py +++ b/dimos/robot/local_planner/simple.py @@ -22,57 +22,50 @@ from reactivex import operators as ops from dimos.core import In, Module, Out, rpc -from dimos.msgs.geometry_msgs import PoseStamped, Vector3 +from dimos.msgs.foxglove_msgs import Arrow # from dimos.robot.local_planner.local_planner import LocalPlanner +from dimos.msgs.geometry_msgs import ( + Pose, + PoseLike, + PoseStamped, + Twist, + Vector3, + VectorLike, + to_pose, +) from dimos.types.costmap import Costmap from dimos.types.path import Path -from dimos.types.vector import Vector, VectorLike, to_vector from dimos.utils.logging_config import setup_logger from dimos.utils.threadpool import get_scheduler logger = setup_logger("dimos.robot.unitree.local_planner") -def transform_to_robot_frame(global_vector: Vector3, robot_position: PoseStamped) -> Vector: - """Transform a global coordinate vector to robot-relative coordinates. - - Args: - global_vector: Vector in global coordinates - robot_position: Robot's position and orientation - - Returns: - Vector in robot coordinates where X is forward/backward, Y is left/right - """ - # Get the robot's yaw angle (rotation around Z-axis) - robot_yaw = robot_position.yaw - - # Create rotation matrix to transform from global to robot frame - # We need to rotate the coordinate system by -robot_yaw to get robot-relative coordinates - cos_yaw = math.cos(-robot_yaw) - sin_yaw = math.sin(-robot_yaw) - - print(cos_yaw, sin_yaw, global_vector) - # Apply 2D rotation transformation - # This transforms a global direction vector into the robot's coordinate frame - # In robot frame: X=forward/backward, Y=left/right - # In global frame: X=east/west, Y=north/south - robot_x = global_vector.x * cos_yaw - global_vector.y * sin_yaw # Forward/backward - robot_y = global_vector.x * sin_yaw + global_vector.y * cos_yaw # Left/right - - return Vector3(-robot_x, robot_y, 0) +def vector_to_twist(vector: Vector3) -> Twist: + """Convert a Vector3 to a Twist message.""" + twist = Twist() + twist.linear.x = vector.x + twist.linear.y = vector.y + twist.linear.z = 0.0 + twist.angular.x = 0.0 + twist.angular.y = 0.0 + twist.angular.z = vector.z + return twist class SimplePlanner(Module): path: In[Path] = None odom: In[PoseStamped] = None - movecmd: Out[Vector3] = None + movecmd: Out[Twist] = None + + arrow: Out[Arrow] = None get_costmap: Callable[[], Costmap] latest_odom: Optional[PoseStamped] = None - goal: Optional[Vector] = None + goal: Optional[PoseStamped] = None speed: float = 0.3 def __init__( @@ -82,6 +75,13 @@ def __init__( Module.__init__(self) self.get_costmap = get_costmap + def transform_to_robot_frame( + self, global_target: Vector3, global_robot_position: PoseStamped + ) -> Vector3: + transform = global_robot_position.find_transform(global_target) + self.arrow.publish(Arrow.from_transform(transform, global_robot_position)) + return transform.translation + def get_move_stream(self, frequency: float = 20.0) -> rx.Observable: return rx.interval(1.0 / frequency, scheduler=get_scheduler()).pipe( # do we have a goal? @@ -95,18 +95,13 @@ def get_move_stream(self, frequency: float = 20.0) -> rx.Observable: ops.map(self._calculate_rotation_to_target), # filter out None results from calc_move ops.filter(lambda result: result is not None), + ops.map(vector_to_twist), ) - def _safe_transform_goal(self) -> Optional[Vector]: + def _safe_transform_goal(self) -> Optional[Vector3]: """Safely transform goal to robot frame with error handling.""" try: - if self.goal is None or self.latest_odom is None: - return None - - # Convert PoseStamped to Pose for transform function - # Pose constructor takes (pos, rot) where pos and rot are VectorLike - - return transform_to_robot_frame(self.goal, self.latest_odom) + return self.transform_to_robot_frame(self.goal, self.latest_odom) except Exception as e: logger.error(f"Error transforming goal to robot frame: {e}") print(traceback.format_exc()) @@ -116,10 +111,6 @@ def _safe_transform_goal(self) -> Optional[Vector]: def start(self): print(self.path.connection, self.path.transport) self.path.subscribe(self.set_goal) - self.movecmd.publish(Vector3(1, 2, 3)) - self.movecmd.publish(Vector3(1, 2, 3)) - self.movecmd.publish(Vector3(1, 2, 3)) - self.movecmd.publish(Vector3(1, 2, 3)) def setodom(odom: PoseStamped): self.latest_odom = odom @@ -138,17 +129,17 @@ def pubmove(move: Vector3): @dispatch def set_goal(self, goal: Path, stop_event=None, goal_theta=None) -> bool: - self.goal = goal.last().to_2d() + self.goal = to_pose(goal.last()) logger.info(f"Setting goal: {self.goal}") return True @dispatch def set_goal(self, goal: VectorLike, stop_event=None, goal_theta=None) -> bool: - self.goal = to_vector(goal).to_2d() + self.goal = to_pose(goal.last()) logger.info(f"Setting goal: {self.goal}") return True - def calc_move(self, direction: Vector) -> Optional[Vector]: + def calc_move(self, direction: Vector3) -> Optional[Vector3]: """Calculate the movement vector based on the direction to the goal. Args: @@ -206,7 +197,7 @@ def freq_spy_fun(x): return ops.map(freq_spy_fun) - def _test_translational_movement(self) -> Vector: + def _test_translational_movement(self) -> Vector3: """Test translational movement by alternating left and right movement. Returns: @@ -219,11 +210,11 @@ def _test_translational_movement(self) -> Vector: if phase < 0.5: # First half: move LEFT (positive X according to our documentation) - movement = Vector(0.2, 0, 0) # Move left at 0.2 m/s + movement = Vector3(0.2, 0, 0) # Move left at 0.2 m/s direction = "LEFT (positive X)" else: # Second half: move RIGHT (negative X according to our documentation) - movement = Vector(-0.2, 0, 0) # Move right at 0.2 m/s + movement = Vector3(-0.2, 0, 0) # Move right at 0.2 m/s direction = "RIGHT (negative X)" print("=== LEFT-RIGHT MOVEMENT TEST ===") @@ -233,7 +224,7 @@ def _test_translational_movement(self) -> Vector: print("===================================") return movement - def _calculate_rotation_to_target(self, direction_to_goal: Vector) -> Vector: + def _calculate_rotation_to_target(self, direction_to_goal: Vector3) -> Vector3: """Calculate the rotation needed for the robot to face the target. Args: @@ -244,7 +235,7 @@ def _calculate_rotation_to_target(self, direction_to_goal: Vector) -> Vector: """ if self.latest_odom is None: logger.warning("No odometry data available for rotation calculation") - return Vector(0, 0, 0) + return Vector3(0, 0, 0) # Calculate the desired yaw angle to face the target desired_yaw = math.atan2(direction_to_goal.y, direction_to_goal.x) @@ -288,7 +279,7 @@ def _calculate_rotation_to_target(self, direction_to_goal: Vector) -> Vector: # Try flipping the sign in case the rotation convention is opposite return Vector3(0, 0, -angular_velocity) - def _debug_direction(self, name: str, direction: Vector) -> Vector: + def _debug_direction(self, name: str, direction: Vector3) -> Vector3: """Debug helper to log direction information""" robot_pos = self.latest_odom if robot_pos is None: @@ -300,7 +291,7 @@ def _debug_direction(self, name: str, direction: Vector) -> Vector: ) return direction - def _debug_robot_command(self, robot_cmd: Vector) -> Vector: + def _debug_robot_command(self, robot_cmd: Vector3) -> Vector3: """Debug helper to log robot command information""" print( f"DEBUG robot_command: x={robot_cmd.x:.3f}, y={robot_cmd.y:.3f} (forward/backward, left/right)" diff --git a/dimos/robot/unitree_webrtc/connection.py b/dimos/robot/unitree_webrtc/connection.py index 29080d1ebb..785939397f 100644 --- a/dimos/robot/unitree_webrtc/connection.py +++ b/dimos/robot/unitree_webrtc/connection.py @@ -35,7 +35,7 @@ from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.lowstate import LowStateMsg from dimos.robot.unitree_webrtc.type.odometry import Odometry -from dimos.types.pose import Pose +from dimos.msgs.geometry_msgs import Pose from dimos.types.vector import Vector from dimos.utils.reactive import backpressure, callback_to_observable diff --git a/dimos/robot/unitree_webrtc/multiprocess/example_usage.py b/dimos/robot/unitree_webrtc/multiprocess/example_usage.py index 6e11998ff5..f8a64b4b0b 100644 --- a/dimos/robot/unitree_webrtc/multiprocess/example_usage.py +++ b/dimos/robot/unitree_webrtc/multiprocess/example_usage.py @@ -19,19 +19,20 @@ import asyncio import os +import threading + +import reactivex as rx +import reactivex.operators as ops + +from dimos.agents.claude_agent import ClaudeAgent +from dimos.perception.object_detection_stream import ObjectDetectionStream from dimos.robot.unitree_webrtc.multiprocess.unitree_go2 import UnitreeGo2Light from dimos.robot.unitree_webrtc.multiprocess.unitree_go2_heavy import UnitreeGo2Heavy -from dimos.utils.reactive import backpressure -from dimos.perception.object_detection_stream import ObjectDetectionStream -from dimos.web.robot_web_interface import RobotWebInterface from dimos.robot.unitree_webrtc.unitree_skills import MyUnitreeSkills -from dimos.skills.skills import SkillLibrary, AbstractRobotSkill +from dimos.skills.skills import AbstractRobotSkill, SkillLibrary +from dimos.stream.audio.pipelines import stt, tts from dimos.utils.reactive import backpressure -import reactivex as rx -import reactivex.operators as ops -from dimos.stream.audio.pipelines import tts, stt -import threading -from dimos.agents.claude_agent import ClaudeAgent +from dimos.web.robot_web_interface import RobotWebInterface async def run_light_robot(): @@ -46,9 +47,9 @@ async def run_light_robot(): print(f"Robot position: {pose['position']}") print(f"Robot rotation: {pose['rotation']}") - from dimos.types.vector import Vector + from dimos.msgs.geometry_msgs import Vector3 - robot.move(Vector(0.5, 0, 0), duration=2.0) + # robot.move(Vector3(0.5, 0, 0), duration=2.0) robot.explore() @@ -146,7 +147,7 @@ async def run_heavy_robot(): if __name__ == "__main__": - use_heavy = True + use_heavy = False if use_heavy: print("Running UnitreeGo2Heavy with GPU modules...") diff --git a/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py index 9e82d52c9d..0098429606 100644 --- a/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py @@ -14,38 +14,39 @@ import asyncio import functools +import logging +import os import threading import time +import warnings from typing import Callable +from reactivex import Observable from reactivex import operators as ops import dimos.core.colors as colors from dimos import core from dimos.core import In, Module, Out, rpc -from dimos.msgs.geometry_msgs import PoseStamped, Vector3 +from dimos.msgs.foxglove_msgs import Arrow +from dimos.msgs.geometry_msgs import Pose, PoseStamped, Twist, Vector3 from dimos.msgs.sensor_msgs import Image from dimos.protocol import pubsub from dimos.robot.foxglove_bridge import FoxgloveBridge +from dimos.robot.frontier_exploration.wavefront_frontier_goal_selector import ( + WavefrontFrontierExplorer, +) from dimos.robot.global_planner import AstarPlanner from dimos.robot.local_planner.vfh_local_planner import VFHPurePursuitPlanner -from dimos.robot.unitree_webrtc.connection import VideoMessage, UnitreeWebRTCConnection +from dimos.robot.unitree_webrtc.connection import UnitreeWebRTCConnection, VideoMessage from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.map import Map from dimos.robot.unitree_webrtc.type.odometry import Odometry from dimos.types.costmap import Costmap from dimos.types.vector import Vector from dimos.utils.data import get_data +from dimos.utils.logging_config import setup_logger from dimos.utils.reactive import getter_streaming from dimos.utils.testing import TimedSensorReplay -from dimos.robot.frontier_exploration.wavefront_frontier_goal_selector import ( - WavefrontFrontierExplorer, -) -import os -import logging -import warnings -from dimos.utils.logging_config import setup_logger -from reactivex import Observable logger = setup_logger("dimos.robot.unitree_webrtc.multiprocess.unitree_go2", level=logging.INFO) @@ -150,14 +151,14 @@ def get_pos(self) -> Vector: class ControlModule(Module): - plancmd: Out[Vector3] = None + plancmd: Out[Pose] = None @rpc def start(self): def plancmd(): time.sleep(4) print(colors.red("requesting global plan")) - self.plancmd.publish(Vector3(0, 0, 0)) + self.plancmd.publish(Pose(0, 0, 0, 0, 0, 0, 1)) thread = threading.Thread(target=plancmd, daemon=True) thread.start() @@ -350,11 +351,12 @@ def get_video_stream(self, fps: int = 30) -> Observable: Observable stream of video frames """ # Import required modules for LCM subscription - from dimos.protocol.pubsub.lcmpubsub import LCM, Topic - from dimos.msgs.sensor_msgs import Image from reactivex import create from reactivex.disposable import Disposable + from dimos.msgs.sensor_msgs import Image + from dimos.protocol.pubsub.lcmpubsub import LCM, Topic + lcm_instance = LCM() lcm_instance.start() diff --git a/dimos/types/__init__.py b/dimos/types/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/dimos/types/pose.py b/dimos/types/pose.py deleted file mode 100644 index 455f22c189..0000000000 --- a/dimos/types/pose.py +++ /dev/null @@ -1,149 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import TypeVar, Union, Sequence -import numpy as np -from plum import dispatch -import math - -from dimos.types.vector import Vector, to_vector, to_numpy, VectorLike - - -T = TypeVar("T", bound="Pose") - -PoseLike = Union["Pose", VectorLike, Sequence[VectorLike]] - - -def yaw_to_matrix(yaw: float) -> np.ndarray: - """2-D yaw (about Z) to a 3×3 rotation matrix.""" - c, s = math.cos(yaw), math.sin(yaw) - return np.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) - - -class Pose(Vector): - """A pose in 3D space, consisting of a position vector and a rotation vector. - - Pose inherits from Vector and behaves like a vector for the position component. - The rotation vector is stored separately and can be accessed via the rot property. - """ - - _rot: Vector = None - - @dispatch - def __init__(self, pos: VectorLike): - self._data = to_numpy(pos) - self._rot = None - - @dispatch - def __init__(self, pos: VectorLike, rot: VectorLike): - self._data = to_numpy(pos) - self._rot = to_vector(rot) - - def __repr__(self) -> str: - return f"Pose({self.pos.__repr__()}, {self.rot.__repr__()})" - - def __str__(self) -> str: - return self.__repr__() - - def is_zero(self) -> bool: - return super().is_zero() and self.rot.is_zero() - - def __bool__(self) -> bool: - return not self.is_zero() - - def serialize(self): - """Serialize the pose to a dictionary.""" - return {"type": "pose", "pos": self.to_list(), "rot": self.rot.to_list()} - - def vector_to(self, target: Vector) -> Vector: - direction = target - self.pos.to_2d() - - cos_y = math.cos(-self.yaw) - sin_y = math.sin(-self.yaw) - - x = cos_y * direction.x - sin_y * direction.y - y = sin_y * direction.x + cos_y * direction.y - - return Vector(x, y) - - def __eq__(self, other) -> bool: - """Check if two poses are equal using numpy's allclose for floating point comparison.""" - if not isinstance(other, Pose): - return False - return np.allclose(self.pos._data, other.pos._data) and np.allclose( - self.rot._data, other.rot._data - ) - - @property - def rot(self) -> Vector: - if self._rot: - return self._rot - else: - return Vector(0, 0, 0) - - @property - def pos(self) -> Vector: - """Get the position vector (self).""" - return to_vector(self._data) - - def __add__(self: T, other) -> T: - """Override Vector's __add__ to handle Pose objects specially. - - When adding two Pose objects, both position and rotation components are added. - """ - if isinstance(other, Pose): - # Add both position and rotation components - result = super().__add__(other) - result._rot = self.rot + other.rot - return result - else: - # For other types, just use Vector's addition - return Pose(super().__add__(other), self.rot) - - @property - def yaw(self) -> float: - """Get the yaw (rotation around Z-axis) from the rotation vector.""" - return self.rot.z - - def __sub__(self: T, other) -> T: - """Override Vector's __sub__ to handle Pose objects specially. - - When subtracting two Pose objects, both position and rotation components are subtracted. - """ - if isinstance(other, Pose): - # Subtract both position and rotation components - result = super().__sub__(other) - result._rot = self.rot - other.rot - return result - else: - # For other types, just use Vector's subtraction - return super().__sub__(other) - - def __mul__(self: T, scalar: float) -> T: - return Pose(self.pos * scalar, self.rot) - - -@dispatch -def to_pose(pos: Pose) -> Pose: - return pos - - -@dispatch -def to_pose(pos: VectorLike) -> Pose: - return Pose(pos) - - -@dispatch -def to_pose(pos: Sequence[VectorLike]) -> Pose: - return Pose(*pos) diff --git a/dimos/types/test_pose.py b/dimos/types/test_pose.py deleted file mode 100644 index e95133e035..0000000000 --- a/dimos/types/test_pose.py +++ /dev/null @@ -1,323 +0,0 @@ -# Copyright 2025 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import numpy as np -import math -from dimos.types.pose import Pose, to_pose -from dimos.types.vector import Vector - - -def test_pose_default_init(): - """Test that default initialization of Pose() has zero vectors for pos and rot.""" - pose = Pose() - - # Check that pos is a zero vector - assert isinstance(pose.pos, Vector) - assert pose.pos.is_zero() - assert pose.pos.x == 0.0 - assert pose.pos.y == 0.0 - assert pose.pos.z == 0.0 - - # Check that rot is a zero vector - assert isinstance(pose.rot, Vector) - assert pose.rot.is_zero() - assert pose.rot.x == 0.0 - assert pose.rot.y == 0.0 - assert pose.rot.z == 0.0 - - assert pose.is_zero() - - assert not pose - - -def test_pose_vector_init(): - """Test initialization with custom vectors.""" - pos = Vector(1.0, 2.0, 3.0) - rot = Vector(4.0, 5.0, 6.0) - - pose = Pose(pos, rot) - - # Check pos vector - assert pose.pos == pos - assert pose.pos.x == 1.0 - assert pose.pos.y == 2.0 - assert pose.pos.z == 3.0 - - # Check rot vector - assert pose.rot == rot - assert pose.rot.x == 4.0 - assert pose.rot.y == 5.0 - assert pose.rot.z == 6.0 - - # even if pos has the same xyz as pos vector - # it shouldn't accept equality comparisons - # as both are not the same type - assert not pose == pos - - -def test_pose_partial_init(): - """Test initialization with only one custom vector.""" - pos = Vector(1.0, 2.0, 3.0) - assert pos - - # Only specify pos - pose1 = Pose(pos) - assert pose1.pos == pos - assert pose1.pos.x == 1.0 - assert pose1.pos.y == 2.0 - assert pose1.pos.z == 3.0 - assert not pose1.pos.is_zero() - - assert isinstance(pose1.rot, Vector) - assert pose1.rot.is_zero() - assert pose1.rot.x == 0.0 - assert pose1.rot.y == 0.0 - assert pose1.rot.z == 0.0 - - -def test_pose_equality(): - """Test equality comparison between positions.""" - pos1 = Pose(Vector(1.0, 2.0, 3.0), Vector(4.0, 5.0, 6.0)) - pos2 = Pose(Vector(1.0, 2.0, 3.0), Vector(4.0, 5.0, 6.0)) - pos3 = Pose(Vector(1.0, 2.0, 3.0), Vector(7.0, 8.0, 9.0)) - pos4 = Pose(Vector(7.0, 8.0, 9.0), Vector(4.0, 5.0, 6.0)) - - # Same pos and rot values should be equal - assert pos1 == pos2 - - # Different rot values should not be equal - assert pos1 != pos3 - - # Different pos values should not be equal - assert pos1 != pos4 - - # Pose should not equal a vector even if values match - assert pos1 != Vector(1.0, 2.0, 3.0) - - -def test_pose_vector_operations(): - """Test that Pose inherits Vector operations.""" - pos1 = Pose(Vector(1.0, 2.0, 3.0), Vector(4.0, 5.0, 6.0)) - pos2 = Pose(Vector(2.0, 3.0, 4.0), Vector(7.0, 8.0, 9.0)) - - # Addition should work on both position and rotation components - sum_pos = pos1 + pos2 - assert isinstance(sum_pos, Pose) - assert sum_pos.x == 3.0 - assert sum_pos.y == 5.0 - assert sum_pos.z == 7.0 - # Rotation should be added as well - assert sum_pos.rot.x == 11.0 # 4.0 + 7.0 - assert sum_pos.rot.y == 13.0 # 5.0 + 8.0 - assert sum_pos.rot.z == 15.0 # 6.0 + 9.0 - - # Subtraction should work on both position and rotation components - diff_pos = pos2 - pos1 - assert isinstance(diff_pos, Pose) - assert diff_pos.x == 1.0 - assert diff_pos.y == 1.0 - assert diff_pos.z == 1.0 - # Rotation should be subtracted as well - assert diff_pos.rot.x == 3.0 # 7.0 - 4.0 - assert diff_pos.rot.y == 3.0 # 8.0 - 5.0 - assert diff_pos.rot.z == 3.0 # 9.0 - 6.0 - - # Scalar multiplication - scaled_pos = pos1 * 2.0 - assert isinstance(scaled_pos, Pose) - assert scaled_pos.x == 2.0 - assert scaled_pos.y == 4.0 - assert scaled_pos.z == 6.0 - assert scaled_pos.rot == pos1.rot # Rotation not affected by scalar multiplication - - # Adding a Vector to a Pose (only affects position component) - vec = Vector(5.0, 6.0, 7.0) - pos_plus_vec = pos1 + vec - assert isinstance(pos_plus_vec, Pose) - assert pos_plus_vec.x == 6.0 - assert pos_plus_vec.y == 8.0 - assert pos_plus_vec.z == 10.0 - assert pos_plus_vec.rot == pos1.rot # Rotation unchanged - - -def test_pose_serialization(): - """Test pose serialization.""" - pos = Pose(Vector(1.0, 2.0, 3.0), Vector(4.0, 5.0, 6.0)) - serialized = pos.serialize() - - assert serialized["type"] == "pose" - assert serialized["pos"] == [1.0, 2.0, 3.0] - assert serialized["rot"] == [4.0, 5.0, 6.0] - - -def test_pose_initialization_with_arrays(): - """Test initialization with numpy arrays, lists and tuples.""" - # Test with numpy arrays - np_pos = np.array([1.0, 2.0, 3.0]) - np_rot = np.array([4.0, 5.0, 6.0]) - - pos1 = Pose(np_pos, np_rot) - - assert pos1.x == 1.0 - assert pos1.y == 2.0 - assert pos1.z == 3.0 - assert pos1.rot.x == 4.0 - assert pos1.rot.y == 5.0 - assert pos1.rot.z == 6.0 - - # Test with lists - list_pos = [7.0, 8.0, 9.0] - list_rot = [10.0, 11.0, 12.0] - pos2 = Pose(list_pos, list_rot) - - assert pos2.x == 7.0 - assert pos2.y == 8.0 - assert pos2.z == 9.0 - assert pos2.rot.x == 10.0 - assert pos2.rot.y == 11.0 - assert pos2.rot.z == 12.0 - - # Test with tuples - tuple_pos = (13.0, 14.0, 15.0) - tuple_rot = (16.0, 17.0, 18.0) - pos3 = Pose(tuple_pos, tuple_rot) - - assert pos3.x == 13.0 - assert pos3.y == 14.0 - assert pos3.z == 15.0 - assert pos3.rot.x == 16.0 - assert pos3.rot.y == 17.0 - assert pos3.rot.z == 18.0 - - -def test_to_pose_with_pose(): - """Test to_pose with Pose input.""" - # Create a pose - original_pos = Pose(Vector(1.0, 2.0, 3.0), Vector(4.0, 5.0, 6.0)) - - # Convert using to_pose - converted_pos = to_pose(original_pos) - - # Should return the exact same object - assert converted_pos is original_pos - assert converted_pos == original_pos - - # Check values - assert converted_pos.x == 1.0 - assert converted_pos.y == 2.0 - assert converted_pos.z == 3.0 - assert converted_pos.rot.x == 4.0 - assert converted_pos.rot.y == 5.0 - assert converted_pos.rot.z == 6.0 - - -def test_to_pose_with_vector(): - """Test to_pose with Vector input.""" - # Create a vector - vec = Vector(1.0, 2.0, 3.0) - - # Convert using to_pose - pos = to_pose(vec) - - # Should return a Pose with the vector as position and zero rotation - assert isinstance(pos, Pose) - assert pos.pos == vec - assert pos.x == 1.0 - assert pos.y == 2.0 - assert pos.z == 3.0 - - # Rotation should be zero - assert isinstance(pos.rot, Vector) - assert pos.rot.is_zero() - assert pos.rot.x == 0.0 - assert pos.rot.y == 0.0 - assert pos.rot.z == 0.0 - - -def test_to_pose_with_vectorlike(): - """Test to_pose with VectorLike inputs (arrays, lists, tuples).""" - # Test with numpy arrays - np_arr = np.array([1.0, 2.0, 3.0]) - pos1 = to_pose(np_arr) - - assert isinstance(pos1, Pose) - assert pos1.x == 1.0 - assert pos1.y == 2.0 - assert pos1.z == 3.0 - assert pos1.rot.is_zero() - - # Test with lists - list_val = [4.0, 5.0, 6.0] - pos2 = to_pose(list_val) - - assert isinstance(pos2, Pose) - assert pos2.x == 4.0 - assert pos2.y == 5.0 - assert pos2.z == 6.0 - assert pos2.rot.is_zero() - - # Test with tuples - tuple_val = (7.0, 8.0, 9.0) - pos3 = to_pose(tuple_val) - - assert isinstance(pos3, Pose) - assert pos3.x == 7.0 - assert pos3.y == 8.0 - assert pos3.z == 9.0 - assert pos3.rot.is_zero() - - -def test_to_pose_with_sequence(): - """Test to_pose with Sequence of VectorLike inputs.""" - # Test with sequence of two vectors - pos_vec = Vector(1.0, 2.0, 3.0) - rot_vec = Vector(4.0, 5.0, 6.0) - pos1 = to_pose([pos_vec, rot_vec]) - - assert isinstance(pos1, Pose) - assert pos1.pos == pos_vec - assert pos1.rot == rot_vec - assert pos1.x == 1.0 - assert pos1.y == 2.0 - assert pos1.z == 3.0 - assert pos1.rot.x == 4.0 - assert pos1.rot.y == 5.0 - assert pos1.rot.z == 6.0 - - # Test with sequence of lists - pos2 = to_pose([[7.0, 8.0, 9.0], [10.0, 11.0, 12.0]]) - - assert isinstance(pos2, Pose) - assert pos2.x == 7.0 - assert pos2.y == 8.0 - assert pos2.z == 9.0 - assert pos2.rot.x == 10.0 - assert pos2.rot.y == 11.0 - assert pos2.rot.z == 12.0 - - # Test with mixed sequence (tuple and numpy array) - pos3 = to_pose([(13.0, 14.0, 15.0), np.array([16.0, 17.0, 18.0])]) - - assert isinstance(pos3, Pose) - assert pos3.x == 13.0 - assert pos3.y == 14.0 - assert pos3.z == 15.0 - assert pos3.rot.x == 16.0 - assert pos3.rot.y == 17.0 - assert pos3.rot.z == 18.0 - - -def test_vector_transform(): - robot_pose = Pose(Vector(4.0, 2.0, 0.5), Vector(0.0, 0.0, math.pi / 2)) - target = Vector(1.0, 3.0, 0.0) - print(robot_pose.vector_to(target))