Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 0 additions & 60 deletions dimos/core/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import reactivex as rx
from reactivex import operators as ops
from reactivex.disposable import Disposable
import rerun as rr

import dimos.core.colors as colors
from dimos.core.resource import Resource
Expand Down Expand Up @@ -143,8 +142,6 @@ class Out(Stream[T], ObservableMixin[T]):

def __init__(self, *argv, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(*argv, **kwargs)
self._rerun_config: dict | None = None # type: ignore[type-arg]
self._rerun_last_log: float = 0.0

@property
def transport(self) -> Transport[T]:
Expand Down Expand Up @@ -175,11 +172,6 @@ def publish(self, msg) -> None: # type: ignore[no-untyped-def]
if not hasattr(self, "_transport") or self._transport is None:
logger.warning(f"Trying to publish on Out {self} without a transport")
return

# Log to Rerun directly if configured
if self._rerun_config is not None:
self._log_to_rerun(msg)

self._transport.broadcast(self, msg)

def subscribe(self, cb) -> Callable[[], None]: # type: ignore[no-untyped-def]
Expand All @@ -193,58 +185,6 @@ def subscribe(self, cb) -> Callable[[], None]: # type: ignore[no-untyped-def]
"""
return self.transport.subscribe(cb, self) # type: ignore[arg-type, func-returns-value, no-any-return]

def autolog_to_rerun(
self,
entity_path: str,
rate_limit: float | None = None,
**rerun_kwargs: Any,
) -> None:
"""Configure this output to auto-log to Rerun (fire-and-forget).

Call once in start() - messages auto-logged when published.

Args:
entity_path: Rerun entity path (e.g., "world/map")
rate_limit: Max Hz to log (None = unlimited)
**rerun_kwargs: Passed to msg.to_rerun() for rendering config
(e.g., radii=0.02, colormap="turbo", colors=[255,0,0])

Example:
def start(self):
super().start()
# Just declare it - fire and forget!
self.global_map.autolog_to_rerun("world/map", rate_limit=5.0, radii=0.02)
"""
self._rerun_config = {
"entity_path": entity_path,
"rate_limit": rate_limit,
"rerun_kwargs": rerun_kwargs,
}
self._rerun_last_log = 0.0

def _log_to_rerun(self, msg: T) -> None:
"""Log message to Rerun with rate limiting."""
if not hasattr(msg, "to_rerun"):
return

if self._rerun_config is None:
return

import time

config = self._rerun_config

# Rate limiting
if config["rate_limit"] is not None:
now = time.monotonic()
min_interval = 1.0 / config["rate_limit"]
if now - self._rerun_last_log < min_interval:
return # Skip - too soon
self._rerun_last_log = now

rerun_data = msg.to_rerun(**config["rerun_kwargs"])
rr.log(config["entity_path"], rerun_data)


class RemoteStream(Stream[T]):
@property
Expand Down
112 changes: 112 additions & 0 deletions dimos/dashboard/tf_rerun_module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""TF Rerun Module - Automatically visualize all transforms in Rerun.

This module subscribes to the /tf LCM topic and logs ALL transforms
to Rerun, providing automatic visualization of the robot's TF tree.

Usage:
# In blueprints:
from dimos.dashboard.tf_rerun_module import tf_rerun

def my_robot():
return (
robot_connection()
+ tf_rerun() # Add TF visualization
+ other_modules()
)
"""

from typing import Any

import rerun as rr

from dimos.core import Module, rpc
from dimos.core.global_config import GlobalConfig
from dimos.dashboard.rerun_init import connect_rerun
from dimos.msgs.tf2_msgs import TFMessage
from dimos.protocol.pubsub.lcmpubsub import LCM, Topic
from dimos.utils.logging_config import setup_logger

logger = setup_logger()


class TFRerunModule(Module):
"""Subscribes to /tf LCM topic and logs all transforms to Rerun.

This module automatically visualizes the TF tree in Rerun by:
- Subscribing to the /tf LCM topic (captures ALL transforms in the system)
- Logging each transform to its derived entity path (world/{child_frame_id})
"""

_global_config: GlobalConfig
_lcm: LCM | None = None
_unsubscribe: Any = None

def __init__(
self,
global_config: GlobalConfig | None = None,
**kwargs: Any,
) -> None:
"""Initialize TFRerunModule.

Args:
global_config: Optional global configuration for viewer backend settings
**kwargs: Additional arguments passed to parent Module
"""
super().__init__(**kwargs)
self._global_config = global_config or GlobalConfig()

@rpc
def start(self) -> None:
"""Start the TF visualization module."""
super().start()

# Only connect if Rerun backend is selected
if self._global_config.viewer_backend.startswith("rerun"):
connect_rerun(global_config=self._global_config)

# Subscribe directly to LCM /tf topic (captures ALL transforms)
self._lcm = LCM()
self._lcm.start()
topic = Topic("/tf", TFMessage)
self._unsubscribe = self._lcm.subscribe(topic, self._on_tf_message)
logger.info("TFRerunModule: subscribed to /tf, logging all transforms to Rerun")

def _on_tf_message(self, msg: TFMessage, topic: Topic) -> None:
"""Log all transforms in TFMessage to Rerun.

Args:
msg: TFMessage containing transforms to visualize
topic: The LCM topic (unused but required by callback signature)
"""
for entity_path, transform in msg.to_rerun(): # type: ignore[no-untyped-call]
rr.log(entity_path, transform)

@rpc
def stop(self) -> None:
"""Stop the TF visualization module and cleanup LCM subscription."""
if self._unsubscribe:
self._unsubscribe()
self._unsubscribe = None

if self._lcm:
self._lcm.stop()
self._lcm = None

super().stop()


tf_rerun = TFRerunModule.blueprint
12 changes: 2 additions & 10 deletions dimos/mapping/costmapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,13 @@ def _rerun_worker(self) -> None:

# Generate mesh + log to Rerun (blocks in background, not on data path)
try:
# 2D image panel
rr.log(
"world/nav/costmap/image",
grid.to_rerun(
mode="image",
colormap="RdBu_r",
),
)
# 3D floor overlay (expensive mesh generation)
rr.log(
"world/nav/costmap/floor",
grid.to_rerun(
mode="mesh",
colormap=None, # Grayscale: free=white, occupied=black
z_offset=0.02,
colormap=None, # Uses Foxglove-style colors (blue-purple free, black occupied)
z_offset=0.05, # 5cm above floor to avoid z-fighting
),
)

Expand Down
26 changes: 19 additions & 7 deletions dimos/msgs/nav_msgs/OccupancyGrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,15 +648,27 @@ def _to_rerun_mesh(self, colormap: str | None = None, z_offset: float = 0.01):
# Alpha: 180 for free, 220 for occupied
alpha = np.where(cell_values == 0, 180, 220).astype(np.uint8)
else:
# Default coloring: dark grey for free, black for occupied
# Foxglove-style coloring: blue-purple for free, black for occupied
# Free (0): #484981 = RGB(72, 73, 129)
# Occupied (100): #000000 = RGB(0, 0, 0)
rgb = np.zeros((n_cells, 3), dtype=np.uint8)
is_free = cell_values == 0
# Free space: dark grey
rgb[is_free] = [40, 40, 40]
# Occupied: black to dark grey gradient (darker = more occupied)
intensity = (40 * (1 - cell_values / 100)).astype(np.uint8)
rgb[~is_free] = np.column_stack([intensity[~is_free]] * 3)
alpha = np.where(is_free, 150, 200).astype(np.uint8)
is_occupied = ~is_free

# Free space: blue-purple #484981
rgb[is_free] = [72, 73, 129]

# Occupied: gradient from blue-purple to black based on cost
# cost 1 -> mostly blue-purple, cost 100 -> black
if np.any(is_occupied):
costs = cell_values[is_occupied].astype(np.float32)
# Linear interpolation: (1 - cost/100) * blue-purple
factor = (1 - costs / 100).clip(0, 1)
rgb[is_occupied, 0] = (72 * factor).astype(np.uint8)
rgb[is_occupied, 1] = (73 * factor).astype(np.uint8)
rgb[is_occupied, 2] = (129 * factor).astype(np.uint8)

alpha = np.where(is_free, 180, 220).astype(np.uint8)

# Combine RGB and alpha into RGBA
colors_per_cell = np.column_stack([rgb, alpha]) # (n_cells, 4)
Expand Down
14 changes: 11 additions & 3 deletions dimos/msgs/nav_msgs/Path.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,25 @@ def to_ros_msg(self) -> ROSPath:

return ros_msg

def to_rerun(self, color: tuple[int, int, int] = (0, 255, 128)): # type: ignore[no-untyped-def]
def to_rerun( # type: ignore[no-untyped-def]
self,
color: tuple[int, int, int] = (0, 255, 128),
z_offset: float = 0.2,
radii: float = 0.05,
):
"""Convert to rerun LineStrips3D format.

Args:
color: RGB color tuple for the path line
z_offset: Height above floor to render path (default 0.2m to avoid costmap occlusion)
radii: Thickness of the path line (default 0.05m = 5cm)

Returns:
rr.LineStrips3D archetype for logging to rerun
"""
if not self.poses:
return rr.LineStrips3D([])

points = [[p.x, p.y, p.z] for p in self.poses]
return rr.LineStrips3D([points], colors=[color])
# Lift path above floor so it's visible over costmap
points = [[p.x, p.y, p.z + z_offset] for p in self.poses]
return rr.LineStrips3D([points], colors=[color], radii=radii)
27 changes: 15 additions & 12 deletions dimos/navigation/replanning_a_star/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from dimos_lcm.std_msgs import Bool, String
from reactivex.disposable import Disposable
import rerun as rr

from dimos.core import In, Module, Out, rpc
from dimos.core.global_config import GlobalConfig
Expand Down Expand Up @@ -50,22 +51,24 @@ def __init__(self, global_config: GlobalConfig | None = None) -> None:
@rpc
def start(self) -> None:
super().start()
connect_rerun(global_config=self._global_config)

# Auto-log path to Rerun
self.path.autolog_to_rerun("world/nav/path")
if self._global_config.viewer_backend.startswith("rerun"):
connect_rerun(global_config=self._global_config)

unsub = self.odom.subscribe(self._planner.handle_odom)
self._disposables.add(Disposable(unsub))
# Manual Rerun logging for path
def _log_path_to_rerun(path: Path) -> None:
rr.log("world/nav/path", path.to_rerun()) # type: ignore[no-untyped-call]

unsub = self.global_costmap.subscribe(self._planner.handle_global_costmap)
self._disposables.add(Disposable(unsub))
self._disposables.add(self._planner.path.subscribe(_log_path_to_rerun))

unsub = self.goal_request.subscribe(self._planner.handle_goal_request)
self._disposables.add(Disposable(unsub))

unsub = self.target.subscribe(self._planner.handle_goal_request)
self._disposables.add(Disposable(unsub))
self._disposables.add(Disposable(self.odom.subscribe(self._planner.handle_odom)))
self._disposables.add(
Disposable(self.global_costmap.subscribe(self._planner.handle_global_costmap))
)
self._disposables.add(
Disposable(self.goal_request.subscribe(self._planner.handle_goal_request))
)
self._disposables.add(Disposable(self.target.subscribe(self._planner.handle_goal_request)))

self._disposables.add(self._planner.path.subscribe(self.path.publish))

Expand Down
2 changes: 2 additions & 0 deletions dimos/robot/unitree/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from dimos.core.resource import Resource
from dimos.msgs.geometry_msgs import Pose, Transform, Twist
from dimos.msgs.sensor_msgs import Image
from dimos.msgs.sensor_msgs.image_impls.AbstractImage import ImageFormat
from dimos.robot.unitree_webrtc.type.lidar import LidarMessage
from dimos.robot.unitree_webrtc.type.lowstate import LowStateMsg
from dimos.robot.unitree_webrtc.type.odometry import Odometry
Expand Down Expand Up @@ -267,6 +268,7 @@ def video_stream(self) -> Observable[Image]:
lambda frame: Image.from_numpy(
# np.ascontiguousarray(frame.to_ndarray("rgb24")),
frame.to_ndarray(format="rgb24"), # type: ignore[attr-defined]
format=ImageFormat.RGB, # Frame is RGB24, not BGR
frame_id="camera_optical",
)
),
Expand Down
2 changes: 2 additions & 0 deletions dimos/robot/unitree_webrtc/unitree_go2_blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE
from dimos.core.blueprints import autoconnect
from dimos.core.transport import JpegLcmTransport, JpegShmTransport, LCMTransport, pSHMTransport
from dimos.dashboard.tf_rerun_module import tf_rerun
from dimos.mapping.costmapper import cost_mapper
from dimos.mapping.voxels import voxel_mapper
from dimos.msgs.sensor_msgs import Image, PointCloud2
Expand Down Expand Up @@ -73,6 +74,7 @@
go2_connection(),
linux if platform.system() == "Linux" else mac,
websocket_vis(),
tf_rerun(), # Auto-visualize all TF transforms in Rerun
).global_config(n_dask_workers=4, robot_model="unitree_go2")

nav = autoconnect(
Expand Down
Loading