diff --git a/dimos/core/stream.py b/dimos/core/stream.py index 70384ffc77..29db16c655 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -26,7 +26,6 @@ import reactivex as rx from reactivex import operators as ops from reactivex.disposable import Disposable -import rerun as rr import dimos.core.colors as colors from dimos.core.resource import Resource @@ -143,8 +142,6 @@ class Out(Stream[T], ObservableMixin[T]): def __init__(self, *argv, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*argv, **kwargs) - self._rerun_config: dict | None = None # type: ignore[type-arg] - self._rerun_last_log: float = 0.0 @property def transport(self) -> Transport[T]: @@ -175,11 +172,6 @@ def publish(self, msg) -> None: # type: ignore[no-untyped-def] if not hasattr(self, "_transport") or self._transport is None: logger.warning(f"Trying to publish on Out {self} without a transport") return - - # Log to Rerun directly if configured - if self._rerun_config is not None: - self._log_to_rerun(msg) - self._transport.broadcast(self, msg) def subscribe(self, cb) -> Callable[[], None]: # type: ignore[no-untyped-def] @@ -193,58 +185,6 @@ def subscribe(self, cb) -> Callable[[], None]: # type: ignore[no-untyped-def] """ return self.transport.subscribe(cb, self) # type: ignore[arg-type, func-returns-value, no-any-return] - def autolog_to_rerun( - self, - entity_path: str, - rate_limit: float | None = None, - **rerun_kwargs: Any, - ) -> None: - """Configure this output to auto-log to Rerun (fire-and-forget). - - Call once in start() - messages auto-logged when published. - - Args: - entity_path: Rerun entity path (e.g., "world/map") - rate_limit: Max Hz to log (None = unlimited) - **rerun_kwargs: Passed to msg.to_rerun() for rendering config - (e.g., radii=0.02, colormap="turbo", colors=[255,0,0]) - - Example: - def start(self): - super().start() - # Just declare it - fire and forget! - self.global_map.autolog_to_rerun("world/map", rate_limit=5.0, radii=0.02) - """ - self._rerun_config = { - "entity_path": entity_path, - "rate_limit": rate_limit, - "rerun_kwargs": rerun_kwargs, - } - self._rerun_last_log = 0.0 - - def _log_to_rerun(self, msg: T) -> None: - """Log message to Rerun with rate limiting.""" - if not hasattr(msg, "to_rerun"): - return - - if self._rerun_config is None: - return - - import time - - config = self._rerun_config - - # Rate limiting - if config["rate_limit"] is not None: - now = time.monotonic() - min_interval = 1.0 / config["rate_limit"] - if now - self._rerun_last_log < min_interval: - return # Skip - too soon - self._rerun_last_log = now - - rerun_data = msg.to_rerun(**config["rerun_kwargs"]) - rr.log(config["entity_path"], rerun_data) - class RemoteStream(Stream[T]): @property diff --git a/dimos/dashboard/tf_rerun_module.py b/dimos/dashboard/tf_rerun_module.py new file mode 100644 index 0000000000..c862778cad --- /dev/null +++ b/dimos/dashboard/tf_rerun_module.py @@ -0,0 +1,112 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""TF Rerun Module - Automatically visualize all transforms in Rerun. + +This module subscribes to the /tf LCM topic and logs ALL transforms +to Rerun, providing automatic visualization of the robot's TF tree. + +Usage: + # In blueprints: + from dimos.dashboard.tf_rerun_module import tf_rerun + + def my_robot(): + return ( + robot_connection() + + tf_rerun() # Add TF visualization + + other_modules() + ) +""" + +from typing import Any + +import rerun as rr + +from dimos.core import Module, rpc +from dimos.core.global_config import GlobalConfig +from dimos.dashboard.rerun_init import connect_rerun +from dimos.msgs.tf2_msgs import TFMessage +from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class TFRerunModule(Module): + """Subscribes to /tf LCM topic and logs all transforms to Rerun. + + This module automatically visualizes the TF tree in Rerun by: + - Subscribing to the /tf LCM topic (captures ALL transforms in the system) + - Logging each transform to its derived entity path (world/{child_frame_id}) + """ + + _global_config: GlobalConfig + _lcm: LCM | None = None + _unsubscribe: Any = None + + def __init__( + self, + global_config: GlobalConfig | None = None, + **kwargs: Any, + ) -> None: + """Initialize TFRerunModule. + + Args: + global_config: Optional global configuration for viewer backend settings + **kwargs: Additional arguments passed to parent Module + """ + super().__init__(**kwargs) + self._global_config = global_config or GlobalConfig() + + @rpc + def start(self) -> None: + """Start the TF visualization module.""" + super().start() + + # Only connect if Rerun backend is selected + if self._global_config.viewer_backend.startswith("rerun"): + connect_rerun(global_config=self._global_config) + + # Subscribe directly to LCM /tf topic (captures ALL transforms) + self._lcm = LCM() + self._lcm.start() + topic = Topic("/tf", TFMessage) + self._unsubscribe = self._lcm.subscribe(topic, self._on_tf_message) + logger.info("TFRerunModule: subscribed to /tf, logging all transforms to Rerun") + + def _on_tf_message(self, msg: TFMessage, topic: Topic) -> None: + """Log all transforms in TFMessage to Rerun. + + Args: + msg: TFMessage containing transforms to visualize + topic: The LCM topic (unused but required by callback signature) + """ + for entity_path, transform in msg.to_rerun(): # type: ignore[no-untyped-call] + rr.log(entity_path, transform) + + @rpc + def stop(self) -> None: + """Stop the TF visualization module and cleanup LCM subscription.""" + if self._unsubscribe: + self._unsubscribe() + self._unsubscribe = None + + if self._lcm: + self._lcm.stop() + self._lcm = None + + super().stop() + + +tf_rerun = TFRerunModule.blueprint diff --git a/dimos/mapping/costmapper.py b/dimos/mapping/costmapper.py index e637126a04..ee7512baba 100644 --- a/dimos/mapping/costmapper.py +++ b/dimos/mapping/costmapper.py @@ -82,21 +82,13 @@ def _rerun_worker(self) -> None: # Generate mesh + log to Rerun (blocks in background, not on data path) try: - # 2D image panel - rr.log( - "world/nav/costmap/image", - grid.to_rerun( - mode="image", - colormap="RdBu_r", - ), - ) # 3D floor overlay (expensive mesh generation) rr.log( "world/nav/costmap/floor", grid.to_rerun( mode="mesh", - colormap=None, # Grayscale: free=white, occupied=black - z_offset=0.02, + colormap=None, # Uses Foxglove-style colors (blue-purple free, black occupied) + z_offset=0.05, # 5cm above floor to avoid z-fighting ), ) diff --git a/dimos/msgs/nav_msgs/OccupancyGrid.py b/dimos/msgs/nav_msgs/OccupancyGrid.py index 62ee9aa072..3876b44fab 100644 --- a/dimos/msgs/nav_msgs/OccupancyGrid.py +++ b/dimos/msgs/nav_msgs/OccupancyGrid.py @@ -648,15 +648,27 @@ def _to_rerun_mesh(self, colormap: str | None = None, z_offset: float = 0.01): # Alpha: 180 for free, 220 for occupied alpha = np.where(cell_values == 0, 180, 220).astype(np.uint8) else: - # Default coloring: dark grey for free, black for occupied + # Foxglove-style coloring: blue-purple for free, black for occupied + # Free (0): #484981 = RGB(72, 73, 129) + # Occupied (100): #000000 = RGB(0, 0, 0) rgb = np.zeros((n_cells, 3), dtype=np.uint8) is_free = cell_values == 0 - # Free space: dark grey - rgb[is_free] = [40, 40, 40] - # Occupied: black to dark grey gradient (darker = more occupied) - intensity = (40 * (1 - cell_values / 100)).astype(np.uint8) - rgb[~is_free] = np.column_stack([intensity[~is_free]] * 3) - alpha = np.where(is_free, 150, 200).astype(np.uint8) + is_occupied = ~is_free + + # Free space: blue-purple #484981 + rgb[is_free] = [72, 73, 129] + + # Occupied: gradient from blue-purple to black based on cost + # cost 1 -> mostly blue-purple, cost 100 -> black + if np.any(is_occupied): + costs = cell_values[is_occupied].astype(np.float32) + # Linear interpolation: (1 - cost/100) * blue-purple + factor = (1 - costs / 100).clip(0, 1) + rgb[is_occupied, 0] = (72 * factor).astype(np.uint8) + rgb[is_occupied, 1] = (73 * factor).astype(np.uint8) + rgb[is_occupied, 2] = (129 * factor).astype(np.uint8) + + alpha = np.where(is_free, 180, 220).astype(np.uint8) # Combine RGB and alpha into RGBA colors_per_cell = np.column_stack([rgb, alpha]) # (n_cells, 4) diff --git a/dimos/msgs/nav_msgs/Path.py b/dimos/msgs/nav_msgs/Path.py index ff30b88fa6..e92eab17a4 100644 --- a/dimos/msgs/nav_msgs/Path.py +++ b/dimos/msgs/nav_msgs/Path.py @@ -233,11 +233,18 @@ def to_ros_msg(self) -> ROSPath: return ros_msg - def to_rerun(self, color: tuple[int, int, int] = (0, 255, 128)): # type: ignore[no-untyped-def] + def to_rerun( # type: ignore[no-untyped-def] + self, + color: tuple[int, int, int] = (0, 255, 128), + z_offset: float = 0.2, + radii: float = 0.05, + ): """Convert to rerun LineStrips3D format. Args: color: RGB color tuple for the path line + z_offset: Height above floor to render path (default 0.2m to avoid costmap occlusion) + radii: Thickness of the path line (default 0.05m = 5cm) Returns: rr.LineStrips3D archetype for logging to rerun @@ -245,5 +252,6 @@ def to_rerun(self, color: tuple[int, int, int] = (0, 255, 128)): # type: ignore if not self.poses: return rr.LineStrips3D([]) - points = [[p.x, p.y, p.z] for p in self.poses] - return rr.LineStrips3D([points], colors=[color]) + # Lift path above floor so it's visible over costmap + points = [[p.x, p.y, p.z + z_offset] for p in self.poses] + return rr.LineStrips3D([points], colors=[color], radii=radii) diff --git a/dimos/navigation/replanning_a_star/module.py b/dimos/navigation/replanning_a_star/module.py index d8b582be83..6ba1ae0ba1 100644 --- a/dimos/navigation/replanning_a_star/module.py +++ b/dimos/navigation/replanning_a_star/module.py @@ -16,6 +16,7 @@ from dimos_lcm.std_msgs import Bool, String from reactivex.disposable import Disposable +import rerun as rr from dimos.core import In, Module, Out, rpc from dimos.core.global_config import GlobalConfig @@ -50,22 +51,24 @@ def __init__(self, global_config: GlobalConfig | None = None) -> None: @rpc def start(self) -> None: super().start() - connect_rerun(global_config=self._global_config) - # Auto-log path to Rerun - self.path.autolog_to_rerun("world/nav/path") + if self._global_config.viewer_backend.startswith("rerun"): + connect_rerun(global_config=self._global_config) - unsub = self.odom.subscribe(self._planner.handle_odom) - self._disposables.add(Disposable(unsub)) + # Manual Rerun logging for path + def _log_path_to_rerun(path: Path) -> None: + rr.log("world/nav/path", path.to_rerun()) # type: ignore[no-untyped-call] - unsub = self.global_costmap.subscribe(self._planner.handle_global_costmap) - self._disposables.add(Disposable(unsub)) + self._disposables.add(self._planner.path.subscribe(_log_path_to_rerun)) - unsub = self.goal_request.subscribe(self._planner.handle_goal_request) - self._disposables.add(Disposable(unsub)) - - unsub = self.target.subscribe(self._planner.handle_goal_request) - self._disposables.add(Disposable(unsub)) + self._disposables.add(Disposable(self.odom.subscribe(self._planner.handle_odom))) + self._disposables.add( + Disposable(self.global_costmap.subscribe(self._planner.handle_global_costmap)) + ) + self._disposables.add( + Disposable(self.goal_request.subscribe(self._planner.handle_goal_request)) + ) + self._disposables.add(Disposable(self.target.subscribe(self._planner.handle_goal_request))) self._disposables.add(self._planner.path.subscribe(self.path.publish)) diff --git a/dimos/robot/unitree/connection/connection.py b/dimos/robot/unitree/connection/connection.py index 411710ce1c..bef0c0b127 100644 --- a/dimos/robot/unitree/connection/connection.py +++ b/dimos/robot/unitree/connection/connection.py @@ -38,6 +38,7 @@ from dimos.core.resource import Resource from dimos.msgs.geometry_msgs import Pose, Transform, Twist from dimos.msgs.sensor_msgs import Image +from dimos.msgs.sensor_msgs.image_impls.AbstractImage import ImageFormat from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.lowstate import LowStateMsg from dimos.robot.unitree_webrtc.type.odometry import Odometry @@ -267,6 +268,7 @@ def video_stream(self) -> Observable[Image]: lambda frame: Image.from_numpy( # np.ascontiguousarray(frame.to_ndarray("rgb24")), frame.to_ndarray(format="rgb24"), # type: ignore[attr-defined] + format=ImageFormat.RGB, # Frame is RGB24, not BGR frame_id="camera_optical", ) ), diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index 8173f27527..2d962af981 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -30,6 +30,7 @@ from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core.blueprints import autoconnect from dimos.core.transport import JpegLcmTransport, JpegShmTransport, LCMTransport, pSHMTransport +from dimos.dashboard.tf_rerun_module import tf_rerun from dimos.mapping.costmapper import cost_mapper from dimos.mapping.voxels import voxel_mapper from dimos.msgs.sensor_msgs import Image, PointCloud2 @@ -73,6 +74,7 @@ go2_connection(), linux if platform.system() == "Linux" else mac, websocket_vis(), + tf_rerun(), # Auto-visualize all TF transforms in Rerun ).global_config(n_dask_workers=4, robot_model="unitree_go2") nav = autoconnect( diff --git a/dimos/utils/metrics.py b/dimos/utils/metrics.py index 54c3d3ca10..bf7bf45cdc 100644 --- a/dimos/utils/metrics.py +++ b/dimos/utils/metrics.py @@ -18,6 +18,7 @@ from typing import Any, TypeVar, cast from dimos_lcm.std_msgs import Float32 +import rerun as rr from dimos.core import LCMTransport, Transport @@ -50,3 +51,40 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return cast("F", wrapper) return timed_decorator + + +def log_timing_to_rerun(entity_path: str) -> Callable[[F], F]: + """Decorator to log function execution time to Rerun. + + Automatically measures the execution time of the decorated function + and logs it as a scalar value to the specified Rerun entity path. + + Args: + entity_path: Rerun entity path for timing metrics + (e.g., "metrics/costmap/calc_ms") + + Returns: + Decorator function + + Example: + @log_timing_to_rerun("metrics/costmap/calc_ms") + def _calculate_costmap(self, msg): + # ... expensive computation + return result + + # Timing automatically logged to Rerun as a time series! + """ + + def decorator(func: F) -> F: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + start = time.perf_counter() + result = func(*args, **kwargs) + elapsed_ms = (time.perf_counter() - start) * 1000 + + rr.log(entity_path, rr.Scalars(elapsed_ms)) + return result + + return cast("F", wrapper) + + return decorator diff --git a/dimos/web/websocket_vis/websocket_vis_module.py b/dimos/web/websocket_vis/websocket_vis_module.py index e3638a6fe9..31aa0d3956 100644 --- a/dimos/web/websocket_vis/websocket_vis_module.py +++ b/dimos/web/websocket_vis/websocket_vis_module.py @@ -22,13 +22,12 @@ """ import asyncio -import os from pathlib import Path as FilePath import threading import time from typing import Any -from dimos_lcm.std_msgs import Bool +from dimos_lcm.std_msgs import Bool # type: ignore[import-untyped] from reactivex.disposable import Disposable import socketio # type: ignore[import-untyped] from starlette.applications import Starlette @@ -45,6 +44,7 @@ ) from dimos.core import In, Module, Out, rpc +from dimos.core.global_config import GlobalConfig from dimos.mapping.occupancy.gradient import gradient from dimos.mapping.occupancy.inflation import simple_inflate from dimos.mapping.types import LatLon @@ -90,13 +90,20 @@ class WebsocketVisModule(Module): cmd_vel: Out[Twist] movecmd_stamped: Out[TwistStamped] - def __init__(self, port: int = 7779, **kwargs) -> None: # type: ignore[no-untyped-def] + def __init__( + self, + port: int = 7779, + global_config: GlobalConfig | None = None, + **kwargs: Any, + ) -> None: """Initialize the WebSocket visualization module. Args: port: Port to run the web server on + global_config: Optional global config for viewer backend settings """ super().__init__(**kwargs) + self._global_config = global_config or GlobalConfig() self.port = port self._uvicorn_server_thread: threading.Thread | None = None @@ -204,8 +211,7 @@ def _create_server(self) -> None: async def serve_index(request): # type: ignore[no-untyped-def] """Serve appropriate HTML based on viewer mode.""" # If running native Rerun, redirect to standalone command center - viewer_backend = os.environ.get("VIEWER_BACKEND", "rerun-web").lower() - if viewer_backend == "rerun-native": + if self._global_config.viewer_backend == "rerun-native": return RedirectResponse(url="/command-center") # Otherwise serve full dashboard with Rerun iframe return FileResponse(_DASHBOARD_HTML, media_type="text/html")