Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dimos-dev",
"image": "ghcr.io/dimensionalos/ros-dev:dev",
"image": "ghcr.io/dimensionalos/dev:dev",
"customizations": {
"vscode": {
"extensions": [
Expand Down
6 changes: 6 additions & 0 deletions dimos/msgs/geometry_msgs/PoseStamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,9 @@ def lcm_decode(cls, data: bytes | BinaryIO) -> PoseStamped:
lcm_msg.pose.orientation.w,
], # noqa: E501,
)

def __str__(self) -> str:
return (
f"PoseStamped(pos=[{self.x:.3f}, {self.y:.3f}, {self.z:.3f}], "
f"euler=[{self.roll:.3f}, {self.pitch:.3f}, {self.yaw:.3f}])"
)
55 changes: 55 additions & 0 deletions dimos/msgs/geometry_msgs/test_PoseStamped.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pickle
import time

from dimos.msgs.geometry_msgs import PoseStamped


def test_lcm_encode_decode():
"""Test encoding and decoding of Pose to/from binary LCM format."""

pose_source = PoseStamped(
ts=time.time(),
position=(1.0, 2.0, 3.0),
orientation=(0.1, 0.2, 0.3, 0.9),
)
binary_msg = pose_source.lcm_encode()
pose_dest = PoseStamped.lcm_decode(binary_msg)

assert isinstance(pose_dest, PoseStamped)
assert pose_dest is not pose_source

print(pose_source.position)
print(pose_source.orientation)

print(pose_dest.position)
print(pose_dest.orientation)
assert pose_dest == pose_source


def test_pickle_encode_decode():
"""Test encoding and decoding of PoseStamped to/from binary LCM format."""

pose_source = PoseStamped(
ts=time.time(),
position=(1.0, 2.0, 3.0),
orientation=(0.1, 0.2, 0.3, 0.9),
)
binary_msg = pickle.dumps(pose_source)
pose_dest = pickle.loads(binary_msg)
assert isinstance(pose_dest, PoseStamped)
assert pose_dest is not pose_source
assert pose_dest == pose_source
10 changes: 6 additions & 4 deletions dimos/robot/global_planner/algo.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import math
import heapq
from typing import Optional, Tuple
import math
from collections import deque
from dimos.types.path import Path
from dimos.types.vector import VectorLike, Vector
from typing import Optional, Tuple

from dimos.msgs.geometry_msgs import Vector3 as Vector
from dimos.msgs.geometry_msgs.Vector3 import VectorLike
from dimos.types.costmap import Costmap
from dimos.types.path import Path


def find_nearest_free_cell(
Expand Down
89 changes: 66 additions & 23 deletions dimos/robot/local_planner/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,27 @@

import math
import time
from dataclasses import dataclass
from typing import Any, Callable, Optional
import traceback
from typing import Callable, Optional

import reactivex as rx
from plum import dispatch
from reactivex import operators as ops

from dimos.core import In, Module, Out, rpc
from dimos.msgs.geometry_msgs import PoseStamped, Vector3
from dimos.robot.unitree_webrtc.type.odometry import Odometry

# from dimos.robot.local_planner.local_planner import LocalPlanner
from dimos.types.costmap import Costmap
from dimos.types.path import Path
from dimos.types.pose import Pose
from dimos.types.vector import Vector, VectorLike, to_vector
from dimos.utils.logging_config import setup_logger
from dimos.utils.threadpool import get_scheduler

logger = setup_logger("dimos.robot.unitree.global_planner")
logger = setup_logger("dimos.robot.unitree.local_planner")


def transform_to_robot_frame(global_vector: Vector, robot_position: Pose) -> Vector:
def transform_to_robot_frame(global_vector: Vector3, robot_position: PoseStamped) -> Vector:
"""Transform a global coordinate vector to robot-relative coordinates.

Args:
Expand All @@ -47,21 +45,22 @@ def transform_to_robot_frame(global_vector: Vector, robot_position: Pose) -> Vec
Vector in robot coordinates where X is forward/backward, Y is left/right
"""
# Get the robot's yaw angle (rotation around Z-axis)
robot_yaw = robot_position.rot.z
robot_yaw = robot_position.yaw

# Create rotation matrix to transform from global to robot frame
# We need to rotate the coordinate system by -robot_yaw to get robot-relative coordinates
cos_yaw = math.cos(-robot_yaw)
sin_yaw = math.sin(-robot_yaw)

print(cos_yaw, sin_yaw, global_vector)
# Apply 2D rotation transformation
# This transforms a global direction vector into the robot's coordinate frame
# In robot frame: X=forward/backward, Y=left/right
# In global frame: X=east/west, Y=north/south
robot_x = global_vector.x * cos_yaw - global_vector.y * sin_yaw # Forward/backward
robot_y = global_vector.x * sin_yaw + global_vector.y * cos_yaw # Left/right

return Vector(-robot_x, robot_y, 0)
return Vector3(-robot_x, robot_y, 0)


class SimplePlanner(Module):
Expand All @@ -71,7 +70,7 @@ class SimplePlanner(Module):

get_costmap: Callable[[], Costmap]

latest_odom: PoseStamped = None
latest_odom: Optional[PoseStamped] = None

goal: Optional[Vector] = None
speed: float = 0.3
Expand All @@ -83,24 +82,59 @@ def __init__(
Module.__init__(self)
self.get_costmap = get_costmap

def get_move_stream(self, frequency: float = 40.0) -> rx.Observable:
def get_move_stream(self, frequency: float = 20.0) -> rx.Observable:
return rx.interval(1.0 / frequency, scheduler=get_scheduler()).pipe(
# do we have a goal?
ops.filter(lambda _: self.goal is not None),
# For testing: make robot move left/right instead of rotating
ops.map(lambda _: self._test_translational_movement()),
self.frequency_spy("movement_test"),
# do we have odometry data?
ops.filter(lambda _: self.latest_odom is not None),
# transform goal to robot frame with error handling
ops.map(lambda _: self._safe_transform_goal()),
# filter out None results (errors)
ops.filter(lambda result: result is not None),
ops.map(self._calculate_rotation_to_target),
# filter out None results from calc_move
ops.filter(lambda result: result is not None),
)

def _safe_transform_goal(self) -> Optional[Vector]:
"""Safely transform goal to robot frame with error handling."""
try:
if self.goal is None or self.latest_odom is None:
return None

# Convert PoseStamped to Pose for transform function
# Pose constructor takes (pos, rot) where pos and rot are VectorLike

return transform_to_robot_frame(self.goal, self.latest_odom)
except Exception as e:
logger.error(f"Error transforming goal to robot frame: {e}")
print(traceback.format_exc())
return None

@rpc
def start(self):
print(self.path.connection, self.path.transport)
self.path.subscribe(self.set_goal)
self.movecmd.publish(Vector3(1, 2, 3))
self.movecmd.publish(Vector3(1, 2, 3))
self.movecmd.publish(Vector3(1, 2, 3))
self.movecmd.publish(Vector3(1, 2, 3))

def setodom(odom: Odometry):
def setodom(odom: PoseStamped):
self.latest_odom = odom

def pubmove(move: Vector3):
print("PUBMOVE", move, "\n\n")
# print(self.movecmd)
try:
self.movecmd.publish(move)
except Exception as e:
print(traceback.format_exc())
print(e)

self.odom.subscribe(setodom)
self.get_move_stream(frequency=20.0).subscribe(self.movecmd.publish)
self.get_move_stream(frequency=20.0).subscribe(pubmove)

@dispatch
def set_goal(self, goal: Path, stop_event=None, goal_theta=None) -> bool:
Expand All @@ -114,14 +148,14 @@ def set_goal(self, goal: VectorLike, stop_event=None, goal_theta=None) -> bool:
logger.info(f"Setting goal: {self.goal}")
return True

def calc_move(self, direction: Vector) -> Vector:
def calc_move(self, direction: Vector) -> Optional[Vector]:
"""Calculate the movement vector based on the direction to the goal.

Args:
direction: Direction vector towards the goal

Returns:
Movement vector scaled by speed
Movement vector scaled by speed, or None if error occurs
"""
try:
# Normalize the direction vector and scale by speed
Expand All @@ -130,7 +164,8 @@ def calc_move(self, direction: Vector) -> Vector:
print("CALC MOVE", direction, normalized_direction, move_vector)
return move_vector
except Exception as e:
print("Error calculating move vector:", e)
logger.error(f"Error calculating move vector: {e}")
return None

def spy(self, name: str):
def spyfun(x):
Expand Down Expand Up @@ -184,11 +219,11 @@ def _test_translational_movement(self) -> Vector:

if phase < 0.5:
# First half: move LEFT (positive X according to our documentation)
movement = Vector3(0.2, 0, 0) # Move left at 0.2 m/s
movement = Vector(0.2, 0, 0) # Move left at 0.2 m/s
direction = "LEFT (positive X)"
else:
# Second half: move RIGHT (negative X according to our documentation)
movement = Vector3(-0.2, 0, 0) # Move right at 0.2 m/s
movement = Vector(-0.2, 0, 0) # Move right at 0.2 m/s
direction = "RIGHT (negative X)"

print("=== LEFT-RIGHT MOVEMENT TEST ===")
Expand All @@ -207,11 +242,15 @@ def _calculate_rotation_to_target(self, direction_to_goal: Vector) -> Vector:
Returns:
Vector with (x=0, y=0, z=angular_velocity) for rotation only
"""
if self.latest_odom is None:
logger.warning("No odometry data available for rotation calculation")
return Vector(0, 0, 0)

# Calculate the desired yaw angle to face the target
desired_yaw = math.atan2(direction_to_goal.y, direction_to_goal.x)

# Get current robot yaw
current_yaw = self.latest_odom.orientation.z
current_yaw = self.latest_odom.yaw

# Calculate the yaw error using a more robust method to avoid oscillation
yaw_error = math.atan2(
Expand Down Expand Up @@ -247,13 +286,17 @@ def _calculate_rotation_to_target(self, direction_to_goal: Vector) -> Vector:

# Return movement command: no translation (x=0, y=0), only rotation (z=angular_velocity)
# Try flipping the sign in case the rotation convention is opposite
return Vector(0, 0, -angular_velocity)
return Vector3(0, 0, -angular_velocity)

def _debug_direction(self, name: str, direction: Vector) -> Vector:
"""Debug helper to log direction information"""
robot_pos = self.latest_odom
if robot_pos is None:
print(f"DEBUG {name}: direction={direction}, robot_pos=None, goal={self.goal}")
return direction

print(
f"DEBUG {name}: direction={direction}, robot_pos={robot_pos.position.to_2d()}, robot_yaw={math.degrees(robot_pos.rot.z):.1f}°, goal={self.goal}"
f"DEBUG {name}: direction={direction}, robot_pos={robot_pos.position.to_2d()}, robot_yaw={math.degrees(robot_pos.orientation.z):.1f}°, goal={self.goal}"
)
return direction

Expand Down
7 changes: 2 additions & 5 deletions dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ async def run(ip):

# This enables LCM transport
# Ensures system multicast, udp sizes are auto-adjusted if needed
# TODO: this doesn't seem to work atm and LCMTransport instantiation can fail
pubsub.lcm.autoconf()

connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage)
Expand All @@ -171,11 +170,10 @@ async def run(ip):
global_planner.path.transport = core.pLCMTransport("/global_path")

local_planner.path.connect(global_planner.path)

local_planner.odom.connect(connection.odom)

local_planner.movecmd.transport = core.LCMTransport("/move", Vector3)
local_planner.movecmd.connect(connection.movecmd)
connection.movecmd.connect(local_planner.movecmd)

ctrl = dimos.deploy(ControlModule)

Expand Down Expand Up @@ -207,13 +205,12 @@ async def run(ip):
print(colors.green("starting foxglove bridge"))
foxglove_bridge.start()

# uncomment to move the bot
print(colors.green("starting ctrl"))
ctrl.start()

print(colors.red("READY"))

await asyncio.sleep(10)
await asyncio.sleep(2)
print("querying system")
print(mapper.costmap())
while True:
Expand Down
8 changes: 0 additions & 8 deletions dimos/robot/unitree_webrtc/type/lidar.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,6 @@ def from_msg(cls: "LidarMessage", raw_message: RawLidarMsg) -> "LidarMessage":
raw_msg=raw_message,
)

def to_pointcloud2(self) -> PointCloud2:
"""Convert to PointCloud2 message format."""
return PointCloud2(
pointcloud=self.pointcloud,
frame_id=self.frame_id,
ts=self.ts,
)

def __repr__(self):
return f"LidarMessage(ts={to_human_readable(self.ts)}, origin={self.origin}, resolution={self.resolution}, {self.pointcloud})"

Expand Down
18 changes: 0 additions & 18 deletions dimos/robot/unitree_webrtc/type/test_lidar.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,3 @@ def test_init():
assert isinstance(raw_frame, dict)
frame = LidarMessage.from_msg(raw_frame)
assert isinstance(frame, LidarMessage)
data = frame.to_pointcloud2().lcm_encode()
assert len(data) > 0
assert isinstance(data, bytes)


@pytest.mark.tool
def test_publish():
lcm = LCM()
lcm.start()

topic = Topic(topic="/lidar", lcm_type=PointCloud2)
lidar = SensorReplay("office_lidar", autocast=LidarMessage.from_msg)

while True:
for frame in lidar.iterate():
print(frame)
lcm.publish(topic, frame.to_pointcloud2())
time.sleep(0.1)
Loading