Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dimos/perception/test_spatial_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from dimos.perception.spatial_perception import SpatialMemory
from dimos.stream.video_provider import VideoProvider
from dimos.types.position import Position
from dimos.types.pose import Pose
from dimos.types.vector import Vector


Expand Down Expand Up @@ -117,7 +117,7 @@ def process_frame(frame):
nonlocal frame_counter

# Generate a unique position for this frame to ensure minimum distance threshold is met
pos = Position(frame_counter * 0.5, frame_counter * 0.5, 0)
pos = Pose(frame_counter * 0.5, frame_counter * 0.5, 0)
transform = {"position": pos, "timestamp": time.time()}
frame_counter += 1

Expand Down
6 changes: 3 additions & 3 deletions dimos/robot/local_planner/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
# from dimos.robot.local_planner.local_planner import LocalPlanner
from dimos.types.costmap import Costmap
from dimos.types.path import Path
from dimos.types.position import Position
from dimos.types.pose import Pose
from dimos.types.vector import Vector, VectorLike, to_vector
from dimos.utils.logging_config import setup_logger
from dimos.utils.threadpool import get_scheduler

logger = setup_logger("dimos.robot.unitree.global_planner")


def transform_to_robot_frame(global_vector: Vector, robot_position: Position) -> Vector:
def transform_to_robot_frame(global_vector: Vector, robot_position: Pose) -> Vector:
"""Transform a global coordinate vector to robot-relative coordinates.

Args:
Expand Down Expand Up @@ -68,7 +68,7 @@ class SimplePlanner(Module):
movecmd: Out[Vector3] = None

get_costmap: Callable[[], Costmap]
get_robot_pos: Callable[[], Position]
get_robot_pos: Callable[[], Pose]
set_move: Callable[[Vector3], Any]
goal: Optional[Vector] = None
speed: float = 0.3
Expand Down
6 changes: 3 additions & 3 deletions dimos/robot/unitree_webrtc/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from dimos.robot.unitree_webrtc.type.lidar import LidarMessage
from dimos.robot.unitree_webrtc.type.lowstate import LowStateMsg
from dimos.robot.unitree_webrtc.type.odometry import Odometry
from dimos.types.position import Position
from dimos.types.pose import Pose
from dimos.types.vector import Vector
from dimos.utils.reactive import backpressure, callback_to_observable

Expand Down Expand Up @@ -166,7 +166,7 @@ def raw_lidar_stream(self) -> Subject[LidarMessage]:
return backpressure(self.unitree_sub_stream(RTC_TOPIC["ULIDAR_ARRAY"]))

@functools.cache
def raw_odom_stream(self) -> Subject[Position]:
def raw_odom_stream(self) -> Subject[Pose]:
return backpressure(self.unitree_sub_stream(RTC_TOPIC["ROBOTODOM"]))

@functools.cache
Expand All @@ -178,7 +178,7 @@ def lidar_stream(self) -> Subject[LidarMessage]:
)

@functools.cache
def odom_stream(self) -> Subject[Position]:
def odom_stream(self) -> Subject[Pose]:
return backpressure(self.raw_odom_stream().pipe(ops.map(Odometry.from_msg)))

@functools.cache
Expand Down
4 changes: 2 additions & 2 deletions dimos/robot/unitree_webrtc/type/odometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ class Orientation(TypedDict):
w: float


class Pose(TypedDict):
class PoseData(TypedDict):
position: RawPosition
orientation: Orientation


class OdometryData(TypedDict):
header: Header
pose: Pose
pose: PoseData


class RawOdometryMessage(TypedDict):
Expand Down
46 changes: 23 additions & 23 deletions dimos/types/position.py → dimos/types/pose.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from dimos.types.vector import Vector, to_vector, to_numpy, VectorLike


T = TypeVar("T", bound="Position")
T = TypeVar("T", bound="Pose")

PositionLike = Union["Position", VectorLike, Sequence[VectorLike]]
PoseLike = Union["Pose", VectorLike, Sequence[VectorLike]]


def yaw_to_matrix(yaw: float) -> np.ndarray:
Expand All @@ -31,10 +31,10 @@ def yaw_to_matrix(yaw: float) -> np.ndarray:
return np.array([[c, -s, 0], [s, c, 0], [0, 0, 1]])


class Position(Vector):
"""A position in 3D space, consisting of a position vector and a rotation vector.
class Pose(Vector):
"""A pose in 3D space, consisting of a position vector and a rotation vector.

Position inherits from Vector and behaves like a vector for the position component.
Pose inherits from Vector and behaves like a vector for the position component.
The rotation vector is stored separately and can be accessed via the rot property.
"""

Expand All @@ -51,7 +51,7 @@ def __init__(self, pos: VectorLike, rot: VectorLike):
self._rot = to_vector(rot)

def __repr__(self) -> str:
return f"Position({self.pos.__repr__()}, {self.rot.__repr__()})"
return f"Pose({self.pos.__repr__()}, {self.rot.__repr__()})"

def __str__(self) -> str:
return self.__repr__()
Expand All @@ -63,8 +63,8 @@ def __bool__(self) -> bool:
return not self.is_zero()

def serialize(self):
"""Serialize the position to a dictionary."""
return {"type": "position", "pos": self.to_list(), "rot": self.rot.to_list()}
"""Serialize the pose to a dictionary."""
return {"type": "pose", "pos": self.to_list(), "rot": self.rot.to_list()}

def vector_to(self, target: Vector) -> Vector:
direction = target - self.pos.to_2d()
Expand All @@ -78,8 +78,8 @@ def vector_to(self, target: Vector) -> Vector:
return Vector(x, y)

def __eq__(self, other) -> bool:
"""Check if two positions are equal using numpy's allclose for floating point comparison."""
if not isinstance(other, Position):
"""Check if two poses are equal using numpy's allclose for floating point comparison."""
if not isinstance(other, Pose):
return False
return np.allclose(self.pos._data, other.pos._data) and np.allclose(
self.rot._data, other.rot._data
Expand All @@ -98,30 +98,30 @@ def pos(self) -> Vector:
return to_vector(self._data)

def __add__(self: T, other) -> T:
"""Override Vector's __add__ to handle Position objects specially.
"""Override Vector's __add__ to handle Pose objects specially.

When adding two Position objects, both position and rotation components are added.
When adding two Pose objects, both position and rotation components are added.
"""
if isinstance(other, Position):
if isinstance(other, Pose):
# Add both position and rotation components
result = super().__add__(other)
result._rot = self.rot + other.rot
return result
else:
# For other types, just use Vector's addition
return Position(super().__add__(other), self.rot)
return Pose(super().__add__(other), self.rot)

@property
def yaw(self) -> float:
"""Get the yaw (rotation around Z-axis) from the rotation vector."""
return self.rot.z

def __sub__(self: T, other) -> T:
"""Override Vector's __sub__ to handle Position objects specially.
"""Override Vector's __sub__ to handle Pose objects specially.

When subtracting two Position objects, both position and rotation components are subtracted.
When subtracting two Pose objects, both position and rotation components are subtracted.
"""
if isinstance(other, Position):
if isinstance(other, Pose):
# Subtract both position and rotation components
result = super().__sub__(other)
result._rot = self.rot - other.rot
Expand All @@ -131,19 +131,19 @@ def __sub__(self: T, other) -> T:
return super().__sub__(other)

def __mul__(self: T, scalar: float) -> T:
return Position(self.pos * scalar, self.rot)
return Pose(self.pos * scalar, self.rot)


@dispatch
def to_position(pos: Position) -> Position:
def to_pose(pos: Pose) -> Pose:
return pos


@dispatch
def to_position(pos: VectorLike) -> Position:
return Position(pos)
def to_pose(pos: VectorLike) -> Pose:
return Pose(pos)


@dispatch
def to_position(pos: Sequence[VectorLike]) -> Position:
return Position(*pos)
def to_pose(pos: Sequence[VectorLike]) -> Pose:
return Pose(*pos)
Loading