Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions dimos/msgs/geometry_msgs/Pose.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@

from __future__ import annotations

import struct
import traceback
from io import BytesIO
from typing import BinaryIO, TypeAlias
from typing import TypeAlias

from dimos_lcm.geometry_msgs import Pose as LCMPose
from dimos_lcm.geometry_msgs import Transform as LCMTransform

try:
from geometry_msgs.msg import Pose as ROSPose
from geometry_msgs.msg import Point as ROSPoint
from geometry_msgs.msg import Quaternion as ROSQuaternion
except ImportError:
ROSPose = None
ROSPoint = None
ROSQuaternion = None

from plum import dispatch

from dimos.msgs.geometry_msgs.Quaternion import Quaternion, QuaternionConvertable
Expand Down Expand Up @@ -207,6 +214,43 @@ def __add__(self, other: "Pose" | PoseConvertable | LCMTransform | Transform) ->

return Pose(new_position, new_orientation)

@classmethod
def from_ros_msg(cls, ros_msg: ROSPose) -> "Pose":
"""Create a Pose from a ROS geometry_msgs/Pose message.

Args:
ros_msg: ROS Pose message

Returns:
Pose instance
"""
position = Vector3(ros_msg.position.x, ros_msg.position.y, ros_msg.position.z)
orientation = Quaternion(
ros_msg.orientation.x,
ros_msg.orientation.y,
ros_msg.orientation.z,
ros_msg.orientation.w,
)
return cls(position, orientation)

def to_ros_msg(self) -> ROSPose:
"""Convert to a ROS geometry_msgs/Pose message.

Returns:
ROS Pose message
"""
ros_msg = ROSPose()
ros_msg.position = ROSPoint(
x=float(self.position.x), y=float(self.position.y), z=float(self.position.z)
)
ros_msg.orientation = ROSQuaternion(
x=float(self.orientation.x),
y=float(self.orientation.y),
z=float(self.orientation.z),
w=float(self.orientation.w),
)
return ros_msg


@dispatch
def to_pose(value: "Pose") -> "Pose":
Expand Down
47 changes: 47 additions & 0 deletions dimos/msgs/geometry_msgs/PoseStamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
from dimos_lcm.geometry_msgs import PoseStamped as LCMPoseStamped
from dimos_lcm.std_msgs import Header as LCMHeader
from dimos_lcm.std_msgs import Time as LCMTime

try:
from geometry_msgs.msg import PoseStamped as ROSPoseStamped
except ImportError:
ROSPoseStamped = None

from plum import dispatch

from dimos.msgs.geometry_msgs.Pose import Pose
Expand Down Expand Up @@ -109,3 +115,44 @@ def find_transform(self, other: PoseStamped) -> Transform:
translation=local_translation,
rotation=relative_rotation,
)

@classmethod
def from_ros_msg(cls, ros_msg: ROSPoseStamped) -> "PoseStamped":
"""Create a PoseStamped from a ROS geometry_msgs/PoseStamped message.

Args:
ros_msg: ROS PoseStamped message

Returns:
PoseStamped instance
"""
# Convert timestamp from ROS header
ts = ros_msg.header.stamp.sec + (ros_msg.header.stamp.nanosec / 1_000_000_000)

# Convert pose
pose = Pose.from_ros_msg(ros_msg.pose)

return cls(
ts=ts,
frame_id=ros_msg.header.frame_id,
position=pose.position,
orientation=pose.orientation,
)

def to_ros_msg(self) -> ROSPoseStamped:
"""Convert to a ROS geometry_msgs/PoseStamped message.

Returns:
ROS PoseStamped message
"""
ros_msg = ROSPoseStamped()

# Set header
ros_msg.header.frame_id = self.frame_id
ros_msg.header.stamp.sec = int(self.ts)
ros_msg.header.stamp.nanosec = int((self.ts - int(self.ts)) * 1_000_000_000)

# Set pose
ros_msg.pose = Pose.to_ros_msg(self)

return ros_msg
225 changes: 225 additions & 0 deletions dimos/msgs/geometry_msgs/PoseWithCovariance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import TypeAlias

import numpy as np
from dimos_lcm.geometry_msgs import PoseWithCovariance as LCMPoseWithCovariance
from plum import dispatch

try:
from geometry_msgs.msg import PoseWithCovariance as ROSPoseWithCovariance
except ImportError:
ROSPoseWithCovariance = None

from dimos.msgs.geometry_msgs.Pose import Pose, PoseConvertable
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Vector3 import Vector3

# Types that can be converted to/from PoseWithCovariance
PoseWithCovarianceConvertable: TypeAlias = (
tuple[PoseConvertable, list[float] | np.ndarray]
| LCMPoseWithCovariance
| dict[str, PoseConvertable | list[float] | np.ndarray]
)


class PoseWithCovariance(LCMPoseWithCovariance):
pose: Pose
msg_name = "geometry_msgs.PoseWithCovariance"

@dispatch
def __init__(self) -> None:
"""Initialize with default pose and zero covariance."""
self.pose = Pose()
self.covariance = np.zeros(36)

@dispatch
def __init__(
self, pose: Pose | PoseConvertable, covariance: list[float] | np.ndarray | None = None
) -> None:
"""Initialize with pose and optional covariance."""
self.pose = Pose(pose) if not isinstance(pose, Pose) else pose
if covariance is None:
self.covariance = np.zeros(36)
else:
self.covariance = np.array(covariance, dtype=float).reshape(36)

@dispatch
def __init__(self, pose_with_cov: PoseWithCovariance) -> None:
"""Initialize from another PoseWithCovariance (copy constructor)."""
self.pose = Pose(pose_with_cov.pose)
self.covariance = np.array(pose_with_cov.covariance).copy()

@dispatch
def __init__(self, lcm_pose_with_cov: LCMPoseWithCovariance) -> None:
"""Initialize from an LCM PoseWithCovariance."""
self.pose = Pose(lcm_pose_with_cov.pose)
self.covariance = np.array(lcm_pose_with_cov.covariance)

@dispatch
def __init__(self, pose_dict: dict[str, PoseConvertable | list[float] | np.ndarray]) -> None:
"""Initialize from a dictionary with 'pose' and 'covariance' keys."""
self.pose = Pose(pose_dict["pose"])
covariance = pose_dict.get("covariance")
if covariance is None:
self.covariance = np.zeros(36)
else:
self.covariance = np.array(covariance, dtype=float).reshape(36)

@dispatch
def __init__(self, pose_tuple: tuple[PoseConvertable, list[float] | np.ndarray]) -> None:
"""Initialize from a tuple of (pose, covariance)."""
self.pose = Pose(pose_tuple[0])
self.covariance = np.array(pose_tuple[1], dtype=float).reshape(36)

def __getattribute__(self, name):
"""Override to ensure covariance is always returned as numpy array."""
if name == "covariance":
cov = object.__getattribute__(self, "covariance")
if not isinstance(cov, np.ndarray):
return np.array(cov, dtype=float)
return cov
return super().__getattribute__(name)

def __setattr__(self, name, value):
"""Override to ensure covariance is stored as numpy array."""
if name == "covariance":
if not isinstance(value, np.ndarray):
value = np.array(value, dtype=float).reshape(36)
super().__setattr__(name, value)

@property
def x(self) -> float:
"""X coordinate of position."""
return self.pose.x

@property
def y(self) -> float:
"""Y coordinate of position."""
return self.pose.y

@property
def z(self) -> float:
"""Z coordinate of position."""
return self.pose.z

@property
def position(self) -> Vector3:
"""Position vector."""
return self.pose.position

@property
def orientation(self) -> Quaternion:
"""Orientation quaternion."""
return self.pose.orientation

@property
def roll(self) -> float:
"""Roll angle in radians."""
return self.pose.roll

@property
def pitch(self) -> float:
"""Pitch angle in radians."""
return self.pose.pitch

@property
def yaw(self) -> float:
"""Yaw angle in radians."""
return self.pose.yaw

@property
def covariance_matrix(self) -> np.ndarray:
"""Get covariance as 6x6 matrix."""
return self.covariance.reshape(6, 6)

@covariance_matrix.setter
def covariance_matrix(self, value: np.ndarray) -> None:
"""Set covariance from 6x6 matrix."""
self.covariance = np.array(value).reshape(36)

def __repr__(self) -> str:
return f"PoseWithCovariance(pose={self.pose!r}, covariance=<{self.covariance.shape[0] if isinstance(self.covariance, np.ndarray) else len(self.covariance)} elements>)"

def __str__(self) -> str:
return (
f"PoseWithCovariance(pos=[{self.x:.3f}, {self.y:.3f}, {self.z:.3f}], "
f"euler=[{self.roll:.3f}, {self.pitch:.3f}, {self.yaw:.3f}], "
f"cov_trace={np.trace(self.covariance_matrix):.3f})"
)

def __eq__(self, other) -> bool:
"""Check if two PoseWithCovariance are equal."""
if not isinstance(other, PoseWithCovariance):
return False
return self.pose == other.pose and np.allclose(self.covariance, other.covariance)

def lcm_encode(self) -> bytes:
"""Encode to LCM binary format."""
lcm_msg = LCMPoseWithCovariance()
lcm_msg.pose = self.pose
# LCM expects list, not numpy array
if isinstance(self.covariance, np.ndarray):
lcm_msg.covariance = self.covariance.tolist()
else:
lcm_msg.covariance = list(self.covariance)
return lcm_msg.lcm_encode()

@classmethod
def lcm_decode(cls, data: bytes) -> "PoseWithCovariance":
"""Decode from LCM binary format."""
lcm_msg = LCMPoseWithCovariance.lcm_decode(data)
pose = Pose(
position=[lcm_msg.pose.position.x, lcm_msg.pose.position.y, lcm_msg.pose.position.z],
orientation=[
lcm_msg.pose.orientation.x,
lcm_msg.pose.orientation.y,
lcm_msg.pose.orientation.z,
lcm_msg.pose.orientation.w,
],
)
return cls(pose, lcm_msg.covariance)

@classmethod
def from_ros_msg(cls, ros_msg: ROSPoseWithCovariance) -> "PoseWithCovariance":
"""Create a PoseWithCovariance from a ROS geometry_msgs/PoseWithCovariance message.

Args:
ros_msg: ROS PoseWithCovariance message

Returns:
PoseWithCovariance instance
"""

pose = Pose.from_ros_msg(ros_msg.pose)
return cls(pose, list(ros_msg.covariance))

def to_ros_msg(self) -> ROSPoseWithCovariance:
"""Convert to a ROS geometry_msgs/PoseWithCovariance message.

Returns:
ROS PoseWithCovariance message
"""

ros_msg = ROSPoseWithCovariance()
ros_msg.pose = self.pose.to_ros_msg()
# ROS expects list, not numpy array
if isinstance(self.covariance, np.ndarray):
ros_msg.covariance = self.covariance.tolist()
else:
ros_msg.covariance = list(self.covariance)
return ros_msg
Loading