diff --git a/dimos/robot/unitree_webrtc/unitree_b1/README.md b/dimos/robot/unitree_webrtc/unitree_b1/README.md index 040a0a6da9..8616fc286a 100644 --- a/dimos/robot/unitree_webrtc/unitree_b1/README.md +++ b/dimos/robot/unitree_webrtc/unitree_b1/README.md @@ -167,6 +167,25 @@ External Machine (Client) B1 Robot (Server) └─────────────────────┘ └──────────────────┘ ``` +## Setting up ROS Navigation stack with Unitree B1 + +### Setup external Wireless USB Adapter on onboard hardware +This is because the onboard hardware (mini PC, jetson, etc.) needs to connect to both the B1 wifi AP network to send cmd_vel messages over UDP, as well as the network running dimensional + + +Plug in wireless adapter +```bash +nmcli device status +nmcli device wifi list ifname *DEVICE_NAME* +# Connect to b1 network +nmcli device wifi connect "Unitree_B1-251" password "00000000" ifname *DEVICE_NAME* +# Verify connection +nmcli connection show --active +``` + +### *TODO: add more docs* + + ## Troubleshooting ### Cannot connect to B1 diff --git a/dimos/robot/unitree_webrtc/unitree_b1/b1_command.py b/dimos/robot/unitree_webrtc/unitree_b1/b1_command.py index ac56978e72..ab547dade2 100644 --- a/dimos/robot/unitree_webrtc/unitree_b1/b1_command.py +++ b/dimos/robot/unitree_webrtc/unitree_b1/b1_command.py @@ -51,22 +51,27 @@ def from_twist(cls, twist, mode: int = 2): Returns: B1Command configured for the given Twist """ + # Max velocities from ROS needed to clamp to joystick ranges properly + MAX_LINEAR_VEL = 1.0 # m/s + MAX_ANGULAR_VEL = 2.0 # rad/s + if mode == 2: # WALK mode - velocity control return cls( - lx=-twist.angular.z, # ROS yaw → B1 turn (negated for correct direction) - ly=twist.linear.x, # ROS forward → B1 forward - rx=-twist.linear.y, # ROS lateral → B1 strafe (negated for correct direction) + # Scale and clamp to joystick range [-1, 1] + lx=max(-1.0, min(1.0, -twist.angular.z / MAX_ANGULAR_VEL)), + ly=max(-1.0, min(1.0, twist.linear.x / MAX_LINEAR_VEL)), + rx=max(-1.0, min(1.0, -twist.linear.y / MAX_LINEAR_VEL)), ry=0.0, # No pitch control in walk mode mode=mode, ) elif mode == 1: # STAND mode - body pose control # Map Twist pose controls to B1 joystick axes - # G1 cpp server maps: ly→bodyHeight, lx→euler[2], rx→euler[0], ry→euler[1] + # Already in normalized units, just clamp to [-1, 1] return cls( - lx=-twist.angular.z, # ROS yaw → B1 yaw (euler[2]) - ly=twist.linear.z, # ROS height → B1 bodyHeight - rx=-twist.angular.x, # ROS roll → B1 roll (euler[0]) - ry=twist.angular.y, # ROS pitch → B1 pitch (euler[1]) + lx=max(-1.0, min(1.0, -twist.angular.z)), # ROS yaw → B1 yaw + ly=max(-1.0, min(1.0, twist.linear.z)), # ROS height → B1 bodyHeight + rx=max(-1.0, min(1.0, -twist.angular.x)), # ROS roll → B1 roll + ry=max(-1.0, min(1.0, twist.angular.y)), # ROS pitch → B1 pitch mode=mode, ) else: diff --git a/dimos/robot/unitree_webrtc/unitree_b1/connection.py b/dimos/robot/unitree_webrtc/unitree_b1/connection.py index 9cbcd2f7d7..657ad8f6e4 100644 --- a/dimos/robot/unitree_webrtc/unitree_b1/connection.py +++ b/dimos/robot/unitree_webrtc/unitree_b1/connection.py @@ -17,16 +17,31 @@ """B1 Connection Module that accepts standard Twist commands and converts to UDP packets.""" +import logging import socket import threading import time from typing import Optional -from dimos.core import In, Module, rpc -from dimos.msgs.geometry_msgs import Twist +from dimos.core import In, Out, Module, rpc +from dimos.msgs.geometry_msgs import Twist, TwistStamped, PoseStamped +from dimos.msgs.nav_msgs.Odometry import Odometry from dimos.msgs.std_msgs import Int32 +from dimos.utils.logging_config import setup_logger from .b1_command import B1Command +# Setup logger with DEBUG level for troubleshooting +logger = setup_logger("dimos.robot.unitree_webrtc.unitree_b1.connection", level=logging.DEBUG) + + +class RobotMode: + """Constants for B1 robot modes.""" + + IDLE = 0 + STAND = 1 + WALK = 2 + RECOVERY = 6 + class B1ConnectionModule(Module): """UDP connection module for B1 robot with standard Twist interface. @@ -35,9 +50,11 @@ class B1ConnectionModule(Module): internally converts to B1Command format, and sends UDP packets at 50Hz. """ - # Module inputs - cmd_vel: In[Twist] = None # Standard velocity commands + cmd_vel: In[TwistStamped] = None # Timestamped velocity commands from ROS mode_cmd: In[Int32] = None # Mode changes + odom_in: In[Odometry] = None # External odometry from ROS SLAM/lidar + + odom_pose: Out[PoseStamped] = None # Converted pose for internal use def __init__( self, ip: str = "192.168.12.1", port: int = 9090, test_mode: bool = False, *args, **kwargs @@ -54,16 +71,19 @@ def __init__( self.ip = ip self.port = port self.test_mode = test_mode - self.current_mode = 0 # Start in IDLE mode for safety - # Internal state as B1Command - self._current_cmd = B1Command(mode=0) + self.current_mode = RobotMode.IDLE # Start in IDLE mode + self._current_cmd = B1Command(mode=RobotMode.IDLE) + self.cmd_lock = threading.Lock() # Thread lock for _current_cmd access # Thread control self.running = False self.send_thread = None self.socket = None self.packet_count = 0 self.last_command_time = time.time() - self.command_timeout = 0.1 # 100ms timeout matching C++ server + self.command_timeout = 0.2 # 200ms safety timeout + self.watchdog_thread = None + self.watchdog_running = False + self.timeout_active = False @rpc def start(self): @@ -72,40 +92,54 @@ def start(self): # Setup UDP socket (unless in test mode) if not self.test_mode: self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - print(f"B1 Connection started - UDP to {self.ip}:{self.port} at 50Hz") + logger.info(f"B1 Connection started - UDP to {self.ip}:{self.port} at 50Hz") else: - print(f"[TEST MODE] B1 Connection started - would send to {self.ip}:{self.port}") + logger.info(f"[TEST MODE] B1 Connection started - would send to {self.ip}:{self.port}") # Subscribe to input streams if self.cmd_vel: - self.cmd_vel.subscribe(self.handle_twist) + self.cmd_vel.subscribe(self.handle_twist_stamped) if self.mode_cmd: self.mode_cmd.subscribe(self.handle_mode) + if self.odom_in: + self.odom_in.subscribe(self._publish_odom_pose) - # Start 50Hz sending thread + # Start threads self.running = True + self.watchdog_running = True + + # Start 50Hz sending thread self.send_thread = threading.Thread(target=self._send_loop, daemon=True) self.send_thread.start() + # Start watchdog thread + self.watchdog_thread = threading.Thread(target=self._watchdog_loop, daemon=True) + self.watchdog_thread.start() + return True @rpc def stop(self): """Stop the connection and send stop commands.""" - self.set_mode(0) # IDLE - self._current_cmd = B1Command(mode=0) # Zero all velocities + self.set_mode(RobotMode.IDLE) # IDLE + with self.cmd_lock: + self._current_cmd = B1Command(mode=RobotMode.IDLE) # Zero all velocities # Send multiple stop packets if not self.test_mode and self.socket: - stop_cmd = B1Command(mode=0) + stop_cmd = B1Command(mode=RobotMode.IDLE) for _ in range(5): data = stop_cmd.to_bytes() self.socket.sendto(data, (self.ip, self.port)) time.sleep(0.02) self.running = False + self.watchdog_running = False + if self.send_thread: self.send_thread.join(timeout=0.5) + if self.watchdog_thread: + self.watchdog_thread.join(timeout=0.5) if self.socket: self.socket.close() @@ -113,126 +147,206 @@ def stop(self): return True - def handle_twist(self, twist: Twist): - """Handle standard Twist message and convert to B1Command. + def handle_twist_stamped(self, twist_stamped: TwistStamped): + """Handle timestamped Twist message and convert to B1Command. This is called automatically when messages arrive on cmd_vel input. """ + # Extract Twist from TwistStamped + twist = Twist(linear=twist_stamped.linear, angular=twist_stamped.angular) + + logger.debug( + f"Received cmd_vel: linear=({twist.linear.x:.3f}, {twist.linear.y:.3f}, {twist.linear.z:.3f}), angular=({twist.angular.x:.3f}, {twist.angular.y:.3f}, {twist.angular.z:.3f})" + ) + + # In STAND mode, all twist values control body pose, not movement + # W/S: height (linear.z), A/D: yaw (angular.z), J/L: roll (angular.x), I/K: pitch (angular.y) + if self.current_mode == RobotMode.STAND: + # In STAND mode, don't auto-switch since all inputs are valid body pose controls + has_movement = False + else: + # In other modes, consider linear x/y and angular.z as movement + has_movement = ( + abs(twist.linear.x) > 0.01 + or abs(twist.linear.y) > 0.01 + or abs(twist.angular.z) > 0.01 + ) + + if has_movement and self.current_mode not in (RobotMode.STAND, RobotMode.WALK): + logger.info("Auto-switching to WALK mode for ROS control") + self.set_mode(RobotMode.WALK) + elif not has_movement and self.current_mode == RobotMode.WALK: + logger.info("Auto-switching to IDLE mode (zero velocities)") + self.set_mode(RobotMode.IDLE) + if self.test_mode: - print( - f"[TEST] Received Twist: linear=({twist.linear.x:.2f}, {twist.linear.y:.2f}), angular.z={twist.angular.z:.2f}" + logger.info( + f"[TEST] Received TwistStamped: linear=({twist.linear.x:.2f}, {twist.linear.y:.2f}), angular.z={twist.angular.z:.2f}" ) - # Convert Twist to B1Command - self._current_cmd = B1Command.from_twist(twist, self.current_mode) + + with self.cmd_lock: + self._current_cmd = B1Command.from_twist(twist, self.current_mode) + + logger.debug(f"Converted to B1Command: {self._current_cmd}") + self.last_command_time = time.time() + self.timeout_active = False # Reset timeout state since we got a new command def handle_mode(self, mode_msg: Int32): """Handle mode change message. This is called automatically when messages arrive on mode_cmd input. """ + logger.debug(f"Received mode change: {mode_msg.data}") if self.test_mode: - print(f"[TEST] Received mode change: {mode_msg.data}") + logger.info(f"[TEST] Received mode change: {mode_msg.data}") self.set_mode(mode_msg.data) @rpc def set_mode(self, mode: int): """Set robot mode (0=idle, 1=stand, 2=walk, 6=recovery).""" self.current_mode = mode - self._current_cmd.mode = mode - - # Clear velocities when not in walk mode - if mode != 2: - self._current_cmd.lx = 0.0 - self._current_cmd.ly = 0.0 - self._current_cmd.rx = 0.0 - self._current_cmd.ry = 0.0 - + with self.cmd_lock: + self._current_cmd.mode = mode + + # Clear velocities when not in walk mode + if mode != RobotMode.WALK: + self._current_cmd.lx = 0.0 + self._current_cmd.ly = 0.0 + self._current_cmd.rx = 0.0 + self._current_cmd.ry = 0.0 + + mode_names = { + RobotMode.IDLE: "IDLE", + RobotMode.STAND: "STAND", + RobotMode.WALK: "WALK", + RobotMode.RECOVERY: "RECOVERY", + } + logger.info(f"Mode changed to: {mode_names.get(mode, mode)}") if self.test_mode: - mode_names = {0: "IDLE", 1: "STAND", 2: "WALK", 6: "RECOVERY"} - print(f"[TEST] Mode changed to: {mode_names.get(mode, mode)}") + logger.info(f"[TEST] Mode changed to: {mode_names.get(mode, mode)}") return True def _send_loop(self): - """Continuously send current command at 50Hz with safety timeout.""" - timeout_warned = False + """Continuously send current command at 50Hz. + The watchdog thread handles timeout and zeroing commands, so this loop + just sends whatever is in self._current_cmd at 50Hz. + """ while self.running: try: - # Safety check: If no command received recently, send zeros - time_since_last_cmd = time.time() - self.last_command_time - - if time_since_last_cmd > self.command_timeout: - # Command is stale - send zero velocities for safety - if not timeout_warned: - if self.test_mode: - print( - f"[TEST] Command timeout ({time_since_last_cmd:.1f}s) - sending zeros" - ) - timeout_warned = True - - # Create safe idle command - safe_cmd = B1Command(mode=self.current_mode) - safe_cmd.lx = 0.0 - safe_cmd.ly = 0.0 - safe_cmd.rx = 0.0 - safe_cmd.ry = 0.0 - cmd_to_send = safe_cmd - else: - # Send command if fresh - if timeout_warned: - if self.test_mode: - print("[TEST] Commands resumed - control restored") - timeout_warned = False + # Watchdog handles timeout, we just send current command + with self.cmd_lock: cmd_to_send = self._current_cmd + # Log status every second (50 packets) + if self.packet_count % 50 == 0: + logger.info( + f"Sending B1 commands at 50Hz | Mode: {self.current_mode} | Count: {self.packet_count}" + ) + if not self.test_mode: + logger.debug(f"Current B1Command: {self._current_cmd}") + data = cmd_to_send.to_bytes() + hex_str = " ".join(f"{b:02x}" for b in data) + logger.debug(f"UDP packet ({len(data)} bytes): {hex_str}") + if self.socket: data = cmd_to_send.to_bytes() self.socket.sendto(data, (self.ip, self.port)) self.packet_count += 1 - # Maintain 50Hz rate (20ms between packets) + # 50Hz rate (20ms between packets) time.sleep(0.020) except Exception as e: if self.running: - print(f"Send error: {e}") + logger.error(f"Send error: {e}") + + def _publish_odom_pose(self, msg: Odometry): + """Convert and publish odometry as PoseStamped. + + This matches G1's approach of receiving external odometry. + """ + if self.odom_pose: + pose_stamped = PoseStamped( + ts=msg.ts, + frame_id=msg.frame_id, + position=msg.pose.pose.position, + orientation=msg.pose.pose.orientation, + ) + self.odom_pose.publish(pose_stamped) + + def _watchdog_loop(self): + """Single watchdog thread that monitors command freshness.""" + while self.watchdog_running: + try: + time_since_last_cmd = time.time() - self.last_command_time + + if time_since_last_cmd > self.command_timeout: + if not self.timeout_active: + # First time detecting timeout + logger.warning( + f"Watchdog timeout ({time_since_last_cmd:.1f}s) - zeroing commands" + ) + if self.test_mode: + logger.info("[TEST] Watchdog timeout - zeroing commands") + + with self.cmd_lock: + self._current_cmd.lx = 0.0 + self._current_cmd.ly = 0.0 + self._current_cmd.rx = 0.0 + self._current_cmd.ry = 0.0 + + self.timeout_active = True + else: + if self.timeout_active: + logger.info("Watchdog: Commands resumed - control restored") + if self.test_mode: + logger.info("[TEST] Watchdog: Commands resumed") + self.timeout_active = False + + # Check every 50ms + time.sleep(0.05) + + except Exception as e: + if self.watchdog_running: + logger.error(f"Watchdog error: {e}") @rpc def idle(self): """Set robot to idle mode.""" - self.set_mode(0) + self.set_mode(RobotMode.IDLE) return True @rpc def pose(self): """Set robot to stand/pose mode for reaching ground objects with manipulator.""" - self.set_mode(1) + self.set_mode(RobotMode.STAND) return True @rpc def walk(self): """Set robot to walk mode.""" - self.set_mode(2) + self.set_mode(RobotMode.WALK) return True @rpc def recovery(self): """Set robot to recovery mode.""" - self.set_mode(6) + self.set_mode(RobotMode.RECOVERY) return True @rpc - def move(self, twist: Twist, duration: float = 0.0): - """Direct RPC method for sending Twist commands. + def move(self, twist_stamped: TwistStamped, duration: float = 0.0): + """Direct RPC method for sending TwistStamped commands. Args: - twist: Velocity command + twist_stamped: Timestamped velocity command duration: Not used, kept for compatibility """ - self.handle_twist(twist) + self.handle_twist_stamped(twist_stamped) return True def cleanup(self): @@ -257,18 +371,20 @@ def _send_loop(self): # Show timeout transitions if is_timeout and not timeout_warned: - print(f"[TEST] Command timeout! Sending zeros after {time_since_last_cmd:.1f}s") + logger.info( + f"[TEST] Command timeout! Sending zeros after {time_since_last_cmd:.1f}s" + ) timeout_warned = True elif not is_timeout and timeout_warned: - print("[TEST] Commands resumed - control restored") + logger.info("[TEST] Commands resumed - control restored") timeout_warned = False # Print current state every 0.5 seconds if self.packet_count % 25 == 0: if is_timeout: - print(f"[TEST] B1Cmd[ZEROS] (timeout) | Count: {self.packet_count}") + logger.info(f"[TEST] B1Cmd[ZEROS] (timeout) | Count: {self.packet_count}") else: - print(f"[TEST] {self._current_cmd} | Count: {self.packet_count}") + logger.info(f"[TEST] {self._current_cmd} | Count: {self.packet_count}") self.packet_count += 1 time.sleep(0.020) diff --git a/dimos/robot/unitree_webrtc/unitree_b1/joystick_module.py b/dimos/robot/unitree_webrtc/unitree_b1/joystick_module.py index e8857e31e2..34fb5d79c8 100644 --- a/dimos/robot/unitree_webrtc/unitree_b1/joystick_module.py +++ b/dimos/robot/unitree_webrtc/unitree_b1/joystick_module.py @@ -23,19 +23,20 @@ # Force X11 driver to avoid OpenGL threading issues os.environ["SDL_VIDEODRIVER"] = "x11" +import time from dimos.core import Module, Out, rpc -from dimos.msgs.geometry_msgs import Twist, Vector3 +from dimos.msgs.geometry_msgs import Twist, TwistStamped, Vector3 from dimos.msgs.std_msgs import Int32 class JoystickModule(Module): """Pygame-based joystick control module for B1 testing. - Outputs standard Twist messages on /cmd_vel and mode changes on /b1/mode. + Outputs timestamped Twist messages on /cmd_vel and mode changes on /b1/mode. This allows testing the same interface that navigation will use. """ - twist_out: Out[Twist] = None # Standard velocity commands + twist_out: Out[TwistStamped] = None # Timestamped velocity commands mode_out: Out[Int32] = None # Mode changes def __init__(self, *args, **kwargs): @@ -119,7 +120,13 @@ def _pygame_loop(self): stop_twist = Twist() stop_twist.linear = Vector3(0, 0, 0) stop_twist.angular = Vector3(0, 0, 0) - self.twist_out.publish(stop_twist) + stop_twist_stamped = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=stop_twist.linear, + angular=stop_twist.angular, + ) + self.twist_out.publish(stop_twist_stamped) print("EMERGENCY STOP!") elif event.key == pygame.K_ESCAPE: # ESC still quits for development convenience @@ -178,8 +185,10 @@ def _pygame_loop(self): if pygame.K_k in self.keys_held: twist.angular.y = -1.0 # Pitch backward - # Always publish twist at 50Hz (matching working client behavior) - self.twist_out.publish(twist) + twist_stamped = TwistStamped( + ts=time.time(), frame_id="base_link", linear=twist.linear, angular=twist.angular + ) + self.twist_out.publish(twist_stamped) # Update pygame display self._update_display(twist) @@ -253,7 +262,13 @@ def stop(self): self.running = False # Send stop command stop_twist = Twist() - self.twist_out.publish(stop_twist) + stop_twist_stamped = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=stop_twist.linear, + angular=stop_twist.angular, + ) + self.twist_out.publish(stop_twist_stamped) return True def cleanup(self): diff --git a/dimos/robot/unitree_webrtc/unitree_b1/test_connection.py b/dimos/robot/unitree_webrtc/unitree_b1/test_connection.py new file mode 100644 index 0000000000..a5cfe7976c --- /dev/null +++ b/dimos/robot/unitree_webrtc/unitree_b1/test_connection.py @@ -0,0 +1,413 @@ +#!/usr/bin/env python3 +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright 2025 Dimensional Inc. + +"""Comprehensive tests for Unitree B1 connection module Timer implementation.""" + +import time +import threading + +from .connection import TestB1ConnectionModule +from dimos.msgs.geometry_msgs import TwistStamped, Vector3 +from dimos.msgs.std_msgs.Int32 import Int32 + + +class TestB1Connection: + """Test suite for B1 connection module with Timer implementation.""" + + def test_watchdog_actually_zeros_commands(self): + """Test that watchdog thread zeros commands after timeout.""" + conn = TestB1ConnectionModule(ip="127.0.0.1", port=9090) + conn.running = True + conn.watchdog_running = True + conn.send_thread = threading.Thread(target=conn._send_loop, daemon=True) + conn.send_thread.start() + conn.watchdog_thread = threading.Thread(target=conn._watchdog_loop, daemon=True) + conn.watchdog_thread.start() + + # Send a forward command + twist_stamped = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(1.0, 0, 0), + angular=Vector3(0, 0, 0), + ) + conn.handle_twist_stamped(twist_stamped) + + # Verify command is set + assert conn._current_cmd.ly == 1.0 + assert conn._current_cmd.mode == 2 + assert not conn.timeout_active + + # Wait for watchdog timeout (200ms + buffer) + time.sleep(0.3) + + # Verify commands were zeroed by watchdog + assert conn._current_cmd.ly == 0.0 + assert conn._current_cmd.lx == 0.0 + assert conn._current_cmd.rx == 0.0 + assert conn._current_cmd.ry == 0.0 + assert conn._current_cmd.mode == 2 # Mode maintained + assert conn.timeout_active + + conn.running = False + conn.watchdog_running = False + conn.send_thread.join(timeout=0.5) + conn.watchdog_thread.join(timeout=0.5) + + def test_watchdog_resets_on_new_command(self): + """Test that watchdog timeout resets when new command arrives.""" + conn = TestB1ConnectionModule(ip="127.0.0.1", port=9090) + conn.running = True + conn.watchdog_running = True + conn.send_thread = threading.Thread(target=conn._send_loop, daemon=True) + conn.send_thread.start() + conn.watchdog_thread = threading.Thread(target=conn._watchdog_loop, daemon=True) + conn.watchdog_thread.start() + + # Send first command + twist1 = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(1.0, 0, 0), + angular=Vector3(0, 0, 0), + ) + conn.handle_twist_stamped(twist1) + assert conn._current_cmd.ly == 1.0 + + # Wait 150ms (not enough to trigger timeout) + time.sleep(0.15) + + # Send second command before timeout + twist2 = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(0.5, 0, 0), + angular=Vector3(0, 0, 0), + ) + conn.handle_twist_stamped(twist2) + + # Command should be updated and no timeout + assert conn._current_cmd.ly == 0.5 + assert not conn.timeout_active + + # Wait another 150ms (total 300ms from second command) + time.sleep(0.15) + # Should still not timeout since we reset the timer + assert not conn.timeout_active + assert conn._current_cmd.ly == 0.5 + + conn.running = False + conn.watchdog_running = False + conn.send_thread.join(timeout=0.5) + conn.watchdog_thread.join(timeout=0.5) + + def test_watchdog_thread_efficiency(self): + """Test that watchdog uses only one thread regardless of command rate.""" + conn = TestB1ConnectionModule(ip="127.0.0.1", port=9090) + conn.running = True + conn.watchdog_running = True + conn.send_thread = threading.Thread(target=conn._send_loop, daemon=True) + conn.send_thread.start() + conn.watchdog_thread = threading.Thread(target=conn._watchdog_loop, daemon=True) + conn.watchdog_thread.start() + + # Count threads before sending commands + initial_thread_count = threading.active_count() + + # Send many commands rapidly (would create many Timer threads in old implementation) + for i in range(50): + twist = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(i * 0.01, 0, 0), + angular=Vector3(0, 0, 0), + ) + conn.handle_twist_stamped(twist) + time.sleep(0.01) # 100Hz command rate + + # Thread count should be same (no new threads created) + final_thread_count = threading.active_count() + assert final_thread_count == initial_thread_count, "No new threads should be created" + + conn.running = False + conn.watchdog_running = False + conn.send_thread.join(timeout=0.5) + conn.watchdog_thread.join(timeout=0.5) + + def test_watchdog_with_send_loop_blocking(self): + """Test that watchdog still works if send loop blocks.""" + conn = TestB1ConnectionModule(ip="127.0.0.1", port=9090) + + # Mock the send loop to simulate blocking + original_send_loop = conn._send_loop + block_event = threading.Event() + + def blocking_send_loop(): + # Block immediately + block_event.wait() + # Then run normally + original_send_loop() + + conn._send_loop = blocking_send_loop + conn.running = True + conn.watchdog_running = True + conn.send_thread = threading.Thread(target=conn._send_loop, daemon=True) + conn.send_thread.start() + conn.watchdog_thread = threading.Thread(target=conn._watchdog_loop, daemon=True) + conn.watchdog_thread.start() + + # Send command + twist = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(1.0, 0, 0), + angular=Vector3(0, 0, 0), + ) + conn.handle_twist_stamped(twist) + assert conn._current_cmd.ly == 1.0 + + # Wait for watchdog timeout + time.sleep(0.3) + + # Watchdog should have zeroed commands despite blocked send loop + assert conn._current_cmd.ly == 0.0 + assert conn.timeout_active + + # Unblock send loop + block_event.set() + conn.running = False + conn.watchdog_running = False + conn.send_thread.join(timeout=0.5) + conn.watchdog_thread.join(timeout=0.5) + + def test_continuous_commands_prevent_timeout(self): + """Test that continuous commands prevent watchdog timeout.""" + conn = TestB1ConnectionModule(ip="127.0.0.1", port=9090) + conn.running = True + conn.watchdog_running = True + conn.send_thread = threading.Thread(target=conn._send_loop, daemon=True) + conn.send_thread.start() + conn.watchdog_thread = threading.Thread(target=conn._watchdog_loop, daemon=True) + conn.watchdog_thread.start() + + # Send commands continuously for 500ms (should prevent timeout) + start = time.time() + commands_sent = 0 + while time.time() - start < 0.5: + twist = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(0.5, 0, 0), + angular=Vector3(0, 0, 0), + ) + conn.handle_twist_stamped(twist) + commands_sent += 1 + time.sleep(0.05) # 50ms between commands (well under 200ms timeout) + + # Should never timeout + assert not conn.timeout_active, "Should not timeout with continuous commands" + assert conn._current_cmd.ly == 0.5, "Commands should still be active" + assert commands_sent >= 9, f"Should send at least 9 commands in 500ms, sent {commands_sent}" + + conn.running = False + conn.watchdog_running = False + conn.send_thread.join(timeout=0.5) + conn.watchdog_thread.join(timeout=0.5) + + def test_watchdog_timing_accuracy(self): + """Test that watchdog zeros commands at approximately 200ms.""" + conn = TestB1ConnectionModule(ip="127.0.0.1", port=9090) + conn.running = True + conn.watchdog_running = True + conn.send_thread = threading.Thread(target=conn._send_loop, daemon=True) + conn.send_thread.start() + conn.watchdog_thread = threading.Thread(target=conn._watchdog_loop, daemon=True) + conn.watchdog_thread.start() + + # Send command and record time + start_time = time.time() + twist = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(1.0, 0, 0), + angular=Vector3(0, 0, 0), + ) + conn.handle_twist_stamped(twist) + + # Wait for timeout checking periodically + timeout_time = None + while time.time() - start_time < 0.5: + if conn.timeout_active: + timeout_time = time.time() + break + time.sleep(0.01) + + assert timeout_time is not None, "Watchdog should timeout within 500ms" + + # Check timing (should be close to 200ms + up to 50ms watchdog interval) + elapsed = timeout_time - start_time + print(f"\nWatchdog timeout occurred at exactly {elapsed:.3f} seconds") + assert 0.19 <= elapsed <= 0.26, f"Watchdog timed out at {elapsed:.3f}s, expected ~0.2-0.25s" + + conn.running = False + conn.watchdog_running = False + conn.send_thread.join(timeout=0.5) + conn.watchdog_thread.join(timeout=0.5) + + def test_mode_changes_with_watchdog(self): + """Test that mode changes work correctly with watchdog.""" + conn = TestB1ConnectionModule(ip="127.0.0.1", port=9090) + conn.running = True + conn.watchdog_running = True + conn.send_thread = threading.Thread(target=conn._send_loop, daemon=True) + conn.send_thread.start() + conn.watchdog_thread = threading.Thread(target=conn._watchdog_loop, daemon=True) + conn.watchdog_thread.start() + + # Send walk command + twist = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(1.0, 0, 0), + angular=Vector3(0, 0, 0), + ) + conn.handle_twist_stamped(twist) + assert conn.current_mode == 2 + assert conn._current_cmd.ly == 1.0 + + # Wait for timeout first + time.sleep(0.25) + assert conn.timeout_active + assert conn._current_cmd.ly == 0.0 # Watchdog zeroed it + + # Now change mode to STAND + mode_msg = Int32() + mode_msg.data = 1 # STAND + conn.handle_mode(mode_msg) + assert conn.current_mode == 1 + assert conn._current_cmd.mode == 1 + # timeout_active stays true since we didn't send new movement commands + + conn.running = False + conn.watchdog_running = False + conn.send_thread.join(timeout=0.5) + conn.watchdog_thread.join(timeout=0.5) + + def test_watchdog_stops_movement_when_commands_stop(self): + """Verify watchdog zeros commands when packets stop being sent.""" + conn = TestB1ConnectionModule(ip="127.0.0.1", port=9090) + conn.running = True + conn.watchdog_running = True + conn.send_thread = threading.Thread(target=conn._send_loop, daemon=True) + conn.send_thread.start() + conn.watchdog_thread = threading.Thread(target=conn._watchdog_loop, daemon=True) + conn.watchdog_thread.start() + + # Simulate sending movement commands for a while + for i in range(5): + twist = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(1.0, 0, 0), + angular=Vector3(0, 0, 0.5), # Forward and turning + ) + conn.handle_twist_stamped(twist) + time.sleep(0.05) # Send at 20Hz + + # Verify robot is moving + assert conn._current_cmd.ly == 1.0 + assert conn._current_cmd.lx == -0.25 # angular.z * 0.5 -> lx (for turning) + assert conn.current_mode == 2 # WALK mode + assert not conn.timeout_active + + # Wait for watchdog to detect timeout (200ms + buffer) + time.sleep(0.3) + + assert conn.timeout_active, "Watchdog should have detected timeout" + assert conn._current_cmd.ly == 0.0, "Forward velocity should be zeroed" + assert conn._current_cmd.lx == 0.0, "Lateral velocity should be zeroed" + assert conn._current_cmd.rx == 0.0, "Rotation X should be zeroed" + assert conn._current_cmd.ry == 0.0, "Rotation Y should be zeroed" + assert conn.current_mode == 2, "Mode should stay as WALK" + + # Verify recovery works - send new command + twist = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(0.5, 0, 0), + angular=Vector3(0, 0, 0), + ) + conn.handle_twist_stamped(twist) + + # Give watchdog time to detect recovery + time.sleep(0.1) + + assert not conn.timeout_active, "Should recover from timeout" + assert conn._current_cmd.ly == 0.5, "Should accept new commands" + + conn.running = False + conn.watchdog_running = False + conn.send_thread.join(timeout=0.5) + conn.watchdog_thread.join(timeout=0.5) + + def test_rapid_command_thread_safety(self): + """Test thread safety with rapid commands from multiple threads.""" + conn = TestB1ConnectionModule(ip="127.0.0.1", port=9090) + conn.running = True + conn.watchdog_running = True + conn.send_thread = threading.Thread(target=conn._send_loop, daemon=True) + conn.send_thread.start() + conn.watchdog_thread = threading.Thread(target=conn._watchdog_loop, daemon=True) + conn.watchdog_thread.start() + + # Count initial threads + initial_threads = threading.active_count() + + # Send commands from multiple threads rapidly + def send_commands(thread_id): + for i in range(10): + twist = TwistStamped( + ts=time.time(), + frame_id="base_link", + linear=Vector3(thread_id * 0.1, 0, 0), + angular=Vector3(0, 0, 0), + ) + conn.handle_twist_stamped(twist) + time.sleep(0.01) + + threads = [] + for i in range(3): + t = threading.Thread(target=send_commands, args=(i,)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + # Thread count should only increase by the 3 sender threads we created + # No additional Timer threads should be created + final_threads = threading.active_count() + assert final_threads <= initial_threads, "No extra threads should be created by watchdog" + + # Commands should still work correctly + assert conn._current_cmd.ly >= 0, "Last command should be set" + assert not conn.timeout_active, "Should not be in timeout with recent commands" + + conn.running = False + conn.watchdog_running = False + conn.send_thread.join(timeout=0.5) + conn.watchdog_thread.join(timeout=0.5) diff --git a/dimos/robot/unitree_webrtc/unitree_b1/unitree_b1.py b/dimos/robot/unitree_webrtc/unitree_b1/unitree_b1.py index a81d75cc2f..77cf3ce19c 100644 --- a/dimos/robot/unitree_webrtc/unitree_b1/unitree_b1.py +++ b/dimos/robot/unitree_webrtc/unitree_b1/unitree_b1.py @@ -25,10 +25,13 @@ from typing import Optional from dimos import core -from dimos.msgs.geometry_msgs import Twist +from dimos.msgs.geometry_msgs import Twist, TwistStamped, PoseStamped +from dimos.msgs.nav_msgs.Odometry import Odometry +from dimos.msgs.tf2_msgs.TFMessage import TFMessage from dimos.msgs.std_msgs import Int32 from dimos.protocol.pubsub.lcmpubsub import LCM from dimos.robot.robot import Robot +from dimos.robot.ros_bridge import ROSBridge, BridgeDirection from dimos.robot.unitree_webrtc.unitree_b1.connection import ( B1ConnectionModule, TestB1ConnectionModule, @@ -37,6 +40,19 @@ from dimos.types.robot_capabilities import RobotCapability from dimos.utils.logging_config import setup_logger +# Handle ROS imports for environments where ROS is not available like CI +try: + from geometry_msgs.msg import TwistStamped as ROSTwistStamped + from nav_msgs.msg import Odometry as ROSOdometry + from tf2_msgs.msg import TFMessage as ROSTFMessage + + ROS_AVAILABLE = True +except ImportError: + ROSTwistStamped = None + ROSOdometry = None + ROSTFMessage = None + ROS_AVAILABLE = False + logger = setup_logger("dimos.robot.unitree_webrtc.unitree_b1", level=logging.INFO) @@ -56,6 +72,7 @@ def __init__( output_dir: str = None, skill_library: Optional[SkillLibrary] = None, enable_joystick: bool = False, + enable_ros_bridge: bool = True, test_mode: bool = False, ): """Initialize the B1 robot. @@ -66,6 +83,7 @@ def __init__( output_dir: Directory for saving outputs skill_library: Skill library instance (optional) enable_joystick: Enable pygame joystick control module + enable_ros_bridge: Enable ROS bridge for external control test_mode: Test mode - print commands instead of sending UDP """ super().__init__() @@ -73,10 +91,12 @@ def __init__( self.port = port self.output_dir = output_dir or os.path.join(os.getcwd(), "assets", "output") self.enable_joystick = enable_joystick + self.enable_ros_bridge = enable_ros_bridge self.test_mode = test_mode self.capabilities = [RobotCapability.LOCOMOTION] self.connection = None self.joystick = None + self.ros_bridge = None os.makedirs(self.output_dir, exist_ok=True) logger.info(f"Robot outputs will be saved to: {self.output_dir}") @@ -93,16 +113,18 @@ def start(self): else: self.connection = self.dimos.deploy(B1ConnectionModule, self.ip, self.port) - # Configure LCM transports for connection - self.connection.cmd_vel.transport = core.LCMTransport("/cmd_vel", Twist) + # Configure LCM transports for connection (matching G1 pattern) + self.connection.cmd_vel.transport = core.LCMTransport("/cmd_vel", TwistStamped) self.connection.mode_cmd.transport = core.LCMTransport("/b1/mode", Int32) + self.connection.odom_in.transport = core.LCMTransport("/state_estimation", Odometry) + self.connection.odom_pose.transport = core.LCMTransport("/odom", PoseStamped) # Deploy joystick move_vel control if self.enable_joystick: from dimos.robot.unitree_webrtc.unitree_b1.joystick_module import JoystickModule self.joystick = self.dimos.deploy(JoystickModule) - self.joystick.twist_out.transport = core.LCMTransport("/cmd_vel", Twist) + self.joystick.twist_out.transport = core.LCMTransport("/cmd_vel", TwistStamped) self.joystick.mode_out.transport = core.LCMTransport("/b1/mode", Int32) logger.info("Joystick module deployed - pygame window will open") @@ -113,20 +135,47 @@ def start(self): if self.joystick: self.joystick.start() + # Deploy ROS bridge if enabled (matching G1 pattern) + if self.enable_ros_bridge: + self._deploy_ros_bridge() + logger.info(f"UnitreeB1 initialized - UDP control to {self.ip}:{self.port}") if self.enable_joystick: logger.info("Pygame joystick module enabled for testing") + if self.enable_ros_bridge: + logger.info("ROS bridge enabled for external control") + + def _deploy_ros_bridge(self): + """Deploy and configure ROS bridge (matching G1 implementation).""" + self.ros_bridge = ROSBridge("b1_ros_bridge") + + # Add /cmd_vel topic from ROS to DIMOS + self.ros_bridge.add_topic( + "/cmd_vel", TwistStamped, ROSTwistStamped, direction=BridgeDirection.ROS_TO_DIMOS + ) + + # Add /state_estimation topic from ROS to DIMOS (external odometry) + self.ros_bridge.add_topic( + "/state_estimation", Odometry, ROSOdometry, direction=BridgeDirection.ROS_TO_DIMOS + ) + + # Add /tf topic from ROS to DIMOS + self.ros_bridge.add_topic( + "/tf", TFMessage, ROSTFMessage, direction=BridgeDirection.ROS_TO_DIMOS + ) + + logger.info("ROS bridge deployed: /cmd_vel, /state_estimation, /tf (ROS → DIMOS)") # Robot control methods (standard interface) - def move(self, twist: Twist, duration: float = 0.0): - """Send movement command to robot using standard Twist. + def move(self, twist_stamped: TwistStamped, duration: float = 0.0): + """Send movement command to robot using timestamped Twist. Args: - twist: Twist message with linear and angular velocities + twist_stamped: TwistStamped message with linear and angular velocities duration: How long to move (not used for B1) """ if self.connection: - self.connection.move(twist, duration) + self.connection.move(twist_stamped, duration) def stop(self): """Stop all robot movement.""" @@ -151,22 +200,34 @@ def idle(self): self.connection.idle() logger.info("B1 switched to IDLE mode") - def cleanup(self): - """Clean up robot resources.""" - logger.info("Cleaning up B1 robot...") + def shutdown(self): + """Shutdown the robot and clean up resources.""" + logger.info("Shutting down UnitreeB1...") # Stop robot movement self.stop() - # Clean up modules + # Shutdown ROS bridge if it exists + if self.ros_bridge is not None: + try: + self.ros_bridge.shutdown() + logger.info("ROS bridge shut down successfully") + except Exception as e: + logger.error(f"Error shutting down ROS bridge: {e}") + + # Clean up connection module if self.connection: self.connection.cleanup() - logger.info("B1 cleanup complete") + logger.info("UnitreeB1 shutdown complete") + + def cleanup(self): + """Clean up robot resources (calls shutdown for consistency).""" + self.shutdown() def __del__(self): """Destructor to ensure cleanup.""" - self.cleanup() + self.shutdown() def main(): @@ -177,6 +238,10 @@ def main(): parser.add_argument("--ip", default="192.168.12.1", help="Robot IP address") parser.add_argument("--port", type=int, default=9090, help="UDP port") parser.add_argument("--joystick", action="store_true", help="Enable pygame joystick control") + parser.add_argument("--ros-bridge", action="store_true", default=True, help="Enable ROS bridge") + parser.add_argument( + "--no-ros-bridge", dest="ros_bridge", action="store_false", help="Disable ROS bridge" + ) parser.add_argument("--output-dir", help="Output directory for logs/data") parser.add_argument( "--test", action="store_true", help="Test mode - print commands instead of UDP" @@ -189,6 +254,7 @@ def main(): port=args.port, output_dir=args.output_dir, enable_joystick=args.joystick, + enable_ros_bridge=args.ros_bridge, test_mode=args.test, ) @@ -216,7 +282,10 @@ def main(): # Manual control example print("\nB1 Robot ready for commands") print("Use robot.idle(), robot.stand(), robot.walk() to change modes") - print("Use robot.move(Twist(...)) to send velocity commands") + if args.ros_bridge: + print("ROS bridge active - listening for /cmd_vel and /state_estimation") + else: + print("Use robot.move(TwistStamped(...)) to send velocity commands") print("Press Ctrl+C to exit\n") import time