Skip to content
Closed
45 changes: 41 additions & 4 deletions dimos/msgs/geometry_msgs/Pose.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

from __future__ import annotations

import struct
import traceback
from io import BytesIO
from typing import BinaryIO, TypeAlias
from typing import TypeAlias

from dimos_lcm.geometry_msgs import Pose as LCMPose
from dimos_lcm.geometry_msgs import Transform as LCMTransform
from geometry_msgs.msg import Pose as ROSPose
from geometry_msgs.msg import Point as ROSPoint
from geometry_msgs.msg import Quaternion as ROSQuaternion
from plum import dispatch

from dimos.msgs.geometry_msgs.Quaternion import Quaternion, QuaternionConvertable
Expand Down Expand Up @@ -207,6 +207,43 @@ def __add__(self, other: "Pose" | PoseConvertable | LCMTransform | Transform) ->

return Pose(new_position, new_orientation)

@classmethod
def from_ros_msg(cls, ros_msg: ROSPose) -> "Pose":
"""Create a Pose from a ROS geometry_msgs/Pose message.

Args:
ros_msg: ROS Pose message

Returns:
Pose instance
"""
position = Vector3(ros_msg.position.x, ros_msg.position.y, ros_msg.position.z)
orientation = Quaternion(
ros_msg.orientation.x,
ros_msg.orientation.y,
ros_msg.orientation.z,
ros_msg.orientation.w,
)
return cls(position, orientation)

def to_ros_msg(self) -> ROSPose:
"""Convert to a ROS geometry_msgs/Pose message.

Returns:
ROS Pose message
"""
ros_msg = ROSPose()
ros_msg.position = ROSPoint(
x=float(self.position.x), y=float(self.position.y), z=float(self.position.z)
)
ros_msg.orientation = ROSQuaternion(
x=float(self.orientation.x),
y=float(self.orientation.y),
z=float(self.orientation.z),
w=float(self.orientation.w),
)
return ros_msg


@dispatch
def to_pose(value: "Pose") -> "Pose":
Expand Down
42 changes: 42 additions & 0 deletions dimos/msgs/geometry_msgs/PoseStamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dimos_lcm.geometry_msgs import PoseStamped as LCMPoseStamped
from dimos_lcm.std_msgs import Header as LCMHeader
from dimos_lcm.std_msgs import Time as LCMTime
from geometry_msgs.msg import PoseStamped as ROSPoseStamped
from plum import dispatch

from dimos.msgs.geometry_msgs.Pose import Pose
Expand Down Expand Up @@ -109,3 +110,44 @@ def find_transform(self, other: PoseStamped) -> Transform:
translation=local_translation,
rotation=relative_rotation,
)

@classmethod
def from_ros_msg(cls, ros_msg: ROSPoseStamped) -> "PoseStamped":
"""Create a PoseStamped from a ROS geometry_msgs/PoseStamped message.

Args:
ros_msg: ROS PoseStamped message

Returns:
PoseStamped instance
"""
# Convert timestamp from ROS header
ts = ros_msg.header.stamp.sec + (ros_msg.header.stamp.nanosec / 1_000_000_000)

# Convert pose
pose = Pose.from_ros_msg(ros_msg.pose)

return cls(
ts=ts,
frame_id=ros_msg.header.frame_id,
position=pose.position,
orientation=pose.orientation,
)

def to_ros_msg(self) -> ROSPoseStamped:
"""Convert to a ROS geometry_msgs/PoseStamped message.

Returns:
ROS PoseStamped message
"""
ros_msg = ROSPoseStamped()

# Set header
ros_msg.header.frame_id = self.frame_id
ros_msg.header.stamp.sec = int(self.ts)
ros_msg.header.stamp.nanosec = int((self.ts - int(self.ts)) * 1_000_000_000)

# Set pose
ros_msg.pose = Pose.to_ros_msg(self)

return ros_msg
219 changes: 219 additions & 0 deletions dimos/msgs/geometry_msgs/PoseWithCovariance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import TypeAlias

import numpy as np
from dimos_lcm.geometry_msgs import PoseWithCovariance as LCMPoseWithCovariance
from geometry_msgs.msg import PoseWithCovariance as ROSPoseWithCovariance
from plum import dispatch

from dimos.msgs.geometry_msgs.Pose import Pose, PoseConvertable
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Vector3 import Vector3

# Types that can be converted to/from PoseWithCovariance
PoseWithCovarianceConvertable: TypeAlias = (
tuple[PoseConvertable, list[float] | np.ndarray]
| LCMPoseWithCovariance
| dict[str, PoseConvertable | list[float] | np.ndarray]
)


class PoseWithCovariance(LCMPoseWithCovariance):
pose: Pose
msg_name = "geometry_msgs.PoseWithCovariance"

@dispatch
def __init__(self) -> None:
"""Initialize with default pose and zero covariance."""
self.pose = Pose()
self.covariance = np.zeros(36)

@dispatch
def __init__(
self, pose: Pose | PoseConvertable, covariance: list[float] | np.ndarray | None = None
) -> None:
"""Initialize with pose and optional covariance."""
self.pose = Pose(pose) if not isinstance(pose, Pose) else pose
if covariance is None:
self.covariance = np.zeros(36)
else:
self.covariance = np.array(covariance, dtype=float).reshape(36)

@dispatch
def __init__(self, pose_with_cov: PoseWithCovariance) -> None:
"""Initialize from another PoseWithCovariance (copy constructor)."""
self.pose = Pose(pose_with_cov.pose)
self.covariance = np.array(pose_with_cov.covariance).copy()

@dispatch
def __init__(self, lcm_pose_with_cov: LCMPoseWithCovariance) -> None:
"""Initialize from an LCM PoseWithCovariance."""
self.pose = Pose(lcm_pose_with_cov.pose)
self.covariance = np.array(lcm_pose_with_cov.covariance)

@dispatch
def __init__(self, pose_dict: dict[str, PoseConvertable | list[float] | np.ndarray]) -> None:
"""Initialize from a dictionary with 'pose' and 'covariance' keys."""
self.pose = Pose(pose_dict["pose"])
covariance = pose_dict.get("covariance")
if covariance is None:
self.covariance = np.zeros(36)
else:
self.covariance = np.array(covariance, dtype=float).reshape(36)

@dispatch
def __init__(self, pose_tuple: tuple[PoseConvertable, list[float] | np.ndarray]) -> None:
"""Initialize from a tuple of (pose, covariance)."""
self.pose = Pose(pose_tuple[0])
self.covariance = np.array(pose_tuple[1], dtype=float).reshape(36)

def __getattribute__(self, name):
"""Override to ensure covariance is always returned as numpy array."""
if name == "covariance":
cov = object.__getattribute__(self, "covariance")
if not isinstance(cov, np.ndarray):
return np.array(cov, dtype=float)
return cov
return super().__getattribute__(name)

def __setattr__(self, name, value):
"""Override to ensure covariance is stored as numpy array."""
if name == "covariance":
if not isinstance(value, np.ndarray):
value = np.array(value, dtype=float).reshape(36)
super().__setattr__(name, value)

@property
def x(self) -> float:
"""X coordinate of position."""
return self.pose.x

@property
def y(self) -> float:
"""Y coordinate of position."""
return self.pose.y

@property
def z(self) -> float:
"""Z coordinate of position."""
return self.pose.z

@property
def position(self) -> Vector3:
"""Position vector."""
return self.pose.position

@property
def orientation(self) -> Quaternion:
"""Orientation quaternion."""
return self.pose.orientation

@property
def roll(self) -> float:
"""Roll angle in radians."""
return self.pose.roll

@property
def pitch(self) -> float:
"""Pitch angle in radians."""
return self.pose.pitch

@property
def yaw(self) -> float:
"""Yaw angle in radians."""
return self.pose.yaw

@property
def covariance_matrix(self) -> np.ndarray:
"""Get covariance as 6x6 matrix."""
return self.covariance.reshape(6, 6)

@covariance_matrix.setter
def covariance_matrix(self, value: np.ndarray) -> None:
"""Set covariance from 6x6 matrix."""
self.covariance = np.array(value).reshape(36)

def __repr__(self) -> str:
return f"PoseWithCovariance(pose={self.pose!r}, covariance=<{self.covariance.shape[0] if isinstance(self.covariance, np.ndarray) else len(self.covariance)} elements>)"

def __str__(self) -> str:
return (
f"PoseWithCovariance(pos=[{self.x:.3f}, {self.y:.3f}, {self.z:.3f}], "
f"euler=[{self.roll:.3f}, {self.pitch:.3f}, {self.yaw:.3f}], "
f"cov_trace={np.trace(self.covariance_matrix):.3f})"
)

def __eq__(self, other) -> bool:
"""Check if two PoseWithCovariance are equal."""
if not isinstance(other, PoseWithCovariance):
return False
return self.pose == other.pose and np.allclose(self.covariance, other.covariance)

def lcm_encode(self) -> bytes:
"""Encode to LCM binary format."""
lcm_msg = LCMPoseWithCovariance()
lcm_msg.pose = self.pose
# LCM expects list, not numpy array
if isinstance(self.covariance, np.ndarray):
lcm_msg.covariance = self.covariance.tolist()
else:
lcm_msg.covariance = list(self.covariance)
return lcm_msg.lcm_encode()

@classmethod
def lcm_decode(cls, data: bytes) -> "PoseWithCovariance":
"""Decode from LCM binary format."""
lcm_msg = LCMPoseWithCovariance.lcm_decode(data)
pose = Pose(
position=[lcm_msg.pose.position.x, lcm_msg.pose.position.y, lcm_msg.pose.position.z],
orientation=[
lcm_msg.pose.orientation.x,
lcm_msg.pose.orientation.y,
lcm_msg.pose.orientation.z,
lcm_msg.pose.orientation.w,
],
)
return cls(pose, lcm_msg.covariance)

@classmethod
def from_ros_msg(cls, ros_msg: ROSPoseWithCovariance) -> "PoseWithCovariance":
"""Create a PoseWithCovariance from a ROS geometry_msgs/PoseWithCovariance message.

Args:
ros_msg: ROS PoseWithCovariance message

Returns:
PoseWithCovariance instance
"""
pose = Pose.from_ros_msg(ros_msg.pose)
return cls(pose, list(ros_msg.covariance))

def to_ros_msg(self) -> ROSPoseWithCovariance:
"""Convert to a ROS geometry_msgs/PoseWithCovariance message.

Returns:
ROS PoseWithCovariance message
"""
ros_msg = ROSPoseWithCovariance()
ros_msg.pose = self.pose.to_ros_msg()
# ROS expects list, not numpy array
if isinstance(self.covariance, np.ndarray):
ros_msg.covariance = self.covariance.tolist()
else:
ros_msg.covariance = list(self.covariance)
return ros_msg
Loading