From 91811a6c56c406947d80c15424ae42ffde97b71f Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Thu, 1 Jan 2026 22:09:20 -0800 Subject: [PATCH 01/13] fix: Make path visualization thicker and elevated above floor - Add radii parameter (default 0.05m = 5cm thick) - Add z_offset parameter (default 0.2m above floor) - Prevents path from being occluded by costmap mesh Path now clearly visible in Rerun 3D viewer. --- dimos/msgs/nav_msgs/Path.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dimos/msgs/nav_msgs/Path.py b/dimos/msgs/nav_msgs/Path.py index ff30b88fa6..8510e6a194 100644 --- a/dimos/msgs/nav_msgs/Path.py +++ b/dimos/msgs/nav_msgs/Path.py @@ -233,11 +233,13 @@ def to_ros_msg(self) -> ROSPath: return ros_msg - def to_rerun(self, color: tuple[int, int, int] = (0, 255, 128)): # type: ignore[no-untyped-def] + def to_rerun(self, color: tuple[int, int, int] = (0, 255, 128), z_offset: float = 0.2, radii: float = 0.05): # type: ignore[no-untyped-def] """Convert to rerun LineStrips3D format. Args: color: RGB color tuple for the path line + z_offset: Height above floor to render path (default 0.2m to avoid costmap occlusion) + radii: Thickness of the path line (default 0.05m = 5cm) Returns: rr.LineStrips3D archetype for logging to rerun @@ -245,5 +247,6 @@ def to_rerun(self, color: tuple[int, int, int] = (0, 255, 128)): # type: ignore if not self.poses: return rr.LineStrips3D([]) - points = [[p.x, p.y, p.z] for p in self.poses] - return rr.LineStrips3D([points], colors=[color]) + # Lift path above floor so it's visible over costmap + points = [[p.x, p.y, p.z + z_offset] for p in self.poses] + return rr.LineStrips3D([points], colors=[color], radii=radii) From 6f69518bee271b36d40ec9d338318c1b424e0528 Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Thu, 1 Jan 2026 22:59:26 -0800 Subject: [PATCH 02/13] feat:some rerun visualization improvements - Fix costmap colors: use turbo colormap, remove broken image panel, increase z_offset to 0.05 (turbo is bullshit i will fix this) - Fix websocket_vis: use global_config instead of os.environ for viewer_backend - Add TFRerunModule: auto-visualize all TF transforms via LCM subscription - Add autolog_to_rerun_async(): async logging with background thread for Out streams - Add log_timing_to_rerun(): decorator for timing metrics in Rerun - Wire TFRerunModule into go2 blueprints Addresses feedback from issue #922 --- dimos/core/stream.py | 100 +++++++++++++++- dimos/dashboard/tf_rerun_module.py | 112 ++++++++++++++++++ dimos/mapping/costmapper.py | 12 +- dimos/msgs/nav_msgs/Path.py | 7 +- .../unitree_webrtc/unitree_go2_blueprints.py | 2 + dimos/utils/metrics.py | 38 ++++++ .../web/websocket_vis/websocket_vis_module.py | 14 ++- 7 files changed, 265 insertions(+), 20 deletions(-) create mode 100644 dimos/dashboard/tf_rerun_module.py diff --git a/dimos/core/stream.py b/dimos/core/stream.py index 70384ffc77..24a4589a90 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -15,6 +15,9 @@ from __future__ import annotations import enum +import queue +import threading +import time from typing import ( TYPE_CHECKING, Any, @@ -145,6 +148,10 @@ def __init__(self, *argv, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*argv, **kwargs) self._rerun_config: dict | None = None # type: ignore[type-arg] self._rerun_last_log: float = 0.0 + # Async logging state + self._rerun_queue: queue.Queue | None = None # type: ignore[type-arg] + self._rerun_thread: threading.Thread | None = None + self._rerun_async: bool = False @property def transport(self) -> Transport[T]: @@ -176,9 +183,16 @@ def publish(self, msg) -> None: # type: ignore[no-untyped-def] logger.warning(f"Trying to publish on Out {self} without a transport") return - # Log to Rerun directly if configured + # Log to Rerun (async or sync depending on config) if self._rerun_config is not None: - self._log_to_rerun(msg) + if self._rerun_async and self._rerun_queue is not None: + # Queue for async logging (non-blocking, drops if full) + try: + self._rerun_queue.put_nowait(msg) + except queue.Full: + pass # Drop frame, data pipeline continues + else: + self._log_to_rerun(msg) self._transport.broadcast(self, msg) @@ -222,16 +236,92 @@ def start(self): } self._rerun_last_log = 0.0 + def autolog_to_rerun_async( + self, + entity_path: str, + rate_limit: float | None = None, + **rerun_kwargs: Any, + ) -> None: + """Configure async Rerun logging with background thread (non-blocking). + + Creates a queue and worker thread. Messages are queued and logged + in the background, preventing Rerun from blocking the data pipeline. + + Best for expensive visualizations (meshes, large point clouds) where + blocking the main thread would cause latency. + + Args: + entity_path: Rerun entity path (e.g., "world/map") + rate_limit: Max Hz to log (None = unlimited) + **rerun_kwargs: Passed to msg.to_rerun() for rendering config + + Example: + def start(self): + super().start() + # Non-blocking logging with background thread + self.global_map.autolog_to_rerun_async( + "world/map", + rate_limit=10.0, + mode="boxes", + colormap="turbo", + ) + """ + self._rerun_config = { + "entity_path": entity_path, + "rate_limit": rate_limit, + "rerun_kwargs": rerun_kwargs, + } + self._rerun_last_log = 0.0 + self._rerun_async = True + self._rerun_queue = queue.Queue(maxsize=2) + + def _worker() -> None: + """Background thread: pull from queue and log to Rerun.""" + last_log_time = 0.0 + while True: + try: + msg = self._rerun_queue.get(timeout=1.0) # type: ignore[union-attr] + if msg is None: # Shutdown signal + break + + if not hasattr(msg, "to_rerun"): + continue + + # Rate limiting + if rate_limit is not None: + now = time.monotonic() + min_interval = 1.0 / rate_limit + if now - last_log_time < min_interval: + continue # Skip - too soon + last_log_time = now + + rerun_data = msg.to_rerun(**rerun_kwargs) + rr.log(entity_path, rerun_data) + except queue.Empty: + continue + except Exception as e: + logger.warning(f"Rerun async logging error: {e}") + + self._rerun_thread = threading.Thread(target=_worker, daemon=True) + self._rerun_thread.start() + logger.debug(f"Started async Rerun logging thread for {entity_path}") + + def stop_rerun_async(self) -> None: + """Stop the async Rerun logging thread.""" + if self._rerun_queue is not None and self._rerun_thread is not None: + self._rerun_queue.put(None) # Shutdown signal + self._rerun_thread.join(timeout=2.0) + self._rerun_thread = None + self._rerun_queue = None + def _log_to_rerun(self, msg: T) -> None: - """Log message to Rerun with rate limiting.""" + """Log message to Rerun with rate limiting (synchronous).""" if not hasattr(msg, "to_rerun"): return if self._rerun_config is None: return - import time - config = self._rerun_config # Rate limiting diff --git a/dimos/dashboard/tf_rerun_module.py b/dimos/dashboard/tf_rerun_module.py new file mode 100644 index 0000000000..5260740d60 --- /dev/null +++ b/dimos/dashboard/tf_rerun_module.py @@ -0,0 +1,112 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""TF Rerun Module - Automatically visualize all transforms in Rerun. + +This module subscribes to the /tf LCM topic and logs ALL transforms +to Rerun, providing automatic visualization of the robot's TF tree. + +Usage: + # In blueprints: + from dimos.dashboard.tf_rerun_module import tf_rerun + + def my_robot(): + return ( + robot_connection() + + tf_rerun() # Add TF visualization + + other_modules() + ) +""" + +from typing import Any + +import rerun as rr + +from dimos.core import Module, rpc +from dimos.core.global_config import GlobalConfig +from dimos.dashboard.rerun_init import connect_rerun +from dimos.msgs.tf2_msgs import TFMessage +from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class TFRerunModule(Module): + """Subscribes to /tf LCM topic and logs all transforms to Rerun. + + This module automatically visualizes the TF tree in Rerun by: + - Subscribing to the /tf LCM topic (captures ALL transforms in the system) + - Logging each transform to its derived entity path (world/{child_frame_id}) + """ + + _global_config: GlobalConfig + _lcm: LCM | None = None + _unsubscribe: Any = None + + def __init__( + self, + global_config: GlobalConfig | None = None, + **kwargs: Any, + ) -> None: + """Initialize TFRerunModule. + + Args: + global_config: Optional global configuration for viewer backend settings + **kwargs: Additional arguments passed to parent Module + """ + super().__init__(**kwargs) + self._global_config = global_config or GlobalConfig() + + @rpc + def start(self) -> None: + """Start the TF visualization module.""" + super().start() + + # Only connect if Rerun backend is selected + if self._global_config.viewer_backend.startswith("rerun"): + connect_rerun(global_config=self._global_config) + + # Subscribe directly to LCM /tf topic (captures ALL transforms) + self._lcm = LCM() + self._lcm.start() + topic = Topic("/tf", TFMessage) + self._unsubscribe = self._lcm.subscribe(topic, self._on_tf_message) + logger.info("TFRerunModule: subscribed to /tf, logging all transforms to Rerun") + + def _on_tf_message(self, msg: TFMessage, topic: Topic) -> None: + """Log all transforms in TFMessage to Rerun. + + Args: + msg: TFMessage containing transforms to visualize + topic: The LCM topic (unused but required by callback signature) + """ + for entity_path, transform in msg.to_rerun(): + rr.log(entity_path, transform) + + @rpc + def stop(self) -> None: + """Stop the TF visualization module and cleanup LCM subscription.""" + if self._unsubscribe: + self._unsubscribe() + self._unsubscribe = None + + if self._lcm: + self._lcm.stop() + self._lcm = None + + super().stop() + + +tf_rerun = TFRerunModule.blueprint diff --git a/dimos/mapping/costmapper.py b/dimos/mapping/costmapper.py index e637126a04..7d1579d080 100644 --- a/dimos/mapping/costmapper.py +++ b/dimos/mapping/costmapper.py @@ -82,21 +82,13 @@ def _rerun_worker(self) -> None: # Generate mesh + log to Rerun (blocks in background, not on data path) try: - # 2D image panel - rr.log( - "world/nav/costmap/image", - grid.to_rerun( - mode="image", - colormap="RdBu_r", - ), - ) # 3D floor overlay (expensive mesh generation) rr.log( "world/nav/costmap/floor", grid.to_rerun( mode="mesh", - colormap=None, # Grayscale: free=white, occupied=black - z_offset=0.02, + colormap="turbo", # Matches Foxglove colormap (blue-purple gradient) + z_offset=0.05, # 5cm above floor to avoid z-fighting ), ) diff --git a/dimos/msgs/nav_msgs/Path.py b/dimos/msgs/nav_msgs/Path.py index 8510e6a194..01ea4dc75d 100644 --- a/dimos/msgs/nav_msgs/Path.py +++ b/dimos/msgs/nav_msgs/Path.py @@ -233,7 +233,12 @@ def to_ros_msg(self) -> ROSPath: return ros_msg - def to_rerun(self, color: tuple[int, int, int] = (0, 255, 128), z_offset: float = 0.2, radii: float = 0.05): # type: ignore[no-untyped-def] + def to_rerun( + self, + color: tuple[int, int, int] = (0, 255, 128), + z_offset: float = 0.2, + radii: float = 0.05, + ): # type: ignore[no-untyped-def] """Convert to rerun LineStrips3D format. Args: diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index 8173f27527..2d962af981 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -30,6 +30,7 @@ from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core.blueprints import autoconnect from dimos.core.transport import JpegLcmTransport, JpegShmTransport, LCMTransport, pSHMTransport +from dimos.dashboard.tf_rerun_module import tf_rerun from dimos.mapping.costmapper import cost_mapper from dimos.mapping.voxels import voxel_mapper from dimos.msgs.sensor_msgs import Image, PointCloud2 @@ -73,6 +74,7 @@ go2_connection(), linux if platform.system() == "Linux" else mac, websocket_vis(), + tf_rerun(), # Auto-visualize all TF transforms in Rerun ).global_config(n_dask_workers=4, robot_model="unitree_go2") nav = autoconnect( diff --git a/dimos/utils/metrics.py b/dimos/utils/metrics.py index 54c3d3ca10..bf7bf45cdc 100644 --- a/dimos/utils/metrics.py +++ b/dimos/utils/metrics.py @@ -18,6 +18,7 @@ from typing import Any, TypeVar, cast from dimos_lcm.std_msgs import Float32 +import rerun as rr from dimos.core import LCMTransport, Transport @@ -50,3 +51,40 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return cast("F", wrapper) return timed_decorator + + +def log_timing_to_rerun(entity_path: str) -> Callable[[F], F]: + """Decorator to log function execution time to Rerun. + + Automatically measures the execution time of the decorated function + and logs it as a scalar value to the specified Rerun entity path. + + Args: + entity_path: Rerun entity path for timing metrics + (e.g., "metrics/costmap/calc_ms") + + Returns: + Decorator function + + Example: + @log_timing_to_rerun("metrics/costmap/calc_ms") + def _calculate_costmap(self, msg): + # ... expensive computation + return result + + # Timing automatically logged to Rerun as a time series! + """ + + def decorator(func: F) -> F: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + start = time.perf_counter() + result = func(*args, **kwargs) + elapsed_ms = (time.perf_counter() - start) * 1000 + + rr.log(entity_path, rr.Scalars(elapsed_ms)) + return result + + return cast("F", wrapper) + + return decorator diff --git a/dimos/web/websocket_vis/websocket_vis_module.py b/dimos/web/websocket_vis/websocket_vis_module.py index e3638a6fe9..7e848f8639 100644 --- a/dimos/web/websocket_vis/websocket_vis_module.py +++ b/dimos/web/websocket_vis/websocket_vis_module.py @@ -22,7 +22,6 @@ """ import asyncio -import os from pathlib import Path as FilePath import threading import time @@ -45,6 +44,7 @@ ) from dimos.core import In, Module, Out, rpc +from dimos.core.global_config import GlobalConfig from dimos.mapping.occupancy.gradient import gradient from dimos.mapping.occupancy.inflation import simple_inflate from dimos.mapping.types import LatLon @@ -90,13 +90,20 @@ class WebsocketVisModule(Module): cmd_vel: Out[Twist] movecmd_stamped: Out[TwistStamped] - def __init__(self, port: int = 7779, **kwargs) -> None: # type: ignore[no-untyped-def] + def __init__( + self, + port: int = 7779, + global_config: GlobalConfig | None = None, + **kwargs: Any, + ) -> None: """Initialize the WebSocket visualization module. Args: port: Port to run the web server on + global_config: Optional global config for viewer backend settings """ super().__init__(**kwargs) + self._global_config = global_config or GlobalConfig() self.port = port self._uvicorn_server_thread: threading.Thread | None = None @@ -204,8 +211,7 @@ def _create_server(self) -> None: async def serve_index(request): # type: ignore[no-untyped-def] """Serve appropriate HTML based on viewer mode.""" # If running native Rerun, redirect to standalone command center - viewer_backend = os.environ.get("VIEWER_BACKEND", "rerun-web").lower() - if viewer_backend == "rerun-native": + if self._global_config.viewer_backend == "rerun-native": return RedirectResponse(url="/command-center") # Otherwise serve full dashboard with Rerun iframe return FileResponse(_DASHBOARD_HTML, media_type="text/html") From 76df3b6712fab4a069b3ae439d079610b8f5b905 Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Thu, 1 Jan 2026 23:01:08 -0800 Subject: [PATCH 03/13] fix: Use Foxglove-style colors for costmap (blue-purple free, black occupied) - Free space: #484981 (blue-purple) - Occupied: gradient to #000000 (black) - Replaces incorrect turbo colormap --- dimos/mapping/costmapper.py | 2 +- dimos/msgs/nav_msgs/OccupancyGrid.py | 26 +++++++++++++++++++------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/dimos/mapping/costmapper.py b/dimos/mapping/costmapper.py index 7d1579d080..ee7512baba 100644 --- a/dimos/mapping/costmapper.py +++ b/dimos/mapping/costmapper.py @@ -87,7 +87,7 @@ def _rerun_worker(self) -> None: "world/nav/costmap/floor", grid.to_rerun( mode="mesh", - colormap="turbo", # Matches Foxglove colormap (blue-purple gradient) + colormap=None, # Uses Foxglove-style colors (blue-purple free, black occupied) z_offset=0.05, # 5cm above floor to avoid z-fighting ), ) diff --git a/dimos/msgs/nav_msgs/OccupancyGrid.py b/dimos/msgs/nav_msgs/OccupancyGrid.py index 62ee9aa072..3876b44fab 100644 --- a/dimos/msgs/nav_msgs/OccupancyGrid.py +++ b/dimos/msgs/nav_msgs/OccupancyGrid.py @@ -648,15 +648,27 @@ def _to_rerun_mesh(self, colormap: str | None = None, z_offset: float = 0.01): # Alpha: 180 for free, 220 for occupied alpha = np.where(cell_values == 0, 180, 220).astype(np.uint8) else: - # Default coloring: dark grey for free, black for occupied + # Foxglove-style coloring: blue-purple for free, black for occupied + # Free (0): #484981 = RGB(72, 73, 129) + # Occupied (100): #000000 = RGB(0, 0, 0) rgb = np.zeros((n_cells, 3), dtype=np.uint8) is_free = cell_values == 0 - # Free space: dark grey - rgb[is_free] = [40, 40, 40] - # Occupied: black to dark grey gradient (darker = more occupied) - intensity = (40 * (1 - cell_values / 100)).astype(np.uint8) - rgb[~is_free] = np.column_stack([intensity[~is_free]] * 3) - alpha = np.where(is_free, 150, 200).astype(np.uint8) + is_occupied = ~is_free + + # Free space: blue-purple #484981 + rgb[is_free] = [72, 73, 129] + + # Occupied: gradient from blue-purple to black based on cost + # cost 1 -> mostly blue-purple, cost 100 -> black + if np.any(is_occupied): + costs = cell_values[is_occupied].astype(np.float32) + # Linear interpolation: (1 - cost/100) * blue-purple + factor = (1 - costs / 100).clip(0, 1) + rgb[is_occupied, 0] = (72 * factor).astype(np.uint8) + rgb[is_occupied, 1] = (73 * factor).astype(np.uint8) + rgb[is_occupied, 2] = (129 * factor).astype(np.uint8) + + alpha = np.where(is_free, 180, 220).astype(np.uint8) # Combine RGB and alpha into RGBA colors_per_cell = np.column_stack([rgb, alpha]) # (n_cells, 4) From 00c5cd5440b9101226151a51c9efa575818df08b Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Thu, 1 Jan 2026 23:58:23 -0800 Subject: [PATCH 04/13] fix: Remove autolog_to_rerun from A* planner, use manual logging - Removes dependency on stream.py autolog_to_rerun() method - Uses manual Rerun logging like CostMapper/VoxelGridMapper - Keeps stream.py clean without Rerun-specific code --- dimos/core/stream.py | 174 +----------------- dimos/navigation/replanning_a_star/module.py | 15 +- .../web/websocket_vis/websocket_vis_module.py | 2 +- 3 files changed, 19 insertions(+), 172 deletions(-) diff --git a/dimos/core/stream.py b/dimos/core/stream.py index 24a4589a90..f49f558a48 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -1,4 +1,4 @@ -# Copyright 2025-2026 Dimensional Inc. +# Copyright 2025 Dimensional Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,9 +15,6 @@ from __future__ import annotations import enum -import queue -import threading -import time from typing import ( TYPE_CHECKING, Any, @@ -29,7 +26,6 @@ import reactivex as rx from reactivex import operators as ops from reactivex.disposable import Disposable -import rerun as rr import dimos.core.colors as colors from dimos.core.resource import Resource @@ -141,17 +137,11 @@ def __str__(self) -> str: ) -class Out(Stream[T], ObservableMixin[T]): +class Out(Stream[T]): _transport: Transport # type: ignore[type-arg] def __init__(self, *argv, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*argv, **kwargs) - self._rerun_config: dict | None = None # type: ignore[type-arg] - self._rerun_last_log: float = 0.0 - # Async logging state - self._rerun_queue: queue.Queue | None = None # type: ignore[type-arg] - self._rerun_thread: threading.Thread | None = None - self._rerun_async: bool = False @property def transport(self) -> Transport[T]: @@ -159,7 +149,8 @@ def transport(self) -> Transport[T]: @transport.setter def transport(self, value: Transport[T]) -> None: - self._transport = value + # just for type checking + ... @property def state(self) -> State: @@ -182,159 +173,8 @@ def publish(self, msg) -> None: # type: ignore[no-untyped-def] if not hasattr(self, "_transport") or self._transport is None: logger.warning(f"Trying to publish on Out {self} without a transport") return - - # Log to Rerun (async or sync depending on config) - if self._rerun_config is not None: - if self._rerun_async and self._rerun_queue is not None: - # Queue for async logging (non-blocking, drops if full) - try: - self._rerun_queue.put_nowait(msg) - except queue.Full: - pass # Drop frame, data pipeline continues - else: - self._log_to_rerun(msg) - self._transport.broadcast(self, msg) - def subscribe(self, cb) -> Callable[[], None]: # type: ignore[no-untyped-def] - """Subscribe to this output stream. - - Args: - cb: Callback function to receive messages - - Returns: - Unsubscribe function - """ - return self.transport.subscribe(cb, self) # type: ignore[arg-type, func-returns-value, no-any-return] - - def autolog_to_rerun( - self, - entity_path: str, - rate_limit: float | None = None, - **rerun_kwargs: Any, - ) -> None: - """Configure this output to auto-log to Rerun (fire-and-forget). - - Call once in start() - messages auto-logged when published. - - Args: - entity_path: Rerun entity path (e.g., "world/map") - rate_limit: Max Hz to log (None = unlimited) - **rerun_kwargs: Passed to msg.to_rerun() for rendering config - (e.g., radii=0.02, colormap="turbo", colors=[255,0,0]) - - Example: - def start(self): - super().start() - # Just declare it - fire and forget! - self.global_map.autolog_to_rerun("world/map", rate_limit=5.0, radii=0.02) - """ - self._rerun_config = { - "entity_path": entity_path, - "rate_limit": rate_limit, - "rerun_kwargs": rerun_kwargs, - } - self._rerun_last_log = 0.0 - - def autolog_to_rerun_async( - self, - entity_path: str, - rate_limit: float | None = None, - **rerun_kwargs: Any, - ) -> None: - """Configure async Rerun logging with background thread (non-blocking). - - Creates a queue and worker thread. Messages are queued and logged - in the background, preventing Rerun from blocking the data pipeline. - - Best for expensive visualizations (meshes, large point clouds) where - blocking the main thread would cause latency. - - Args: - entity_path: Rerun entity path (e.g., "world/map") - rate_limit: Max Hz to log (None = unlimited) - **rerun_kwargs: Passed to msg.to_rerun() for rendering config - - Example: - def start(self): - super().start() - # Non-blocking logging with background thread - self.global_map.autolog_to_rerun_async( - "world/map", - rate_limit=10.0, - mode="boxes", - colormap="turbo", - ) - """ - self._rerun_config = { - "entity_path": entity_path, - "rate_limit": rate_limit, - "rerun_kwargs": rerun_kwargs, - } - self._rerun_last_log = 0.0 - self._rerun_async = True - self._rerun_queue = queue.Queue(maxsize=2) - - def _worker() -> None: - """Background thread: pull from queue and log to Rerun.""" - last_log_time = 0.0 - while True: - try: - msg = self._rerun_queue.get(timeout=1.0) # type: ignore[union-attr] - if msg is None: # Shutdown signal - break - - if not hasattr(msg, "to_rerun"): - continue - - # Rate limiting - if rate_limit is not None: - now = time.monotonic() - min_interval = 1.0 / rate_limit - if now - last_log_time < min_interval: - continue # Skip - too soon - last_log_time = now - - rerun_data = msg.to_rerun(**rerun_kwargs) - rr.log(entity_path, rerun_data) - except queue.Empty: - continue - except Exception as e: - logger.warning(f"Rerun async logging error: {e}") - - self._rerun_thread = threading.Thread(target=_worker, daemon=True) - self._rerun_thread.start() - logger.debug(f"Started async Rerun logging thread for {entity_path}") - - def stop_rerun_async(self) -> None: - """Stop the async Rerun logging thread.""" - if self._rerun_queue is not None and self._rerun_thread is not None: - self._rerun_queue.put(None) # Shutdown signal - self._rerun_thread.join(timeout=2.0) - self._rerun_thread = None - self._rerun_queue = None - - def _log_to_rerun(self, msg: T) -> None: - """Log message to Rerun with rate limiting (synchronous).""" - if not hasattr(msg, "to_rerun"): - return - - if self._rerun_config is None: - return - - config = self._rerun_config - - # Rate limiting - if config["rate_limit"] is not None: - now = time.monotonic() - min_interval = 1.0 / config["rate_limit"] - if now - self._rerun_last_log < min_interval: - return # Skip - too soon - self._rerun_last_log = now - - rerun_data = msg.to_rerun(**config["rerun_kwargs"]) - rr.log(config["entity_path"], rerun_data) - class RemoteStream(Stream[T]): @property @@ -382,7 +222,7 @@ def __reduce__(self): # type: ignore[no-untyped-def] @property def transport(self) -> Transport[T]: if not self._transport and self.connection: - self._transport = self.connection.transport + self._transport = self.connection.transport # type: ignore[union-attr] return self._transport @transport.setter @@ -417,7 +257,7 @@ def transport(self) -> Transport[T]: def publish(self, msg) -> None: # type: ignore[no-untyped-def] self.transport.broadcast(self, msg) # type: ignore[arg-type] - @transport.setter # type: ignore[attr-defined, no-redef, untyped-decorator] + @transport.setter # type: ignore[attr-defined, misc, no-redef, untyped-decorator] def transport(self, value: Transport[T]) -> None: self.owner.set_transport(self.name, value).result() # type: ignore[union-attr] - self._transport = value + self._transport = value \ No newline at end of file diff --git a/dimos/navigation/replanning_a_star/module.py b/dimos/navigation/replanning_a_star/module.py index d8b582be83..cf1180c15b 100644 --- a/dimos/navigation/replanning_a_star/module.py +++ b/dimos/navigation/replanning_a_star/module.py @@ -50,10 +50,17 @@ def __init__(self, global_config: GlobalConfig | None = None) -> None: @rpc def start(self) -> None: super().start() - connect_rerun(global_config=self._global_config) - - # Auto-log path to Rerun - self.path.autolog_to_rerun("world/nav/path") + + if self._global_config.viewer_backend.startswith("rerun"): + connect_rerun(global_config=self._global_config) + + # Manual Rerun logging for path + def _log_path_to_rerun(path: Path) -> None: + import rerun as rr + rr.log("world/nav/path", path.to_rerun()) + + unsub = self.path.subscribe(_log_path_to_rerun) + self._disposables.add(Disposable(unsub)) unsub = self.odom.subscribe(self._planner.handle_odom) self._disposables.add(Disposable(unsub)) diff --git a/dimos/web/websocket_vis/websocket_vis_module.py b/dimos/web/websocket_vis/websocket_vis_module.py index 7e848f8639..8afd9655c4 100644 --- a/dimos/web/websocket_vis/websocket_vis_module.py +++ b/dimos/web/websocket_vis/websocket_vis_module.py @@ -27,7 +27,7 @@ import time from typing import Any -from dimos_lcm.std_msgs import Bool +from dimos_lcm.std_msgs import Bool # type: ignore[import-untyped] from reactivex.disposable import Disposable import socketio # type: ignore[import-untyped] from starlette.applications import Starlette From 71bf75a64b955e7f0578030d5121d8badca9d74f Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Fri, 2 Jan 2026 00:05:11 -0800 Subject: [PATCH 05/13] fix: Subscribe to internal planner path, not module Out stream Out streams don't have subscribe() method - fixed to use self._planner.path --- dimos/navigation/replanning_a_star/module.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dimos/navigation/replanning_a_star/module.py b/dimos/navigation/replanning_a_star/module.py index cf1180c15b..8553a702f8 100644 --- a/dimos/navigation/replanning_a_star/module.py +++ b/dimos/navigation/replanning_a_star/module.py @@ -25,6 +25,7 @@ from dimos.msgs.sensor_msgs import Image from dimos.navigation.base import NavigationInterface, NavigationState from dimos.navigation.replanning_a_star.global_planner import GlobalPlanner +import rerun as rr class ReplanningAStarPlanner(Module, NavigationInterface): @@ -56,10 +57,9 @@ def start(self) -> None: # Manual Rerun logging for path def _log_path_to_rerun(path: Path) -> None: - import rerun as rr rr.log("world/nav/path", path.to_rerun()) - unsub = self.path.subscribe(_log_path_to_rerun) + unsub = self._planner.path.subscribe(_log_path_to_rerun) self._disposables.add(Disposable(unsub)) unsub = self.odom.subscribe(self._planner.handle_odom) From 4e22048e9eb572158877f3d2c96e7b5dbd7b2f70 Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Fri, 2 Jan 2026 00:10:08 -0800 Subject: [PATCH 06/13] style: Apply pre-commit formatting and linting fixes --- dimos/core/stream.py | 4 ++-- dimos/navigation/replanning_a_star/module.py | 8 ++++---- dimos/web/websocket_vis/websocket_vis_module.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dimos/core/stream.py b/dimos/core/stream.py index f49f558a48..782a7b5b12 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -1,4 +1,4 @@ -# Copyright 2025 Dimensional Inc. +# Copyright 2025-2026 Dimensional Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -260,4 +260,4 @@ def publish(self, msg) -> None: # type: ignore[no-untyped-def] @transport.setter # type: ignore[attr-defined, misc, no-redef, untyped-decorator] def transport(self, value: Transport[T]) -> None: self.owner.set_transport(self.name, value).result() # type: ignore[union-attr] - self._transport = value \ No newline at end of file + self._transport = value diff --git a/dimos/navigation/replanning_a_star/module.py b/dimos/navigation/replanning_a_star/module.py index 8553a702f8..7ceb168545 100644 --- a/dimos/navigation/replanning_a_star/module.py +++ b/dimos/navigation/replanning_a_star/module.py @@ -16,6 +16,7 @@ from dimos_lcm.std_msgs import Bool, String from reactivex.disposable import Disposable +import rerun as rr from dimos.core import In, Module, Out, rpc from dimos.core.global_config import GlobalConfig @@ -25,7 +26,6 @@ from dimos.msgs.sensor_msgs import Image from dimos.navigation.base import NavigationInterface, NavigationState from dimos.navigation.replanning_a_star.global_planner import GlobalPlanner -import rerun as rr class ReplanningAStarPlanner(Module, NavigationInterface): @@ -51,14 +51,14 @@ def __init__(self, global_config: GlobalConfig | None = None) -> None: @rpc def start(self) -> None: super().start() - + if self._global_config.viewer_backend.startswith("rerun"): connect_rerun(global_config=self._global_config) - + # Manual Rerun logging for path def _log_path_to_rerun(path: Path) -> None: rr.log("world/nav/path", path.to_rerun()) - + unsub = self._planner.path.subscribe(_log_path_to_rerun) self._disposables.add(Disposable(unsub)) diff --git a/dimos/web/websocket_vis/websocket_vis_module.py b/dimos/web/websocket_vis/websocket_vis_module.py index 8afd9655c4..31aa0d3956 100644 --- a/dimos/web/websocket_vis/websocket_vis_module.py +++ b/dimos/web/websocket_vis/websocket_vis_module.py @@ -27,7 +27,7 @@ import time from typing import Any -from dimos_lcm.std_msgs import Bool # type: ignore[import-untyped] +from dimos_lcm.std_msgs import Bool # type: ignore[import-untyped] from reactivex.disposable import Disposable import socketio # type: ignore[import-untyped] from starlette.applications import Starlette From 1df04afe26e206c5a4fcf7d41fef56ba6ef4ec09 Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Fri, 2 Jan 2026 01:01:28 -0800 Subject: [PATCH 07/13] fix: correct stream.py --- dimos/core/stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dimos/core/stream.py b/dimos/core/stream.py index 782a7b5b12..652223d9ab 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -222,7 +222,7 @@ def __reduce__(self): # type: ignore[no-untyped-def] @property def transport(self) -> Transport[T]: if not self._transport and self.connection: - self._transport = self.connection.transport # type: ignore[union-attr] + self._transport = self.connection.transport return self._transport @transport.setter @@ -257,7 +257,7 @@ def transport(self) -> Transport[T]: def publish(self, msg) -> None: # type: ignore[no-untyped-def] self.transport.broadcast(self, msg) # type: ignore[arg-type] - @transport.setter # type: ignore[attr-defined, misc, no-redef, untyped-decorator] + @transport.setter # type: ignore[attr-defined, no-redef, untyped-decorator] def transport(self, value: Transport[T]) -> None: self.owner.set_transport(self.name, value).result() # type: ignore[union-attr] self._transport = value From 2897a1cdb33f2c0559f9759eeb9a4e9511624eda Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Fri, 2 Jan 2026 01:32:51 -0800 Subject: [PATCH 08/13] refactor: Remove all Rerun code from stream.py - Remove import rerun - Remove autolog_to_rerun() and _log_to_rerun() methods - Remove _rerun_config and _rerun_last_log from __init__ - Keep Out.subscribe() and ObservableMixin (core functionality) --- dimos/core/stream.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/dimos/core/stream.py b/dimos/core/stream.py index 652223d9ab..49e8ab7c41 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -137,7 +137,7 @@ def __str__(self) -> str: ) -class Out(Stream[T]): +class Out(Stream[T], ObservableMixin[T]): _transport: Transport # type: ignore[type-arg] def __init__(self, *argv, **kwargs) -> None: # type: ignore[no-untyped-def] @@ -149,8 +149,7 @@ def transport(self) -> Transport[T]: @transport.setter def transport(self, value: Transport[T]) -> None: - # just for type checking - ... + self._transport = value @property def state(self) -> State: @@ -175,6 +174,17 @@ def publish(self, msg) -> None: # type: ignore[no-untyped-def] return self._transport.broadcast(self, msg) + def subscribe(self, cb) -> Callable[[], None]: # type: ignore[no-untyped-def] + """Subscribe to this output stream. + + Args: + cb: Callback function to receive messages + + Returns: + Unsubscribe function + """ + return self.transport.subscribe(cb, self) # type: ignore[arg-type, func-returns-value, no-any-return] + class RemoteStream(Stream[T]): @property @@ -260,4 +270,4 @@ def publish(self, msg) -> None: # type: ignore[no-untyped-def] @transport.setter # type: ignore[attr-defined, no-redef, untyped-decorator] def transport(self, value: Transport[T]) -> None: self.owner.set_transport(self.name, value).result() # type: ignore[union-attr] - self._transport = value + self._transport = value \ No newline at end of file From 2b9a9993e3feb5893664b3eaebddbda93535c715 Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Fri, 2 Jan 2026 01:35:06 -0800 Subject: [PATCH 09/13] style: Apply linter formatting --- dimos/core/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/core/stream.py b/dimos/core/stream.py index 49e8ab7c41..29db16c655 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -270,4 +270,4 @@ def publish(self, msg) -> None: # type: ignore[no-untyped-def] @transport.setter # type: ignore[attr-defined, no-redef, untyped-decorator] def transport(self, value: Transport[T]) -> None: self.owner.set_transport(self.name, value).result() # type: ignore[union-attr] - self._transport = value \ No newline at end of file + self._transport = value From 717768b73ead60d299c77ef028353b8d5593364b Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Fri, 2 Jan 2026 02:39:23 -0800 Subject: [PATCH 10/13] fix: Address mypy errors from Rerun changes - Add type ignore for to_rerun() calls - Remove Disposable() double-wrapping, add subscriptions directly - Move type ignore comment for Path.to_rerun() --- dimos/dashboard/tf_rerun_module.py | 2 +- dimos/msgs/nav_msgs/Path.py | 4 ++-- dimos/navigation/replanning_a_star/module.py | 20 ++++++-------------- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/dimos/dashboard/tf_rerun_module.py b/dimos/dashboard/tf_rerun_module.py index 5260740d60..c862778cad 100644 --- a/dimos/dashboard/tf_rerun_module.py +++ b/dimos/dashboard/tf_rerun_module.py @@ -92,7 +92,7 @@ def _on_tf_message(self, msg: TFMessage, topic: Topic) -> None: msg: TFMessage containing transforms to visualize topic: The LCM topic (unused but required by callback signature) """ - for entity_path, transform in msg.to_rerun(): + for entity_path, transform in msg.to_rerun(): # type: ignore[no-untyped-call] rr.log(entity_path, transform) @rpc diff --git a/dimos/msgs/nav_msgs/Path.py b/dimos/msgs/nav_msgs/Path.py index 01ea4dc75d..e92eab17a4 100644 --- a/dimos/msgs/nav_msgs/Path.py +++ b/dimos/msgs/nav_msgs/Path.py @@ -233,12 +233,12 @@ def to_ros_msg(self) -> ROSPath: return ros_msg - def to_rerun( + def to_rerun( # type: ignore[no-untyped-def] self, color: tuple[int, int, int] = (0, 255, 128), z_offset: float = 0.2, radii: float = 0.05, - ): # type: ignore[no-untyped-def] + ): """Convert to rerun LineStrips3D format. Args: diff --git a/dimos/navigation/replanning_a_star/module.py b/dimos/navigation/replanning_a_star/module.py index 7ceb168545..2c62110f4d 100644 --- a/dimos/navigation/replanning_a_star/module.py +++ b/dimos/navigation/replanning_a_star/module.py @@ -57,22 +57,14 @@ def start(self) -> None: # Manual Rerun logging for path def _log_path_to_rerun(path: Path) -> None: - rr.log("world/nav/path", path.to_rerun()) + rr.log("world/nav/path", path.to_rerun()) # type: ignore[no-untyped-call] - unsub = self._planner.path.subscribe(_log_path_to_rerun) - self._disposables.add(Disposable(unsub)) + self._disposables.add(self._planner.path.subscribe(_log_path_to_rerun)) - unsub = self.odom.subscribe(self._planner.handle_odom) - self._disposables.add(Disposable(unsub)) - - unsub = self.global_costmap.subscribe(self._planner.handle_global_costmap) - self._disposables.add(Disposable(unsub)) - - unsub = self.goal_request.subscribe(self._planner.handle_goal_request) - self._disposables.add(Disposable(unsub)) - - unsub = self.target.subscribe(self._planner.handle_goal_request) - self._disposables.add(Disposable(unsub)) + self._disposables.add(self.odom.subscribe(self._planner.handle_odom)) + self._disposables.add(self.global_costmap.subscribe(self._planner.handle_global_costmap)) + self._disposables.add(self.goal_request.subscribe(self._planner.handle_goal_request)) + self._disposables.add(self.target.subscribe(self._planner.handle_goal_request)) self._disposables.add(self._planner.path.subscribe(self.path.publish)) From d8f6187a43a818ec949d4d6f2b0665bf597cca78 Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Fri, 2 Jan 2026 03:01:32 -0800 Subject: [PATCH 11/13] fix: Correct camera image format from BGR to RGB Frame is decoded as rgb24 but Image.from_numpy() was defaulting to BGR, causing red/blue color swap in Rerun camera feed --- dimos/robot/unitree/connection/connection.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dimos/robot/unitree/connection/connection.py b/dimos/robot/unitree/connection/connection.py index 411710ce1c..bef0c0b127 100644 --- a/dimos/robot/unitree/connection/connection.py +++ b/dimos/robot/unitree/connection/connection.py @@ -38,6 +38,7 @@ from dimos.core.resource import Resource from dimos.msgs.geometry_msgs import Pose, Transform, Twist from dimos.msgs.sensor_msgs import Image +from dimos.msgs.sensor_msgs.image_impls.AbstractImage import ImageFormat from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.lowstate import LowStateMsg from dimos.robot.unitree_webrtc.type.odometry import Odometry @@ -267,6 +268,7 @@ def video_stream(self) -> Observable[Image]: lambda frame: Image.from_numpy( # np.ascontiguousarray(frame.to_ndarray("rgb24")), frame.to_ndarray(format="rgb24"), # type: ignore[attr-defined] + format=ImageFormat.RGB, # Frame is RGB24, not BGR frame_id="camera_optical", ) ), From 8d82163af65d98e60438f4eacbf32c3da356e96f Mon Sep 17 00:00:00 2001 From: Nabla7 Date: Fri, 2 Jan 2026 04:45:07 -0800 Subject: [PATCH 12/13] fix: Camera RGB color swap and additional cleanup - Fix camera image format: specify RGB when decoding rgb24 frames - Wrap module input subscriptions with Disposable() - Add type ignores for untyped to_rerun() calls --- dimos/navigation/replanning_a_star/module.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dimos/navigation/replanning_a_star/module.py b/dimos/navigation/replanning_a_star/module.py index 2c62110f4d..b644676772 100644 --- a/dimos/navigation/replanning_a_star/module.py +++ b/dimos/navigation/replanning_a_star/module.py @@ -61,10 +61,10 @@ def _log_path_to_rerun(path: Path) -> None: self._disposables.add(self._planner.path.subscribe(_log_path_to_rerun)) - self._disposables.add(self.odom.subscribe(self._planner.handle_odom)) - self._disposables.add(self.global_costmap.subscribe(self._planner.handle_global_costmap)) - self._disposables.add(self.goal_request.subscribe(self._planner.handle_goal_request)) - self._disposables.add(self.target.subscribe(self._planner.handle_goal_request)) + self._disposables.add(Disposable(self.odom.subscribe(self._planner.handle_odom))) + self._disposables.add(Disposable(self.global_costmap.subscribe(self._planner.handle_global_costmap))) + self._disposables.add(Disposable(self.goal_request.subscribe(self._planner.handle_goal_request))) + self._disposables.add(Disposable(self.target.subscribe(self._planner.handle_goal_request))) self._disposables.add(self._planner.path.subscribe(self.path.publish)) From 18c75bd126e2a9e85e4c2dd8bebe1f3917207616 Mon Sep 17 00:00:00 2001 From: Nabla7 <49974392+Nabla7@users.noreply.github.com> Date: Fri, 2 Jan 2026 12:46:11 +0000 Subject: [PATCH 13/13] CI code cleanup --- dimos/navigation/replanning_a_star/module.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dimos/navigation/replanning_a_star/module.py b/dimos/navigation/replanning_a_star/module.py index b644676772..6ba1ae0ba1 100644 --- a/dimos/navigation/replanning_a_star/module.py +++ b/dimos/navigation/replanning_a_star/module.py @@ -62,8 +62,12 @@ def _log_path_to_rerun(path: Path) -> None: self._disposables.add(self._planner.path.subscribe(_log_path_to_rerun)) self._disposables.add(Disposable(self.odom.subscribe(self._planner.handle_odom))) - self._disposables.add(Disposable(self.global_costmap.subscribe(self._planner.handle_global_costmap))) - self._disposables.add(Disposable(self.goal_request.subscribe(self._planner.handle_goal_request))) + self._disposables.add( + Disposable(self.global_costmap.subscribe(self._planner.handle_global_costmap)) + ) + self._disposables.add( + Disposable(self.goal_request.subscribe(self._planner.handle_goal_request)) + ) self._disposables.add(Disposable(self.target.subscribe(self._planner.handle_goal_request))) self._disposables.add(self._planner.path.subscribe(self.path.publish))