diff --git a/dimos/core/blueprints.py b/dimos/core/blueprints.py index 14338d6a38..1560554eed 100644 --- a/dimos/core/blueprints.py +++ b/dimos/core/blueprints.py @@ -23,6 +23,9 @@ from types import MappingProxyType from typing import Any, Literal, get_args, get_origin, get_type_hints +import rerun as rr +import rerun.blueprint as rrb + from dimos.core.global_config import GlobalConfig from dimos.core.module import Module from dimos.core.module_coordinator import ModuleCoordinator @@ -280,6 +283,50 @@ def _connect_rpc_methods(self, module_coordinator: ModuleCoordinator) -> None: requested_method_name, rpc_methods_dot[requested_method_name] ) + def _init_rerun_blueprint(self, module_coordinator: ModuleCoordinator) -> None: + """Compose and send Rerun blueprint from module contributions. + + Collects rerun_views() from all modules and composes them into a unified layout. + """ + # Collect view contributions from all modules + side_panels = [] + for blueprint in self.blueprints: + if hasattr(blueprint.module, "rerun_views"): + views = blueprint.module.rerun_views() + if views: + side_panels.extend(views) + + # Always include latency panel if we have any panels + if side_panels: + side_panels.append( + rrb.TimeSeriesView( + name="Latency (ms)", + origin="/metrics", + contents=[ + "+ /metrics/voxel_map/latency_ms", + "+ /metrics/costmap/latency_ms", + ], + ) + ) + + # Compose final layout + if side_panels: + composed_blueprint = rrb.Blueprint( + rrb.Horizontal( + rrb.Spatial3DView( + name="3D View", + origin="world", + background=[0, 0, 0], + ), + rrb.Vertical(*side_panels, row_shares=[2] + [1] * (len(side_panels) - 1)), + column_shares=[3, 1], + ), + rrb.TimePanel(state="collapsed"), + rrb.SelectionPanel(state="collapsed"), + rrb.BlueprintPanel(state="collapsed"), + ) + rr.send_blueprint(composed_blueprint) + def build( self, global_config: GlobalConfig | None = None, @@ -294,6 +341,17 @@ def build( self._check_requirements() self._verify_no_name_conflicts() + # Initialize Rerun server before deploying modules (if backend is Rerun) + if global_config.rerun_enabled and global_config.viewer_backend.startswith("rerun"): + try: + from dimos.dashboard.rerun_init import init_rerun_server + + server_addr = init_rerun_server(viewer_mode=global_config.viewer_backend) + global_config = global_config.model_copy(update={"rerun_server_addr": server_addr}) + logger.info("Rerun server initialized", addr=server_addr) + except Exception as e: + logger.warning(f"Failed to initialize Rerun server: {e}") + module_coordinator = ModuleCoordinator(global_config=global_config) module_coordinator.start() @@ -303,6 +361,10 @@ def build( module_coordinator.start_all_modules() + # Compose and send Rerun blueprint from module contributions + if global_config.viewer_backend.startswith("rerun"): + self._init_rerun_blueprint(module_coordinator) + return module_coordinator diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py index 8d8edc2598..dfa54830ac 100644 --- a/dimos/core/global_config.py +++ b/dimos/core/global_config.py @@ -14,12 +14,15 @@ from functools import cached_property import re +from typing import Literal, TypeAlias from pydantic_settings import BaseSettings, SettingsConfigDict from dimos.mapping.occupancy.path_map import NavigationStrategy from dimos.navigation.global_planner.types import AStarAlgorithm +ViewerBackend: TypeAlias = Literal["rerun-web", "rerun-native", "foxglove"] + def _get_all_numbers(s: str) -> list[float]: return [float(x) for x in re.findall(r"-?\d+\.?\d*", s)] @@ -29,6 +32,9 @@ class GlobalConfig(BaseSettings): robot_ip: str | None = None simulation: bool = False replay: bool = False + rerun_enabled: bool = True + rerun_server_addr: str | None = None + viewer_backend: ViewerBackend = "rerun-native" n_dask_workers: int = 2 memory_limit: str = "auto" mujoco_camera_position: str | None = None diff --git a/dimos/core/stream.py b/dimos/core/stream.py index 652223d9ab..70384ffc77 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -26,6 +26,7 @@ import reactivex as rx from reactivex import operators as ops from reactivex.disposable import Disposable +import rerun as rr import dimos.core.colors as colors from dimos.core.resource import Resource @@ -137,11 +138,13 @@ def __str__(self) -> str: ) -class Out(Stream[T]): +class Out(Stream[T], ObservableMixin[T]): _transport: Transport # type: ignore[type-arg] def __init__(self, *argv, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*argv, **kwargs) + self._rerun_config: dict | None = None # type: ignore[type-arg] + self._rerun_last_log: float = 0.0 @property def transport(self) -> Transport[T]: @@ -149,8 +152,7 @@ def transport(self) -> Transport[T]: @transport.setter def transport(self, value: Transport[T]) -> None: - # just for type checking - ... + self._transport = value @property def state(self) -> State: @@ -173,8 +175,76 @@ def publish(self, msg) -> None: # type: ignore[no-untyped-def] if not hasattr(self, "_transport") or self._transport is None: logger.warning(f"Trying to publish on Out {self} without a transport") return + + # Log to Rerun directly if configured + if self._rerun_config is not None: + self._log_to_rerun(msg) + self._transport.broadcast(self, msg) + def subscribe(self, cb) -> Callable[[], None]: # type: ignore[no-untyped-def] + """Subscribe to this output stream. + + Args: + cb: Callback function to receive messages + + Returns: + Unsubscribe function + """ + return self.transport.subscribe(cb, self) # type: ignore[arg-type, func-returns-value, no-any-return] + + def autolog_to_rerun( + self, + entity_path: str, + rate_limit: float | None = None, + **rerun_kwargs: Any, + ) -> None: + """Configure this output to auto-log to Rerun (fire-and-forget). + + Call once in start() - messages auto-logged when published. + + Args: + entity_path: Rerun entity path (e.g., "world/map") + rate_limit: Max Hz to log (None = unlimited) + **rerun_kwargs: Passed to msg.to_rerun() for rendering config + (e.g., radii=0.02, colormap="turbo", colors=[255,0,0]) + + Example: + def start(self): + super().start() + # Just declare it - fire and forget! + self.global_map.autolog_to_rerun("world/map", rate_limit=5.0, radii=0.02) + """ + self._rerun_config = { + "entity_path": entity_path, + "rate_limit": rate_limit, + "rerun_kwargs": rerun_kwargs, + } + self._rerun_last_log = 0.0 + + def _log_to_rerun(self, msg: T) -> None: + """Log message to Rerun with rate limiting.""" + if not hasattr(msg, "to_rerun"): + return + + if self._rerun_config is None: + return + + import time + + config = self._rerun_config + + # Rate limiting + if config["rate_limit"] is not None: + now = time.monotonic() + min_interval = 1.0 / config["rate_limit"] + if now - self._rerun_last_log < min_interval: + return # Skip - too soon + self._rerun_last_log = now + + rerun_data = msg.to_rerun(**config["rerun_kwargs"]) + rr.log(config["entity_path"], rerun_data) + class RemoteStream(Stream[T]): @property diff --git a/dimos/dashboard/__init__.py b/dimos/dashboard/__init__.py new file mode 100644 index 0000000000..fc97805936 --- /dev/null +++ b/dimos/dashboard/__init__.py @@ -0,0 +1,34 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dashboard module for visualization and monitoring. + +Rerun Initialization: + Main process (e.g., blueprints.build) starts Rerun server automatically. + Worker modules connect to the server via connect_rerun(). + +Usage in modules: + import rerun as rr + from dimos.dashboard.rerun_init import connect_rerun + + class MyModule(Module): + def start(self): + super().start() + connect_rerun() # Connect to Rerun server + rr.log("my/entity", my_data.to_rerun()) +""" + +from dimos.dashboard.rerun_init import connect_rerun, init_rerun_server, shutdown_rerun + +__all__ = ["connect_rerun", "init_rerun_server", "shutdown_rerun"] diff --git a/dimos/dashboard/dimos.rbl b/dimos/dashboard/dimos.rbl new file mode 100644 index 0000000000..160180e27a Binary files /dev/null and b/dimos/dashboard/dimos.rbl differ diff --git a/dimos/dashboard/rerun_init.py b/dimos/dashboard/rerun_init.py new file mode 100644 index 0000000000..81beb40d6a --- /dev/null +++ b/dimos/dashboard/rerun_init.py @@ -0,0 +1,165 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Rerun initialization with multi-process support. + +Architecture: + - Main process calls init_rerun_server() to start gRPC server + viewer + - Worker processes call connect_rerun() to connect to the server + - All processes share the same Rerun recording stream + +Viewer modes (set via VIEWER_BACKEND config or environment variable): + - "rerun-web" (default): Web viewer on port 9090 + - "rerun-native": Native Rerun viewer (requires display) + - "foxglove": Use Foxglove instead of Rerun + +Usage: + # Set via environment: + VIEWER_BACKEND=rerun-web # or rerun-native or foxglove + + # Or via .env file: + viewer_backend=rerun-native + + # In main process (blueprints.py handles this automatically): + from dimos.dashboard.rerun_init import init_rerun_server + server_addr = init_rerun_server(viewer_mode="rerun-web") + + # In worker modules: + from dimos.dashboard.rerun_init import connect_rerun + connect_rerun() + + # On shutdown: + from dimos.dashboard.rerun_init import shutdown_rerun + shutdown_rerun() +""" + +import atexit +import threading + +import rerun as rr + +from dimos.core.global_config import GlobalConfig +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +RERUN_GRPC_PORT = 9876 +RERUN_WEB_PORT = 9090 +RERUN_GRPC_ADDR = f"rerun+http://127.0.0.1:{RERUN_GRPC_PORT}/proxy" + +# Track initialization state +_server_started = False +_connected = False +_rerun_init_lock = threading.Lock() + + +def init_rerun_server(viewer_mode: str = "rerun-web") -> str: + """Initialize Rerun server in the main process. + + Starts the gRPC server and optionally the web/native viewer. + Should only be called once from the main process. + + Args: + viewer_mode: One of "rerun-web", "rerun-native", or "rerun-grpc-only" + + Returns: + Server address for workers to connect to. + + Raises: + RuntimeError: If server initialization fails. + """ + global _server_started + + if _server_started: + logger.debug("Rerun server already started") + return RERUN_GRPC_ADDR + + rr.init("dimos") + + if viewer_mode == "rerun-native": + # Spawn native viewer (requires display) + rr.spawn(port=RERUN_GRPC_PORT, connect=True) + logger.info("Rerun: spawned native viewer", port=RERUN_GRPC_PORT) + elif viewer_mode == "rerun-web": + # Start gRPC + web viewer (headless friendly) + server_uri = rr.serve_grpc(grpc_port=RERUN_GRPC_PORT) + rr.serve_web_viewer(web_port=RERUN_WEB_PORT, open_browser=False, connect_to=server_uri) + logger.info( + "Rerun: web viewer started", + web_port=RERUN_WEB_PORT, + url=f"http://localhost:{RERUN_WEB_PORT}", + ) + else: + # Just gRPC server, no viewer (connect externally) + rr.serve_grpc(grpc_port=RERUN_GRPC_PORT) + logger.info( + "Rerun: gRPC server only", + port=RERUN_GRPC_PORT, + connect_command=f"rerun --connect {RERUN_GRPC_ADDR}", + ) + + _server_started = True + + # Register shutdown handler + atexit.register(shutdown_rerun) + + return RERUN_GRPC_ADDR + + +def connect_rerun( + global_config: GlobalConfig | None = None, + server_addr: str | None = None, +) -> None: + """Connect to Rerun server from a worker process. + + Modules should check global_config.viewer_backend before calling this. + + Args: + global_config: Global configuration (checks viewer_backend) + server_addr: Server address to connect to. Defaults to RERUN_GRPC_ADDR. + """ + global _connected + + with _rerun_init_lock: + if _connected: + logger.debug("Already connected to Rerun server") + return + + # Skip if foxglove backend selected + if global_config and not global_config.viewer_backend.startswith("rerun"): + logger.debug("Rerun connection skipped", viewer_backend=global_config.viewer_backend) + return + + addr = server_addr or RERUN_GRPC_ADDR + + rr.init("dimos") + rr.connect_grpc(addr) + logger.info("Rerun: connected to server", addr=addr) + + _connected = True + + +def shutdown_rerun() -> None: + """Disconnect from Rerun and cleanup resources.""" + global _server_started, _connected + + if _server_started or _connected: + try: + rr.disconnect() + logger.info("Rerun: disconnected") + except Exception as e: + logger.warning("Rerun: error during disconnect", error=str(e)) + + _server_started = False + _connected = False diff --git a/dimos/mapping/costmapper.py b/dimos/mapping/costmapper.py index 022751e0cf..e637126a04 100644 --- a/dimos/mapping/costmapper.py +++ b/dimos/mapping/costmapper.py @@ -13,18 +13,28 @@ # limitations under the License. from dataclasses import asdict, dataclass, field +import queue +import threading +import time from reactivex import operators as ops +import rerun as rr +import rerun.blueprint as rrb from dimos.core import In, Module, Out, rpc +from dimos.core.global_config import GlobalConfig from dimos.core.module import ModuleConfig +from dimos.dashboard.rerun_init import connect_rerun from dimos.mapping.pointclouds.occupancy import ( OCCUPANCY_ALGOS, HeightCostConfig, OccupancyConfig, ) from dimos.msgs.nav_msgs import OccupancyGrid -from dimos.robot.unitree_webrtc.type.lidar import LidarMessage +from dimos.msgs.sensor_msgs import PointCloud2 +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() @dataclass @@ -37,27 +47,118 @@ class CostMapper(Module): default_config = Config config: Config - global_map: In[LidarMessage] + global_map: In[PointCloud2] global_costmap: Out[OccupancyGrid] + # Background Rerun logging (decouples viz from data pipeline) + _rerun_queue: queue.Queue[tuple[OccupancyGrid, float, float] | None] + _rerun_thread: threading.Thread | None = None + + @classmethod + def rerun_views(cls): # type: ignore[no-untyped-def] + """Return Rerun view blueprints for costmap visualization.""" + return [ + rrb.TimeSeriesView( + name="Costmap (ms)", + origin="/metrics/costmap", + contents=["+ /metrics/costmap/calc_ms"], + ), + ] + + def __init__(self, global_config: GlobalConfig | None = None, **kwargs: object) -> None: + super().__init__(**kwargs) + self._global_config = global_config or GlobalConfig() + self._rerun_queue = queue.Queue(maxsize=2) + + def _rerun_worker(self) -> None: + """Background thread: pull from queue and log to Rerun (non-blocking).""" + while True: + try: + item = self._rerun_queue.get(timeout=1.0) + if item is None: # Shutdown signal + break + + grid, calc_time_ms, rx_monotonic = item + + # Generate mesh + log to Rerun (blocks in background, not on data path) + try: + # 2D image panel + rr.log( + "world/nav/costmap/image", + grid.to_rerun( + mode="image", + colormap="RdBu_r", + ), + ) + # 3D floor overlay (expensive mesh generation) + rr.log( + "world/nav/costmap/floor", + grid.to_rerun( + mode="mesh", + colormap=None, # Grayscale: free=white, occupied=black + z_offset=0.02, + ), + ) + + # Log timing metrics + rr.log("metrics/costmap/calc_ms", rr.Scalars(calc_time_ms)) + latency_ms = (time.monotonic() - rx_monotonic) * 1000 + rr.log("metrics/costmap/latency_ms", rr.Scalars(latency_ms)) + except Exception as e: + logger.warning(f"Rerun logging error: {e}") + except queue.Empty: + continue + @rpc def start(self) -> None: super().start() + # Only start Rerun logging if Rerun backend is selected + if self._global_config.viewer_backend.startswith("rerun"): + connect_rerun(global_config=self._global_config) + + # Start background Rerun logging thread + self._rerun_thread = threading.Thread(target=self._rerun_worker, daemon=True) + self._rerun_thread.start() + logger.info("CostMapper: started async Rerun logging thread") + + def _publish_costmap(grid: OccupancyGrid, calc_time_ms: float, rx_monotonic: float) -> None: + # Publish to downstream FIRST (fast, not blocked by Rerun) + self.global_costmap.publish(grid) + + # Queue for async Rerun logging (non-blocking, drops if queue full) + if self._rerun_thread and self._rerun_thread.is_alive(): + try: + self._rerun_queue.put_nowait((grid, calc_time_ms, rx_monotonic)) + except queue.Full: + pass # Drop viz frame, data pipeline continues + + def _calculate_and_time( + msg: PointCloud2, + ) -> tuple[OccupancyGrid, float, float]: + rx_monotonic = time.monotonic() # Capture receipt time + start = time.perf_counter() + grid = self._calculate_costmap(msg) + elapsed_ms = (time.perf_counter() - start) * 1000 + return grid, elapsed_ms, rx_monotonic + self._disposables.add( self.global_map.observable() # type: ignore[no-untyped-call] - .pipe(ops.map(self._calculate_costmap)) - .subscribe( - self.global_costmap.publish, - ) + .pipe(ops.map(_calculate_and_time)) + .subscribe(lambda result: _publish_costmap(result[0], result[1], result[2])) ) @rpc def stop(self) -> None: + # Shutdown background Rerun thread + if self._rerun_thread and self._rerun_thread.is_alive(): + self._rerun_queue.put(None) # Shutdown signal + self._rerun_thread.join(timeout=2.0) + super().stop() # @timed() # TODO: fix thread leak in timed decorator - def _calculate_costmap(self, msg: LidarMessage) -> OccupancyGrid: + def _calculate_costmap(self, msg: PointCloud2) -> OccupancyGrid: fn = OCCUPANCY_ALGOS[self.config.algo] return fn(msg, **asdict(self.config.config)) diff --git a/dimos/mapping/voxels.py b/dimos/mapping/voxels.py index 6b122213c3..a36dc9bc17 100644 --- a/dimos/mapping/voxels.py +++ b/dimos/mapping/voxels.py @@ -13,6 +13,8 @@ # limitations under the License. from dataclasses import dataclass +import queue +import threading import time import numpy as np @@ -21,14 +23,21 @@ from reactivex import interval, operators as ops from reactivex.disposable import Disposable from reactivex.subject import Subject +import rerun as rr +import rerun.blueprint as rrb from dimos.core import In, Module, Out, rpc +from dimos.core.global_config import GlobalConfig from dimos.core.module import ModuleConfig +from dimos.dashboard.rerun_init import connect_rerun from dimos.msgs.sensor_msgs import PointCloud2 from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.utils.decorators import simple_mcache +from dimos.utils.logging_config import setup_logger from dimos.utils.reactive import backpressure +logger = setup_logger() + @dataclass class Config(ModuleConfig): @@ -46,10 +55,32 @@ class VoxelGridMapper(Module): config: Config lidar: In[LidarMessage] - global_map: Out[LidarMessage] - - def __init__(self, **kwargs: object) -> None: + global_map: Out[PointCloud2] + + @classmethod + def rerun_views(cls): # type: ignore[no-untyped-def] + """Return Rerun view blueprints for voxel map visualization.""" + return [ + rrb.TimeSeriesView( + name="Voxel Pipeline (ms)", + origin="/metrics/voxel_map", + contents=[ + "+ /metrics/voxel_map/extract_ms", + "+ /metrics/voxel_map/transport_ms", + "+ /metrics/voxel_map/publish_ms", + ], + ), + rrb.TimeSeriesView( + name="Voxel Count", + origin="/metrics/voxel_map", + contents=["+ /metrics/voxel_map/voxel_count"], + ), + ] + + def __init__(self, global_config: GlobalConfig | None = None, **kwargs: object) -> None: super().__init__(**kwargs) + self._global_config = global_config or GlobalConfig() + dev = ( o3c.Device(self.config.device) if (self.config.device.startswith("CUDA") and o3c.cuda.is_available()) @@ -72,11 +103,49 @@ def __init__(self, **kwargs: object) -> None: self._voxel_hashmap = self.vbg.hashmap() self._key_dtype = self._voxel_hashmap.key_tensor().dtype self._latest_frame_ts: float = 0.0 + # Monotonic timestamp of last received frame (for accurate latency in replay) + self._latest_frame_rx_monotonic: float | None = None + + # Background Rerun logging (decouples viz from data pipeline) + self._rerun_queue: queue.Queue[PointCloud2 | None] = queue.Queue(maxsize=2) + self._rerun_thread: threading.Thread | None = None + + def _rerun_worker(self) -> None: + """Background thread: pull from queue and log to Rerun (non-blocking).""" + while True: + try: + pc = self._rerun_queue.get(timeout=1.0) + if pc is None: # Shutdown signal + break + + # Log to Rerun (blocks in background, doesn't affect data pipeline) + try: + rr.log( + "world/map", + pc.to_rerun( + mode="boxes", + size=self.config.voxel_size, + colormap="turbo", + ), + ) + except Exception as e: + logger.warning(f"Rerun logging error: {e}") + except queue.Empty: + continue @rpc def start(self) -> None: super().start() + # Only start Rerun logging if Rerun backend is selected + if self._global_config.viewer_backend.startswith("rerun"): + connect_rerun(global_config=self._global_config) + + # Start background Rerun logging thread (decouples viz from data pipeline) + self._rerun_thread = threading.Thread(target=self._rerun_worker, daemon=True) + self._rerun_thread.start() + logger.info("VoxelGridMapper: started async Rerun logging thread") + # Subject to trigger publishing, with backpressure to drop if busy self._publish_trigger: Subject[None] = Subject() self._disposables.add( @@ -98,15 +167,53 @@ def start(self) -> None: @rpc def stop(self) -> None: + # Shutdown background Rerun thread + if self._rerun_thread and self._rerun_thread.is_alive(): + self._rerun_queue.put(None) # Shutdown signal + self._rerun_thread.join(timeout=2.0) + super().stop() def _on_frame(self, frame: LidarMessage) -> None: + # Track receipt time with monotonic clock (works correctly in replay) + self._latest_frame_rx_monotonic = time.monotonic() self.add_frame(frame) if self.config.publish_interval == 0: self._publish_trigger.on_next(None) def publish_global_map(self) -> None: - self.global_map.publish(self.get_global_pointcloud2()) + # Snapshot monotonic timestamp once (won't be overwritten during slow publish) + rx_monotonic = self._latest_frame_rx_monotonic + + start_total = time.perf_counter() + + # 1. Extract pointcloud from GPU hashmap + t1 = time.perf_counter() + pc = self.get_global_pointcloud2() + extract_ms = (time.perf_counter() - t1) * 1000 + + # 2. Publish to downstream (NO auto-logging - fast!) + t2 = time.perf_counter() + self.global_map.publish(pc) + publish_ms = (time.perf_counter() - t2) * 1000 + + # 3. Queue for async Rerun logging (non-blocking, drops if queue full) + try: + self._rerun_queue.put_nowait(pc) + except queue.Full: + pass # Drop viz frame, data pipeline continues + + # Log detailed timing breakdown to Rerun + total_ms = (time.perf_counter() - start_total) * 1000 + rr.log("metrics/voxel_map/publish_ms", rr.Scalars(total_ms)) + rr.log("metrics/voxel_map/extract_ms", rr.Scalars(extract_ms)) + rr.log("metrics/voxel_map/transport_ms", rr.Scalars(publish_ms)) + rr.log("metrics/voxel_map/voxel_count", rr.Scalars(float(len(pc)))) + + # Log pipeline latency (time from frame receipt to publish complete) + if rx_monotonic is not None: + latency_ms = (time.monotonic() - rx_monotonic) * 1000 + rr.log("metrics/voxel_map/latency_ms", rr.Scalars(latency_ms)) def size(self) -> int: return self._voxel_hashmap.size() # type: ignore[no-any-return] diff --git a/dimos/msgs/geometry_msgs/PoseStamped.py b/dimos/msgs/geometry_msgs/PoseStamped.py index 0d53c22dd5..406c5d7ac7 100644 --- a/dimos/msgs/geometry_msgs/PoseStamped.py +++ b/dimos/msgs/geometry_msgs/PoseStamped.py @@ -26,8 +26,8 @@ ) except ImportError: ROSPoseStamped = None # type: ignore[assignment, misc] - from plum import dispatch +import rerun as rr from dimos.msgs.geometry_msgs.Pose import Pose from dimos.msgs.geometry_msgs.Quaternion import Quaternion, QuaternionConvertable @@ -87,6 +87,31 @@ def __str__(self) -> str: f"euler=[{math.degrees(self.roll):.1f}, {math.degrees(self.pitch):.1f}, {math.degrees(self.yaw):.1f}])" ) + def to_rerun(self): # type: ignore[no-untyped-def] + """Convert to rerun Transform3D format. + + Returns a Transform3D that can be logged to Rerun to position + child entities in the transform hierarchy. + """ + return rr.Transform3D( + translation=[self.x, self.y, self.z], + rotation=rr.Quaternion( + xyzw=[ + self.orientation.x, + self.orientation.y, + self.orientation.z, + self.orientation.w, + ] + ), + ) + + def to_rerun_arrow(self, length: float = 0.5): # type: ignore[no-untyped-def] + """Convert to rerun Arrows3D format for visualization.""" + origin = [[self.x, self.y, self.z]] + forward = self.orientation.rotate_vector(Vector3(length, 0, 0)) + vector = [[forward.x, forward.y, forward.z]] + return rr.Arrows3D(origins=origin, vectors=vector) + def new_transform_to(self, name: str) -> Transform: return self.find_transform( PoseStamped( diff --git a/dimos/msgs/geometry_msgs/Transform.py b/dimos/msgs/geometry_msgs/Transform.py index e1a6f7d424..cb112ee5c9 100644 --- a/dimos/msgs/geometry_msgs/Transform.py +++ b/dimos/msgs/geometry_msgs/Transform.py @@ -34,6 +34,7 @@ ROSTransform = None # type: ignore[assignment, misc] ROSVector3 = None # type: ignore[assignment, misc] ROSQuaternion = None # type: ignore[assignment, misc] +import rerun as rr from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Vector3 import Vector3 @@ -359,3 +360,18 @@ def lcm_decode(cls, data: bytes | BinaryIO) -> Transform: child_frame_id=lcm_transform_stamped.child_frame_id, ts=ts, ) + + def to_rerun(self): # type: ignore[no-untyped-def] + """Convert to rerun Transform3D format with frame IDs. + + Returns: + rr.Transform3D archetype for logging to rerun with parent/child frames + """ + return rr.Transform3D( + translation=[self.translation.x, self.translation.y, self.translation.z], + rotation=rr.Quaternion( + xyzw=[self.rotation.x, self.rotation.y, self.rotation.z, self.rotation.w] + ), + parent_frame=self.frame_id, # type: ignore[call-arg] + child_frame=self.child_frame_id, # type: ignore[call-arg] + ) diff --git a/dimos/msgs/nav_msgs/OccupancyGrid.py b/dimos/msgs/nav_msgs/OccupancyGrid.py index 193abff756..62ee9aa072 100644 --- a/dimos/msgs/nav_msgs/OccupancyGrid.py +++ b/dimos/msgs/nav_msgs/OccupancyGrid.py @@ -15,20 +15,30 @@ from __future__ import annotations from enum import IntEnum +from functools import lru_cache import time -from typing import TYPE_CHECKING, BinaryIO +from typing import TYPE_CHECKING, Any, BinaryIO from dimos_lcm.nav_msgs import ( MapMetaData, OccupancyGrid as LCMOccupancyGrid, ) -from dimos_lcm.std_msgs import Time as LCMTime +from dimos_lcm.std_msgs import Time as LCMTime # type: ignore[import-untyped] +import matplotlib.pyplot as plt import numpy as np from PIL import Image +import rerun as rr from dimos.msgs.geometry_msgs import Pose, Vector3, VectorLike from dimos.types.timestamped import Timestamped + +@lru_cache(maxsize=16) +def _get_matplotlib_cmap(name: str): # type: ignore[no-untyped-def] + """Get a matplotlib colormap by name (cached for performance).""" + return plt.get_cmap(name) + + if TYPE_CHECKING: from pathlib import Path @@ -416,3 +426,245 @@ def cell_value(self, world_position: Vector3) -> int: return CostValues.UNKNOWN return int(self.grid[y, x]) + + def to_rerun( # type: ignore[no-untyped-def] + self, + colormap: str | None = None, + mode: str = "image", + z_offset: float = 0.01, + **kwargs: Any, + ): # type: ignore[no-untyped-def] + """Convert to Rerun visualization format. + + Args: + colormap: Optional colormap name (e.g., "RdBu_r" for blue=free, red=occupied). + If None, uses grayscale for image mode or default colors for 3D modes. + mode: Visualization mode: + - "image": 2D grayscale/colored image (default) + - "mesh": 3D textured plane overlay on floor + - "points": 3D points for occupied cells only + z_offset: Height offset for 3D modes (default 0.01m above floor) + **kwargs: Additional args (ignored for compatibility) + + Returns: + Rerun archetype for logging (rr.Image, rr.Mesh3D, or rr.Points3D) + + The visualization uses: + - Free space (value 0): white/blue + - Unknown space (value -1): gray/transparent + - Occupied space (value > 0): black/red with gradient + """ + if self.grid.size == 0: + if mode == "image": + return rr.Image(np.zeros((1, 1), dtype=np.uint8), color_model="L") + elif mode == "mesh": + return rr.Mesh3D(vertex_positions=[]) + else: + return rr.Points3D([]) + + if mode == "points": + return self._to_rerun_points(colormap, z_offset) + elif mode == "mesh": + return self._to_rerun_mesh(colormap, z_offset) + else: + return self._to_rerun_image(colormap) + + def _to_rerun_image(self, colormap: str | None = None): # type: ignore[no-untyped-def] + """Convert to 2D image visualization.""" + # Use existing cached visualization functions for supported palettes + if colormap in ("turbo", "rainbow"): + from dimos.mapping.occupancy.visualizations import rainbow_image, turbo_image + + if colormap == "turbo": + bgr_image = turbo_image(self.grid) + else: + bgr_image = rainbow_image(self.grid) + + # Convert BGR to RGB and flip for world coordinates + rgb_image = np.flipud(bgr_image[:, :, ::-1]) + return rr.Image(rgb_image, color_model="RGB") + + if colormap is not None: + # Use matplotlib colormap (cached for performance) + cmap = _get_matplotlib_cmap(colormap) + + grid_float = self.grid.astype(np.float32) + + # Create RGBA image + vis = np.zeros((self.height, self.width, 4), dtype=np.uint8) + + # Free space: low cost (blue in RdBu_r) + free_mask = self.grid == 0 + # Occupied: high cost (red in RdBu_r) + occupied_mask = self.grid > 0 + # Unknown: transparent gray + unknown_mask = self.grid == -1 + + # Map free to 0, costs to normalized value + if np.any(free_mask): + colors_free = (cmap(0.0)[:3] * np.array([255, 255, 255])).astype(np.uint8) + vis[free_mask, :3] = colors_free + vis[free_mask, 3] = 255 + + if np.any(occupied_mask): + # Normalize costs 1-100 to 0.5-1.0 range + costs = grid_float[occupied_mask] + cost_norm = 0.5 + (costs / 100) * 0.5 + colors_occ = (cmap(cost_norm)[:, :3] * 255).astype(np.uint8) + vis[occupied_mask, :3] = colors_occ + vis[occupied_mask, 3] = 255 + + if np.any(unknown_mask): + vis[unknown_mask] = [128, 128, 128, 100] # Semi-transparent gray + + # Flip vertically to match world coordinates (y=0 at bottom) + return rr.Image(np.flipud(vis), color_model="RGBA") + + # Grayscale visualization (no colormap) + vis_gray = np.zeros((self.height, self.width), dtype=np.uint8) + + # Free space = white + vis_gray[self.grid == 0] = 255 + + # Unknown = gray + vis_gray[self.grid == -1] = 128 + + # Occupied (100) = black, costs (1-99) = gradient + occupied_mask = self.grid > 0 + if np.any(occupied_mask): + # Map 1-100 to 127-0 (darker = more occupied) + costs = self.grid[occupied_mask].astype(np.float32) + vis_gray[occupied_mask] = (127 * (1 - costs / 100)).astype(np.uint8) + + # Flip vertically to match world coordinates (y=0 at bottom) + return rr.Image(np.flipud(vis_gray), color_model="L") + + def _to_rerun_points(self, colormap: str | None = None, z_offset: float = 0.01): # type: ignore[no-untyped-def] + """Convert to 3D points for occupied cells.""" + # Find occupied cells (cost > 0) + occupied_mask = self.grid > 0 + if not np.any(occupied_mask): + return rr.Points3D([]) + + # Get grid coordinates of occupied cells + gy, gx = np.where(occupied_mask) + costs = self.grid[occupied_mask].astype(np.float32) + + # Convert to world coordinates + ox = self.origin.position.x + oy = self.origin.position.y + wx = ox + (gx + 0.5) * self.resolution + wy = oy + (gy + 0.5) * self.resolution + wz = np.full_like(wx, z_offset) + + points = np.column_stack([wx, wy, wz]) + + # Determine colors + if colormap is not None: + # Normalize costs to 0-1 range + cost_norm = costs / 100.0 + cmap = _get_matplotlib_cmap(colormap) + point_colors = (cmap(cost_norm)[:, :3] * 255).astype(np.uint8) + else: + # Default: red gradient based on cost + intensity = (costs / 100.0 * 255).astype(np.uint8) + point_colors = np.column_stack( + [intensity, np.zeros_like(intensity), np.zeros_like(intensity)] + ) + + return rr.Points3D( + positions=points, + radii=self.resolution / 2, + colors=point_colors, + ) + + def _to_rerun_mesh(self, colormap: str | None = None, z_offset: float = 0.01): # type: ignore[no-untyped-def] + """Convert to 3D mesh overlay on floor plane. + + Only renders known cells (free or occupied), skipping unknown cells. + Uses per-vertex colors for proper alpha blending. + Fully vectorized for performance (~100x faster than loop version). + """ + # Only render known cells (not unknown = -1) + known_mask = self.grid != -1 + if not np.any(known_mask): + return rr.Mesh3D(vertex_positions=[]) + + # Get grid coordinates of known cells + gy, gx = np.where(known_mask) + n_cells = len(gy) + + ox = self.origin.position.x + oy = self.origin.position.y + r = self.resolution + + # === VECTORIZED VERTEX GENERATION === + # World positions of cell corners (bottom-left of each cell) + wx = ox + gx.astype(np.float32) * r + wy = oy + gy.astype(np.float32) * r + + # Each cell has 4 vertices: (wx,wy), (wx+r,wy), (wx+r,wy+r), (wx,wy+r) + # Shape: (n_cells, 4, 3) + vertices = np.zeros((n_cells, 4, 3), dtype=np.float32) + vertices[:, 0, 0] = wx + vertices[:, 0, 1] = wy + vertices[:, 0, 2] = z_offset + vertices[:, 1, 0] = wx + r + vertices[:, 1, 1] = wy + vertices[:, 1, 2] = z_offset + vertices[:, 2, 0] = wx + r + vertices[:, 2, 1] = wy + r + vertices[:, 2, 2] = z_offset + vertices[:, 3, 0] = wx + vertices[:, 3, 1] = wy + r + vertices[:, 3, 2] = z_offset + # Flatten to (n_cells*4, 3) + flat_vertices = vertices.reshape(-1, 3) + + # === VECTORIZED INDEX GENERATION === + # Base vertex indices for each cell: [0, 4, 8, 12, ...] + base_v = np.arange(n_cells, dtype=np.uint32) * 4 + # Two triangles per cell: (0,1,2) and (0,2,3) relative to base + indices = np.zeros((n_cells, 2, 3), dtype=np.uint32) + indices[:, 0, 0] = base_v + indices[:, 0, 1] = base_v + 1 + indices[:, 0, 2] = base_v + 2 + indices[:, 1, 0] = base_v + indices[:, 1, 1] = base_v + 2 + indices[:, 1, 2] = base_v + 3 + # Flatten to (n_cells*2, 3) + flat_indices = indices.reshape(-1, 3) + + # === VECTORIZED COLOR GENERATION === + cell_values = self.grid[gy, gx] # Get all cell values at once + + if colormap: + cmap = _get_matplotlib_cmap(colormap) + # Normalize costs: free(0) -> 0.0, cost(1-100) -> 0.5-1.0 + cost_norm = np.where(cell_values == 0, 0.0, 0.5 + (cell_values / 100) * 0.5) + # Sample colormap for all cells at once (returns Nx4 RGBA float) + rgba_float = cmap(cost_norm)[:, :3] # Drop alpha, we set our own + rgb = (rgba_float * 255).astype(np.uint8) + # Alpha: 180 for free, 220 for occupied + alpha = np.where(cell_values == 0, 180, 220).astype(np.uint8) + else: + # Default coloring: dark grey for free, black for occupied + rgb = np.zeros((n_cells, 3), dtype=np.uint8) + is_free = cell_values == 0 + # Free space: dark grey + rgb[is_free] = [40, 40, 40] + # Occupied: black to dark grey gradient (darker = more occupied) + intensity = (40 * (1 - cell_values / 100)).astype(np.uint8) + rgb[~is_free] = np.column_stack([intensity[~is_free]] * 3) + alpha = np.where(is_free, 150, 200).astype(np.uint8) + + # Combine RGB and alpha into RGBA + colors_per_cell = np.column_stack([rgb, alpha]) # (n_cells, 4) + # Repeat each color 4 times (one per vertex) + colors = np.repeat(colors_per_cell, 4, axis=0) # (n_cells*4, 4) + + return rr.Mesh3D( + vertex_positions=flat_vertices, + triangle_indices=flat_indices, + vertex_colors=colors, + ) diff --git a/dimos/msgs/nav_msgs/Path.py b/dimos/msgs/nav_msgs/Path.py index deecbb52e1..ff30b88fa6 100644 --- a/dimos/msgs/nav_msgs/Path.py +++ b/dimos/msgs/nav_msgs/Path.py @@ -30,6 +30,7 @@ from nav_msgs.msg import Path as ROSPath # type: ignore[attr-defined] except ImportError: ROSPath = None # type: ignore[assignment, misc] +import rerun as rr from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.types.timestamped import Timestamped @@ -231,3 +232,18 @@ def to_ros_msg(self) -> ROSPath: ros_msg.poses.append(pose.to_ros_msg()) return ros_msg + + def to_rerun(self, color: tuple[int, int, int] = (0, 255, 128)): # type: ignore[no-untyped-def] + """Convert to rerun LineStrips3D format. + + Args: + color: RGB color tuple for the path line + + Returns: + rr.LineStrips3D archetype for logging to rerun + """ + if not self.poses: + return rr.LineStrips3D([]) + + points = [[p.x, p.y, p.z] for p in self.poses] + return rr.LineStrips3D([points], colors=[color]) diff --git a/dimos/msgs/sensor_msgs/CameraInfo.py b/dimos/msgs/sensor_msgs/CameraInfo.py index 482ec44b48..c54b6565fa 100644 --- a/dimos/msgs/sensor_msgs/CameraInfo.py +++ b/dimos/msgs/sensor_msgs/CameraInfo.py @@ -20,6 +20,7 @@ from dimos_lcm.sensor_msgs import CameraInfo as LCMCameraInfo from dimos_lcm.std_msgs.Header import Header import numpy as np +import rerun as rr # Import ROS types try: @@ -372,6 +373,28 @@ def __eq__(self, other) -> bool: # type: ignore[no-untyped-def] and self.frame_id == other.frame_id ) + def to_rerun(self, image_plane_distance: float = 0.5): # type: ignore[no-untyped-def] + """Convert to Rerun Pinhole archetype for camera frustum visualization. + + Args: + image_plane_distance: Distance to draw the image plane in the frustum + + Returns: + rr.Pinhole archetype for logging to Rerun + """ + # Extract intrinsics from K matrix + # K = [fx, 0, cx, 0, fy, cy, 0, 0, 1] + fx, fy = self.K[0], self.K[4] + cx, cy = self.K[2], self.K[5] + + return rr.Pinhole( + focal_length=[fx, fy], + principal_point=[cx, cy], + width=self.width, + height=self.height, + image_plane_distance=image_plane_distance, + ) + class CalibrationProvider: """Provides lazy-loaded access to camera calibration YAML files in a directory.""" diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 4e07544444..cab6526f3b 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -16,7 +16,7 @@ import base64 import time -from typing import TYPE_CHECKING, Literal, TypedDict +from typing import TYPE_CHECKING, Any, Literal, TypedDict import cv2 from dimos_lcm.sensor_msgs.Image import Image as LCMImage @@ -319,6 +319,10 @@ def to_bgr(self) -> Image: def to_grayscale(self) -> Image: return Image(self._impl.to_grayscale()) + def to_rerun(self) -> Any: + """Convert to rerun Image format.""" + return self._impl.to_rerun() + def resize(self, width: int, height: int, interpolation: int = cv2.INTER_LINEAR) -> Image: return Image(self._impl.resize(width, height, interpolation)) diff --git a/dimos/msgs/sensor_msgs/PointCloud2.py b/dimos/msgs/sensor_msgs/PointCloud2.py index 78e979b998..1e842a0b49 100644 --- a/dimos/msgs/sensor_msgs/PointCloud2.py +++ b/dimos/msgs/sensor_msgs/PointCloud2.py @@ -21,11 +21,13 @@ from dimos_lcm.sensor_msgs.PointCloud2 import ( PointCloud2 as LCMPointCloud2, ) -from dimos_lcm.sensor_msgs.PointField import PointField -from dimos_lcm.std_msgs.Header import Header +from dimos_lcm.sensor_msgs.PointField import PointField # type: ignore[import-untyped] +from dimos_lcm.std_msgs.Header import Header # type: ignore[import-untyped] +import matplotlib.pyplot as plt import numpy as np import open3d as o3d # type: ignore[import-untyped] import open3d.core as o3c # type: ignore[import-untyped] +import rerun as rr from dimos.msgs.geometry_msgs import Vector3 @@ -44,6 +46,12 @@ from dimos.types.timestamped import Timestamped +@functools.lru_cache(maxsize=16) +def _get_matplotlib_cmap(name: str): # type: ignore[no-untyped-def] + """Get a matplotlib colormap by name (cached for performance).""" + return plt.get_cmap(name) + + # TODO: encode/decode need to be updated to work with full spectrum of pointcloud2 fields class PointCloud2(Timestamped): msg_name = "sensor_msgs.PointCloud2" @@ -410,6 +418,62 @@ def __len__(self) -> int: return 0 return int(self._pcd_tensor.point["positions"].shape[0]) + def to_rerun( # type: ignore[no-untyped-def] + self, + radii: float = 0.02, + colormap: str | None = None, + colors: list[int] | None = None, + mode: str = "boxes", + size: float | None = None, + fill_mode: str = "solid", + **kwargs, # type: ignore[no-untyped-def] + ): # type: ignore[no-untyped-def] + """Convert to Rerun Points3D or Boxes3D archetype. + + Args: + radii: Point radius for visualization (only for mode="points") + colormap: Optional colormap name (e.g., "turbo", "viridis") to color by height + colors: Optional RGB color [r, g, b] for all points (0-255) + mode: Visualization mode - "points" for spheres, "boxes" for cubes (default) + size: Box size for mode="boxes" (e.g., voxel_size). Defaults to radii*2. + fill_mode: Fill mode for boxes - "solid", "majorwireframe", or "densewireframe" + **kwargs: Additional args (ignored for compatibility) + + Returns: + rr.Points3D or rr.Boxes3D archetype for logging to Rerun + """ + points = self.as_numpy() + if len(points) == 0: + return rr.Points3D([]) if mode == "points" else rr.Boxes3D(centers=[]) + + # Determine colors + point_colors = None + if colormap is not None: + # Color by height (z-coordinate) + z = points[:, 2] + z_norm = (z - z.min()) / (z.max() - z.min() + 1e-8) + cmap = _get_matplotlib_cmap(colormap) + point_colors = (cmap(z_norm)[:, :3] * 255).astype(np.uint8) + elif colors is not None: + point_colors = colors + + if mode == "boxes": + # Use boxes for voxel visualization + box_size = size if size is not None else radii * 2 + half = box_size / 2 + return rr.Boxes3D( + centers=points, + half_sizes=[half, half, half], + colors=point_colors, + fill_mode=fill_mode, # type: ignore[arg-type] + ) + else: + return rr.Points3D( + positions=points, + radii=radii, + colors=point_colors, + ) + def filter_by_height( self, min_height: float | None = None, diff --git a/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py b/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py index 8de01f9a9d..f5d92a3bc6 100644 --- a/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py +++ b/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py @@ -22,6 +22,7 @@ import cv2 import numpy as np +import rerun as rr try: import cupy as cp # type: ignore[import-not-found] @@ -108,6 +109,37 @@ def _encode_nvimgcodec_cuda(bgr_cu, quality: int = 80) -> bytes: # type: ignore return bytes(bs0) +def format_to_rerun(data, fmt: ImageFormat): # type: ignore[no-untyped-def] + """Convert image data to Rerun archetype based on format. + + Args: + data: Image data (numpy array or cupy array on CPU) + fmt: ImageFormat enum value + + Returns: + Rerun archetype (rr.Image or rr.DepthImage) + """ + match fmt: + case ImageFormat.RGB: + return rr.Image(data, color_model="RGB") + case ImageFormat.RGBA: + return rr.Image(data, color_model="RGBA") + case ImageFormat.BGR: + return rr.Image(data, color_model="BGR") + case ImageFormat.BGRA: + return rr.Image(data, color_model="BGRA") + case ImageFormat.GRAY: + return rr.Image(data, color_model="L") + case ImageFormat.GRAY16: + return rr.Image(data, color_model="L") + case ImageFormat.DEPTH: + return rr.DepthImage(data) + case ImageFormat.DEPTH16: + return rr.DepthImage(data) + case _: + raise ValueError(f"Unsupported format for Rerun: {fmt}") + + class AbstractImage(ABC): data: Any format: ImageFormat @@ -165,6 +197,10 @@ def resize( ) -> AbstractImage: # pragma: no cover - abstract ... + @abstractmethod + def to_rerun(self) -> Any: # pragma: no cover - abstract + ... + @abstractmethod def sharpness(self) -> float: # pragma: no cover - abstract ... diff --git a/dimos/msgs/sensor_msgs/image_impls/CudaImage.py b/dimos/msgs/sensor_msgs/image_impls/CudaImage.py index 027719ffc6..8230daae29 100644 --- a/dimos/msgs/sensor_msgs/image_impls/CudaImage.py +++ b/dimos/msgs/sensor_msgs/image_impls/CudaImage.py @@ -647,6 +647,20 @@ def resize(self, width: int, height: int, interpolation: int = cv2.INTER_LINEAR) _resize_bilinear_hwc_cuda(self.data, height, width), self.format, self.frame_id, self.ts ) + def to_rerun(self): # type: ignore[no-untyped-def] + """Convert to rerun Image format. + + Transfers data from GPU to CPU and converts to appropriate format. + + Returns: + rr.Image or rr.DepthImage archetype for logging to rerun + """ + from dimos.msgs.sensor_msgs.image_impls.AbstractImage import format_to_rerun + + # Transfer to CPU + cpu_data = cp.asnumpy(self.data) + return format_to_rerun(cpu_data, self.format) + def crop(self, x: int, y: int, width: int, height: int) -> CudaImage: """Crop the image to the specified region. diff --git a/dimos/msgs/sensor_msgs/image_impls/NumpyImage.py b/dimos/msgs/sensor_msgs/image_impls/NumpyImage.py index ff6272e30c..250b951371 100644 --- a/dimos/msgs/sensor_msgs/image_impls/NumpyImage.py +++ b/dimos/msgs/sensor_msgs/image_impls/NumpyImage.py @@ -126,6 +126,12 @@ def to_grayscale(self) -> NumpyImage: ) raise ValueError(f"Unsupported format: {self.format}") + def to_rerun(self): # type: ignore[no-untyped-def] + """Convert to rerun Image format.""" + from dimos.msgs.sensor_msgs.image_impls.AbstractImage import format_to_rerun + + return format_to_rerun(self.data, self.format) + def resize(self, width: int, height: int, interpolation: int = cv2.INTER_LINEAR) -> NumpyImage: return NumpyImage( cv2.resize(self.data, (width, height), interpolation=interpolation), diff --git a/dimos/msgs/tf2_msgs/TFMessage.py b/dimos/msgs/tf2_msgs/TFMessage.py index 225580b83e..29e890de47 100644 --- a/dimos/msgs/tf2_msgs/TFMessage.py +++ b/dimos/msgs/tf2_msgs/TFMessage.py @@ -159,3 +159,22 @@ def to_ros_msg(self) -> ROSTFMessage: ros_msg.transforms.append(transform.to_ros_transform_stamped()) return ros_msg + + def to_rerun(self): # type: ignore[no-untyped-def] + """Convert to a list of rerun Transform3D archetypes. + + Returns a list of tuples (entity_path, Transform3D) for each transform + in the message. The entity_path is derived from the child_frame_id. + + Returns: + List of (entity_path, rr.Transform3D) tuples + + Example: + for path, transform in tf_msg.to_rerun(): + rr.log(path, transform) + """ + results = [] + for transform in self.transforms: + entity_path = f"world/{transform.child_frame_id}" + results.append((entity_path, transform.to_rerun())) # type: ignore[no-untyped-call] + return results diff --git a/dimos/navigation/replanning_a_star/module.py b/dimos/navigation/replanning_a_star/module.py index 9c43c9037c..d8b582be83 100644 --- a/dimos/navigation/replanning_a_star/module.py +++ b/dimos/navigation/replanning_a_star/module.py @@ -19,6 +19,7 @@ from dimos.core import In, Module, Out, rpc from dimos.core.global_config import GlobalConfig +from dimos.dashboard.rerun_init import connect_rerun from dimos.msgs.geometry_msgs import PoseStamped, Twist from dimos.msgs.nav_msgs import OccupancyGrid, Path from dimos.msgs.sensor_msgs import Image @@ -49,6 +50,10 @@ def __init__(self, global_config: GlobalConfig | None = None) -> None: @rpc def start(self) -> None: super().start() + connect_rerun(global_config=self._global_config) + + # Auto-log path to Rerun + self.path.autolog_to_rerun("world/nav/path") unsub = self.odom.subscribe(self._planner.handle_odom) self._disposables.add(Disposable(unsub)) diff --git a/dimos/perception/detection/module2D.py b/dimos/perception/detection/module2D.py index fdf276bcae..cfca3b2192 100644 --- a/dimos/perception/detection/module2D.py +++ b/dimos/perception/detection/module2D.py @@ -56,7 +56,7 @@ class Detection2DModule(Module): config: Config detector: Detector - image: In[Image] + color_image: In[Image] detections: Out[Detection2DArray] annotations: Out[ImageAnnotations] @@ -82,7 +82,7 @@ def process_image_frame(self, image: Image) -> ImageDetections2D: @simple_mcache def sharp_image_stream(self) -> Observable[Image]: return backpressure( - self.image.pure_observable().pipe( + self.color_image.pure_observable().pipe( sharpness_barrier(self.config.max_freq), ) ) @@ -166,7 +166,7 @@ def deploy( # type: ignore[no-untyped-def] from dimos.core import LCMTransport detector = Detection2DModule(**kwargs) - detector.image.connect(camera.color_image) + detector.color_image.connect(camera.color_image) detector.annotations.transport = LCMTransport(f"{prefix}/annotations", ImageAnnotations) detector.detections.transport = LCMTransport(f"{prefix}/detections", Detection2DArray) diff --git a/dimos/perception/detection/module3D.py b/dimos/perception/detection/module3D.py index eddba06e3e..037376f995 100644 --- a/dimos/perception/detection/module3D.py +++ b/dimos/perception/detection/module3D.py @@ -37,7 +37,7 @@ class Detection3DModule(Detection2DModule): - image: In[Image] + color_image: In[Image] pointcloud: In[PointCloud2] detections: Out[Detection2DArray] @@ -113,7 +113,7 @@ def ask_vlm(self, question: str) -> str: from dimos.models.vl.qwen import QwenVlModel model = QwenVlModel() - image = self.image.get_next() + image = self.color_image.get_next() return model.query(image, question) # @skill @@ -130,7 +130,7 @@ def nav_vlm(self, question: str) -> str: from dimos.models.vl.qwen import QwenVlModel model = QwenVlModel() - image = self.image.get_next() + image = self.color_image.get_next() result = model.query_detections(image, question) print("VLM result:", result, "for", image, "and question", question) diff --git a/dimos/perception/detection/moduleDB.py b/dimos/perception/detection/moduleDB.py index e9dc1d9518..c37dff8dea 100644 --- a/dimos/perception/detection/moduleDB.py +++ b/dimos/perception/detection/moduleDB.py @@ -23,8 +23,7 @@ from lcm_msgs.foxglove_msgs import SceneUpdate # type: ignore[import-not-found] from reactivex.observable import Observable -from dimos import spec -from dimos.core import DimosCluster, In, Out, rpc +from dimos.core import In, Out, rpc from dimos.msgs.geometry_msgs import PoseStamped, Quaternion, Transform, Vector3 from dimos.msgs.sensor_msgs import Image, PointCloud2 from dimos.msgs.vision_msgs import Detection2DArray @@ -143,7 +142,7 @@ class ObjectDBModule(Detection3DModule, TableStr): goto: Callable[[PoseStamped], Any] | None = None - image: In[Image] + color_image: In[Image] pointcloud: In[PointCloud2] detections: Out[Detection2DArray] @@ -308,36 +307,6 @@ def __len__(self) -> int: return len(self.objects.values()) -def deploy( # type: ignore[no-untyped-def] - dimos: DimosCluster, - lidar: spec.Pointcloud, - camera: spec.Camera, - prefix: str = "/detectorDB", - **kwargs, -) -> Detection3DModule: - from dimos.core import LCMTransport - - detector = dimos.deploy(ObjectDBModule, camera_info=camera.camera_info_stream, **kwargs) # type: ignore[attr-defined] - - detector.image.connect(camera.color_image) - detector.pointcloud.connect(lidar.pointcloud) - - detector.annotations.transport = LCMTransport(f"{prefix}/annotations", ImageAnnotations) - detector.detections.transport = LCMTransport(f"{prefix}/detections", Detection2DArray) - detector.scene_update.transport = LCMTransport(f"{prefix}/scene_update", SceneUpdate) - - detector.detected_image_0.transport = LCMTransport(f"{prefix}/image/0", Image) - detector.detected_image_1.transport = LCMTransport(f"{prefix}/image/1", Image) - detector.detected_image_2.transport = LCMTransport(f"{prefix}/image/2", Image) - - detector.detected_pointcloud_0.transport = LCMTransport(f"{prefix}/pointcloud/0", PointCloud2) - detector.detected_pointcloud_1.transport = LCMTransport(f"{prefix}/pointcloud/1", PointCloud2) - detector.detected_pointcloud_2.transport = LCMTransport(f"{prefix}/pointcloud/2", PointCloud2) - - detector.start() - return detector # type: ignore[no-any-return] - - detectionDB_module = ObjectDBModule.blueprint -__all__ = ["ObjectDBModule", "deploy", "detectionDB_module"] +__all__ = ["ObjectDBModule", "detectionDB_module"] diff --git a/dimos/perception/detection/person_tracker.py b/dimos/perception/detection/person_tracker.py index 4d4fa149dc..6212080858 100644 --- a/dimos/perception/detection/person_tracker.py +++ b/dimos/perception/detection/person_tracker.py @@ -29,7 +29,7 @@ class PersonTracker(Module): detections: In[Detection2DArray] - image: In[Image] + color_image: In[Image] target: Out[PoseStamped] camera_info: CameraInfo @@ -76,7 +76,7 @@ def center_to_3d( def detections_stream(self) -> Observable[ImageDetections2D]: return backpressure( align_timestamped( - self.image.pure_observable(), + self.color_image.pure_observable(), self.detections.pure_observable().pipe( ops.filter(lambda d: d.detections_length > 0) # type: ignore[attr-defined] ), diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index df975d87df..e30d42dc32 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -17,8 +17,9 @@ # The blueprints are defined as import strings so as not to trigger unnecessary imports. all_blueprints = { "unitree-go2": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:nav", - "unitree-go2-nav": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:nav", "unitree-go2-basic": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:basic", + "unitree-go2-nav": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:nav", + "unitree-go2-detection": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:detection", "unitree-go2-spatial": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:spatial", "unitree-go2-agentic": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic", "unitree-go2-agentic-ollama": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic_ollama", diff --git a/dimos/robot/foxglove_bridge.py b/dimos/robot/foxglove_bridge.py index 4a208dd78e..ed14a06495 100644 --- a/dimos/robot/foxglove_bridge.py +++ b/dimos/robot/foxglove_bridge.py @@ -15,6 +15,7 @@ import asyncio import logging import threading +from typing import TYPE_CHECKING, Any # this is missing, I'm just trying to import lcm_foxglove_bridge.py from dimos_lcm from dimos_lcm.foxglove_bridge import ( @@ -22,24 +23,46 @@ ) from dimos.core import DimosCluster, Module, rpc +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.core.global_config import GlobalConfig logging.getLogger("lcm_foxglove_bridge").setLevel(logging.ERROR) logging.getLogger("FoxgloveServer").setLevel(logging.ERROR) +logger = setup_logger() + class FoxgloveBridge(Module): _thread: threading.Thread _loop: asyncio.AbstractEventLoop - - def __init__(self, *args, shm_channels=None, jpeg_shm_channels=None, **kwargs) -> None: # type: ignore[no-untyped-def] + _global_config: "GlobalConfig | None" = None + + def __init__( + self, + *args: Any, + shm_channels: list[str] | None = None, + jpeg_shm_channels: list[str] | None = None, + global_config: "GlobalConfig | None" = None, + **kwargs: Any, + ) -> None: super().__init__(*args, **kwargs) self.shm_channels = shm_channels or [] self.jpeg_shm_channels = jpeg_shm_channels or [] + self._global_config = global_config @rpc def start(self) -> None: super().start() + # Skip if Rerun is the selected viewer backend + if self._global_config and self._global_config.viewer_backend.startswith("rerun"): + logger.info( + "Foxglove bridge skipped", viewer_backend=self._global_config.viewer_backend + ) + return + def run_bridge() -> None: self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) diff --git a/dimos/robot/unitree/connection/go2.py b/dimos/robot/unitree/connection/go2.py index 0e9037f25d..03d7677f48 100644 --- a/dimos/robot/unitree/connection/go2.py +++ b/dimos/robot/unitree/connection/go2.py @@ -13,16 +13,20 @@ # limitations under the License. import logging +from pathlib import Path from threading import Thread import time from typing import Any, Protocol from reactivex.disposable import Disposable from reactivex.observable import Observable +import rerun as rr +import rerun.blueprint as rrb from dimos import spec from dimos.core import DimosCluster, In, LCMTransport, Module, Out, pSHMTransport, rpc from dimos.core.global_config import GlobalConfig +from dimos.dashboard.rerun_init import connect_rerun from dimos.msgs.geometry_msgs import ( PoseStamped, Quaternion, @@ -40,6 +44,9 @@ logger = setup_logger(level=logging.INFO) +# URDF path for Go2 robot +_GO2_URDF = Path(__file__).parent.parent / "go2" / "go2.urdf" + class Go2ConnectionProtocol(Protocol): """Protocol defining the interface for Go2 robot connections.""" @@ -137,6 +144,16 @@ class GO2Connection(Module, spec.Camera, spec.Pointcloud): _global_config: GlobalConfig _camera_info_thread: Thread | None = None + @classmethod + def rerun_views(cls): # type: ignore[no-untyped-def] + """Return Rerun view blueprints for GO2 camera visualization.""" + return [ + rrb.Spatial2DView( + name="Camera", + origin="world/robot/camera/rgb", + ), + ] + def __init__( # type: ignore[no-untyped-def] self, ip: str | None = None, @@ -179,9 +196,17 @@ def start(self) -> None: self.connection.start() + # Initialize Rerun world frame and load URDF (only if Rerun backend) + if self._global_config.viewer_backend.startswith("rerun"): + self._init_rerun_world() + + def onimage(image: Image) -> None: + self.color_image.publish(image) + rr.log("world/robot/camera/rgb", image.to_rerun()) + self._disposables.add(self.connection.lidar_stream().subscribe(self.lidar.publish)) self._disposables.add(self.connection.odom_stream().subscribe(self._publish_tf)) - self._disposables.add(self.connection.video_stream().subscribe(self.color_image.publish)) + self._disposables.add(self.connection.video_stream().subscribe(onimage)) self._disposables.add(Disposable(self.cmd_vel.subscribe(self.move))) self._camera_info_thread = Thread( @@ -193,6 +218,45 @@ def start(self) -> None: self.standup() # self.record("go2_bigoffice") + def _init_rerun_world(self) -> None: + """Set up Rerun world frame, load URDF, and static assets. + + Does NOT compose blueprint - that's handled by ModuleBlueprintSet.build(). + """ + connect_rerun(global_config=self._global_config) + + # Set up world coordinate system AND register it as a named frame + # This is KEY - it connects entity paths to the named frame system + rr.log( + "world", + rr.ViewCoordinates.RIGHT_HAND_Z_UP, + rr.CoordinateFrame("world"), # type: ignore[attr-defined] + static=True, + ) + + # Bridge the named frame "world" to the implicit frame hierarchy "tf#/world" + # This connects TF named frames to entity path hierarchy + rr.log( + "world", + rr.Transform3D( + parent_frame="world", # type: ignore[call-arg] + child_frame="tf#/world", # type: ignore[call-arg] + ), + static=True, + ) + + # Load robot URDF + if _GO2_URDF.exists(): + rr.log_file_from_path( + str(_GO2_URDF), + entity_path_prefix="world/robot", + static=True, + ) + logger.info(f"Loaded URDF from {_GO2_URDF}") + + # Log static camera pinhole (for frustum) + rr.log("world/robot/camera", _camera_info_static().to_rerun(), static=True) + @rpc def stop(self) -> None: self.liedown() @@ -248,10 +312,47 @@ def _odom_to_tf(cls, odom: PoseStamped) -> list[Transform]: ] def _publish_tf(self, msg: PoseStamped) -> None: - self.tf.publish(*self._odom_to_tf(msg)) + transforms = self._odom_to_tf(msg) + self.tf.publish(*transforms) if self.odom.transport: self.odom.publish(msg) + # Log to Rerun: robot pose (relative to parent entity "world") + rr.log( + "world/robot", + rr.Transform3D( + translation=[msg.x, msg.y, msg.z], + rotation=rr.Quaternion( + xyzw=[ + msg.orientation.x, + msg.orientation.y, + msg.orientation.z, + msg.orientation.w, + ] + ), + ), + ) + # Log axes as a child entity for visualization + rr.log("world/robot/axes", rr.TransformAxes3D(0.5)) # type: ignore[attr-defined] + + # Log camera transform (compose base_link -> camera_link -> camera_optical) + # transforms[1] is camera_link, transforms[2] is camera_optical + cam_tf = transforms[1] + transforms[2] # compose transforms + rr.log( + "world/robot/camera", + rr.Transform3D( + translation=[cam_tf.translation.x, cam_tf.translation.y, cam_tf.translation.z], + rotation=rr.Quaternion( + xyzw=[ + cam_tf.rotation.x, + cam_tf.rotation.y, + cam_tf.rotation.z, + cam_tf.rotation.w, + ] + ), + ), + ) + def publish_camera_info(self) -> None: while True: self.camera_info.publish(_camera_info_static()) diff --git a/dimos/robot/unitree/g1/g1detector.py b/dimos/robot/unitree/g1/g1detector.py index ca549025af..55986eb087 100644 --- a/dimos/robot/unitree/g1/g1detector.py +++ b/dimos/robot/unitree/g1/g1detector.py @@ -31,7 +31,7 @@ def deploy(dimos: DimosCluster, ip: str): # type: ignore[no-untyped-def] detector=YoloPersonDetector, ) - detector3d = moduleDB.deploy( + detector3d = moduleDB.deploy( # type: ignore[attr-defined] dimos, camera=camera, lidar=nav, diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index bf65447e25..8173f27527 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -16,6 +16,10 @@ import platform +from dimos_lcm.foxglove_msgs.ImageAnnotations import ( # type: ignore[import-untyped] + ImageAnnotations, +) + from dimos.agents.agent import llm_agent from dimos.agents.cli.human import human_input from dimos.agents.cli.web import web_input @@ -25,19 +29,21 @@ from dimos.agents.spec import Provider from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core.blueprints import autoconnect -from dimos.core.transport import JpegLcmTransport, JpegShmTransport, pSHMTransport +from dimos.core.transport import JpegLcmTransport, JpegShmTransport, LCMTransport, pSHMTransport from dimos.mapping.costmapper import cost_mapper from dimos.mapping.voxels import voxel_mapper -from dimos.msgs.sensor_msgs import Image +from dimos.msgs.sensor_msgs import Image, PointCloud2 +from dimos.msgs.vision_msgs import Detection2DArray from dimos.navigation.frontier_exploration import ( wavefront_frontier_explorer, ) from dimos.navigation.replanning_a_star.module import ( replanning_a_star_planner, ) +from dimos.perception.detection.moduleDB import ObjectDBModule, detectionDB_module from dimos.perception.spatial_perception import spatial_memory from dimos.robot.foxglove_bridge import foxglove_bridge -from dimos.robot.unitree.connection.go2 import go2_connection +from dimos.robot.unitree.connection.go2 import GO2Connection, go2_connection from dimos.robot.unitree_webrtc.unitree_skill_container import unitree_skills from dimos.utils.monitoring import utilization from dimos.web.websocket_vis.websocket_vis_module import websocket_vis @@ -71,12 +77,53 @@ nav = autoconnect( basic, - voxel_mapper(voxel_size=0.05), + voxel_mapper(voxel_size=0.1), cost_mapper(), replanning_a_star_planner(), wavefront_frontier_explorer(), ).global_config(n_dask_workers=6, robot_model="unitree_go2") +detection = ( + autoconnect( + nav, + detectionDB_module( + camera_info=GO2Connection.camera_info_static, + ), + ) + .remappings( + [ + (ObjectDBModule, "pointcloud", "global_map"), + ] + ) + .transports( + { + # Detection 3D module outputs + ("detections", ObjectDBModule): LCMTransport( + "/detector3d/detections", Detection2DArray + ), + ("annotations", ObjectDBModule): LCMTransport( + "/detector3d/annotations", ImageAnnotations + ), + # ("scene_update", ObjectDBModule): LCMTransport( + # "/detector3d/scene_update", SceneUpdate + # ), + ("detected_pointcloud_0", ObjectDBModule): LCMTransport( + "/detector3d/pointcloud/0", PointCloud2 + ), + ("detected_pointcloud_1", ObjectDBModule): LCMTransport( + "/detector3d/pointcloud/1", PointCloud2 + ), + ("detected_pointcloud_2", ObjectDBModule): LCMTransport( + "/detector3d/pointcloud/2", PointCloud2 + ), + ("detected_image_0", ObjectDBModule): LCMTransport("/detector3d/image/0", Image), + ("detected_image_1", ObjectDBModule): LCMTransport("/detector3d/image/1", Image), + ("detected_image_2", ObjectDBModule): LCMTransport("/detector3d/image/2", Image), + } + ) +) + + spatial = autoconnect( nav, spatial_memory(), diff --git a/dimos/web/command-center-extension/.gitignore b/dimos/web/command-center-extension/.gitignore index 3f7224ed26..1cb79e0e3c 100644 --- a/dimos/web/command-center-extension/.gitignore +++ b/dimos/web/command-center-extension/.gitignore @@ -1,5 +1,6 @@ *.foxe /dist +/dist-standalone /node_modules !/package.json !/package-lock.json diff --git a/dimos/web/command-center-extension/index.html b/dimos/web/command-center-extension/index.html new file mode 100644 index 0000000000..e1e9ce85ad --- /dev/null +++ b/dimos/web/command-center-extension/index.html @@ -0,0 +1,18 @@ + + +
+ + +