From 90b59dc8960c616d0ed0c655291ed9b9cca023c1 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 10 Jul 2025 18:51:21 -0700 Subject: [PATCH 01/21] fixes testing stream issue with blocking video on foxglove --- dimos/utils/test_testing.py | 30 ++++++++++++++++++++++++++++++ dimos/utils/testing.py | 18 +++++++++++++----- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/dimos/utils/test_testing.py b/dimos/utils/test_testing.py index 092a269862..bf69209617 100644 --- a/dimos/utils/test_testing.py +++ b/dimos/utils/test_testing.py @@ -16,8 +16,12 @@ import os import subprocess +from reactivex import operators as ops +import reactivex as rx from dimos.robot.unitree_webrtc.type.lidar import LidarMessage +from dimos.robot.unitree_webrtc.type.odometry import Odometry from dimos.utils import testing +from dimos.utils.data import get_data def test_sensor_replay(): @@ -36,3 +40,29 @@ def test_sensor_replay_cast(): counter += 1 assert isinstance(message, LidarMessage) assert counter == 500 + + +def test_timed_sensor_replay(): + data = get_data("unitree_office_walk") + odom_store = testing.TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) + + itermsgs = [] + for msg in odom_store.iterate(): + itermsgs.append(msg) + if len(itermsgs) > 9: + break + + assert len(itermsgs) == 10 + + print("\n") + + timed_msgs = [] + + for msg in odom_store.stream().pipe(ops.take(10), ops.to_list()).run(): + timed_msgs.append(msg) + + assert len(timed_msgs) == 10 + + for i in range(10): + print(itermsgs[i], timed_msgs[i]) + assert itermsgs[i] == timed_msgs[i] diff --git a/dimos/utils/testing.py b/dimos/utils/testing.py index 8b46991c13..688d87fd8b 100644 --- a/dimos/utils/testing.py +++ b/dimos/utils/testing.py @@ -19,7 +19,15 @@ from pathlib import Path from typing import Any, Callable, Generic, Iterator, Optional, Tuple, TypeVar, Union -from reactivex import concat, empty, from_iterable, interval, just, merge, timer +from reactivex import ( + concat_with_iterable, + empty, + from_iterable, + interval, + just, + merge, + timer, +) from reactivex import operators as ops from reactivex import timer as rx_timer from reactivex.observable import Observable @@ -176,14 +184,14 @@ def create_timed_stream(): try: prev_timestamp, first_data = next(iterator) - yield just(first_data) for timestamp, data in iterator: time_diff = timestamp - prev_timestamp - if time_diff > 0: - yield rx_timer(time_diff).pipe(ops.map(lambda _: data)) + yield rx_timer(time_diff).pipe( + ops.map(lambda _, captured_data=data: captured_data) + ) else: yield just(data) @@ -192,4 +200,4 @@ def create_timed_stream(): except StopIteration: yield empty() - return concat(*create_timed_stream()) + return concat_with_iterable(create_timed_stream()) From e2945bb59f88e3d73397beb1d29fe24cdaafa8b5 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 10 Jul 2025 20:36:01 -0700 Subject: [PATCH 02/21] unitree replay working --- dimos/msgs/geometry_msgs/PoseStamped.py | 2 +- dimos/protocol/rpc/spec.py | 2 +- dimos/robot/global_planner/planner.py | 5 +++-- dimos/robot/local_planner/simple.py | 5 +++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/dimos/msgs/geometry_msgs/PoseStamped.py b/dimos/msgs/geometry_msgs/PoseStamped.py index bfe5a32481..2a35ccf445 100644 --- a/dimos/msgs/geometry_msgs/PoseStamped.py +++ b/dimos/msgs/geometry_msgs/PoseStamped.py @@ -48,7 +48,7 @@ class PoseStamped(Pose, Timestamped): @dispatch def __init__(self, *args, ts: float = 0, frame_id: str = "", **kwargs) -> None: self.frame_id = frame_id - self.ts = ts if ts is not 0 else time.time() + self.ts = ts if ts != 0 else time.time() super().__init__(*args, **kwargs) def lcm_encode(self) -> bytes: diff --git a/dimos/protocol/rpc/spec.py b/dimos/protocol/rpc/spec.py index c9c2ca88a9..da329f4f1b 100644 --- a/dimos/protocol/rpc/spec.py +++ b/dimos/protocol/rpc/spec.py @@ -81,7 +81,7 @@ def call(*args, fname=fname): class RPCServer(Protocol): - def serve(self, f: Callable, name: str) -> None: ... + def serve_rpc(self, f: Callable, name: str) -> None: ... class RPC(RPCServer, RPCClient): ... diff --git a/dimos/robot/global_planner/planner.py b/dimos/robot/global_planner/planner.py index bd717ed959..55eea616a0 100644 --- a/dimos/robot/global_planner/planner.py +++ b/dimos/robot/global_planner/planner.py @@ -17,7 +17,7 @@ from dataclasses import dataclass from typing import Callable, Optional -from dimos.core import In, Module, Out +from dimos.core import In, Module, Out, rpc from dimos.msgs.geometry_msgs import Vector3 from dimos.robot.global_planner.algo import astar from dimos.types.costmap import Costmap @@ -71,7 +71,8 @@ def __init__( self.get_costmap = get_costmap self.get_robot_pos = get_robot_pos - async def start(self): + @rpc + def start(self): self.target.subscribe(self.plan) def plan(self, goal: VectorLike) -> Path: diff --git a/dimos/robot/local_planner/simple.py b/dimos/robot/local_planner/simple.py index 7295909c8c..82f13f2903 100644 --- a/dimos/robot/local_planner/simple.py +++ b/dimos/robot/local_planner/simple.py @@ -21,7 +21,7 @@ from plum import dispatch from reactivex import operators as ops -from dimos.core import In, Module, Out +from dimos.core import In, Module, Out, rpc # from dimos.robot.local_planner.local_planner import LocalPlanner from dimos.types.costmap import Costmap @@ -85,7 +85,8 @@ def get_move_stream(self, frequency: float = 40.0) -> rx.Observable: self.frequency_spy("movement_test"), ) - async def start(self): + @rpc + def start(self): self.path.subscribe(self.set_goal) self.get_move_stream(frequency=20.0).subscribe(self.movecmd.publish) From 98911c236ed5c8398dc2293c58dc539996644a6c Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 10 Jul 2025 20:43:46 -0700 Subject: [PATCH 03/21] control module threading fix --- .../robot/unitree_webrtc/multiprocess_unitree_go2.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py b/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py index e2cb812ba0..3087ecdc9a 100644 --- a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py +++ b/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py @@ -16,6 +16,7 @@ import contextvars import functools import time +import threading from typing import Callable from dask.distributed import get_client, get_worker @@ -80,6 +81,7 @@ class ConnectionModule(FakeRTC, Module): def __init__(self, ip: str, *args, **kwargs): Module.__init__(self, *args, **kwargs) + self.ip = ip @rpc @@ -115,13 +117,14 @@ def get_pos(self) -> Vector: class ControlModule(Module): plancmd: Out[Vector3] = None - async def start(self): - async def plancmd(): - await asyncio.sleep(4) + def start(self): + def plancmd(): + time.sleep(4) print(colors.red("requesting global plan")) self.plancmd.publish(Vector3([0, 0, 0])) - asyncio.create_task(plancmd()) + thread = threading.Thread(target=plancmd, daemon=True) + thread.start() class Unitree: From 9cf06f04415a5170dab33a6fef1eaed53abf0d10 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 10 Jul 2025 20:48:40 -0700 Subject: [PATCH 04/21] potential real unitree go2 fix --- .../multiprocess_unitree_go2.py | 163 +++++++++--------- 1 file changed, 80 insertions(+), 83 deletions(-) diff --git a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py b/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py index 3087ecdc9a..7f768b3c5a 100644 --- a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py +++ b/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py @@ -15,8 +15,8 @@ import asyncio import contextvars import functools -import time import threading +import time from typing import Callable from dask.distributed import get_client, get_worker @@ -47,28 +47,52 @@ class FakeRTC(WebRTCRobot): def connect(self): ... + def standup(self): + print("standup supressed") + + def liedown(self): + print("liedown supressed") + + @rpc + def start(self): + # ensure that LFS data is available + data = get_data("unitree_office_walk") + self.lidar_stream().subscribe(self.lidar.publish) + self.odom_stream().subscribe(self.odom.publish) + self.video_stream().subscribe(self.video.publish) + self.movecmd.subscribe(self.move) + self._odom = getter_streaming(self.odom_stream()) + self._lidar = getter_streaming(self.lidar_stream()) + @functools.cache def lidar_stream(self): print("lidar stream start") lidar_store = TimedSensorReplay("unitree_office_walk/lidar", autocast=LidarMessage.from_msg) - return lidar_store.stream() + return backpressure(lidar_store.stream()) @functools.cache def odom_stream(self): print("odom stream start") odom_store = TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) - return odom_store.stream() + return backpressure(odom_store.stream()) @functools.cache - def video_stream(self): + def video_stream(self, freq_hz=0.5): print("video stream start") video_store = TimedSensorReplay("unitree_office_walk/video", autocast=Image.from_numpy) - return video_store.stream().pipe(ops.sample(0.5)) + return backpressure(video_store.stream().pipe(ops.sample(freq_hz))) def move(self, vector: Vector): print("move supressed", vector) +class RealRTC(WebRTCRobot): + @rpc + def start(self): + WebRTCRobot.__init__(self, ip=self.ip) + + +# inherit RealRTC instead of FakeRTC to run the real robot class ConnectionModule(FakeRTC, Module): movecmd: In[Vector] = None odom: Out[Vector3] = None @@ -80,26 +104,8 @@ class ConnectionModule(FakeRTC, Module): _lidar: Callable[[], LidarMessage] def __init__(self, ip: str, *args, **kwargs): - Module.__init__(self, *args, **kwargs) - self.ip = ip - - @rpc - def start(self): - # ensure that LFS data is available - data = get_data("unitree_office_walk") - # Since TimedSensorReplay is now non-blocking, we can subscribe directly - self.lidar_stream().subscribe(self.lidar.publish) - self.odom_stream().subscribe(self.odom.publish) - self.video_stream().subscribe(self.video.publish) - - print("movecmd sub") - self.movecmd.subscribe(print) - print("sub ok") - self._odom = getter_streaming(self.odom_stream()) - self._lidar = getter_streaming(self.lidar_stream()) - - print("ConnectionModule started") + Module.__init__(self, *args, **kwargs) @rpc def get_local_costmap(self) -> Costmap: @@ -117,6 +123,7 @@ def get_pos(self) -> Vector: class ControlModule(Module): plancmd: Out[Vector3] = None + @rpc def start(self): def plancmd(): time.sleep(4) @@ -127,83 +134,73 @@ def plancmd(): thread.start() -class Unitree: - def __init__(self, ip: str): - self.ip = ip - - async def start(self): - dimos = None - if not dimos: - dimos = core.start(4) - - connection = dimos.deploy(ConnectionModule, self.ip) - - # This enables LCM transport - # Ensures system multicast, udp sizes are auto-adjusted if needed - pubsub.lcm.autoconf() +async def run(ip): + dimos = core.start(3) + connection = dimos.deploy(ConnectionModule, ip) - connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage) - connection.odom.transport = core.LCMTransport("/odom", Odometry) - connection.video.transport = core.LCMTransport("/video", Image) - connection.movecmd.transport = core.LCMTransport("/move", Vector3) + # This enables LCM transport + # Ensures system multicast, udp sizes are auto-adjusted if needed + # TODO: this doesn't seem to work atm and LCMTransport instantiation can fail + pubsub.lcm.autoconf() - mapper = dimos.deploy(Map, voxel_size=0.5) + connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage) + connection.odom.transport = core.LCMTransport("/odom", Odometry) + connection.video.transport = core.LCMTransport("/video", Image) + connection.movecmd.transport = core.LCMTransport("/move", Vector3) - local_planner = dimos.deploy( - SimplePlanner, - get_costmap=connection.get_local_costmap, - get_robot_pos=connection.get_pos, - ) + mapper = dimos.deploy(Map, voxel_size=0.5) - global_planner = dimos.deploy( - AstarPlanner, - get_costmap=mapper.costmap, - get_robot_pos=connection.get_pos, - ) + local_planner = dimos.deploy( + SimplePlanner, + get_costmap=connection.get_local_costmap, + get_robot_pos=connection.get_pos, + ) - global_planner.path.transport = core.pLCMTransport("/global_path") + global_planner = dimos.deploy( + AstarPlanner, + get_costmap=mapper.costmap, + get_robot_pos=connection.get_pos, + ) - local_planner.path.connect(global_planner.path) - local_planner.movecmd.connect(connection.movecmd) + global_planner.path.transport = core.pLCMTransport("/global_path") - ctrl = dimos.deploy(ControlModule) + local_planner.path.connect(global_planner.path) + local_planner.movecmd.connect(connection.movecmd) - mapper.lidar.connect(connection.lidar) + ctrl = dimos.deploy(ControlModule) - ctrl.plancmd.transport = core.LCMTransport("/global_target", Vector3) - global_planner.target.connect(ctrl.plancmd) + mapper.lidar.connect(connection.lidar) - # we review the structure - print("\n") - for module in [connection, mapper, local_planner, global_planner, ctrl]: - print(module.io().result(), "\n") + ctrl.plancmd.transport = core.LCMTransport("/global_target", Vector3) + global_planner.target.connect(ctrl.plancmd) - print(colors.green("starting mapper")) - mapper.start() + # we review the structure + print("\n") + for module in [connection, mapper, local_planner, global_planner, ctrl]: + print(module.io().result(), "\n") - print(colors.green("starting connection")) - connection.start() + print(colors.green("starting mapper")) + mapper.start() - print(colors.green("local planner start")) - local_planner.start() + print(colors.green("starting connection")) + connection.start() - print(colors.green("starting global planner")) - global_planner.start() + print(colors.green("local planner start")) + local_planner.start() - print(colors.green("starting ctrl")) - ctrl.start() + print(colors.green("starting global planner")) + global_planner.start() - print(colors.red("READY")) + print(colors.green("starting ctrl")) + ctrl.start() - await asyncio.sleep(3) + print(colors.red("READY")) - print("querying system") - print(mapper.costmap()) - # global_planner.dask_receive_msg("target", Vector3([0, 0, 0])).result() - time.sleep(20) + await asyncio.sleep(20) + print("querying system") + print(mapper.costmap()) + await asyncio.sleep(60) if __name__ == "__main__": - unitree = Unitree("Bla") - asyncio.run(unitree.start()) - time.sleep(30) + asyncio.run(run("192.168.9.140")) From bcb0b80200679cca283e307d603fd169590b0de6 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 10 Jul 2025 21:37:42 -0700 Subject: [PATCH 05/21] all working --- dimos/robot/local_planner/simple.py | 22 +++++-- dimos/robot/unitree_webrtc/connection.py | 41 +++++++++++-- .../multiprocess_unitree_go2.py | 60 +++++++++++-------- 3 files changed, 86 insertions(+), 37 deletions(-) diff --git a/dimos/robot/local_planner/simple.py b/dimos/robot/local_planner/simple.py index 82f13f2903..46abd7c303 100644 --- a/dimos/robot/local_planner/simple.py +++ b/dimos/robot/local_planner/simple.py @@ -15,13 +15,14 @@ import math import time from dataclasses import dataclass -from typing import Callable, Optional +from typing import Any, Callable, Optional import reactivex as rx from plum import dispatch from reactivex import operators as ops from dimos.core import In, Module, Out, rpc +from dimos.msgs.geometry_msgs import Vector3 # from dimos.robot.local_planner.local_planner import LocalPlanner from dimos.types.costmap import Costmap @@ -64,17 +65,24 @@ def transform_to_robot_frame(global_vector: Vector, robot_position: Position) -> class SimplePlanner(Module): path: In[Path] = None - movecmd: Out[Vector] = None + movecmd: Out[Vector3] = None get_costmap: Callable[[], Costmap] get_robot_pos: Callable[[], Position] + set_move: Callable[[Vector3], Any] goal: Optional[Vector] = None speed: float = 0.3 - def __init__(self, get_costmap: Callable[[], Costmap], get_robot_pos: Callable[[], Vector]): + def __init__( + self, + get_costmap: Callable[[], Costmap], + get_robot_pos: Callable[[], Vector], + set_move: Callable[[Vector3], Any], + ): Module.__init__(self) self.get_costmap = get_costmap self.get_robot_pos = get_robot_pos + self.set_move = set_move def get_move_stream(self, frequency: float = 40.0) -> rx.Observable: return rx.interval(1.0 / frequency, scheduler=get_scheduler()).pipe( @@ -88,7 +96,9 @@ def get_move_stream(self, frequency: float = 40.0) -> rx.Observable: @rpc def start(self): self.path.subscribe(self.set_goal) - self.get_move_stream(frequency=20.0).subscribe(self.movecmd.publish) + # weird bug with this + # self.get_move_stream(frequency=20.0).subscribe(self.movecmd.publish) + self.get_move_stream(frequency=20.0).subscribe(self.set_move) @dispatch def set_goal(self, goal: Path, stop_event=None, goal_theta=None) -> bool: @@ -172,11 +182,11 @@ def _test_translational_movement(self) -> Vector: if phase < 0.5: # First half: move LEFT (positive X according to our documentation) - movement = Vector(0.2, 0, 0) # Move left at 0.2 m/s + movement = Vector3(0.2, 0, 0) # Move left at 0.2 m/s direction = "LEFT (positive X)" else: # Second half: move RIGHT (negative X according to our documentation) - movement = Vector(-0.2, 0, 0) # Move right at 0.2 m/s + movement = Vector3(-0.2, 0, 0) # Move right at 0.2 m/s direction = "RIGHT (negative X)" print("=== LEFT-RIGHT MOVEMENT TEST ===") diff --git a/dimos/robot/unitree_webrtc/connection.py b/dimos/robot/unitree_webrtc/connection.py index df8469a98b..81beec6244 100644 --- a/dimos/robot/unitree_webrtc/connection.py +++ b/dimos/robot/unitree_webrtc/connection.py @@ -30,6 +30,7 @@ from reactivex.subject import Subject from dimos.core import In, Module, Out, rpc +from dimos.msgs.sensor_msgs import Image from dimos.robot.connection_interface import ConnectionInterface from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.lowstate import LowStateMsg @@ -44,6 +45,7 @@ class WebRTCRobot(ConnectionInterface): def __init__(self, ip: str, mode: str = "ai"): self.ip = ip + print("IP IS", ip) self.mode = mode self.conn = Go2WebRTCConnection(WebRTCConnectionMethod.LocalSTA, ip=self.ip) self.connect() @@ -95,6 +97,8 @@ def move(self, velocity: Vector, duration: float = 0.0) -> bool: """ x, y, yaw = velocity.x, velocity.y, velocity.z + print("MOVE REQUESTED", velocity) + # WebRTC coordinate mapping: # x - Positive right, negative left # y - positive forward, negative backwards @@ -132,9 +136,25 @@ async def async_move_duration(): # Generic conversion of unitree subscription to Subject (used for all subs) def unitree_sub_stream(self, topic_name: str): + def subscribe_in_thread(cb): + # Run the subscription in the background thread that has the event loop + def run_subscription(): + self.conn.datachannel.pub_sub.subscribe(topic_name, cb) + + # Use call_soon_threadsafe to run in the background thread + self.loop.call_soon_threadsafe(run_subscription) + + def unsubscribe_in_thread(cb): + # Run the unsubscription in the background thread that has the event loop + def run_unsubscription(): + self.conn.datachannel.pub_sub.unsubscribe(topic_name) + + # Use call_soon_threadsafe to run in the background thread + self.loop.call_soon_threadsafe(run_unsubscription) + return callback_to_observable( - start=lambda cb: self.conn.datachannel.pub_sub.subscribe(topic_name, cb), - stop=lambda: self.conn.datachannel.pub_sub.unsubscribe(topic_name), + start=subscribe_in_thread, + stop=unsubscribe_in_thread, ) # Generic sync API call (we jump into the client thread) @@ -217,15 +237,25 @@ async def accept_track(track: MediaStreamTrack) -> VideoMessage: if stop_event.is_set(): return frame = await track.recv() - subject.on_next(frame.to_ndarray(format="bgr24")) + subject.on_next(Image.from_numpy(frame.to_ndarray(format="bgr24"))) self.conn.video.add_track_callback(accept_track) - self.conn.video.switchVideoChannel(True) + + # Run the video channel switching in the background thread + def switch_video_channel(): + self.conn.video.switchVideoChannel(True) + + self.loop.call_soon_threadsafe(switch_video_channel) def stop(cb): stop_event.set() # Signal the loop to stop self.conn.video.track_callbacks.remove(accept_track) - self.conn.video.switchVideoChannel(False) + + # Run the video channel switching off in the background thread + def switch_video_channel_off(): + self.conn.video.switchVideoChannel(False) + + self.loop.call_soon_threadsafe(switch_video_channel_off) return subject.pipe(ops.finally_action(stop)) @@ -247,6 +277,7 @@ def get_video_stream(self, fps: int = 30) -> Observable[VideoMessage]: if stream is None: print("Warning: Video stream is not available") return stream + except Exception as e: print(f"Error getting video stream: {e}") return None diff --git a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py b/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py index 7f768b3c5a..f9603e95fc 100644 --- a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py +++ b/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py @@ -45,6 +45,10 @@ # can be swapped in for WebRTCRobot class FakeRTC(WebRTCRobot): + def __init__(self, *args, **kwargs): + # ensures we download msgs from lfs store + data = get_data("unitree_office_walk") + def connect(self): ... def standup(self): @@ -53,48 +57,34 @@ def standup(self): def liedown(self): print("liedown supressed") - @rpc - def start(self): - # ensure that LFS data is available - data = get_data("unitree_office_walk") - self.lidar_stream().subscribe(self.lidar.publish) - self.odom_stream().subscribe(self.odom.publish) - self.video_stream().subscribe(self.video.publish) - self.movecmd.subscribe(self.move) - self._odom = getter_streaming(self.odom_stream()) - self._lidar = getter_streaming(self.lidar_stream()) - @functools.cache def lidar_stream(self): print("lidar stream start") lidar_store = TimedSensorReplay("unitree_office_walk/lidar", autocast=LidarMessage.from_msg) - return backpressure(lidar_store.stream()) + return lidar_store.stream() @functools.cache def odom_stream(self): print("odom stream start") odom_store = TimedSensorReplay("unitree_office_walk/odom", autocast=Odometry.from_msg) - return backpressure(odom_store.stream()) + return odom_store.stream() @functools.cache def video_stream(self, freq_hz=0.5): print("video stream start") video_store = TimedSensorReplay("unitree_office_walk/video", autocast=Image.from_numpy) - return backpressure(video_store.stream().pipe(ops.sample(freq_hz))) + return video_store.stream().pipe(ops.sample(freq_hz)) def move(self, vector: Vector): print("move supressed", vector) -class RealRTC(WebRTCRobot): - @rpc - def start(self): - WebRTCRobot.__init__(self, ip=self.ip) +class RealRTC(WebRTCRobot): ... # inherit RealRTC instead of FakeRTC to run the real robot -class ConnectionModule(FakeRTC, Module): - movecmd: In[Vector] = None +class ConnectionModule(RealRTC, Module): + movecmd: In[Vector3] = None odom: Out[Vector3] = None lidar: Out[LidarMessage] = None video: Out[VideoMessage] = None @@ -103,10 +93,25 @@ class ConnectionModule(FakeRTC, Module): _odom: Callable[[], Odometry] _lidar: Callable[[], LidarMessage] + @rpc + def move(self, vector: Vector3): + super().move(vector) + def __init__(self, ip: str, *args, **kwargs): self.ip = ip Module.__init__(self, *args, **kwargs) + @rpc + def start(self): + super().__init__(self.ip) + # ensure that LFS data is available + self.lidar_stream().subscribe(self.lidar.publish) + self.odom_stream().subscribe(self.odom.publish) + self.video_stream().subscribe(self.video.publish) + self.movecmd.subscribe(self.move) + self._odom = getter_streaming(self.odom_stream()) + self._lidar = getter_streaming(self.lidar_stream()) + @rpc def get_local_costmap(self) -> Costmap: return self._lidar().costmap() @@ -128,14 +133,14 @@ def start(self): def plancmd(): time.sleep(4) print(colors.red("requesting global plan")) - self.plancmd.publish(Vector3([0, 0, 0])) + self.plancmd.publish(Vector3([0.750893, -6.017522, 0.307474])) thread = threading.Thread(target=plancmd, daemon=True) thread.start() async def run(ip): - dimos = core.start(3) + dimos = core.start(4) connection = dimos.deploy(ConnectionModule, ip) # This enables LCM transport @@ -154,6 +159,7 @@ async def run(ip): SimplePlanner, get_costmap=connection.get_local_costmap, get_robot_pos=connection.get_pos, + set_move=connection.move, ) global_planner = dimos.deploy( @@ -191,16 +197,18 @@ async def run(ip): print(colors.green("starting global planner")) global_planner.start() - print(colors.green("starting ctrl")) - ctrl.start() + # uncomment to move the bot + # print(colors.green("starting ctrl")) + # ctrl.start() print(colors.red("READY")) - await asyncio.sleep(20) + await asyncio.sleep(10) print("querying system") print(mapper.costmap()) await asyncio.sleep(60) if __name__ == "__main__": - asyncio.run(run("192.168.9.140")) + asyncio.run(run("192.168.9.179")) + # asyncio.run(run("192.168.9.140")) From e43b03b180018f138cac3fff7f4e59dbf3a11bb8 Mon Sep 17 00:00:00 2001 From: lesh Date: Thu, 10 Jul 2025 23:21:30 -0700 Subject: [PATCH 06/21] removed pytest dep from core --- dimos/core/__init__.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 09011caa03..5df6d4e803 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -4,7 +4,6 @@ import time from typing import Optional -import pytest from dask.distributed import Client, LocalCluster from rich.console import Console @@ -82,14 +81,6 @@ def deploy( return dask_client -@pytest.fixture -def dimos(): - process_count = 3 # we chill - client = start(process_count) - yield client - stop(client) - - def start(n: Optional[int] = None) -> Client: console = Console() if not n: From 340bf2407d9e15ce58c6f9c785ad91588bf4db00 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 16:02:18 -0700 Subject: [PATCH 07/21] lcmspy cli --- bin/lcmspy | 7 ++ dimos/utils/cli/lcmspy.py | 196 +++++++++++++++++++++++++++++ dimos/utils/cli/lcmspy_cli.py | 130 ++++++++++++++++++++ dimos/utils/cli/test_lcmspy.py | 217 +++++++++++++++++++++++++++++++++ pyproject.toml | 5 +- 5 files changed, 553 insertions(+), 2 deletions(-) create mode 100755 bin/lcmspy create mode 100755 dimos/utils/cli/lcmspy.py create mode 100644 dimos/utils/cli/lcmspy_cli.py create mode 100644 dimos/utils/cli/test_lcmspy.py diff --git a/bin/lcmspy b/bin/lcmspy new file mode 100755 index 0000000000..0efa14fce3 --- /dev/null +++ b/bin/lcmspy @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +# current script dir + ..dimos + + +script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +python $script_dir/../dimos/utils/cli/lcmspy_cli.py "$@" diff --git a/dimos/utils/cli/lcmspy.py b/dimos/utils/cli/lcmspy.py new file mode 100755 index 0000000000..d0345891f0 --- /dev/null +++ b/dimos/utils/cli/lcmspy.py @@ -0,0 +1,196 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import time +from collections import deque +from dataclasses import dataclass +from enum import Enum + +import lcm + +from dimos.protocol.service.lcmservice import LCMConfig, LCMService + + +class BandwidthUnit(Enum): + BPS = "B/s" + KBPS = "kB/s" + MBPS = "MB/s" + GBPS = "GB/s" + + +class Topic: + history_window: float = 60.0 + + def __init__(self, name: str, history_window: float = 60.0): + self.name = name + # Store (timestamp, data_size) tuples for statistics + self.message_history = deque() + self.history_window = history_window + + def msg(self, data: bytes): + # print(f"> msg {self.__str__()} {len(data)} bytes") + datalen = len(data) + self.message_history.append((time.time(), datalen)) + self._cleanup_old_messages() + + def _cleanup_old_messages(self, max_age: float = None): + """Remove messages older than max_age seconds""" + current_time = time.time() + while self.message_history and current_time - self.message_history[0][0] > ( + max_age or self.history_window + ): + self.message_history.popleft() + + def _get_messages_in_window(self, time_window: float): + """Get messages within the specified time window""" + current_time = time.time() + cutoff_time = current_time - time_window + return [(ts, size) for ts, size in self.message_history if ts >= cutoff_time] + + # avg msg freq in the last n seconds + def freq(self, time_window: float) -> float: + messages = self._get_messages_in_window(time_window) + if not messages: + return 0.0 + return len(messages) / time_window + + # avg bandwidth in kB/s in the last n seconds + def kbps(self, time_window: float) -> float: + messages = self._get_messages_in_window(time_window) + if not messages: + return 0.0 + total_bytes = sum(size for _, size in messages) + total_kbytes = total_bytes / 1000 # Convert bytes to kB + return total_kbytes / time_window + + def kbps_hr(self, time_window: float, round_to: int = 2) -> tuple[float, BandwidthUnit]: + """Return human-readable bandwidth with appropriate units""" + kbps_val = self.kbps(time_window) + + if kbps_val >= 1024: + return round(kbps_val / 1024, round_to), BandwidthUnit.MBPS + elif kbps_val >= 1: + return round(kbps_val, round_to), BandwidthUnit.KBPS + else: + # Convert to B/s for small values + bps = kbps_val * 1000 + return round(bps, round_to), BandwidthUnit.BPS + + # avg msg size in the last n seconds + def size(self, time_window: float) -> float: + messages = self._get_messages_in_window(time_window) + if not messages: + return 0.0 + total_size = sum(size for _, size in messages) + return total_size / len(messages) + + def __str__(self): + return f"topic({self.name})" + + +@dataclass +class LCMSpyConfig(LCMConfig): + topic_history_window: float = 60.0 + + +class LCMSpy(LCMService, Topic): + default_config = LCMSpyConfig + topic = dict[str, Topic] + graph_log_window: float = 1.0 + topic_class: type[Topic] = Topic + + def __init__(self, **kwargs): + super().__init__(**kwargs) + Topic.__init__(self, name="total", history_window=self.config.topic_history_window) + self.topic = {} + self.l = lcm.LCM(self.config.url) if self.config.url else lcm.LCM() + + def start(self): + super().start() + self.l.subscribe("/.*", self.msg) + + def stop(self): + """Stop the LCM spy and clean up resources""" + super().stop() + + def msg(self, topic, data): + Topic.msg(self, data) + + if topic not in self.topic: + print(self.config) + self.topic[topic] = self.topic_class( + topic, history_window=self.config.topic_history_window + ) + self.topic[topic].msg(data) + + +class GraphTopic(Topic): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.freq_history = deque(maxlen=20) + self.bandwidth_history = deque(maxlen=20) + + def update_graphs(self, step_window: float = 1.0): + """Update historical data for graphing""" + freq = self.freq(step_window) + kbps = self.kbps(step_window) + self.freq_history.append(freq) + self.bandwidth_history.append(kbps) + + +@dataclass +class GraphLCMSpyConfig(LCMSpyConfig): + graph_log_window: float = 1.0 + + +class GraphLCMSpy(LCMSpy, GraphTopic): + default_config = GraphLCMSpyConfig + + graph_log_thread: threading.Thread | None = None + graph_log_stop_event: threading.Event = threading.Event() + topic_class: type[Topic] = GraphTopic + + def __init__(self, **kwargs): + super().__init__(**kwargs) + GraphTopic.__init__(self, name="total", history_window=self.config.topic_history_window) + + def start(self): + super().start() + self.graph_log_thread = threading.Thread(target=self.graph_log, daemon=True) + self.graph_log_thread.start() + + def graph_log(self): + while not self.graph_log_stop_event.is_set(): + self.update_graphs(self.config.graph_log_window) # Update global history + for topic in self.topic.values(): + topic.update_graphs(self.config.graph_log_window) + time.sleep(self.config.graph_log_window) + + def stop(self): + """Stop the graph logging and LCM spy""" + self.graph_log_stop_event.set() + if self.graph_log_thread and self.graph_log_thread.is_alive(): + self.graph_log_thread.join(timeout=1.0) + super().stop() + + +if __name__ == "__main__": + lcm_spy = LCMSpy() + lcm_spy.start() + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("LCM Spy stopped.") diff --git a/dimos/utils/cli/lcmspy_cli.py b/dimos/utils/cli/lcmspy_cli.py new file mode 100644 index 0000000000..3e92e6fa13 --- /dev/null +++ b/dimos/utils/cli/lcmspy_cli.py @@ -0,0 +1,130 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import math +import random +import threading +from typing import List + +from rich.text import Text +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.color import Color +from textual.containers import Container +from textual.reactive import reactive +from textual.renderables.sparkline import Sparkline as SparklineRenderable +from textual.widgets import DataTable, Footer, Header, Label, Sparkline + +from dimos.utils.cli.lcmspy import GraphLCMSpy +from dimos.utils.cli.lcmspy import GraphTopic as SpyTopic + + +def gradient(max_value: float, value: float) -> str: + ratio = min(value / max_value, 1.0) + green = Color(0, 255, 0) + red = Color(255, 0, 0) + color = green.blend(red, ratio) + + return color.hex + + +def topic_text(topic_name: str) -> Text: + if "#" in topic_name: + parts = topic_name.split("#", 1) + return Text(parts[0], style="white") + Text("#" + parts[1], style="blue") + + if topic_name[:4] == "/rpc": + return Text(topic_name[:4], style="red") + Text(topic_name[4:], style="white") + + return Text(topic_name, style="white") + + +class LCMSpyApp(App): + """A real-time CLI dashboard for LCM traffic statistics using Textual.""" + + CSS = """ + Screen { + layout: vertical; + } + DataTable { + height: 2fr; + width: 1fr; + border: none; + background: black; + } + """ + + refresh_interval: float = 0.5 # seconds + show_command_palette = reactive(True) + + BINDINGS = [ + ("q", "quit"), + ("ctrl+c", "quit"), + ] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.spy = GraphLCMSpy(autoconf=True, graph_log_window=0.5) + self.table: DataTable | None = None + + def compose(self) -> ComposeResult: + # yield Header() + + self.table = DataTable(zebra_stripes=False, cursor_type=None) + self.table.add_column("Topic", width=30) + self.table.add_column("Freq (Hz)") + self.table.add_column("Bandwidth") + yield self.table + yield Footer() + + def on_mount(self): + self.theme = "flexoki" + self.spy.start() + self.set_interval(self.refresh_interval, self.refresh_table) + + async def on_unmount(self): + self.spy.stop() + + def refresh_table(self): + topics: List[SpyTopic] = list(self.spy.topic.values()) + topics.sort(key=lambda t: t.kbps(5.0), reverse=True) + self.table.clear(columns=False) + + for t in topics: + freq = t.freq(5.0) + kbps = t.kbps(5.0) + bw_val, bw_unit = t.kbps_hr(5.0) + + self.table.add_row( + topic_text(t.name), + Text(f"{freq:.1f}", style=gradient(10, freq)), + Text(f"{bw_val} {bw_unit.value}", style=gradient(1024 * 3, kbps)), + ) + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1 and sys.argv[1] == "web": + # get script file + import os + + from textual_serve.server import Server + + server = Server(f"python {os.path.abspath(__file__)}") + server.serve() + else: + LCMSpyApp().run() diff --git a/dimos/utils/cli/test_lcmspy.py b/dimos/utils/cli/test_lcmspy.py new file mode 100644 index 0000000000..c491638e34 --- /dev/null +++ b/dimos/utils/cli/test_lcmspy.py @@ -0,0 +1,217 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import pytest + +from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic +from dimos.utils.lcmspy import LCMSpy, GraphLCMSpy, GraphTopic + + +def test_spy_basic(): + lcm = PickleLCM(autoconf=True) + lcm.start() + + lcmspy = LCMSpy() + lcmspy.start() + + video_topic = Topic(topic="/video") + odom_topic = Topic(topic="/odom") + + for i in range(5): + lcm.publish(video_topic, f"video frame {i}") + time.sleep(0.1) + if i % 2 == 0: + lcm.publish(odom_topic, f"odometry data {i / 2}") + + # Wait a bit for messages to be processed + time.sleep(0.5) + + # Test statistics for video topic + video_topic_spy = lcmspy.topic["/video"] + assert video_topic_spy is not None + + # Test frequency (should be around 10 Hz for 5 messages in ~0.5 seconds) + freq = video_topic_spy.freq(1.0) + assert freq > 0 + print(f"Video topic frequency: {freq:.2f} Hz") + + # Test bandwidth + kbps = video_topic_spy.kbps(1.0) + assert kbps > 0 + print(f"Video topic bandwidth: {kbps:.2f} kbps") + + # Test average message size + avg_size = video_topic_spy.size(1.0) + assert avg_size > 0 + print(f"Video topic average message size: {avg_size:.2f} bytes") + + # Test statistics for odom topic + odom_topic_spy = lcmspy.topic["/odom"] + assert odom_topic_spy is not None + + freq = odom_topic_spy.freq(1.0) + assert freq > 0 + print(f"Odom topic frequency: {freq:.2f} Hz") + + kbps = odom_topic_spy.kbps(1.0) + assert kbps > 0 + print(f"Odom topic bandwidth: {kbps:.2f} kbps") + + avg_size = odom_topic_spy.size(1.0) + assert avg_size > 0 + print(f"Odom topic average message size: {avg_size:.2f} bytes") + + print(f"Video topic: {video_topic_spy}") + print(f"Odom topic: {odom_topic_spy}") + + +def test_topic_statistics_direct(): + """Test Topic statistics directly without LCM""" + from dimos.utils.lcmspy import Topic as TopicSpy + + topic = TopicSpy("/test") + + # Add some test messages + test_data = [b"small", b"medium sized message", b"very long message for testing purposes"] + + for i, data in enumerate(test_data): + topic.msg(data) + time.sleep(0.1) # Simulate time passing + + # Test statistics over 1 second window + freq = topic.freq(1.0) + kbps = topic.kbps(1.0) + avg_size = topic.size(1.0) + + assert freq > 0 + assert kbps > 0 + assert avg_size > 0 + + print(f"Direct test - Frequency: {freq:.2f} Hz") + print(f"Direct test - Bandwidth: {kbps:.2f} kbps") + print(f"Direct test - Avg size: {avg_size:.2f} bytes") + + +def test_topic_cleanup(): + """Test that old messages are properly cleaned up""" + from dimos.utils.lcmspy import Topic as TopicSpy + + topic = TopicSpy("/test") + + # Add a message + topic.msg(b"test message") + initial_count = len(topic.message_history) + assert initial_count == 1 + + # Simulate time passing by manually adding old timestamps + old_time = time.time() - 70 # 70 seconds ago + topic.message_history.appendleft((old_time, 10)) + + # Trigger cleanup + topic._cleanup_old_messages(max_age=60.0) + + # Should only have the recent message + assert len(topic.message_history) == 1 + assert topic.message_history[0][0] > time.time() - 10 # Recent message + + +def test_graph_topic_basic(): + """Test GraphTopic basic functionality""" + topic = GraphTopic("/test_graph") + + # Add some messages and update graphs + topic.msg(b"test message") + topic.update_graphs(1.0) + + # Should have history data + assert len(topic.freq_history) == 1 + assert len(topic.bandwidth_history) == 1 + assert topic.freq_history[0] > 0 + assert topic.bandwidth_history[0] > 0 + + +def test_graph_lcmspy_basic(): + """Test GraphLCMSpy basic functionality""" + spy = GraphLCMSpy(graph_log_window=0.1) + spy.start() + time.sleep(0.2) # Wait for thread to start + + # Simulate a message + spy.msg("/test", b"test data") + time.sleep(0.2) # Wait for graph update + + # Should create GraphTopic with history + topic = spy.topic["/test"] + assert isinstance(topic, GraphTopic) + assert len(topic.freq_history) > 0 + assert len(topic.bandwidth_history) > 0 + + spy.stop() + + +def test_lcmspy_global_totals(): + """Test that LCMSpy tracks global totals as a Topic itself""" + spy = LCMSpy() + spy.start() + + # Send messages to different topics + spy.msg("/video", b"video frame data") + spy.msg("/odom", b"odometry data") + spy.msg("/imu", b"imu data") + + # The spy itself should have accumulated all messages + assert len(spy.message_history) == 3 + + # Check global statistics + global_freq = spy.freq(1.0) + global_kbps = spy.kbps(1.0) + global_size = spy.size(1.0) + + assert global_freq > 0 + assert global_kbps > 0 + assert global_size > 0 + + print(f"Global frequency: {global_freq:.2f} Hz") + print(f"Global bandwidth: {spy.kbps_hr(1.0)}") + print(f"Global avg message size: {global_size:.0f} bytes") + + spy.stop() + + +def test_graph_lcmspy_global_totals(): + """Test that GraphLCMSpy tracks global totals with history""" + spy = GraphLCMSpy(graph_log_window=0.1) + spy.start() + time.sleep(0.2) + + # Send messages + spy.msg("/video", b"video frame data") + spy.msg("/odom", b"odometry data") + time.sleep(0.2) # Wait for graph update + + # Update global graphs + spy.update_graphs(1.0) + + # Should have global history + assert len(spy.freq_history) == 1 + assert len(spy.bandwidth_history) == 1 + assert spy.freq_history[0] > 0 + assert spy.bandwidth_history[0] > 0 + + print(f"Global frequency history: {spy.freq_history[0]:.2f} Hz") + print(f"Global bandwidth history: {spy.bandwidth_history[0]:.2f} kB/s") + + spy.stop() diff --git a/pyproject.toml b/pyproject.toml index 412b39f0a0..af355ecd3a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,8 +145,9 @@ dev = [ "ruff==0.11.10", "mypy==1.15.0", "pre_commit==4.2.0", - "pytest", - "pytest-asyncio==0.26.0" + "pytest==8.3.5", + "pytest-asyncio==0.26.0", + "textual==3.7.1" ] [tool.ruff] From 634d4c4474f303db7b11cf47a765312cf430caec Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 16:37:44 -0700 Subject: [PATCH 08/21] total traffic measure --- dimos/utils/cli/lcmspy.py | 44 ++++++++++++++++++++++++----------- dimos/utils/cli/lcmspy_cli.py | 8 ++++--- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/dimos/utils/cli/lcmspy.py b/dimos/utils/cli/lcmspy.py index d0345891f0..aa5b48b297 100755 --- a/dimos/utils/cli/lcmspy.py +++ b/dimos/utils/cli/lcmspy.py @@ -24,10 +24,22 @@ class BandwidthUnit(Enum): - BPS = "B/s" - KBPS = "kB/s" - MBPS = "MB/s" - GBPS = "GB/s" + BP = "B" + KBP = "kB" + MBP = "MB" + GBP = "GB" + + +def human_readable_bytes(bytes_value: float, round_to: int = 2) -> tuple[float, BandwidthUnit]: + """Convert bytes to human-readable format with appropriate units""" + if bytes_value >= 1024**3: # GB + return round(bytes_value / (1024**3), round_to), BandwidthUnit.GBP + elif bytes_value >= 1024**2: # MB + return round(bytes_value / (1024**2), round_to), BandwidthUnit.MBP + elif bytes_value >= 1024: # KB + return round(bytes_value / 1024, round_to), BandwidthUnit.KBP + else: + return round(bytes_value, round_to), BandwidthUnit.BP class Topic: @@ -38,11 +50,14 @@ def __init__(self, name: str, history_window: float = 60.0): # Store (timestamp, data_size) tuples for statistics self.message_history = deque() self.history_window = history_window + # Total traffic accumulator (doesn't get cleaned up) + self.total_traffic_bytes = 0 def msg(self, data: bytes): # print(f"> msg {self.__str__()} {len(data)} bytes") datalen = len(data) self.message_history.append((time.time(), datalen)) + self.total_traffic_bytes += datalen self._cleanup_old_messages() def _cleanup_old_messages(self, max_age: float = None): @@ -78,15 +93,9 @@ def kbps(self, time_window: float) -> float: def kbps_hr(self, time_window: float, round_to: int = 2) -> tuple[float, BandwidthUnit]: """Return human-readable bandwidth with appropriate units""" kbps_val = self.kbps(time_window) - - if kbps_val >= 1024: - return round(kbps_val / 1024, round_to), BandwidthUnit.MBPS - elif kbps_val >= 1: - return round(kbps_val, round_to), BandwidthUnit.KBPS - else: - # Convert to B/s for small values - bps = kbps_val * 1000 - return round(bps, round_to), BandwidthUnit.BPS + # Convert kB/s to B/s for human_readable_bytes + bps = kbps_val * 1000 + return human_readable_bytes(bps, round_to) # avg msg size in the last n seconds def size(self, time_window: float) -> float: @@ -96,6 +105,15 @@ def size(self, time_window: float) -> float: total_size = sum(size for _, size in messages) return total_size / len(messages) + def total_traffic(self) -> int: + """Return total traffic passed in bytes since the beginning""" + return self.total_traffic_bytes + + def total_traffic_hr(self) -> tuple[float, BandwidthUnit]: + """Return human-readable total traffic with appropriate units""" + total_bytes = self.total_traffic() + return human_readable_bytes(total_bytes) + def __str__(self): return f"topic({self.name})" diff --git a/dimos/utils/cli/lcmspy_cli.py b/dimos/utils/cli/lcmspy_cli.py index 3e92e6fa13..188dd793de 100644 --- a/dimos/utils/cli/lcmspy_cli.py +++ b/dimos/utils/cli/lcmspy_cli.py @@ -87,6 +87,7 @@ def compose(self) -> ComposeResult: self.table.add_column("Topic", width=30) self.table.add_column("Freq (Hz)") self.table.add_column("Bandwidth") + self.table.add_column("Total Traffic") yield self.table yield Footer() @@ -100,18 +101,20 @@ async def on_unmount(self): def refresh_table(self): topics: List[SpyTopic] = list(self.spy.topic.values()) - topics.sort(key=lambda t: t.kbps(5.0), reverse=True) + topics.sort(key=lambda t: t.total_traffic(), reverse=True) self.table.clear(columns=False) for t in topics: freq = t.freq(5.0) kbps = t.kbps(5.0) bw_val, bw_unit = t.kbps_hr(5.0) + total_val, total_unit = t.total_traffic_hr() self.table.add_row( topic_text(t.name), Text(f"{freq:.1f}", style=gradient(10, freq)), - Text(f"{bw_val} {bw_unit.value}", style=gradient(1024 * 3, kbps)), + Text(f"{bw_val} {bw_unit.value}/s", style=gradient(1024 * 3, kbps)), + Text(f"{total_val} {total_unit.value}"), ) @@ -119,7 +122,6 @@ def refresh_table(self): import sys if len(sys.argv) > 1 and sys.argv[1] == "web": - # get script file import os from textual_serve.server import Server From 8e032b31fe4e5ebe751b77993ca5e275543d1788 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 16:39:20 -0700 Subject: [PATCH 09/21] lcmspy test import fix --- dimos/utils/cli/test_lcmspy.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dimos/utils/cli/test_lcmspy.py b/dimos/utils/cli/test_lcmspy.py index c491638e34..01a7ab30c6 100644 --- a/dimos/utils/cli/test_lcmspy.py +++ b/dimos/utils/cli/test_lcmspy.py @@ -17,7 +17,7 @@ import pytest from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic -from dimos.utils.lcmspy import LCMSpy, GraphLCMSpy, GraphTopic +from dimos.utils.cli.lcmspy import GraphLCMSpy, GraphTopic, LCMSpy def test_spy_basic(): @@ -80,7 +80,7 @@ def test_spy_basic(): def test_topic_statistics_direct(): """Test Topic statistics directly without LCM""" - from dimos.utils.lcmspy import Topic as TopicSpy + from dimos.utils.cli.lcmspy import Topic as TopicSpy topic = TopicSpy("/test") @@ -107,7 +107,7 @@ def test_topic_statistics_direct(): def test_topic_cleanup(): """Test that old messages are properly cleaned up""" - from dimos.utils.lcmspy import Topic as TopicSpy + from dimos.utils.cli.lcmspy import Topic as TopicSpy topic = TopicSpy("/test") From 8447b51b79a7d0c2c6c3e5c00788f2e385ee2349 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 16:52:44 -0700 Subject: [PATCH 10/21] scheduler fix for replay --- .../multiprocess_unitree_go2.py | 6 ++- dimos/utils/testing.py | 50 +++++++++++++------ 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py b/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py index f9603e95fc..3700a1a945 100644 --- a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py +++ b/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py @@ -83,7 +83,7 @@ class RealRTC(WebRTCRobot): ... # inherit RealRTC instead of FakeRTC to run the real robot -class ConnectionModule(RealRTC, Module): +class ConnectionModule(FakeRTC, Module): movecmd: In[Vector3] = None odom: Out[Vector3] = None lidar: Out[LidarMessage] = None @@ -210,5 +210,7 @@ async def run(ip): if __name__ == "__main__": - asyncio.run(run("192.168.9.179")) + import os + + asyncio.run(run(os.getenv("ROBOT_IP"))) # asyncio.run(run("192.168.9.140")) diff --git a/dimos/utils/testing.py b/dimos/utils/testing.py index 688d87fd8b..d05688fbbd 100644 --- a/dimos/utils/testing.py +++ b/dimos/utils/testing.py @@ -27,6 +27,7 @@ just, merge, timer, + concat, ) from reactivex import operators as ops from reactivex import timer as rx_timer @@ -177,27 +178,44 @@ def iterate_ts(self) -> Iterator[Union[Tuple[float, T], Any]]: return super().iterate() def stream(self) -> Observable[Union[T, Any]]: - """Stream sensor data with original timing preserved (non-blocking).""" + def _subscribe(observer, scheduler=None): + from reactivex.disposable import CompositeDisposable, Disposable + + scheduler = scheduler or TimeoutScheduler() # default thread-based - def create_timed_stream(): iterator = self.iterate_ts() try: - prev_timestamp, first_data = next(iterator) - yield just(first_data) + prev_ts, first_data = next(iterator) + except StopIteration: + observer.on_completed() + return Disposable() - for timestamp, data in iterator: - time_diff = timestamp - prev_timestamp - if time_diff > 0: - yield rx_timer(time_diff).pipe( - ops.map(lambda _, captured_data=data: captured_data) - ) - else: - yield just(data) + # Emit the first sample immediately + observer.on_next(first_data) - prev_timestamp = timestamp + disp = CompositeDisposable() - except StopIteration: - yield empty() + def emit_next(prev_timestamp): + try: + ts, data = next(iterator) + except StopIteration: + observer.on_completed() + return + + delay = max(0.0, ts - prev_timestamp) + + def _action(sc, _state=None): + observer.on_next(data) + emit_next(ts) # schedule the following sample + + # Schedule the next emission relative to previous timestamp + disp.add(scheduler.schedule_relative(delay, _action)) + + emit_next(prev_ts) + + return disp + + from reactivex import create - return concat_with_iterable(create_timed_stream()) + return create(_subscribe) From dad44e077d79c31316ba9c2f44f3abc0e21fc971 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 17:04:25 -0700 Subject: [PATCH 11/21] global map publishing --- .../multiprocess_unitree_go2.py | 4 ++- dimos/robot/unitree_webrtc/type/map.py | 26 +++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py b/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py index 3700a1a945..823f781b20 100644 --- a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py +++ b/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py @@ -153,7 +153,9 @@ async def run(ip): connection.video.transport = core.LCMTransport("/video", Image) connection.movecmd.transport = core.LCMTransport("/move", Vector3) - mapper = dimos.deploy(Map, voxel_size=0.5) + mapper = dimos.deploy(Map, voxel_size=0.5, global_publish_interval=10.0) + + mapper.global_map.transport = core.LCMTransport("/global_map", LidarMessage) local_planner = dimos.deploy( SimplePlanner, diff --git a/dimos/robot/unitree_webrtc/type/map.py b/dimos/robot/unitree_webrtc/type/map.py index a9ead5d95d..898bd473b5 100644 --- a/dimos/robot/unitree_webrtc/type/map.py +++ b/dimos/robot/unitree_webrtc/type/map.py @@ -14,10 +14,11 @@ from dataclasses import dataclass from typing import Optional, Tuple - +import time import numpy as np import open3d as o3d import reactivex.operators as ops +from reactivex import interval from reactivex.observable import Observable from dimos.core import In, Module, Out, rpc @@ -27,17 +28,38 @@ class Map(Module): lidar: In[LidarMessage] = None + global_map: Out[LidarMessage] = None pointcloud: o3d.geometry.PointCloud = o3d.geometry.PointCloud() - def __init__(self, voxel_size: float = 0.05, cost_resolution: float = 0.05, **kwargs): + def __init__( + self, + voxel_size: float = 0.05, + cost_resolution: float = 0.05, + global_publish_interval: Optional[float] = None, + **kwargs, + ): self.voxel_size = voxel_size self.cost_resolution = cost_resolution + self.global_publish_interval = global_publish_interval super().__init__(**kwargs) @rpc def start(self): self.lidar.subscribe(self.add_frame) + if self.global_publish_interval is not None: + interval(self.global_publish_interval).subscribe( + lambda _: self.global_map.publish(self.to_lidar_message()) + ) + + def to_lidar_message(self) -> LidarMessage: + return LidarMessage( + pointcloud=self.pointcloud, + origin=[0.0, 0.0, 0.0], + resolution=self.voxel_size, + ts=time.time(), + ) + @rpc def add_frame(self, frame: LidarMessage) -> "Map": """Voxelise *frame* and splice it into the running map.""" From 8d278c7746c1e2ff49b91a4e4a2cde0405107926 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 17:26:37 -0700 Subject: [PATCH 12/21] foxglove dash --- .../unitree_webrtc/unitree_foxglove.json | 221 ++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 dimos/robot/unitree_webrtc/unitree_foxglove.json diff --git a/dimos/robot/unitree_webrtc/unitree_foxglove.json b/dimos/robot/unitree_webrtc/unitree_foxglove.json new file mode 100644 index 0000000000..0f8f8b0ecd --- /dev/null +++ b/dimos/robot/unitree_webrtc/unitree_foxglove.json @@ -0,0 +1,221 @@ +{ + "configById": { + "3D!18i6zy7": { + "layers": { + "845139cb-26bc-40b3-8161-8ab60af4baf5": { + "visible": true, + "frameLocked": true, + "label": "Grid", + "instanceId": "845139cb-26bc-40b3-8161-8ab60af4baf5", + "layerId": "foxglove.Grid", + "lineWidth": 0.5, + "position": [ + 0, + 0, + 0 + ], + "rotation": [ + 0, + 0, + 0 + ], + "order": 1, + "size": 30, + "divisions": 30 + } + }, + "cameraState": { + "perspective": true, + "distance": 41.010920913295585, + "phi": 12.77189691842678, + "thetaOffset": 91.84171266855309, + "targetOffset": [ + 5.610652121013281, + -2.2558250159447826, + 5.235694681126227e-16 + ], + "target": [ + 0, + 0, + 0 + ], + "targetOrientation": [ + 0, + 0, + 0, + 1 + ], + "fovy": 45, + "near": 0.5, + "far": 5000 + }, + "followMode": "follow-pose", + "scene": { + "enableStats": true, + "ignoreColladaUpAxis": false, + "syncCamera": false + }, + "transforms": {}, + "topics": { + "/lidar": { + "stixelsEnabled": false, + "visible": true, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointShape": "cube", + "pointSize": 8, + "explicitAlpha": 1, + "minValue": -0.1, + "decayTime": 0, + "cubeSize": 0.06 + }, + "/odom": { + "visible": true, + "axisScale": 1 + }, + "/video": { + "visible": false + }, + "/global_map": { + "visible": true, + "colorField": "z", + "colorMode": "colormap", + "colorMap": "turbo", + "pointSize": 6, + "minValue": -0.1, + "decayTime": 0, + "pointShape": "square", + "cubeOutline": false, + "cubeSize": 0.05, + "gradient": [ + "#06011dff", + "#d1e2e2ff" + ], + "stixelsEnabled": false + } + }, + "publish": { + "type": "point", + "poseTopic": "/move_base_simple/goal", + "pointTopic": "/clicked_point", + "poseEstimateTopic": "/estimate", + "poseEstimateXDeviation": 0.5, + "poseEstimateYDeviation": 0.5, + "poseEstimateThetaDeviation": 0.26179939 + }, + "imageMode": {}, + "foxglovePanelTitle": "test" + }, + "Image!3mnp456": { + "cameraState": { + "distance": 20, + "perspective": true, + "phi": 60, + "target": [ + 0, + 0, + 0 + ], + "targetOffset": [ + 0, + 0, + 0 + ], + "targetOrientation": [ + 0, + 0, + 0, + 1 + ], + "thetaOffset": 45, + "fovy": 45, + "near": 0.5, + "far": 5000 + }, + "followMode": "follow-pose", + "scene": { + "enableStats": true + }, + "transforms": {}, + "topics": {}, + "layers": {}, + "publish": { + "type": "point", + "poseTopic": "/move_base_simple/goal", + "pointTopic": "/clicked_point", + "poseEstimateTopic": "/initialpose", + "poseEstimateXDeviation": 0.5, + "poseEstimateYDeviation": 0.5, + "poseEstimateThetaDeviation": 0.26179939 + }, + "imageMode": { + "imageTopic": "/video", + "colorMode": "gradient" + }, + "foxglovePanelTitle": "/video" + }, + "RawMessages!os6rgs": { + "diffEnabled": true, + "diffMethod": "previous message", + "diffTopicPath": "/lidar", + "showFullMessageForDiff": false, + "topicPath": "/odom", + "fontSize": 12 + }, + "Plot!a1gj37": { + "paths": [ + { + "timestampMethod": "receiveTime", + "value": "/odom.pose.position.y", + "enabled": true, + "color": "#4e98e2" + }, + { + "timestampMethod": "receiveTime", + "value": "/odom.pose.position.x", + "enabled": true, + "color": "#f5774d" + }, + { + "timestampMethod": "receiveTime", + "value": "/odom.pose.position.z", + "enabled": true, + "color": "#f7df71" + } + ], + "showXAxisLabels": true, + "showYAxisLabels": true, + "showLegend": true, + "legendDisplay": "floating", + "showPlotValuesInLegend": false, + "isSynced": true, + "xAxisVal": "timestamp", + "sidebarDimension": 240 + } + }, + "globalVariables": {}, + "userNodes": {}, + "playbackConfig": { + "speed": 1 + }, + "drawerConfig": { + "tracks": [] + }, + "layout": { + "first": "3D!18i6zy7", + "second": { + "first": "Image!3mnp456", + "second": { + "first": "RawMessages!os6rgs", + "second": "Plot!a1gj37", + "direction": "row", + "splitPercentage": 21.133036282622545 + }, + "direction": "column", + "splitPercentage": 45.83732057416268 + }, + "direction": "row", + "splitPercentage": 61.01509144891709 + } +} From 279785496ba090661dafcd6d9a8ea0791408c65d Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 18:40:15 -0700 Subject: [PATCH 13/21] test fix, moved stuff to multiprocess/ --- dimos/core/test_core.py | 12 +++++++++--- .../individual_node_example.py} | 0 .../unitree_webrtc/{ => multiprocess}/test_actors.py | 0 .../{ => multiprocess}/test_tooling.py | 0 .../{ => multiprocess}/unitree_foxglove.json | 0 .../unitree_go2.py} | 5 +++-- 6 files changed, 12 insertions(+), 5 deletions(-) rename dimos/robot/unitree_webrtc/{multiprocess_individual_node.py => multiprocess/individual_node_example.py} (100%) rename dimos/robot/unitree_webrtc/{ => multiprocess}/test_actors.py (100%) rename dimos/robot/unitree_webrtc/{ => multiprocess}/test_tooling.py (100%) rename dimos/robot/unitree_webrtc/{ => multiprocess}/unitree_foxglove.json (100%) rename dimos/robot/unitree_webrtc/{multiprocess_unitree_go2.py => multiprocess/unitree_go2.py} (99%) diff --git a/dimos/core/test_core.py b/dimos/core/test_core.py index e71036c402..ace435b54b 100644 --- a/dimos/core/test_core.py +++ b/dimos/core/test_core.py @@ -24,10 +24,10 @@ Out, RemoteOut, ZenohTransport, - dimos, pLCMTransport, rpc, start, + stop, ) from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.odometry import Odometry @@ -35,8 +35,14 @@ from dimos.utils.testing import SensorReplay # never delete this line -if dimos: - ... + + +@pytest.fixture +def dimos(): + """Fixture to create a Dimos client for testing.""" + client = start(2) + yield client + stop(client) class RobotClient(Module): diff --git a/dimos/robot/unitree_webrtc/multiprocess_individual_node.py b/dimos/robot/unitree_webrtc/multiprocess/individual_node_example.py similarity index 100% rename from dimos/robot/unitree_webrtc/multiprocess_individual_node.py rename to dimos/robot/unitree_webrtc/multiprocess/individual_node_example.py diff --git a/dimos/robot/unitree_webrtc/test_actors.py b/dimos/robot/unitree_webrtc/multiprocess/test_actors.py similarity index 100% rename from dimos/robot/unitree_webrtc/test_actors.py rename to dimos/robot/unitree_webrtc/multiprocess/test_actors.py diff --git a/dimos/robot/unitree_webrtc/test_tooling.py b/dimos/robot/unitree_webrtc/multiprocess/test_tooling.py similarity index 100% rename from dimos/robot/unitree_webrtc/test_tooling.py rename to dimos/robot/unitree_webrtc/multiprocess/test_tooling.py diff --git a/dimos/robot/unitree_webrtc/unitree_foxglove.json b/dimos/robot/unitree_webrtc/multiprocess/unitree_foxglove.json similarity index 100% rename from dimos/robot/unitree_webrtc/unitree_foxglove.json rename to dimos/robot/unitree_webrtc/multiprocess/unitree_foxglove.json diff --git a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py similarity index 99% rename from dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py rename to dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py index 823f781b20..8af1ef6635 100644 --- a/dimos/robot/unitree_webrtc/multiprocess_unitree_go2.py +++ b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py @@ -153,7 +153,7 @@ async def run(ip): connection.video.transport = core.LCMTransport("/video", Image) connection.movecmd.transport = core.LCMTransport("/move", Vector3) - mapper = dimos.deploy(Map, voxel_size=0.5, global_publish_interval=10.0) + mapper = dimos.deploy(Map, voxel_size=0.5, global_publish_interval=5.0) mapper.global_map.transport = core.LCMTransport("/global_map", LidarMessage) @@ -208,7 +208,8 @@ async def run(ip): await asyncio.sleep(10) print("querying system") print(mapper.costmap()) - await asyncio.sleep(60) + while True: + await asyncio.sleep(1) if __name__ == "__main__": From 44967d04fd52517f71215097c3ad039e612b1815 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 18:41:56 -0700 Subject: [PATCH 14/21] trying to fix tests in CI --- dimos/utils/cli/test_lcmspy.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dimos/utils/cli/test_lcmspy.py b/dimos/utils/cli/test_lcmspy.py index 01a7ab30c6..46edbe0b36 100644 --- a/dimos/utils/cli/test_lcmspy.py +++ b/dimos/utils/cli/test_lcmspy.py @@ -17,8 +17,11 @@ import pytest from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic +from dimos.protocol.service.lcmservice import autoconf from dimos.utils.cli.lcmspy import GraphLCMSpy, GraphTopic, LCMSpy +autoconf() + def test_spy_basic(): lcm = PickleLCM(autoconf=True) From d88146b318643cf0b59b0c1bf197fcde82ba7c0c Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 19:05:35 -0700 Subject: [PATCH 15/21] lcmservice autoconf fix --- dimos/protocol/service/lcmservice.py | 32 +++++++++++++++++++++------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py index 516354642b..5f8c747864 100644 --- a/dimos/protocol/service/lcmservice.py +++ b/dimos/protocol/service/lcmservice.py @@ -18,6 +18,8 @@ import sys import threading import traceback +import os +from functools import cache from dataclasses import dataclass from typing import Any, Callable, Optional, Protocol, runtime_checkable @@ -26,17 +28,29 @@ from dimos.protocol.service.spec import Service +@cache +def check_root() -> bool: + """Return True if the current process is running as root (UID 0).""" + try: + return os.geteuid() == 0 # type: ignore[attr-defined] + except AttributeError: + # Platforms without geteuid (e.g. Windows) – assume non-root. + return False + + def check_multicast() -> list[str]: """Check if multicast configuration is needed and return required commands.""" commands_needed = [] + sudo = "" if check_root() else "sudo " + # Check if loopback interface has multicast enabled try: result = subprocess.run(["ip", "link", "show", "lo"], capture_output=True, text=True) if "MULTICAST" not in result.stdout: - commands_needed.append("sudo ifconfig lo multicast") + commands_needed.append(f"{sudo}ifconfig lo multicast") except Exception: - commands_needed.append("sudo ifconfig lo multicast") + commands_needed.append(f"{sudo}ifconfig lo multicast") # Check if multicast route exists try: @@ -44,9 +58,9 @@ def check_multicast() -> list[str]: ["ip", "route", "show", "224.0.0.0/4"], capture_output=True, text=True ) if not result.stdout.strip(): - commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") + commands_needed.append(f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") except Exception: - commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") + commands_needed.append(f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") return commands_needed @@ -55,22 +69,24 @@ def check_buffers() -> list[str]: """Check if buffer configuration is needed and return required commands.""" commands_needed = [] + sudo = "" if check_root() else "sudo " + # Check current buffer settings try: result = subprocess.run(["sysctl", "net.core.rmem_max"], capture_output=True, text=True) current_max = int(result.stdout.split("=")[1].strip()) if current_max < 2097152: - commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_max=2097152") except Exception: - commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_max=2097152") try: result = subprocess.run(["sysctl", "net.core.rmem_default"], capture_output=True, text=True) current_default = int(result.stdout.split("=")[1].strip()) if current_default < 2097152: - commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_default=2097152") except Exception: - commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_default=2097152") return commands_needed From 22fc0035a74c86f824137d3ce66dcb0a38fba818 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 19:06:24 -0700 Subject: [PATCH 16/21] lcmservice autoconf fix --- dimos/protocol/service/lcmservice.py | 32 +++++++++++++++++++++------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py index 516354642b..5f8c747864 100644 --- a/dimos/protocol/service/lcmservice.py +++ b/dimos/protocol/service/lcmservice.py @@ -18,6 +18,8 @@ import sys import threading import traceback +import os +from functools import cache from dataclasses import dataclass from typing import Any, Callable, Optional, Protocol, runtime_checkable @@ -26,17 +28,29 @@ from dimos.protocol.service.spec import Service +@cache +def check_root() -> bool: + """Return True if the current process is running as root (UID 0).""" + try: + return os.geteuid() == 0 # type: ignore[attr-defined] + except AttributeError: + # Platforms without geteuid (e.g. Windows) – assume non-root. + return False + + def check_multicast() -> list[str]: """Check if multicast configuration is needed and return required commands.""" commands_needed = [] + sudo = "" if check_root() else "sudo " + # Check if loopback interface has multicast enabled try: result = subprocess.run(["ip", "link", "show", "lo"], capture_output=True, text=True) if "MULTICAST" not in result.stdout: - commands_needed.append("sudo ifconfig lo multicast") + commands_needed.append(f"{sudo}ifconfig lo multicast") except Exception: - commands_needed.append("sudo ifconfig lo multicast") + commands_needed.append(f"{sudo}ifconfig lo multicast") # Check if multicast route exists try: @@ -44,9 +58,9 @@ def check_multicast() -> list[str]: ["ip", "route", "show", "224.0.0.0/4"], capture_output=True, text=True ) if not result.stdout.strip(): - commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") + commands_needed.append(f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") except Exception: - commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") + commands_needed.append(f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") return commands_needed @@ -55,22 +69,24 @@ def check_buffers() -> list[str]: """Check if buffer configuration is needed and return required commands.""" commands_needed = [] + sudo = "" if check_root() else "sudo " + # Check current buffer settings try: result = subprocess.run(["sysctl", "net.core.rmem_max"], capture_output=True, text=True) current_max = int(result.stdout.split("=")[1].strip()) if current_max < 2097152: - commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_max=2097152") except Exception: - commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_max=2097152") try: result = subprocess.run(["sysctl", "net.core.rmem_default"], capture_output=True, text=True) current_default = int(result.stdout.split("=")[1].strip()) if current_default < 2097152: - commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_default=2097152") except Exception: - commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_default=2097152") return commands_needed From e08469971039e779c03a8f0149eef5ad4de56200 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 19:18:43 -0700 Subject: [PATCH 17/21] lcmservice test fixes --- dimos/protocol/service/test_lcmservice.py | 79 ++++++++++++----------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/dimos/protocol/service/test_lcmservice.py b/dimos/protocol/service/test_lcmservice.py index 53d8c7fd12..c5b86cac35 100644 --- a/dimos/protocol/service/test_lcmservice.py +++ b/dimos/protocol/service/test_lcmservice.py @@ -23,9 +23,15 @@ autoconf, check_buffers, check_multicast, + check_root, ) +def get_sudo_prefix() -> str: + """Return 'sudo ' if not running as root, empty string if running as root.""" + return "" if check_root() else "sudo " + + def test_check_multicast_all_configured(): """Test check_multicast when system is properly configured.""" with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: @@ -63,7 +69,8 @@ def test_check_multicast_missing_multicast_flag(): ] result = check_multicast() - assert result == ["sudo ifconfig lo multicast"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}ifconfig lo multicast"] def test_check_multicast_missing_route(): @@ -83,7 +90,8 @@ def test_check_multicast_missing_route(): ] result = check_multicast() - assert result == ["sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo"] def test_check_multicast_all_missing(): @@ -103,9 +111,10 @@ def test_check_multicast_all_missing(): ] result = check_multicast() + sudo = get_sudo_prefix() expected = [ - "sudo ifconfig lo multicast", - "sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", + f"{sudo}ifconfig lo multicast", + f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", ] assert result == expected @@ -117,9 +126,10 @@ def test_check_multicast_subprocess_exception(): mock_run.side_effect = Exception("Command failed") result = check_multicast() + sudo = get_sudo_prefix() expected = [ - "sudo ifconfig lo multicast", - "sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", + f"{sudo}ifconfig lo multicast", + f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", ] assert result == expected @@ -151,7 +161,8 @@ def test_check_buffers_low_max_buffer(): ] result = check_buffers() - assert result == ["sudo sysctl -w net.core.rmem_max=2097152"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}sysctl -w net.core.rmem_max=2097152"] def test_check_buffers_low_default_buffer(): @@ -166,7 +177,8 @@ def test_check_buffers_low_default_buffer(): ] result = check_buffers() - assert result == ["sudo sysctl -w net.core.rmem_default=2097152"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}sysctl -w net.core.rmem_default=2097152"] def test_check_buffers_both_low(): @@ -181,9 +193,10 @@ def test_check_buffers_both_low(): ] result = check_buffers() + sudo = get_sudo_prefix() expected = [ - "sudo sysctl -w net.core.rmem_max=2097152", - "sudo sysctl -w net.core.rmem_default=2097152", + f"{sudo}sysctl -w net.core.rmem_max=2097152", + f"{sudo}sysctl -w net.core.rmem_default=2097152", ] assert result == expected @@ -195,9 +208,10 @@ def test_check_buffers_subprocess_exception(): mock_run.side_effect = Exception("Command failed") result = check_buffers() + sudo = get_sudo_prefix() expected = [ - "sudo sysctl -w net.core.rmem_max=2097152", - "sudo sysctl -w net.core.rmem_default=2097152", + f"{sudo}sysctl -w net.core.rmem_max=2097152", + f"{sudo}sysctl -w net.core.rmem_default=2097152", ] assert result == expected @@ -212,9 +226,10 @@ def test_check_buffers_parsing_error(): ] result = check_buffers() + sudo = get_sudo_prefix() expected = [ - "sudo sysctl -w net.core.rmem_max=2097152", - "sudo sysctl -w net.core.rmem_default=2097152", + f"{sudo}sysctl -w net.core.rmem_max=2097152", + f"{sudo}sysctl -w net.core.rmem_default=2097152", ] assert result == expected @@ -267,29 +282,26 @@ def test_autoconf_with_config_needed_success(): # Command execution calls type( "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo ifconfig lo multicast - type("MockResult", (), {"stdout": "success", "returncode": 0})(), # sudo route add... - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo sysctl rmem_max - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo sysctl rmem_default + )(), # ifconfig lo multicast + type("MockResult", (), {"stdout": "success", "returncode": 0})(), # route add... + type("MockResult", (), {"stdout": "success", "returncode": 0})(), # sysctl rmem_max + type("MockResult", (), {"stdout": "success", "returncode": 0})(), # sysctl rmem_default ] with patch("builtins.print") as mock_print: autoconf() + sudo = get_sudo_prefix() # Verify the expected print calls expected_calls = [ ("System configuration required. Executing commands...",), - (" Running: sudo ifconfig lo multicast",), + (f" Running: {sudo}ifconfig lo multicast",), (" ✓ Success",), - (" Running: sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo",), + (f" Running: {sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo",), (" ✓ Success",), - (" Running: sudo sysctl -w net.core.rmem_max=2097152",), + (f" Running: {sudo}sysctl -w net.core.rmem_max=2097152",), (" ✓ Success",), - (" Running: sudo sysctl -w net.core.rmem_default=2097152",), + (f" Running: {sudo}sysctl -w net.core.rmem_default=2097152",), (" ✓ Success",), ("System configuration completed.",), ] @@ -318,20 +330,11 @@ def test_autoconf_with_command_failures(): # Command execution calls - first succeeds, second fails type( "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo ifconfig lo multicast + )(), # ifconfig lo multicast subprocess.CalledProcessError( 1, - [ - "sudo", - "route", - "add", - "-net", - "224.0.0.0", - "netmask", - "240.0.0.0", - "dev", - "lo", - ], + get_sudo_prefix().split() + + ["route", "add", "-net", "224.0.0.0", "netmask", "240.0.0.0", "dev", "lo"], "Permission denied", "Operation not permitted", ), From e4452346e1c8514232a8842bfe851eb7d0c18343 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 19:24:47 -0700 Subject: [PATCH 18/21] lcmspy doesn't limit topic display width --- dimos/utils/cli/lcmspy_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/utils/cli/lcmspy_cli.py b/dimos/utils/cli/lcmspy_cli.py index 188dd793de..47bf2b8a56 100644 --- a/dimos/utils/cli/lcmspy_cli.py +++ b/dimos/utils/cli/lcmspy_cli.py @@ -84,7 +84,7 @@ def compose(self) -> ComposeResult: # yield Header() self.table = DataTable(zebra_stripes=False, cursor_type=None) - self.table.add_column("Topic", width=30) + self.table.add_column("Topic") self.table.add_column("Freq (Hz)") self.table.add_column("Bandwidth") self.table.add_column("Total Traffic") From 89a818917a02f05ca8ab9f54bf99415eb0d8f30d Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 19:31:08 -0700 Subject: [PATCH 19/21] disabled LCM tests in CI --- dimos/utils/cli/test_lcmspy.py | 6 ++++++ pyproject.toml | 7 +++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/dimos/utils/cli/test_lcmspy.py b/dimos/utils/cli/test_lcmspy.py index 46edbe0b36..a77bb03d20 100644 --- a/dimos/utils/cli/test_lcmspy.py +++ b/dimos/utils/cli/test_lcmspy.py @@ -23,6 +23,7 @@ autoconf() +@pytest.mark.lcm def test_spy_basic(): lcm = PickleLCM(autoconf=True) lcm.start() @@ -81,6 +82,7 @@ def test_spy_basic(): print(f"Odom topic: {odom_topic_spy}") +@pytest.mark.lcm def test_topic_statistics_direct(): """Test Topic statistics directly without LCM""" from dimos.utils.cli.lcmspy import Topic as TopicSpy @@ -131,6 +133,7 @@ def test_topic_cleanup(): assert topic.message_history[0][0] > time.time() - 10 # Recent message +@pytest.mark.lcm def test_graph_topic_basic(): """Test GraphTopic basic functionality""" topic = GraphTopic("/test_graph") @@ -146,6 +149,7 @@ def test_graph_topic_basic(): assert topic.bandwidth_history[0] > 0 +@pytest.mark.lcm def test_graph_lcmspy_basic(): """Test GraphLCMSpy basic functionality""" spy = GraphLCMSpy(graph_log_window=0.1) @@ -165,6 +169,7 @@ def test_graph_lcmspy_basic(): spy.stop() +@pytest.mark.lcm def test_lcmspy_global_totals(): """Test that LCMSpy tracks global totals as a Topic itself""" spy = LCMSpy() @@ -194,6 +199,7 @@ def test_lcmspy_global_totals(): spy.stop() +@pytest.mark.lcm def test_graph_lcmspy_global_totals(): """Test that GraphLCMSpy tracks global totals with history""" spy = GraphLCMSpy(graph_log_window=0.1) diff --git a/pyproject.toml b/pyproject.toml index af355ecd3a..5cb8c5be9d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -188,9 +188,12 @@ markers = [ "exclude: arbitrary exclusion from CI and default test exec", "tool: dev tooling", "needsdata: needs test data to be downloaded", - "ros: depend on ros"] + "ros: depend on ros", + "lcm: tests that run actual LCM bus (can't execute in CI)" -addopts = "-v -ra --color=yes -m 'not vis and not benchmark and not exclude and not tool and not needsdata and not ros and not heavy'" +] + +addopts = "-v -ra --color=yes -m 'not vis and not benchmark and not exclude and not tool and not needsdata and not lcm and not ros and not heavy'" From d9a06dcb9eb2833dca9a6031582205513f12a635 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 19:47:29 -0700 Subject: [PATCH 20/21] mini updates to foxglove dash, publishing global map every 2.5 seconds --- .../multiprocess/unitree_foxglove.json | 18 +++++++++--------- .../unitree_webrtc/multiprocess/unitree_go2.py | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/dimos/robot/unitree_webrtc/multiprocess/unitree_foxglove.json b/dimos/robot/unitree_webrtc/multiprocess/unitree_foxglove.json index 0f8f8b0ecd..932ee65fd5 100644 --- a/dimos/robot/unitree_webrtc/multiprocess/unitree_foxglove.json +++ b/dimos/robot/unitree_webrtc/multiprocess/unitree_foxglove.json @@ -26,13 +26,13 @@ }, "cameraState": { "perspective": true, - "distance": 41.010920913295585, - "phi": 12.77189691842678, - "thetaOffset": 91.84171266855309, + "distance": 22.16066481982733, + "phi": 34.816360162458444, + "thetaOffset": 110.38709006815255, "targetOffset": [ - 5.610652121013281, - -2.2558250159447826, - 5.235694681126227e-16 + 0.6543165797799305, + -5.069343424147507, + 5.603018248159094e-16 ], "target": [ 0, @@ -63,8 +63,8 @@ "colorField": "z", "colorMode": "colormap", "colorMap": "turbo", - "pointShape": "cube", - "pointSize": 8, + "pointShape": "square", + "pointSize": 10, "explicitAlpha": 1, "minValue": -0.1, "decayTime": 0, @@ -82,7 +82,7 @@ "colorField": "z", "colorMode": "colormap", "colorMap": "turbo", - "pointSize": 6, + "pointSize": 10, "minValue": -0.1, "decayTime": 0, "pointShape": "square", diff --git a/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py index 8af1ef6635..659eafde0c 100644 --- a/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py @@ -153,7 +153,7 @@ async def run(ip): connection.video.transport = core.LCMTransport("/video", Image) connection.movecmd.transport = core.LCMTransport("/move", Vector3) - mapper = dimos.deploy(Map, voxel_size=0.5, global_publish_interval=5.0) + mapper = dimos.deploy(Map, voxel_size=0.5, global_publish_interval=2.5) mapper.global_map.transport = core.LCMTransport("/global_map", LidarMessage) From d351271b116daf642402b8e7ef6710380dde6374 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 22:42:40 -0700 Subject: [PATCH 21/21] PR fixes --- .../foxglove_unitree_lcm_dashboard.json | 0 dimos/robot/unitree_webrtc/connection.py | 3 --- 2 files changed, 3 deletions(-) rename dimos/robot/unitree_webrtc/multiprocess/unitree_foxglove.json => assets/foxglove_unitree_lcm_dashboard.json (100%) diff --git a/dimos/robot/unitree_webrtc/multiprocess/unitree_foxglove.json b/assets/foxglove_unitree_lcm_dashboard.json similarity index 100% rename from dimos/robot/unitree_webrtc/multiprocess/unitree_foxglove.json rename to assets/foxglove_unitree_lcm_dashboard.json diff --git a/dimos/robot/unitree_webrtc/connection.py b/dimos/robot/unitree_webrtc/connection.py index 81beec6244..711adf640a 100644 --- a/dimos/robot/unitree_webrtc/connection.py +++ b/dimos/robot/unitree_webrtc/connection.py @@ -45,7 +45,6 @@ class WebRTCRobot(ConnectionInterface): def __init__(self, ip: str, mode: str = "ai"): self.ip = ip - print("IP IS", ip) self.mode = mode self.conn = Go2WebRTCConnection(WebRTCConnectionMethod.LocalSTA, ip=self.ip) self.connect() @@ -97,8 +96,6 @@ def move(self, velocity: Vector, duration: float = 0.0) -> bool: """ x, y, yaw = velocity.x, velocity.y, velocity.z - print("MOVE REQUESTED", velocity) - # WebRTC coordinate mapping: # x - Positive right, negative left # y - positive forward, negative backwards