diff --git a/dimos/msgs/geometry_msgs/PoseWithCovariance.py b/dimos/msgs/geometry_msgs/PoseWithCovariance.py index 7e856da6dc..03ce7fd081 100644 --- a/dimos/msgs/geometry_msgs/PoseWithCovariance.py +++ b/dimos/msgs/geometry_msgs/PoseWithCovariance.py @@ -14,7 +14,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, TypeAlias +from typing import TYPE_CHECKING, Any, TypeAlias from dimos_lcm.geometry_msgs import ( PoseWithCovariance as LCMPoseWithCovariance, @@ -38,6 +38,7 @@ class PoseWithCovariance(LCMPoseWithCovariance): # type: ignore[misc] pose: Pose + covariance: np.ndarray[tuple[int], np.dtype[np.floating[Any]]] msg_name = "geometry_msgs.PoseWithCovariance" @dispatch diff --git a/dimos/msgs/geometry_msgs/TwistWithCovariance.py b/dimos/msgs/geometry_msgs/TwistWithCovariance.py index 09f234ea84..90ddb94d7c 100644 --- a/dimos/msgs/geometry_msgs/TwistWithCovariance.py +++ b/dimos/msgs/geometry_msgs/TwistWithCovariance.py @@ -14,7 +14,7 @@ from __future__ import annotations -from typing import TypeAlias +from typing import Any, TypeAlias from dimos_lcm.geometry_msgs import ( TwistWithCovariance as LCMTwistWithCovariance, @@ -35,6 +35,7 @@ class TwistWithCovariance(LCMTwistWithCovariance): # type: ignore[misc] twist: Twist + covariance: np.ndarray[tuple[int], np.dtype[np.floating[Any]]] msg_name = "geometry_msgs.TwistWithCovariance" @dispatch diff --git a/dimos/msgs/nav_msgs/Odometry.py b/dimos/msgs/nav_msgs/Odometry.py index e6d0844f46..584bc42418 100644 --- a/dimos/msgs/nav_msgs/Odometry.py +++ b/dimos/msgs/nav_msgs/Odometry.py @@ -15,11 +15,10 @@ from __future__ import annotations import time -from typing import TYPE_CHECKING, TypeAlias +from typing import TYPE_CHECKING from dimos_lcm.nav_msgs import Odometry as LCMOdometry import numpy as np -from plum import dispatch from dimos.msgs.geometry_msgs.Pose import Pose from dimos.msgs.geometry_msgs.PoseWithCovariance import PoseWithCovariance @@ -28,28 +27,15 @@ from dimos.types.timestamped import Timestamped if TYPE_CHECKING: + from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Vector3 import Vector3 -# Types that can be converted to/from Odometry -OdometryConvertable: TypeAlias = ( - LCMOdometry | dict[str, float | str | PoseWithCovariance | TwistWithCovariance | Pose | Twist] -) +class Odometry(Timestamped): + """Odometry message with pose, twist, and frame information.""" -def sec_nsec(ts): # type: ignore[no-untyped-def] - s = int(ts) - return [s, int((ts - s) * 1_000_000_000)] - - -class Odometry(LCMOdometry, Timestamped): # type: ignore[misc] - pose: PoseWithCovariance - twist: TwistWithCovariance msg_name = "nav_msgs.Odometry" - ts: float - frame_id: str - child_frame_id: str - @dispatch def __init__( self, ts: float = 0.0, @@ -58,234 +44,113 @@ def __init__( pose: PoseWithCovariance | Pose | None = None, twist: TwistWithCovariance | Twist | None = None, ) -> None: - """Initialize with timestamp, frame IDs, pose and twist. - - Args: - ts: Timestamp in seconds (defaults to current time if 0) - frame_id: Reference frame ID (e.g., "odom", "map") - child_frame_id: Child frame ID (e.g., "base_link", "base_footprint") - pose: Pose with covariance (or just Pose, covariance will be zero) - twist: Twist with covariance (or just Twist, covariance will be zero) - """ self.ts = ts if ts != 0 else time.time() self.frame_id = frame_id self.child_frame_id = child_frame_id - # Handle pose if pose is None: self.pose = PoseWithCovariance() - elif isinstance(pose, PoseWithCovariance): - self.pose = pose elif isinstance(pose, Pose): self.pose = PoseWithCovariance(pose) else: - self.pose = PoseWithCovariance(Pose(pose)) - - # Handle twist - if twist is None: - self.twist = TwistWithCovariance() - elif isinstance(twist, TwistWithCovariance): - self.twist = twist - elif isinstance(twist, Twist): - self.twist = TwistWithCovariance(twist) - else: - self.twist = TwistWithCovariance(Twist(twist)) - - @dispatch # type: ignore[no-redef] - def __init__(self, odometry: Odometry) -> None: - """Initialize from another Odometry (copy constructor).""" - self.ts = odometry.ts - self.frame_id = odometry.frame_id - self.child_frame_id = odometry.child_frame_id - self.pose = PoseWithCovariance(odometry.pose) - self.twist = TwistWithCovariance(odometry.twist) - - @dispatch # type: ignore[no-redef] - def __init__(self, lcm_odometry: LCMOdometry) -> None: - """Initialize from an LCM Odometry.""" - self.ts = lcm_odometry.header.stamp.sec + (lcm_odometry.header.stamp.nsec / 1_000_000_000) - self.frame_id = lcm_odometry.header.frame_id - self.child_frame_id = lcm_odometry.child_frame_id - self.pose = PoseWithCovariance(lcm_odometry.pose) - self.twist = TwistWithCovariance(lcm_odometry.twist) - - @dispatch # type: ignore[no-redef] - def __init__( - self, - odometry_dict: dict[ - str, float | str | PoseWithCovariance | TwistWithCovariance | Pose | Twist - ], - ) -> None: - """Initialize from a dictionary.""" - self.ts = odometry_dict.get("ts", odometry_dict.get("timestamp", time.time())) - self.frame_id = odometry_dict.get("frame_id", "") - self.child_frame_id = odometry_dict.get("child_frame_id", "") - - # Handle pose - pose = odometry_dict.get("pose") - if pose is None: - self.pose = PoseWithCovariance() - elif isinstance(pose, PoseWithCovariance): self.pose = pose - elif isinstance(pose, Pose): - self.pose = PoseWithCovariance(pose) - else: - self.pose = PoseWithCovariance(Pose(pose)) - # Handle twist - twist = odometry_dict.get("twist") if twist is None: self.twist = TwistWithCovariance() - elif isinstance(twist, TwistWithCovariance): - self.twist = twist elif isinstance(twist, Twist): self.twist = TwistWithCovariance(twist) else: - self.twist = TwistWithCovariance(Twist(twist)) + self.twist = twist + + # -- Convenience properties -- @property def position(self) -> Vector3: - """Get position from pose.""" return self.pose.position @property - def orientation(self): # type: ignore[no-untyped-def] - """Get orientation from pose.""" + def orientation(self) -> Quaternion: return self.pose.orientation @property def linear_velocity(self) -> Vector3: - """Get linear velocity from twist.""" return self.twist.linear @property def angular_velocity(self) -> Vector3: - """Get angular velocity from twist.""" return self.twist.angular @property def x(self) -> float: - """X position.""" return self.pose.x @property def y(self) -> float: - """Y position.""" return self.pose.y @property def z(self) -> float: - """Z position.""" return self.pose.z @property def vx(self) -> float: - """Linear velocity in X.""" return self.twist.linear.x @property def vy(self) -> float: - """Linear velocity in Y.""" return self.twist.linear.y @property def vz(self) -> float: - """Linear velocity in Z.""" return self.twist.linear.z @property def wx(self) -> float: - """Angular velocity around X (roll rate).""" return self.twist.angular.x @property def wy(self) -> float: - """Angular velocity around Y (pitch rate).""" return self.twist.angular.y @property def wz(self) -> float: - """Angular velocity around Z (yaw rate).""" return self.twist.angular.z @property def roll(self) -> float: - """Roll angle in radians.""" return self.pose.roll @property def pitch(self) -> float: - """Pitch angle in radians.""" return self.pose.pitch @property def yaw(self) -> float: - """Yaw angle in radians.""" return self.pose.yaw - def __repr__(self) -> str: - return ( - f"Odometry(ts={self.ts:.6f}, frame_id='{self.frame_id}', " - f"child_frame_id='{self.child_frame_id}', pose={self.pose!r}, twist={self.twist!r})" - ) - - def __str__(self) -> str: - return ( - f"Odometry:\n" - f" Timestamp: {self.ts:.6f}\n" - f" Frame: {self.frame_id} -> {self.child_frame_id}\n" - f" Position: [{self.x:.3f}, {self.y:.3f}, {self.z:.3f}]\n" - f" Orientation: [roll={self.roll:.3f}, pitch={self.pitch:.3f}, yaw={self.yaw:.3f}]\n" - f" Linear Velocity: [{self.vx:.3f}, {self.vy:.3f}, {self.vz:.3f}]\n" - f" Angular Velocity: [{self.wx:.3f}, {self.wy:.3f}, {self.wz:.3f}]" - ) - - def __eq__(self, other) -> bool: # type: ignore[no-untyped-def] - """Check if two Odometry messages are equal.""" - if not isinstance(other, Odometry): - return False - return ( - abs(self.ts - other.ts) < 1e-6 - and self.frame_id == other.frame_id - and self.child_frame_id == other.child_frame_id - and self.pose == other.pose - and self.twist == other.twist - ) + # -- Serialization -- def lcm_encode(self) -> bytes: - """Encode to LCM binary format.""" lcm_msg = LCMOdometry() - # Set header - [lcm_msg.header.stamp.sec, lcm_msg.header.stamp.nsec] = sec_nsec(self.ts) # type: ignore[no-untyped-call] + lcm_msg.header.stamp.sec, lcm_msg.header.stamp.nsec = self.ros_timestamp() lcm_msg.header.frame_id = self.frame_id lcm_msg.child_frame_id = self.child_frame_id - # Set pose with covariance lcm_msg.pose.pose = self.pose.pose - if isinstance(self.pose.covariance, np.ndarray): # type: ignore[has-type] - lcm_msg.pose.covariance = self.pose.covariance.tolist() # type: ignore[has-type] - else: - lcm_msg.pose.covariance = list(self.pose.covariance) # type: ignore[has-type] + lcm_msg.pose.covariance = list(np.asarray(self.pose.covariance)) - # Set twist with covariance lcm_msg.twist.twist = self.twist.twist - if isinstance(self.twist.covariance, np.ndarray): # type: ignore[has-type] - lcm_msg.twist.covariance = self.twist.covariance.tolist() # type: ignore[has-type] - else: - lcm_msg.twist.covariance = list(self.twist.covariance) # type: ignore[has-type] + lcm_msg.twist.covariance = list(np.asarray(self.twist.covariance)) return lcm_msg.lcm_encode() # type: ignore[no-any-return] @classmethod def lcm_decode(cls, data: bytes) -> Odometry: - """Decode from LCM binary format.""" lcm_msg = LCMOdometry.lcm_decode(data) - # Extract timestamp ts = lcm_msg.header.stamp.sec + (lcm_msg.header.stamp.nsec / 1_000_000_000) - # Create pose with covariance pose = Pose( position=[ lcm_msg.pose.pose.position.x, @@ -299,9 +164,6 @@ def lcm_decode(cls, data: bytes) -> Odometry: lcm_msg.pose.pose.orientation.w, ], ) - pose_with_cov = PoseWithCovariance(pose, lcm_msg.pose.covariance) - - # Create twist with covariance twist = Twist( linear=[ lcm_msg.twist.twist.linear.x, @@ -314,12 +176,41 @@ def lcm_decode(cls, data: bytes) -> Odometry: lcm_msg.twist.twist.angular.z, ], ) - twist_with_cov = TwistWithCovariance(twist, lcm_msg.twist.covariance) return cls( ts=ts, frame_id=lcm_msg.header.frame_id, child_frame_id=lcm_msg.child_frame_id, - pose=pose_with_cov, - twist=twist_with_cov, + pose=PoseWithCovariance(pose, lcm_msg.pose.covariance), + twist=TwistWithCovariance(twist, lcm_msg.twist.covariance), + ) + + # -- Comparison / display -- + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Odometry): + return False + return ( + abs(self.ts - other.ts) < 1e-6 + and self.frame_id == other.frame_id + and self.child_frame_id == other.child_frame_id + and self.pose == other.pose + and self.twist == other.twist + ) + + def __repr__(self) -> str: + return ( + f"Odometry(ts={self.ts:.6f}, frame_id='{self.frame_id}', " + f"child_frame_id='{self.child_frame_id}', pose={self.pose!r}, twist={self.twist!r})" + ) + + def __str__(self) -> str: + return ( + f"Odometry:\n" + f" Timestamp: {self.ts:.6f}\n" + f" Frame: {self.frame_id} -> {self.child_frame_id}\n" + f" Position: [{self.x:.3f}, {self.y:.3f}, {self.z:.3f}]\n" + f" Orientation: [roll={self.roll:.3f}, pitch={self.pitch:.3f}, yaw={self.yaw:.3f}]\n" + f" Linear Velocity: [{self.vx:.3f}, {self.vy:.3f}, {self.vz:.3f}]\n" + f" Angular Velocity: [{self.wx:.3f}, {self.wy:.3f}, {self.wz:.3f}]" ) diff --git a/dimos/msgs/nav_msgs/test_Odometry.py b/dimos/msgs/nav_msgs/test_Odometry.py index e6eadc51f8..c532aed1ca 100644 --- a/dimos/msgs/nav_msgs/test_Odometry.py +++ b/dimos/msgs/nav_msgs/test_Odometry.py @@ -25,48 +25,34 @@ def test_odometry_default_init() -> None: - """Test default initialization.""" odom = Odometry() - # Should have current timestamp assert odom.ts > 0 assert odom.frame_id == "" assert odom.child_frame_id == "" - # Pose should be at origin with identity orientation assert odom.pose.position.x == 0.0 assert odom.pose.position.y == 0.0 assert odom.pose.position.z == 0.0 assert odom.pose.orientation.w == 1.0 - # Twist should be zero assert odom.twist.linear.x == 0.0 - assert odom.twist.linear.y == 0.0 - assert odom.twist.linear.z == 0.0 assert odom.twist.angular.x == 0.0 - assert odom.twist.angular.y == 0.0 - assert odom.twist.angular.z == 0.0 - # Covariances should be zero assert np.all(odom.pose.covariance == 0.0) assert np.all(odom.twist.covariance == 0.0) def test_odometry_with_frames() -> None: - """Test initialization with frame IDs.""" ts = 1234567890.123456 - frame_id = "odom" - child_frame_id = "base_link" - - odom = Odometry(ts=ts, frame_id=frame_id, child_frame_id=child_frame_id) + odom = Odometry(ts=ts, frame_id="odom", child_frame_id="base_link") assert odom.ts == ts - assert odom.frame_id == frame_id - assert odom.child_frame_id == child_frame_id + assert odom.frame_id == "odom" + assert odom.child_frame_id == "base_link" def test_odometry_with_pose_and_twist() -> None: - """Test initialization with pose and twist.""" pose = Pose(1.0, 2.0, 3.0, 0.1, 0.2, 0.3, 0.9) twist = Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.1)) @@ -80,7 +66,6 @@ def test_odometry_with_pose_and_twist() -> None: def test_odometry_with_covariances() -> None: - """Test initialization with pose and twist with covariances.""" pose = Pose(1.0, 2.0, 3.0) pose_cov = np.arange(36, dtype=float) pose_with_cov = PoseWithCovariance(pose, pose_cov) @@ -103,88 +88,35 @@ def test_odometry_with_covariances() -> None: assert np.array_equal(odom.twist.covariance, twist_cov) -def test_odometry_copy_constructor() -> None: - """Test copy constructor.""" - original = Odometry( - ts=1000.0, - frame_id="odom", - child_frame_id="base_link", - pose=Pose(1.0, 2.0, 3.0), - twist=Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.1)), - ) - - copy = Odometry(original) - - assert copy == original - assert copy is not original - assert copy.pose is not original.pose - assert copy.twist is not original.twist - - -def test_odometry_dict_init() -> None: - """Test initialization from dictionary.""" - odom_dict = { - "ts": 1000.0, - "frame_id": "odom", - "child_frame_id": "base_link", - "pose": Pose(1.0, 2.0, 3.0), - "twist": Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.1)), - } - - odom = Odometry(odom_dict) - - assert odom.ts == 1000.0 - assert odom.frame_id == "odom" - assert odom.child_frame_id == "base_link" - assert odom.pose.position.x == 1.0 - assert odom.twist.linear.x == 0.5 - - def test_odometry_properties() -> None: - """Test convenience properties.""" pose = Pose(1.0, 2.0, 3.0, 0.1, 0.2, 0.3, 0.9) twist = Twist(Vector3(0.5, 0.6, 0.7), Vector3(0.1, 0.2, 0.3)) odom = Odometry(ts=1000.0, frame_id="odom", child_frame_id="base_link", pose=pose, twist=twist) - # Position properties assert odom.x == 1.0 assert odom.y == 2.0 assert odom.z == 3.0 assert odom.position.x == 1.0 - assert odom.position.y == 2.0 - assert odom.position.z == 3.0 - # Orientation properties assert odom.orientation.x == 0.1 - assert odom.orientation.y == 0.2 - assert odom.orientation.z == 0.3 - assert odom.orientation.w == 0.9 - # Velocity properties assert odom.vx == 0.5 assert odom.vy == 0.6 assert odom.vz == 0.7 assert odom.linear_velocity.x == 0.5 - assert odom.linear_velocity.y == 0.6 - assert odom.linear_velocity.z == 0.7 - # Angular velocity properties assert odom.wx == 0.1 assert odom.wy == 0.2 assert odom.wz == 0.3 assert odom.angular_velocity.x == 0.1 - assert odom.angular_velocity.y == 0.2 - assert odom.angular_velocity.z == 0.3 - # Euler angles assert odom.roll == pose.roll assert odom.pitch == pose.pitch assert odom.yaw == pose.yaw def test_odometry_str_repr() -> None: - """Test string representations.""" odom = Odometry( ts=1234567890.123456, frame_id="odom", @@ -193,22 +125,16 @@ def test_odometry_str_repr() -> None: twist=Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.1)), ) - repr_str = repr(odom) - assert "Odometry" in repr_str - assert "1234567890.123456" in repr_str - assert "odom" in repr_str - assert "base_link" in repr_str + assert "Odometry" in repr(odom) + assert "1234567890.123456" in repr(odom) - str_repr = str(odom) - assert "Odometry" in str_repr - assert "odom -> base_link" in str_repr - assert "1.234" in str_repr - assert "0.500" in str_repr + s = str(odom) + assert "odom -> base_link" in s + assert "1.234" in s def test_odometry_equality() -> None: - """Test equality comparison.""" - odom1 = Odometry( + kwargs = dict( ts=1000.0, frame_id="odom", child_frame_id="base_link", @@ -216,29 +142,12 @@ def test_odometry_equality() -> None: twist=Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.1)), ) - odom2 = Odometry( - ts=1000.0, - frame_id="odom", - child_frame_id="base_link", - pose=Pose(1.0, 2.0, 3.0), - twist=Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.1)), - ) - - odom3 = Odometry( - ts=1000.0, - frame_id="odom", - child_frame_id="base_link", - pose=Pose(1.1, 2.0, 3.0), # Different position - twist=Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.1)), - ) + assert Odometry(**kwargs) == Odometry(**kwargs) + assert Odometry(**kwargs) != Odometry(**{**kwargs, "pose": Pose(1.1, 2.0, 3.0)}) + assert Odometry(**kwargs) != "not an odometry" - assert odom1 == odom2 - assert odom1 != odom3 - assert odom1 != "not an odometry" - -def test_odometry_lcm_encode_decode() -> None: - """Test LCM encoding and decoding.""" +def test_odometry_lcm_roundtrip() -> None: pose = Pose(1.0, 2.0, 3.0, 0.1, 0.2, 0.3, 0.9) pose_cov = np.arange(36, dtype=float) twist = Twist(Vector3(0.5, 0.6, 0.7), Vector3(0.1, 0.2, 0.3)) @@ -252,11 +161,8 @@ def test_odometry_lcm_encode_decode() -> None: twist=TwistWithCovariance(twist, twist_cov), ) - # Encode and decode - binary_msg = source.lcm_encode() - decoded = Odometry.lcm_decode(binary_msg) + decoded = Odometry.lcm_decode(source.lcm_encode()) - # Check values (allowing for timestamp precision loss) assert abs(decoded.ts - source.ts) < 1e-6 assert decoded.frame_id == source.frame_id assert decoded.child_frame_id == source.child_frame_id @@ -265,56 +171,38 @@ def test_odometry_lcm_encode_decode() -> None: def test_odometry_zero_timestamp() -> None: - """Test that zero timestamp gets replaced with current time.""" odom = Odometry(ts=0.0) - - # Should have been replaced with current time assert odom.ts > 0 assert odom.ts <= time.time() def test_odometry_with_just_pose() -> None: - """Test initialization with just a Pose (no covariance).""" - pose = Pose(1.0, 2.0, 3.0) - - odom = Odometry(pose=pose) + odom = Odometry(pose=Pose(1.0, 2.0, 3.0)) assert odom.pose.position.x == 1.0 - assert odom.pose.position.y == 2.0 - assert odom.pose.position.z == 3.0 - assert np.all(odom.pose.covariance == 0.0) # Should have zero covariance - assert np.all(odom.twist.covariance == 0.0) # Twist should also be zero + assert np.all(odom.pose.covariance == 0.0) + assert np.all(odom.twist.covariance == 0.0) def test_odometry_with_just_twist() -> None: - """Test initialization with just a Twist (no covariance).""" - twist = Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.1)) - - odom = Odometry(twist=twist) + odom = Odometry(twist=Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.1))) assert odom.twist.linear.x == 0.5 assert odom.twist.angular.z == 0.1 - assert np.all(odom.twist.covariance == 0.0) # Should have zero covariance - assert np.all(odom.pose.covariance == 0.0) # Pose should also be zero + assert np.all(odom.twist.covariance == 0.0) def test_odometry_typical_robot_scenario() -> None: - """Test a typical robot odometry scenario.""" - # Robot moving forward at 0.5 m/s with slight rotation odom = Odometry( ts=1000.0, frame_id="odom", child_frame_id="base_footprint", - pose=Pose(10.0, 5.0, 0.0, 0.0, 0.0, np.sin(0.1), np.cos(0.1)), # 0.2 rad yaw - twist=Twist( - Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.05) - ), # Moving forward, turning slightly + pose=Pose(10.0, 5.0, 0.0, 0.0, 0.0, np.sin(0.1), np.cos(0.1)), + twist=Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.05)), ) - # Check we can access all the typical properties assert odom.x == 10.0 assert odom.y == 5.0 - assert odom.z == 0.0 - assert abs(odom.yaw - 0.2) < 0.01 # Approximately 0.2 radians - assert odom.vx == 0.5 # Forward velocity - assert odom.wz == 0.05 # Yaw rate + assert abs(odom.yaw - 0.2) < 0.01 + assert odom.vx == 0.5 + assert odom.wz == 0.05