Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dimos/msgs/geometry_msgs/PoseWithCovariance.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING, TypeAlias
from typing import TYPE_CHECKING, Any, TypeAlias

from dimos_lcm.geometry_msgs import (
PoseWithCovariance as LCMPoseWithCovariance,
Expand All @@ -38,6 +38,7 @@

class PoseWithCovariance(LCMPoseWithCovariance): # type: ignore[misc]
pose: Pose
covariance: np.ndarray[tuple[int], np.dtype[np.floating[Any]]]
msg_name = "geometry_msgs.PoseWithCovariance"

@dispatch
Expand Down
3 changes: 2 additions & 1 deletion dimos/msgs/geometry_msgs/TwistWithCovariance.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import annotations

from typing import TypeAlias
from typing import Any, TypeAlias

from dimos_lcm.geometry_msgs import (
TwistWithCovariance as LCMTwistWithCovariance,
Expand All @@ -35,6 +35,7 @@

class TwistWithCovariance(LCMTwistWithCovariance): # type: ignore[misc]
twist: Twist
covariance: np.ndarray[tuple[int], np.dtype[np.floating[Any]]]
msg_name = "geometry_msgs.TwistWithCovariance"

@dispatch
Expand Down
197 changes: 44 additions & 153 deletions dimos/msgs/nav_msgs/Odometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
from __future__ import annotations

import time
from typing import TYPE_CHECKING, TypeAlias
from typing import TYPE_CHECKING

from dimos_lcm.nav_msgs import Odometry as LCMOdometry
import numpy as np
from plum import dispatch

from dimos.msgs.geometry_msgs.Pose import Pose
from dimos.msgs.geometry_msgs.PoseWithCovariance import PoseWithCovariance
Expand All @@ -28,28 +27,15 @@
from dimos.types.timestamped import Timestamped

if TYPE_CHECKING:
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Vector3 import Vector3

# Types that can be converted to/from Odometry
OdometryConvertable: TypeAlias = (
LCMOdometry | dict[str, float | str | PoseWithCovariance | TwistWithCovariance | Pose | Twist]
)

class Odometry(Timestamped):
"""Odometry message with pose, twist, and frame information."""

def sec_nsec(ts): # type: ignore[no-untyped-def]
s = int(ts)
return [s, int((ts - s) * 1_000_000_000)]


class Odometry(LCMOdometry, Timestamped): # type: ignore[misc]
pose: PoseWithCovariance
twist: TwistWithCovariance
msg_name = "nav_msgs.Odometry"
ts: float
frame_id: str
child_frame_id: str

@dispatch
def __init__(
self,
ts: float = 0.0,
Expand All @@ -58,234 +44,113 @@ def __init__(
pose: PoseWithCovariance | Pose | None = None,
twist: TwistWithCovariance | Twist | None = None,
) -> None:
"""Initialize with timestamp, frame IDs, pose and twist.

Args:
ts: Timestamp in seconds (defaults to current time if 0)
frame_id: Reference frame ID (e.g., "odom", "map")
child_frame_id: Child frame ID (e.g., "base_link", "base_footprint")
pose: Pose with covariance (or just Pose, covariance will be zero)
twist: Twist with covariance (or just Twist, covariance will be zero)
"""
self.ts = ts if ts != 0 else time.time()
self.frame_id = frame_id
self.child_frame_id = child_frame_id

# Handle pose
if pose is None:
self.pose = PoseWithCovariance()
elif isinstance(pose, PoseWithCovariance):
self.pose = pose
elif isinstance(pose, Pose):
self.pose = PoseWithCovariance(pose)
else:
self.pose = PoseWithCovariance(Pose(pose))

# Handle twist
if twist is None:
self.twist = TwistWithCovariance()
elif isinstance(twist, TwistWithCovariance):
self.twist = twist
elif isinstance(twist, Twist):
self.twist = TwistWithCovariance(twist)
else:
self.twist = TwistWithCovariance(Twist(twist))

@dispatch # type: ignore[no-redef]
def __init__(self, odometry: Odometry) -> None:
"""Initialize from another Odometry (copy constructor)."""
self.ts = odometry.ts
self.frame_id = odometry.frame_id
self.child_frame_id = odometry.child_frame_id
self.pose = PoseWithCovariance(odometry.pose)
self.twist = TwistWithCovariance(odometry.twist)

@dispatch # type: ignore[no-redef]
def __init__(self, lcm_odometry: LCMOdometry) -> None:
"""Initialize from an LCM Odometry."""
self.ts = lcm_odometry.header.stamp.sec + (lcm_odometry.header.stamp.nsec / 1_000_000_000)
self.frame_id = lcm_odometry.header.frame_id
self.child_frame_id = lcm_odometry.child_frame_id
self.pose = PoseWithCovariance(lcm_odometry.pose)
self.twist = TwistWithCovariance(lcm_odometry.twist)

@dispatch # type: ignore[no-redef]
def __init__(
self,
odometry_dict: dict[
str, float | str | PoseWithCovariance | TwistWithCovariance | Pose | Twist
],
) -> None:
"""Initialize from a dictionary."""
self.ts = odometry_dict.get("ts", odometry_dict.get("timestamp", time.time()))
self.frame_id = odometry_dict.get("frame_id", "")
self.child_frame_id = odometry_dict.get("child_frame_id", "")

# Handle pose
pose = odometry_dict.get("pose")
if pose is None:
self.pose = PoseWithCovariance()
elif isinstance(pose, PoseWithCovariance):
self.pose = pose
elif isinstance(pose, Pose):
self.pose = PoseWithCovariance(pose)
else:
self.pose = PoseWithCovariance(Pose(pose))

# Handle twist
twist = odometry_dict.get("twist")
if twist is None:
self.twist = TwistWithCovariance()
elif isinstance(twist, TwistWithCovariance):
self.twist = twist
elif isinstance(twist, Twist):
self.twist = TwistWithCovariance(twist)
else:
self.twist = TwistWithCovariance(Twist(twist))
self.twist = twist

# -- Convenience properties --

@property
def position(self) -> Vector3:
"""Get position from pose."""
return self.pose.position

@property
def orientation(self): # type: ignore[no-untyped-def]
"""Get orientation from pose."""
def orientation(self) -> Quaternion:
return self.pose.orientation

@property
def linear_velocity(self) -> Vector3:
"""Get linear velocity from twist."""
return self.twist.linear

@property
def angular_velocity(self) -> Vector3:
"""Get angular velocity from twist."""
return self.twist.angular

@property
def x(self) -> float:
"""X position."""
return self.pose.x

@property
def y(self) -> float:
"""Y position."""
return self.pose.y

@property
def z(self) -> float:
"""Z position."""
return self.pose.z

@property
def vx(self) -> float:
"""Linear velocity in X."""
return self.twist.linear.x

@property
def vy(self) -> float:
"""Linear velocity in Y."""
return self.twist.linear.y

@property
def vz(self) -> float:
"""Linear velocity in Z."""
return self.twist.linear.z

@property
def wx(self) -> float:
"""Angular velocity around X (roll rate)."""
return self.twist.angular.x

@property
def wy(self) -> float:
"""Angular velocity around Y (pitch rate)."""
return self.twist.angular.y

@property
def wz(self) -> float:
"""Angular velocity around Z (yaw rate)."""
return self.twist.angular.z

@property
def roll(self) -> float:
"""Roll angle in radians."""
return self.pose.roll

@property
def pitch(self) -> float:
"""Pitch angle in radians."""
return self.pose.pitch

@property
def yaw(self) -> float:
"""Yaw angle in radians."""
return self.pose.yaw

def __repr__(self) -> str:
return (
f"Odometry(ts={self.ts:.6f}, frame_id='{self.frame_id}', "
f"child_frame_id='{self.child_frame_id}', pose={self.pose!r}, twist={self.twist!r})"
)

def __str__(self) -> str:
return (
f"Odometry:\n"
f" Timestamp: {self.ts:.6f}\n"
f" Frame: {self.frame_id} -> {self.child_frame_id}\n"
f" Position: [{self.x:.3f}, {self.y:.3f}, {self.z:.3f}]\n"
f" Orientation: [roll={self.roll:.3f}, pitch={self.pitch:.3f}, yaw={self.yaw:.3f}]\n"
f" Linear Velocity: [{self.vx:.3f}, {self.vy:.3f}, {self.vz:.3f}]\n"
f" Angular Velocity: [{self.wx:.3f}, {self.wy:.3f}, {self.wz:.3f}]"
)

def __eq__(self, other) -> bool: # type: ignore[no-untyped-def]
"""Check if two Odometry messages are equal."""
if not isinstance(other, Odometry):
return False
return (
abs(self.ts - other.ts) < 1e-6
and self.frame_id == other.frame_id
and self.child_frame_id == other.child_frame_id
and self.pose == other.pose
and self.twist == other.twist
)
# -- Serialization --

def lcm_encode(self) -> bytes:
"""Encode to LCM binary format."""
lcm_msg = LCMOdometry()

# Set header
[lcm_msg.header.stamp.sec, lcm_msg.header.stamp.nsec] = sec_nsec(self.ts) # type: ignore[no-untyped-call]
lcm_msg.header.stamp.sec, lcm_msg.header.stamp.nsec = self.ros_timestamp()
lcm_msg.header.frame_id = self.frame_id
lcm_msg.child_frame_id = self.child_frame_id

# Set pose with covariance
lcm_msg.pose.pose = self.pose.pose
if isinstance(self.pose.covariance, np.ndarray): # type: ignore[has-type]
lcm_msg.pose.covariance = self.pose.covariance.tolist() # type: ignore[has-type]
else:
lcm_msg.pose.covariance = list(self.pose.covariance) # type: ignore[has-type]
lcm_msg.pose.covariance = list(np.asarray(self.pose.covariance))

# Set twist with covariance
lcm_msg.twist.twist = self.twist.twist
if isinstance(self.twist.covariance, np.ndarray): # type: ignore[has-type]
lcm_msg.twist.covariance = self.twist.covariance.tolist() # type: ignore[has-type]
else:
lcm_msg.twist.covariance = list(self.twist.covariance) # type: ignore[has-type]
lcm_msg.twist.covariance = list(np.asarray(self.twist.covariance))

return lcm_msg.lcm_encode() # type: ignore[no-any-return]

@classmethod
def lcm_decode(cls, data: bytes) -> Odometry:
"""Decode from LCM binary format."""
lcm_msg = LCMOdometry.lcm_decode(data)

# Extract timestamp
ts = lcm_msg.header.stamp.sec + (lcm_msg.header.stamp.nsec / 1_000_000_000)

# Create pose with covariance
pose = Pose(
position=[
lcm_msg.pose.pose.position.x,
Expand All @@ -299,9 +164,6 @@ def lcm_decode(cls, data: bytes) -> Odometry:
lcm_msg.pose.pose.orientation.w,
],
)
pose_with_cov = PoseWithCovariance(pose, lcm_msg.pose.covariance)

# Create twist with covariance
twist = Twist(
linear=[
lcm_msg.twist.twist.linear.x,
Expand All @@ -314,12 +176,41 @@ def lcm_decode(cls, data: bytes) -> Odometry:
lcm_msg.twist.twist.angular.z,
],
)
twist_with_cov = TwistWithCovariance(twist, lcm_msg.twist.covariance)

return cls(
ts=ts,
frame_id=lcm_msg.header.frame_id,
child_frame_id=lcm_msg.child_frame_id,
pose=pose_with_cov,
twist=twist_with_cov,
pose=PoseWithCovariance(pose, lcm_msg.pose.covariance),
twist=TwistWithCovariance(twist, lcm_msg.twist.covariance),
)

# -- Comparison / display --

def __eq__(self, other: object) -> bool:
if not isinstance(other, Odometry):
return False
return (
abs(self.ts - other.ts) < 1e-6
and self.frame_id == other.frame_id
and self.child_frame_id == other.child_frame_id
and self.pose == other.pose
and self.twist == other.twist
)

def __repr__(self) -> str:
return (
f"Odometry(ts={self.ts:.6f}, frame_id='{self.frame_id}', "
f"child_frame_id='{self.child_frame_id}', pose={self.pose!r}, twist={self.twist!r})"
)

def __str__(self) -> str:
return (
f"Odometry:\n"
f" Timestamp: {self.ts:.6f}\n"
f" Frame: {self.frame_id} -> {self.child_frame_id}\n"
f" Position: [{self.x:.3f}, {self.y:.3f}, {self.z:.3f}]\n"
f" Orientation: [roll={self.roll:.3f}, pitch={self.pitch:.3f}, yaw={self.yaw:.3f}]\n"
f" Linear Velocity: [{self.vx:.3f}, {self.vy:.3f}, {self.vz:.3f}]\n"
f" Angular Velocity: [{self.wx:.3f}, {self.wy:.3f}, {self.wz:.3f}]"
)
Loading