diff --git a/assets/foxglove_unitree_lcm_dashboard.json b/assets/foxglove_unitree_lcm_dashboard.json new file mode 100644 index 0000000000..932ee65fd5 --- /dev/null +++ b/assets/foxglove_unitree_lcm_dashboard.json @@ -0,0 +1,221 @@ +{ + "configById": { + "3D!18i6zy7": { + "layers": { + "845139cb-26bc-40b3-8161-8ab60af4baf5": { + "visible": true, + "frameLocked": true, + "label": "Grid", + "instanceId": "845139cb-26bc-40b3-8161-8ab60af4baf5", + "layerId": "foxglove.Grid", + "lineWidth": 0.5, + "position": [ + 0, + 0, + 0 + ], + "rotation": [ + 0, + 0, + 0 + ], + "order": 1, + "size": 30, + "divisions": 30 + } + }, + "cameraState": { + "perspective": true, + "distance": 22.16066481982733, + "phi": 34.816360162458444, + "thetaOffset": 110.38709006815255, + "targetOffset": [ + 0.6543165797799305, + -5.069343424147507, + 5.603018248159094e-16 + ], + "target": [ + 0, + 0, + 0 + ], + "targetOrientation": [ + 0, + 0, + 0, + 1 + ], + "fovy": 45, + "near": 0.5, + "far": 5000 + }, + "followMode": "follow-pose", + "scene": { + "enableStats": true, + "ignoreColladaUpAxis": false, + "syncCamera": false + }, + "transforms": {}, + "topics": { + "/lidar": { + "stixelsEnabled": false, + "visible": true, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointShape": "square", + "pointSize": 10, + "explicitAlpha": 1, + "minValue": -0.1, + "decayTime": 0, + "cubeSize": 0.06 + }, + "/odom": { + "visible": true, + "axisScale": 1 + }, + "/video": { + "visible": false + }, + "/global_map": { + "visible": true, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointSize": 10, + "minValue": -0.1, + "decayTime": 0, + "pointShape": "square", + "cubeOutline": false, + "cubeSize": 0.05, + "gradient": [ + "#06011dff", + "#d1e2e2ff" + ], + "stixelsEnabled": false + } + }, + "publish": { + "type": "point", + "poseTopic": "/move_base_simple/goal", + "pointTopic": "/clicked_point", + "poseEstimateTopic": "/estimate", + "poseEstimateXDeviation": 0.5, + "poseEstimateYDeviation": 0.5, + "poseEstimateThetaDeviation": 0.26179939 + }, + "imageMode": {}, + "foxglovePanelTitle": "test" + }, + "Image!3mnp456": { + "cameraState": { + "distance": 20, + "perspective": true, + "phi": 60, + "target": [ + 0, + 0, + 0 + ], + "targetOffset": [ + 0, + 0, + 0 + ], + "targetOrientation": [ + 0, + 0, + 0, + 1 + ], + "thetaOffset": 45, + "fovy": 45, + "near": 0.5, + "far": 5000 + }, + "followMode": "follow-pose", + "scene": { + "enableStats": true + }, + "transforms": {}, + "topics": {}, + "layers": {}, + "publish": { + "type": "point", + "poseTopic": "/move_base_simple/goal", + "pointTopic": "/clicked_point", + "poseEstimateTopic": "/initialpose", + "poseEstimateXDeviation": 0.5, + "poseEstimateYDeviation": 0.5, + "poseEstimateThetaDeviation": 0.26179939 + }, + "imageMode": { + "imageTopic": "/video", + "colorMode": "gradient" + }, + "foxglovePanelTitle": "/video" + }, + "RawMessages!os6rgs": { + "diffEnabled": true, + "diffMethod": "previous message", + "diffTopicPath": "/lidar", + "showFullMessageForDiff": false, + "topicPath": "/odom", + "fontSize": 12 + }, + "Plot!a1gj37": { + "paths": [ + { + "timestampMethod": "receiveTime", + "value": "/odom.pose.position.y", + "enabled": true, + "color": "#4e98e2" + }, + { + "timestampMethod": "receiveTime", + "value": "/odom.pose.position.x", + "enabled": true, + "color": "#f5774d" + }, + { + "timestampMethod": "receiveTime", + "value": "/odom.pose.position.z", + "enabled": true, + "color": "#f7df71" + } + ], + "showXAxisLabels": true, + "showYAxisLabels": true, + "showLegend": true, + "legendDisplay": "floating", + "showPlotValuesInLegend": false, + "isSynced": true, + "xAxisVal": "timestamp", + "sidebarDimension": 240 + } + }, + "globalVariables": {}, + "userNodes": {}, + "playbackConfig": { + "speed": 1 + }, + "drawerConfig": { + "tracks": [] + }, + "layout": { + "first": "3D!18i6zy7", + "second": { + "first": "Image!3mnp456", + "second": { + "first": "RawMessages!os6rgs", + "second": "Plot!a1gj37", + "direction": "row", + "splitPercentage": 21.133036282622545 + }, + "direction": "column", + "splitPercentage": 45.83732057416268 + }, + "direction": "row", + "splitPercentage": 61.01509144891709 + } +} diff --git a/bin/lcmspy b/bin/lcmspy new file mode 100755 index 0000000000..0efa14fce3 --- /dev/null +++ b/bin/lcmspy @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +# current script dir + ..dimos + + +script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +python $script_dir/../dimos/utils/cli/lcmspy_cli.py "$@" diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 09011caa03..5df6d4e803 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -4,7 +4,6 @@ import time from typing import Optional -import pytest from dask.distributed import Client, LocalCluster from rich.console import Console @@ -82,14 +81,6 @@ def deploy( return dask_client -@pytest.fixture -def dimos(): - process_count = 3 # we chill - client = start(process_count) - yield client - stop(client) - - def start(n: Optional[int] = None) -> Client: console = Console() if not n: diff --git a/dimos/core/test_core.py b/dimos/core/test_core.py index e71036c402..ace435b54b 100644 --- a/dimos/core/test_core.py +++ b/dimos/core/test_core.py @@ -24,10 +24,10 @@ Out, RemoteOut, ZenohTransport, - dimos, pLCMTransport, rpc, start, + stop, ) from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.odometry import Odometry @@ -35,8 +35,14 @@ from dimos.utils.testing import SensorReplay # never delete this line -if dimos: - ... + + +@pytest.fixture +def dimos(): + """Fixture to create a Dimos client for testing.""" + client = start(2) + yield client + stop(client) class RobotClient(Module): diff --git a/dimos/msgs/geometry_msgs/PoseStamped.py b/dimos/msgs/geometry_msgs/PoseStamped.py index bfe5a32481..2a35ccf445 100644 --- a/dimos/msgs/geometry_msgs/PoseStamped.py +++ b/dimos/msgs/geometry_msgs/PoseStamped.py @@ -48,7 +48,7 @@ class PoseStamped(Pose, Timestamped): @dispatch def __init__(self, *args, ts: float = 0, frame_id: str = "", **kwargs) -> None: self.frame_id = frame_id - self.ts = ts if ts is not 0 else time.time() + self.ts = ts if ts != 0 else time.time() super().__init__(*args, **kwargs) def lcm_encode(self) -> bytes: diff --git a/dimos/protocol/rpc/spec.py b/dimos/protocol/rpc/spec.py index c9c2ca88a9..da329f4f1b 100644 --- a/dimos/protocol/rpc/spec.py +++ b/dimos/protocol/rpc/spec.py @@ -81,7 +81,7 @@ def call(*args, fname=fname): class RPCServer(Protocol): - def serve(self, f: Callable, name: str) -> None: ... + def serve_rpc(self, f: Callable, name: str) -> None: ... class RPC(RPCServer, RPCClient): ... diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py index 516354642b..5f8c747864 100644 --- a/dimos/protocol/service/lcmservice.py +++ b/dimos/protocol/service/lcmservice.py @@ -18,6 +18,8 @@ import sys import threading import traceback +import os +from functools import cache from dataclasses import dataclass from typing import Any, Callable, Optional, Protocol, runtime_checkable @@ -26,17 +28,29 @@ from dimos.protocol.service.spec import Service +@cache +def check_root() -> bool: + """Return True if the current process is running as root (UID 0).""" + try: + return os.geteuid() == 0 # type: ignore[attr-defined] + except AttributeError: + # Platforms without geteuid (e.g. Windows) – assume non-root. + return False + + def check_multicast() -> list[str]: """Check if multicast configuration is needed and return required commands.""" commands_needed = [] + sudo = "" if check_root() else "sudo " + # Check if loopback interface has multicast enabled try: result = subprocess.run(["ip", "link", "show", "lo"], capture_output=True, text=True) if "MULTICAST" not in result.stdout: - commands_needed.append("sudo ifconfig lo multicast") + commands_needed.append(f"{sudo}ifconfig lo multicast") except Exception: - commands_needed.append("sudo ifconfig lo multicast") + commands_needed.append(f"{sudo}ifconfig lo multicast") # Check if multicast route exists try: @@ -44,9 +58,9 @@ def check_multicast() -> list[str]: ["ip", "route", "show", "224.0.0.0/4"], capture_output=True, text=True ) if not result.stdout.strip(): - commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") + commands_needed.append(f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") except Exception: - commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") + commands_needed.append(f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") return commands_needed @@ -55,22 +69,24 @@ def check_buffers() -> list[str]: """Check if buffer configuration is needed and return required commands.""" commands_needed = [] + sudo = "" if check_root() else "sudo " + # Check current buffer settings try: result = subprocess.run(["sysctl", "net.core.rmem_max"], capture_output=True, text=True) current_max = int(result.stdout.split("=")[1].strip()) if current_max < 2097152: - commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_max=2097152") except Exception: - commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_max=2097152") try: result = subprocess.run(["sysctl", "net.core.rmem_default"], capture_output=True, text=True) current_default = int(result.stdout.split("=")[1].strip()) if current_default < 2097152: - commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_default=2097152") except Exception: - commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_default=2097152") return commands_needed diff --git a/dimos/protocol/service/test_lcmservice.py b/dimos/protocol/service/test_lcmservice.py index 53d8c7fd12..c5b86cac35 100644 --- a/dimos/protocol/service/test_lcmservice.py +++ b/dimos/protocol/service/test_lcmservice.py @@ -23,9 +23,15 @@ autoconf, check_buffers, check_multicast, + check_root, ) +def get_sudo_prefix() -> str: + """Return 'sudo ' if not running as root, empty string if running as root.""" + return "" if check_root() else "sudo " + + def test_check_multicast_all_configured(): """Test check_multicast when system is properly configured.""" with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: @@ -63,7 +69,8 @@ def test_check_multicast_missing_multicast_flag(): ] result = check_multicast() - assert result == ["sudo ifconfig lo multicast"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}ifconfig lo multicast"] def test_check_multicast_missing_route(): @@ -83,7 +90,8 @@ def test_check_multicast_missing_route(): ] result = check_multicast() - assert result == ["sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo"] def test_check_multicast_all_missing(): @@ -103,9 +111,10 @@ def test_check_multicast_all_missing(): ] result = check_multicast() + sudo = get_sudo_prefix() expected = [ - "sudo ifconfig lo multicast", - "sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", + f"{sudo}ifconfig lo multicast", + f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", ] assert result == expected @@ -117,9 +126,10 @@ def test_check_multicast_subprocess_exception(): mock_run.side_effect = Exception("Command failed") result = check_multicast() + sudo = get_sudo_prefix() expected = [ - "sudo ifconfig lo multicast", - "sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", + f"{sudo}ifconfig lo multicast", + f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", ] assert result == expected @@ -151,7 +161,8 @@ def test_check_buffers_low_max_buffer(): ] result = check_buffers() - assert result == ["sudo sysctl -w net.core.rmem_max=2097152"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}sysctl -w net.core.rmem_max=2097152"] def test_check_buffers_low_default_buffer(): @@ -166,7 +177,8 @@ def test_check_buffers_low_default_buffer(): ] result = check_buffers() - assert result == ["sudo sysctl -w net.core.rmem_default=2097152"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}sysctl -w net.core.rmem_default=2097152"] def test_check_buffers_both_low(): @@ -181,9 +193,10 @@ def test_check_buffers_both_low(): ] result = check_buffers() + sudo = get_sudo_prefix() expected = [ - "sudo sysctl -w net.core.rmem_max=2097152", - "sudo sysctl -w net.core.rmem_default=2097152", + f"{sudo}sysctl -w net.core.rmem_max=2097152", + f"{sudo}sysctl -w net.core.rmem_default=2097152", ] assert result == expected @@ -195,9 +208,10 @@ def test_check_buffers_subprocess_exception(): mock_run.side_effect = Exception("Command failed") result = check_buffers() + sudo = get_sudo_prefix() expected = [ - "sudo sysctl -w net.core.rmem_max=2097152", - "sudo sysctl -w net.core.rmem_default=2097152", + f"{sudo}sysctl -w net.core.rmem_max=2097152", + f"{sudo}sysctl -w net.core.rmem_default=2097152", ] assert result == expected @@ -212,9 +226,10 @@ def test_check_buffers_parsing_error(): ] result = check_buffers() + sudo = get_sudo_prefix() expected = [ - "sudo sysctl -w net.core.rmem_max=2097152", - "sudo sysctl -w net.core.rmem_default=2097152", + f"{sudo}sysctl -w net.core.rmem_max=2097152", + f"{sudo}sysctl -w net.core.rmem_default=2097152", ] assert result == expected @@ -267,29 +282,26 @@ def test_autoconf_with_config_needed_success(): # Command execution calls type( "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo ifconfig lo multicast - type("MockResult", (), {"stdout": "success", "returncode": 0})(), # sudo route add... - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo sysctl rmem_max - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo sysctl rmem_default + )(), # ifconfig lo multicast + type("MockResult", (), {"stdout": "success", "returncode": 0})(), # route add... + type("MockResult", (), {"stdout": "success", "returncode": 0})(), # sysctl rmem_max + type("MockResult", (), {"stdout": "success", "returncode": 0})(), # sysctl rmem_default ] with patch("builtins.print") as mock_print: autoconf() + sudo = get_sudo_prefix() # Verify the expected print calls expected_calls = [ ("System configuration required. Executing commands...",), - (" Running: sudo ifconfig lo multicast",), + (f" Running: {sudo}ifconfig lo multicast",), (" ✓ Success",), - (" Running: sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo",), + (f" Running: {sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo",), (" ✓ Success",), - (" Running: sudo sysctl -w net.core.rmem_max=2097152",), + (f" Running: {sudo}sysctl -w net.core.rmem_max=2097152",), (" ✓ Success",), - (" Running: sudo sysctl -w net.core.rmem_default=2097152",), + (f" Running: {sudo}sysctl -w net.core.rmem_default=2097152",), (" ✓ Success",), ("System configuration completed.",), ] @@ -318,20 +330,11 @@ def test_autoconf_with_command_failures(): # Command execution calls - first succeeds, second fails type( "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo ifconfig lo multicast + )(), # ifconfig lo multicast subprocess.CalledProcessError( 1, - [ - "sudo", - "route", - "add", - "-net", - "224.0.0.0", - "netmask", - "240.0.0.0", - "dev", - "lo", - ], + get_sudo_prefix().split() + + ["route", "add", "-net", "224.0.0.0", "netmask", "240.0.0.0", "dev", "lo"], "Permission denied", "Operation not permitted", ), diff --git a/dimos/robot/global_planner/planner.py b/dimos/robot/global_planner/planner.py index bd717ed959..55eea616a0 100644 --- a/dimos/robot/global_planner/planner.py +++ b/dimos/robot/global_planner/planner.py @@ -17,7 +17,7 @@ from dataclasses import dataclass from typing import Callable, Optional -from dimos.core import In, Module, Out +from dimos.core import In, Module, Out, rpc from dimos.msgs.geometry_msgs import Vector3 from dimos.robot.global_planner.algo import astar from dimos.types.costmap import Costmap @@ -71,7 +71,8 @@ def __init__( self.get_costmap = get_costmap self.get_robot_pos = get_robot_pos - async def start(self): + @rpc + def start(self): self.target.subscribe(self.plan) def plan(self, goal: VectorLike) -> Path: diff --git a/dimos/robot/local_planner/simple.py b/dimos/robot/local_planner/simple.py index 7295909c8c..46abd7c303 100644 --- a/dimos/robot/local_planner/simple.py +++ b/dimos/robot/local_planner/simple.py @@ -15,13 +15,14 @@ import math import time from dataclasses import dataclass -from typing import Callable, Optional +from typing import Any, Callable, Optional import reactivex as rx from plum import dispatch from reactivex import operators as ops -from dimos.core import In, Module, Out +from dimos.core import In, Module, Out, rpc +from dimos.msgs.geometry_msgs import Vector3 # from dimos.robot.local_planner.local_planner import LocalPlanner from dimos.types.costmap import Costmap @@ -64,17 +65,24 @@ def transform_to_robot_frame(global_vector: Vector, robot_position: Position) -> class SimplePlanner(Module): path: In[Path] = None - movecmd: Out[Vector] = None + movecmd: Out[Vector3] = None get_costmap: Callable[[], Costmap] get_robot_pos: Callable[[], Position] + set_move: Callable[[Vector3], Any] goal: Optional[Vector] = None speed: float = 0.3 - def __init__(self, get_costmap: Callable[[], Costmap], get_robot_pos: Callable[[], Vector]): + def __init__( + self, + get_costmap: Callable[[], Costmap], + get_robot_pos: Callable[[], Vector], + set_move: Callable[[Vector3], Any], + ): Module.__init__(self) self.get_costmap = get_costmap self.get_robot_pos = get_robot_pos + self.set_move = set_move def get_move_stream(self, frequency: float = 40.0) -> rx.Observable: return rx.interval(1.0 / frequency, scheduler=get_scheduler()).pipe( @@ -85,9 +93,12 @@ def get_move_stream(self, frequency: float = 40.0) -> rx.Observable: self.frequency_spy("movement_test"), ) - async def start(self): + @rpc + def start(self): self.path.subscribe(self.set_goal) - self.get_move_stream(frequency=20.0).subscribe(self.movecmd.publish) + # weird bug with this + # self.get_move_stream(frequency=20.0).subscribe(self.movecmd.publish) + self.get_move_stream(frequency=20.0).subscribe(self.set_move) @dispatch def set_goal(self, goal: Path, stop_event=None, goal_theta=None) -> bool: @@ -171,11 +182,11 @@ def _test_translational_movement(self) -> Vector: if phase < 0.5: # First half: move LEFT (positive X according to our documentation) - movement = Vector(0.2, 0, 0) # Move left at 0.2 m/s + movement = Vector3(0.2, 0, 0) # Move left at 0.2 m/s direction = "LEFT (positive X)" else: # Second half: move RIGHT (negative X according to our documentation) - movement = Vector(-0.2, 0, 0) # Move right at 0.2 m/s + movement = Vector3(-0.2, 0, 0) # Move right at 0.2 m/s direction = "RIGHT (negative X)" print("=== LEFT-RIGHT MOVEMENT TEST ===") diff --git a/dimos/robot/unitree_webrtc/connection.py b/dimos/robot/unitree_webrtc/connection.py index df8469a98b..711adf640a 100644 --- a/dimos/robot/unitree_webrtc/connection.py +++ b/dimos/robot/unitree_webrtc/connection.py @@ -30,6 +30,7 @@ from reactivex.subject import Subject from dimos.core import In, Module, Out, rpc +from dimos.msgs.sensor_msgs import Image from dimos.robot.connection_interface import ConnectionInterface from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.lowstate import LowStateMsg @@ -132,9 +133,25 @@ async def async_move_duration(): # Generic conversion of unitree subscription to Subject (used for all subs) def unitree_sub_stream(self, topic_name: str): + def subscribe_in_thread(cb): + # Run the subscription in the background thread that has the event loop + def run_subscription(): + self.conn.datachannel.pub_sub.subscribe(topic_name, cb) + + # Use call_soon_threadsafe to run in the background thread + self.loop.call_soon_threadsafe(run_subscription) + + def unsubscribe_in_thread(cb): + # Run the unsubscription in the background thread that has the event loop + def run_unsubscription(): + self.conn.datachannel.pub_sub.unsubscribe(topic_name) + + # Use call_soon_threadsafe to run in the background thread + self.loop.call_soon_threadsafe(run_unsubscription) + return callback_to_observable( - start=lambda cb: self.conn.datachannel.pub_sub.subscribe(topic_name, cb), - stop=lambda: self.conn.datachannel.pub_sub.unsubscribe(topic_name), + start=subscribe_in_thread, + stop=unsubscribe_in_thread, ) # Generic sync API call (we jump into the client thread) @@ -217,15 +234,25 @@ async def accept_track(track: MediaStreamTrack) -> VideoMessage: if stop_event.is_set(): return frame = await track.recv() - subject.on_next(frame.to_ndarray(format="bgr24")) + subject.on_next(Image.from_numpy(frame.to_ndarray(format="bgr24"))) self.conn.video.add_track_callback(accept_track) - self.conn.video.switchVideoChannel(True) + + # Run the video channel switching in the background thread + def switch_video_channel(): + self.conn.video.switchVideoChannel(True) + + self.loop.call_soon_threadsafe(switch_video_channel) def stop(cb): stop_event.set() # Signal the loop to stop self.conn.video.track_callbacks.remove(accept_track) - self.conn.video.switchVideoChannel(False) + + # Run the video channel switching off in the background thread + def switch_video_channel_off(): + self.conn.video.switchVideoChannel(False) + + self.loop.call_soon_threadsafe(switch_video_channel_off) return subject.pipe(ops.finally_action(stop)) @@ -247,6 +274,7 @@ def get_video_stream(self, fps: int = 30) -> Observable[VideoMessage]: if stream is None: print("Warning: Video stream is not available") return stream + except Exception as e: print(f"Error getting video stream: {e}") return None diff --git a/dimos/robot/unitree_webrtc/multiprocess_individual_node.py b/dimos/robot/unitree_webrtc/multiprocess/individual_node_example.py similarity index 100% rename from dimos/robot/unitree_webrtc/multiprocess_individual_node.py rename to dimos/robot/unitree_webrtc/multiprocess/individual_node_example.py diff --git a/dimos/robot/unitree_webrtc/test_actors.py b/dimos/robot/unitree_webrtc/multiprocess/test_actors.py similarity index 100% rename from dimos/robot/unitree_webrtc/test_actors.py rename to dimos/robot/unitree_webrtc/multiprocess/test_actors.py diff --git a/dimos/robot/unitree_webrtc/test_tooling.py b/dimos/robot/unitree_webrtc/multiprocess/test_tooling.py similarity index 100% rename from dimos/robot/unitree_webrtc/test_tooling.py rename to dimos/robot/unitree_webrtc/multiprocess/test_tooling.py diff --git a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py similarity index 53% rename from dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py rename to dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py index e2cb812ba0..659eafde0c 100644 --- a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py +++ b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py @@ -15,6 +15,7 @@ import asyncio import contextvars import functools +import threading import time from typing import Callable @@ -44,8 +45,18 @@ # can be swapped in for WebRTCRobot class FakeRTC(WebRTCRobot): + def __init__(self, *args, **kwargs): + # ensures we download msgs from lfs store + data = get_data("unitree_office_walk") + def connect(self): ... + def standup(self): + print("standup supressed") + + def liedown(self): + print("liedown supressed") + @functools.cache def lidar_stream(self): print("lidar stream start") @@ -59,17 +70,21 @@ def odom_stream(self): return odom_store.stream() @functools.cache - def video_stream(self): + def video_stream(self, freq_hz=0.5): print("video stream start") video_store = TimedSensorReplay("unitree_office_walk/video", autocast=Image.from_numpy) - return video_store.stream().pipe(ops.sample(0.5)) + return video_store.stream().pipe(ops.sample(freq_hz)) def move(self, vector: Vector): print("move supressed", vector) +class RealRTC(WebRTCRobot): ... + + +# inherit RealRTC instead of FakeRTC to run the real robot class ConnectionModule(FakeRTC, Module): - movecmd: In[Vector] = None + movecmd: In[Vector3] = None odom: Out[Vector3] = None lidar: Out[LidarMessage] = None video: Out[VideoMessage] = None @@ -78,27 +93,25 @@ class ConnectionModule(FakeRTC, Module): _odom: Callable[[], Odometry] _lidar: Callable[[], LidarMessage] + @rpc + def move(self, vector: Vector3): + super().move(vector) + def __init__(self, ip: str, *args, **kwargs): - Module.__init__(self, *args, **kwargs) self.ip = ip + Module.__init__(self, *args, **kwargs) @rpc def start(self): + super().__init__(self.ip) # ensure that LFS data is available - data = get_data("unitree_office_walk") - # Since TimedSensorReplay is now non-blocking, we can subscribe directly self.lidar_stream().subscribe(self.lidar.publish) self.odom_stream().subscribe(self.odom.publish) self.video_stream().subscribe(self.video.publish) - - print("movecmd sub") - self.movecmd.subscribe(print) - print("sub ok") + self.movecmd.subscribe(self.move) self._odom = getter_streaming(self.odom_stream()) self._lidar = getter_streaming(self.lidar_stream()) - print("ConnectionModule started") - @rpc def get_local_costmap(self) -> Costmap: return self._lidar().costmap() @@ -115,92 +128,92 @@ def get_pos(self) -> Vector: class ControlModule(Module): plancmd: Out[Vector3] = None - async def start(self): - async def plancmd(): - await asyncio.sleep(4) + @rpc + def start(self): + def plancmd(): + time.sleep(4) print(colors.red("requesting global plan")) - self.plancmd.publish(Vector3([0, 0, 0])) + self.plancmd.publish(Vector3([0.750893, -6.017522, 0.307474])) - asyncio.create_task(plancmd()) + thread = threading.Thread(target=plancmd, daemon=True) + thread.start() -class Unitree: - def __init__(self, ip: str): - self.ip = ip - - async def start(self): - dimos = None - if not dimos: - dimos = core.start(4) +async def run(ip): + dimos = core.start(4) + connection = dimos.deploy(ConnectionModule, ip) - connection = dimos.deploy(ConnectionModule, self.ip) + # This enables LCM transport + # Ensures system multicast, udp sizes are auto-adjusted if needed + # TODO: this doesn't seem to work atm and LCMTransport instantiation can fail + pubsub.lcm.autoconf() - # This enables LCM transport - # Ensures system multicast, udp sizes are auto-adjusted if needed - pubsub.lcm.autoconf() + connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage) + connection.odom.transport = core.LCMTransport("/odom", Odometry) + connection.video.transport = core.LCMTransport("/video", Image) + connection.movecmd.transport = core.LCMTransport("/move", Vector3) - connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage) - connection.odom.transport = core.LCMTransport("/odom", Odometry) - connection.video.transport = core.LCMTransport("/video", Image) - connection.movecmd.transport = core.LCMTransport("/move", Vector3) + mapper = dimos.deploy(Map, voxel_size=0.5, global_publish_interval=2.5) - mapper = dimos.deploy(Map, voxel_size=0.5) + mapper.global_map.transport = core.LCMTransport("/global_map", LidarMessage) - local_planner = dimos.deploy( - SimplePlanner, - get_costmap=connection.get_local_costmap, - get_robot_pos=connection.get_pos, - ) + local_planner = dimos.deploy( + SimplePlanner, + get_costmap=connection.get_local_costmap, + get_robot_pos=connection.get_pos, + set_move=connection.move, + ) - global_planner = dimos.deploy( - AstarPlanner, - get_costmap=mapper.costmap, - get_robot_pos=connection.get_pos, - ) + global_planner = dimos.deploy( + AstarPlanner, + get_costmap=mapper.costmap, + get_robot_pos=connection.get_pos, + ) - global_planner.path.transport = core.pLCMTransport("/global_path") + global_planner.path.transport = core.pLCMTransport("/global_path") - local_planner.path.connect(global_planner.path) - local_planner.movecmd.connect(connection.movecmd) + local_planner.path.connect(global_planner.path) + local_planner.movecmd.connect(connection.movecmd) - ctrl = dimos.deploy(ControlModule) + ctrl = dimos.deploy(ControlModule) - mapper.lidar.connect(connection.lidar) + mapper.lidar.connect(connection.lidar) - ctrl.plancmd.transport = core.LCMTransport("/global_target", Vector3) - global_planner.target.connect(ctrl.plancmd) + ctrl.plancmd.transport = core.LCMTransport("/global_target", Vector3) + global_planner.target.connect(ctrl.plancmd) - # we review the structure - print("\n") - for module in [connection, mapper, local_planner, global_planner, ctrl]: - print(module.io().result(), "\n") + # we review the structure + print("\n") + for module in [connection, mapper, local_planner, global_planner, ctrl]: + print(module.io().result(), "\n") - print(colors.green("starting mapper")) - mapper.start() + print(colors.green("starting mapper")) + mapper.start() - print(colors.green("starting connection")) - connection.start() + print(colors.green("starting connection")) + connection.start() - print(colors.green("local planner start")) - local_planner.start() + print(colors.green("local planner start")) + local_planner.start() - print(colors.green("starting global planner")) - global_planner.start() + print(colors.green("starting global planner")) + global_planner.start() - print(colors.green("starting ctrl")) - ctrl.start() + # uncomment to move the bot + # print(colors.green("starting ctrl")) + # ctrl.start() - print(colors.red("READY")) + print(colors.red("READY")) - await asyncio.sleep(3) - - print("querying system") - print(mapper.costmap()) - # global_planner.dask_receive_msg("target", Vector3([0, 0, 0])).result() - time.sleep(20) + await asyncio.sleep(10) + print("querying system") + print(mapper.costmap()) + while True: + await asyncio.sleep(1) if __name__ == "__main__": - unitree = Unitree("Bla") - asyncio.run(unitree.start()) - time.sleep(30) + import os + + asyncio.run(run(os.getenv("ROBOT_IP"))) + # asyncio.run(run("192.168.9.140")) diff --git a/dimos/robot/unitree_webrtc/type/map.py b/dimos/robot/unitree_webrtc/type/map.py index a9ead5d95d..898bd473b5 100644 --- a/dimos/robot/unitree_webrtc/type/map.py +++ b/dimos/robot/unitree_webrtc/type/map.py @@ -14,10 +14,11 @@ from dataclasses import dataclass from typing import Optional, Tuple - +import time import numpy as np import open3d as o3d import reactivex.operators as ops +from reactivex import interval from reactivex.observable import Observable from dimos.core import In, Module, Out, rpc @@ -27,17 +28,38 @@ class Map(Module): lidar: In[LidarMessage] = None + global_map: Out[LidarMessage] = None pointcloud: o3d.geometry.PointCloud = o3d.geometry.PointCloud() - def __init__(self, voxel_size: float = 0.05, cost_resolution: float = 0.05, **kwargs): + def __init__( + self, + voxel_size: float = 0.05, + cost_resolution: float = 0.05, + global_publish_interval: Optional[float] = None, + **kwargs, + ): self.voxel_size = voxel_size self.cost_resolution = cost_resolution + self.global_publish_interval = global_publish_interval super().__init__(**kwargs) @rpc def start(self): self.lidar.subscribe(self.add_frame) + if self.global_publish_interval is not None: + interval(self.global_publish_interval).subscribe( + lambda _: self.global_map.publish(self.to_lidar_message()) + ) + + def to_lidar_message(self) -> LidarMessage: + return LidarMessage( + pointcloud=self.pointcloud, + origin=[0.0, 0.0, 0.0], + resolution=self.voxel_size, + ts=time.time(), + ) + @rpc def add_frame(self, frame: LidarMessage) -> "Map": """Voxelise *frame* and splice it into the running map.""" diff --git a/dimos/utils/cli/lcmspy.py b/dimos/utils/cli/lcmspy.py new file mode 100755 index 0000000000..aa5b48b297 --- /dev/null +++ b/dimos/utils/cli/lcmspy.py @@ -0,0 +1,214 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import time +from collections import deque +from dataclasses import dataclass +from enum import Enum + +import lcm + +from dimos.protocol.service.lcmservice import LCMConfig, LCMService + + +class BandwidthUnit(Enum): + BP = "B" + KBP = "kB" + MBP = "MB" + GBP = "GB" + + +def human_readable_bytes(bytes_value: float, round_to: int = 2) -> tuple[float, BandwidthUnit]: + """Convert bytes to human-readable format with appropriate units""" + if bytes_value >= 1024**3: # GB + return round(bytes_value / (1024**3), round_to), BandwidthUnit.GBP + elif bytes_value >= 1024**2: # MB + return round(bytes_value / (1024**2), round_to), BandwidthUnit.MBP + elif bytes_value >= 1024: # KB + return round(bytes_value / 1024, round_to), BandwidthUnit.KBP + else: + return round(bytes_value, round_to), BandwidthUnit.BP + + +class Topic: + history_window: float = 60.0 + + def __init__(self, name: str, history_window: float = 60.0): + self.name = name + # Store (timestamp, data_size) tuples for statistics + self.message_history = deque() + self.history_window = history_window + # Total traffic accumulator (doesn't get cleaned up) + self.total_traffic_bytes = 0 + + def msg(self, data: bytes): + # print(f"> msg {self.__str__()} {len(data)} bytes") + datalen = len(data) + self.message_history.append((time.time(), datalen)) + self.total_traffic_bytes += datalen + self._cleanup_old_messages() + + def _cleanup_old_messages(self, max_age: float = None): + """Remove messages older than max_age seconds""" + current_time = time.time() + while self.message_history and current_time - self.message_history[0][0] > ( + max_age or self.history_window + ): + self.message_history.popleft() + + def _get_messages_in_window(self, time_window: float): + """Get messages within the specified time window""" + current_time = time.time() + cutoff_time = current_time - time_window + return [(ts, size) for ts, size in self.message_history if ts >= cutoff_time] + + # avg msg freq in the last n seconds + def freq(self, time_window: float) -> float: + messages = self._get_messages_in_window(time_window) + if not messages: + return 0.0 + return len(messages) / time_window + + # avg bandwidth in kB/s in the last n seconds + def kbps(self, time_window: float) -> float: + messages = self._get_messages_in_window(time_window) + if not messages: + return 0.0 + total_bytes = sum(size for _, size in messages) + total_kbytes = total_bytes / 1000 # Convert bytes to kB + return total_kbytes / time_window + + def kbps_hr(self, time_window: float, round_to: int = 2) -> tuple[float, BandwidthUnit]: + """Return human-readable bandwidth with appropriate units""" + kbps_val = self.kbps(time_window) + # Convert kB/s to B/s for human_readable_bytes + bps = kbps_val * 1000 + return human_readable_bytes(bps, round_to) + + # avg msg size in the last n seconds + def size(self, time_window: float) -> float: + messages = self._get_messages_in_window(time_window) + if not messages: + return 0.0 + total_size = sum(size for _, size in messages) + return total_size / len(messages) + + def total_traffic(self) -> int: + """Return total traffic passed in bytes since the beginning""" + return self.total_traffic_bytes + + def total_traffic_hr(self) -> tuple[float, BandwidthUnit]: + """Return human-readable total traffic with appropriate units""" + total_bytes = self.total_traffic() + return human_readable_bytes(total_bytes) + + def __str__(self): + return f"topic({self.name})" + + +@dataclass +class LCMSpyConfig(LCMConfig): + topic_history_window: float = 60.0 + + +class LCMSpy(LCMService, Topic): + default_config = LCMSpyConfig + topic = dict[str, Topic] + graph_log_window: float = 1.0 + topic_class: type[Topic] = Topic + + def __init__(self, **kwargs): + super().__init__(**kwargs) + Topic.__init__(self, name="total", history_window=self.config.topic_history_window) + self.topic = {} + self.l = lcm.LCM(self.config.url) if self.config.url else lcm.LCM() + + def start(self): + super().start() + self.l.subscribe("/.*", self.msg) + + def stop(self): + """Stop the LCM spy and clean up resources""" + super().stop() + + def msg(self, topic, data): + Topic.msg(self, data) + + if topic not in self.topic: + print(self.config) + self.topic[topic] = self.topic_class( + topic, history_window=self.config.topic_history_window + ) + self.topic[topic].msg(data) + + +class GraphTopic(Topic): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.freq_history = deque(maxlen=20) + self.bandwidth_history = deque(maxlen=20) + + def update_graphs(self, step_window: float = 1.0): + """Update historical data for graphing""" + freq = self.freq(step_window) + kbps = self.kbps(step_window) + self.freq_history.append(freq) + self.bandwidth_history.append(kbps) + + +@dataclass +class GraphLCMSpyConfig(LCMSpyConfig): + graph_log_window: float = 1.0 + + +class GraphLCMSpy(LCMSpy, GraphTopic): + default_config = GraphLCMSpyConfig + + graph_log_thread: threading.Thread | None = None + graph_log_stop_event: threading.Event = threading.Event() + topic_class: type[Topic] = GraphTopic + + def __init__(self, **kwargs): + super().__init__(**kwargs) + GraphTopic.__init__(self, name="total", history_window=self.config.topic_history_window) + + def start(self): + super().start() + self.graph_log_thread = threading.Thread(target=self.graph_log, daemon=True) + self.graph_log_thread.start() + + def graph_log(self): + while not self.graph_log_stop_event.is_set(): + self.update_graphs(self.config.graph_log_window) # Update global history + for topic in self.topic.values(): + topic.update_graphs(self.config.graph_log_window) + time.sleep(self.config.graph_log_window) + + def stop(self): + """Stop the graph logging and LCM spy""" + self.graph_log_stop_event.set() + if self.graph_log_thread and self.graph_log_thread.is_alive(): + self.graph_log_thread.join(timeout=1.0) + super().stop() + + +if __name__ == "__main__": + lcm_spy = LCMSpy() + lcm_spy.start() + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("LCM Spy stopped.") diff --git a/dimos/utils/cli/lcmspy_cli.py b/dimos/utils/cli/lcmspy_cli.py new file mode 100644 index 0000000000..47bf2b8a56 --- /dev/null +++ b/dimos/utils/cli/lcmspy_cli.py @@ -0,0 +1,132 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import math +import random +import threading +from typing import List + +from rich.text import Text +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.color import Color +from textual.containers import Container +from textual.reactive import reactive +from textual.renderables.sparkline import Sparkline as SparklineRenderable +from textual.widgets import DataTable, Footer, Header, Label, Sparkline + +from dimos.utils.cli.lcmspy import GraphLCMSpy +from dimos.utils.cli.lcmspy import GraphTopic as SpyTopic + + +def gradient(max_value: float, value: float) -> str: + ratio = min(value / max_value, 1.0) + green = Color(0, 255, 0) + red = Color(255, 0, 0) + color = green.blend(red, ratio) + + return color.hex + + +def topic_text(topic_name: str) -> Text: + if "#" in topic_name: + parts = topic_name.split("#", 1) + return Text(parts[0], style="white") + Text("#" + parts[1], style="blue") + + if topic_name[:4] == "/rpc": + return Text(topic_name[:4], style="red") + Text(topic_name[4:], style="white") + + return Text(topic_name, style="white") + + +class LCMSpyApp(App): + """A real-time CLI dashboard for LCM traffic statistics using Textual.""" + + CSS = """ + Screen { + layout: vertical; + } + DataTable { + height: 2fr; + width: 1fr; + border: none; + background: black; + } + """ + + refresh_interval: float = 0.5 # seconds + show_command_palette = reactive(True) + + BINDINGS = [ + ("q", "quit"), + ("ctrl+c", "quit"), + ] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.spy = GraphLCMSpy(autoconf=True, graph_log_window=0.5) + self.table: DataTable | None = None + + def compose(self) -> ComposeResult: + # yield Header() + + self.table = DataTable(zebra_stripes=False, cursor_type=None) + self.table.add_column("Topic") + self.table.add_column("Freq (Hz)") + self.table.add_column("Bandwidth") + self.table.add_column("Total Traffic") + yield self.table + yield Footer() + + def on_mount(self): + self.theme = "flexoki" + self.spy.start() + self.set_interval(self.refresh_interval, self.refresh_table) + + async def on_unmount(self): + self.spy.stop() + + def refresh_table(self): + topics: List[SpyTopic] = list(self.spy.topic.values()) + topics.sort(key=lambda t: t.total_traffic(), reverse=True) + self.table.clear(columns=False) + + for t in topics: + freq = t.freq(5.0) + kbps = t.kbps(5.0) + bw_val, bw_unit = t.kbps_hr(5.0) + total_val, total_unit = t.total_traffic_hr() + + self.table.add_row( + topic_text(t.name), + Text(f"{freq:.1f}", style=gradient(10, freq)), + Text(f"{bw_val} {bw_unit.value}/s", style=gradient(1024 * 3, kbps)), + Text(f"{total_val} {total_unit.value}"), + ) + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1 and sys.argv[1] == "web": + import os + + from textual_serve.server import Server + + server = Server(f"python {os.path.abspath(__file__)}") + server.serve() + else: + LCMSpyApp().run() diff --git a/dimos/utils/cli/test_lcmspy.py b/dimos/utils/cli/test_lcmspy.py new file mode 100644 index 0000000000..a77bb03d20 --- /dev/null +++ b/dimos/utils/cli/test_lcmspy.py @@ -0,0 +1,226 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import pytest + +from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic +from dimos.protocol.service.lcmservice import autoconf +from dimos.utils.cli.lcmspy import GraphLCMSpy, GraphTopic, LCMSpy + +autoconf() + + +@pytest.mark.lcm +def test_spy_basic(): + lcm = PickleLCM(autoconf=True) + lcm.start() + + lcmspy = LCMSpy() + lcmspy.start() + + video_topic = Topic(topic="/video") + odom_topic = Topic(topic="/odom") + + for i in range(5): + lcm.publish(video_topic, f"video frame {i}") + time.sleep(0.1) + if i % 2 == 0: + lcm.publish(odom_topic, f"odometry data {i / 2}") + + # Wait a bit for messages to be processed + time.sleep(0.5) + + # Test statistics for video topic + video_topic_spy = lcmspy.topic["/video"] + assert video_topic_spy is not None + + # Test frequency (should be around 10 Hz for 5 messages in ~0.5 seconds) + freq = video_topic_spy.freq(1.0) + assert freq > 0 + print(f"Video topic frequency: {freq:.2f} Hz") + + # Test bandwidth + kbps = video_topic_spy.kbps(1.0) + assert kbps > 0 + print(f"Video topic bandwidth: {kbps:.2f} kbps") + + # Test average message size + avg_size = video_topic_spy.size(1.0) + assert avg_size > 0 + print(f"Video topic average message size: {avg_size:.2f} bytes") + + # Test statistics for odom topic + odom_topic_spy = lcmspy.topic["/odom"] + assert odom_topic_spy is not None + + freq = odom_topic_spy.freq(1.0) + assert freq > 0 + print(f"Odom topic frequency: {freq:.2f} Hz") + + kbps = odom_topic_spy.kbps(1.0) + assert kbps > 0 + print(f"Odom topic bandwidth: {kbps:.2f} kbps") + + avg_size = odom_topic_spy.size(1.0) + assert avg_size > 0 + print(f"Odom topic average message size: {avg_size:.2f} bytes") + + print(f"Video topic: {video_topic_spy}") + print(f"Odom topic: {odom_topic_spy}") + + +@pytest.mark.lcm +def test_topic_statistics_direct(): + """Test Topic statistics directly without LCM""" + from dimos.utils.cli.lcmspy import Topic as TopicSpy + + topic = TopicSpy("/test") + + # Add some test messages + test_data = [b"small", b"medium sized message", b"very long message for testing purposes"] + + for i, data in enumerate(test_data): + topic.msg(data) + time.sleep(0.1) # Simulate time passing + + # Test statistics over 1 second window + freq = topic.freq(1.0) + kbps = topic.kbps(1.0) + avg_size = topic.size(1.0) + + assert freq > 0 + assert kbps > 0 + assert avg_size > 0 + + print(f"Direct test - Frequency: {freq:.2f} Hz") + print(f"Direct test - Bandwidth: {kbps:.2f} kbps") + print(f"Direct test - Avg size: {avg_size:.2f} bytes") + + +def test_topic_cleanup(): + """Test that old messages are properly cleaned up""" + from dimos.utils.cli.lcmspy import Topic as TopicSpy + + topic = TopicSpy("/test") + + # Add a message + topic.msg(b"test message") + initial_count = len(topic.message_history) + assert initial_count == 1 + + # Simulate time passing by manually adding old timestamps + old_time = time.time() - 70 # 70 seconds ago + topic.message_history.appendleft((old_time, 10)) + + # Trigger cleanup + topic._cleanup_old_messages(max_age=60.0) + + # Should only have the recent message + assert len(topic.message_history) == 1 + assert topic.message_history[0][0] > time.time() - 10 # Recent message + + +@pytest.mark.lcm +def test_graph_topic_basic(): + """Test GraphTopic basic functionality""" + topic = GraphTopic("/test_graph") + + # Add some messages and update graphs + topic.msg(b"test message") + topic.update_graphs(1.0) + + # Should have history data + assert len(topic.freq_history) == 1 + assert len(topic.bandwidth_history) == 1 + assert topic.freq_history[0] > 0 + assert topic.bandwidth_history[0] > 0 + + +@pytest.mark.lcm +def test_graph_lcmspy_basic(): + """Test GraphLCMSpy basic functionality""" + spy = GraphLCMSpy(graph_log_window=0.1) + spy.start() + time.sleep(0.2) # Wait for thread to start + + # Simulate a message + spy.msg("/test", b"test data") + time.sleep(0.2) # Wait for graph update + + # Should create GraphTopic with history + topic = spy.topic["/test"] + assert isinstance(topic, GraphTopic) + assert len(topic.freq_history) > 0 + assert len(topic.bandwidth_history) > 0 + + spy.stop() + + +@pytest.mark.lcm +def test_lcmspy_global_totals(): + """Test that LCMSpy tracks global totals as a Topic itself""" + spy = LCMSpy() + spy.start() + + # Send messages to different topics + spy.msg("/video", b"video frame data") + spy.msg("/odom", b"odometry data") + spy.msg("/imu", b"imu data") + + # The spy itself should have accumulated all messages + assert len(spy.message_history) == 3 + + # Check global statistics + global_freq = spy.freq(1.0) + global_kbps = spy.kbps(1.0) + global_size = spy.size(1.0) + + assert global_freq > 0 + assert global_kbps > 0 + assert global_size > 0 + + print(f"Global frequency: {global_freq:.2f} Hz") + print(f"Global bandwidth: {spy.kbps_hr(1.0)}") + print(f"Global avg message size: {global_size:.0f} bytes") + + spy.stop() + + +@pytest.mark.lcm +def test_graph_lcmspy_global_totals(): + """Test that GraphLCMSpy tracks global totals with history""" + spy = GraphLCMSpy(graph_log_window=0.1) + spy.start() + time.sleep(0.2) + + # Send messages + spy.msg("/video", b"video frame data") + spy.msg("/odom", b"odometry data") + time.sleep(0.2) # Wait for graph update + + # Update global graphs + spy.update_graphs(1.0) + + # Should have global history + assert len(spy.freq_history) == 1 + assert len(spy.bandwidth_history) == 1 + assert spy.freq_history[0] > 0 + assert spy.bandwidth_history[0] > 0 + + print(f"Global frequency history: {spy.freq_history[0]:.2f} Hz") + print(f"Global bandwidth history: {spy.bandwidth_history[0]:.2f} kB/s") + + spy.stop() diff --git a/dimos/utils/test_testing.py b/dimos/utils/test_testing.py index 092a269862..bf69209617 100644 --- a/dimos/utils/test_testing.py +++ b/dimos/utils/test_testing.py @@ -16,8 +16,12 @@ import os import subprocess +from reactivex import operators as ops +import reactivex as rx from dimos.robot.unitree_webrtc.type.lidar import LidarMessage +from dimos.robot.unitree_webrtc.type.odometry import Odometry from dimos.utils import testing +from dimos.utils.data import get_data def test_sensor_replay(): @@ -36,3 +40,29 @@ def test_sensor_replay_cast(): counter += 1 assert isinstance(message, LidarMessage) assert counter == 500 + + +def test_timed_sensor_replay(): + data = get_data("unitree_office_walk") + odom_store = testing.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + + itermsgs = [] + for msg in odom_store.iterate(): + itermsgs.append(msg) + if len(itermsgs) > 9: + break + + assert len(itermsgs) == 10 + + print("\n") + + timed_msgs = [] + + for msg in odom_store.stream().pipe(ops.take(10), ops.to_list()).run(): + timed_msgs.append(msg) + + assert len(timed_msgs) == 10 + + for i in range(10): + print(itermsgs[i], timed_msgs[i]) + assert itermsgs[i] == timed_msgs[i] diff --git a/dimos/utils/testing.py b/dimos/utils/testing.py index 8b46991c13..d05688fbbd 100644 --- a/dimos/utils/testing.py +++ b/dimos/utils/testing.py @@ -19,7 +19,16 @@ from pathlib import Path from typing import Any, Callable, Generic, Iterator, Optional, Tuple, TypeVar, Union -from reactivex import concat, empty, from_iterable, interval, just, merge, timer +from reactivex import ( + concat_with_iterable, + empty, + from_iterable, + interval, + just, + merge, + timer, + concat, +) from reactivex import operators as ops from reactivex import timer as rx_timer from reactivex.observable import Observable @@ -169,27 +178,44 @@ def iterate_ts(self) -> Iterator[Union[Tuple[float, T], Any]]: return super().iterate() def stream(self) -> Observable[Union[T, Any]]: - """Stream sensor data with original timing preserved (non-blocking).""" + def _subscribe(observer, scheduler=None): + from reactivex.disposable import CompositeDisposable, Disposable + + scheduler = scheduler or TimeoutScheduler() # default thread-based - def create_timed_stream(): iterator = self.iterate_ts() try: - prev_timestamp, first_data = next(iterator) + prev_ts, first_data = next(iterator) + except StopIteration: + observer.on_completed() + return Disposable() - yield just(first_data) + # Emit the first sample immediately + observer.on_next(first_data) - for timestamp, data in iterator: - time_diff = timestamp - prev_timestamp + disp = CompositeDisposable() - if time_diff > 0: - yield rx_timer(time_diff).pipe(ops.map(lambda _: data)) - else: - yield just(data) + def emit_next(prev_timestamp): + try: + ts, data = next(iterator) + except StopIteration: + observer.on_completed() + return - prev_timestamp = timestamp + delay = max(0.0, ts - prev_timestamp) - except StopIteration: - yield empty() + def _action(sc, _state=None): + observer.on_next(data) + emit_next(ts) # schedule the following sample + + # Schedule the next emission relative to previous timestamp + disp.add(scheduler.schedule_relative(delay, _action)) + + emit_next(prev_ts) + + return disp + + from reactivex import create - return concat(*create_timed_stream()) + return create(_subscribe) diff --git a/pyproject.toml b/pyproject.toml index 412b39f0a0..5cb8c5be9d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,8 +145,9 @@ dev = [ "ruff==0.11.10", "mypy==1.15.0", "pre_commit==4.2.0", - "pytest", - "pytest-asyncio==0.26.0" + "pytest==8.3.5", + "pytest-asyncio==0.26.0", + "textual==3.7.1" ] [tool.ruff] @@ -187,9 +188,12 @@ markers = [ "exclude: arbitrary exclusion from CI and default test exec", "tool: dev tooling", "needsdata: needs test data to be downloaded", - "ros: depend on ros"] + "ros: depend on ros", + "lcm: tests that run actual LCM bus (can't execute in CI)" -addopts = "-v -ra --color=yes -m 'not vis and not benchmark and not exclude and not tool and not needsdata and not ros and not heavy'" +] + +addopts = "-v -ra --color=yes -m 'not vis and not benchmark and not exclude and not tool and not needsdata and not lcm and not ros and not heavy'"