diff --git a/dimos/core/stream.py b/dimos/core/stream.py index e69073f278..fe835f8f5a 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -39,6 +39,35 @@ T = TypeVar("T") +class ObservableMixin(Generic[T]): + # subscribes and returns the first value it receives + # might be nicer to write without rxpy but had this snippet ready + def get_next(self, timeout=10.0) -> T: + try: + return ( + self.observable() + .pipe(ops.first(), *([ops.timeout(timeout)] if timeout is not None else [])) + .run() + ) + except Exception as e: + raise Exception(f"No value received after {timeout} seconds") from e + + def hot_latest(self) -> Callable[[], T]: + return reactive.getter_streaming(self.observable()) + + def pure_observable(self): + def _subscribe(observer, scheduler=None): + unsubscribe = self.subscribe(observer.on_next) + return Disposable(unsubscribe) + + return rx.create(_subscribe) + + # default return is backpressured because most + # use cases will want this by default + def observable(self): + return backpressure(self.pure_observable()) + + class State(enum.Enum): UNBOUND = "unbound" # descriptor defined but not bound READY = "ready" # bound to owner but not yet connected @@ -46,7 +75,7 @@ class State(enum.Enum): FLOWING = "flowing" # runtime: data observed -class Transport(Protocol[T]): +class Transport(ObservableMixin[T]): # used by local Output def broadcast(self, selfstream: Out[T], value: T): ... @@ -153,10 +182,13 @@ class RemoteOut(RemoteStream[T]): def connect(self, other: RemoteIn[T]): return other.connect(self) + def subscribe(self, cb) -> Callable[[], None]: + return self.transport.subscribe(cb, self) + # representation of Input # as views from inside of the module -class In(Stream[T]): +class In(Stream[T], ObservableMixin[T]): connection: Optional[RemoteOut[T]] = None _transport: Transport @@ -183,36 +215,9 @@ def transport(self) -> Transport[T]: def state(self) -> State: # noqa: D401 return State.UNBOUND if self.owner is None else State.READY - # subscribes and returns the first value it receives - # might be nicer to write without rxpy but had this snippet ready - def get_next(self, timeout=10.0) -> T: - try: - return ( - self.observable() - .pipe(ops.first(), *([ops.timeout(timeout)] if timeout is not None else [])) - .run() - ) - except Exception as e: - raise Exception(f"No value received after {timeout} seconds") from e - - def hot_latest(self) -> Callable[[], T]: - return reactive.getter_streaming(self.observable()) - - def pure_observable(self): - def _subscribe(observer, scheduler=None): - unsubscribe = self.subscribe(observer.on_next) - return Disposable(unsubscribe) - - return rx.create(_subscribe) - - # default return is backpressured because most - # use cases will want this by default - def observable(self): - return backpressure(self.pure_observable()) - # returns unsubscribe function def subscribe(self, cb) -> Callable[[], None]: - return self.transport.subscribe(self, cb) + return self.transport.subscribe(cb, self) # representation of input outside of module diff --git a/dimos/core/transport.py b/dimos/core/transport.py index e2a7b7320a..8874482e0a 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -37,7 +37,7 @@ ) import dimos.core.colors as colors -from dimos.core.stream import In, Transport +from dimos.core.stream import In, RemoteIn, Transport from dimos.protocol.pubsub.lcmpubsub import LCM, PickleLCM from dimos.protocol.pubsub.lcmpubsub import Topic as LCMTopic @@ -75,7 +75,7 @@ def broadcast(self, _, msg): self.lcm.publish(self.topic, msg) - def subscribe(self, selfstream: In[T], callback: Callable[[T], None]) -> None: + def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: if not self._started: self.lcm.start() self._started = True @@ -99,7 +99,7 @@ def broadcast(self, _, msg): self.lcm.publish(self.topic, msg) - def subscribe(self, selfstream: In[T], callback: Callable[[T], None]) -> None: + def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: if not self._started: self.lcm.start() self._started = True @@ -144,7 +144,7 @@ def dask_register_subscriber(self, remoteInput: RemoteIn[T]) -> None: self.subscribers.append(remoteInput) # for inputs - def subscribe(self, selfstream: In[T], callback: Callable[[T], None]) -> None: + def subscribe(self, callback: Callable[[T], None], selfstream: In[T]) -> None: if not self._started: selfstream.connection.owner.dask_register_subscriber( selfstream.connection.name, selfstream diff --git a/dimos/robot/unitree_webrtc/type/lidar.py b/dimos/robot/unitree_webrtc/type/lidar.py index 8481647d18..03ab277c82 100644 --- a/dimos/robot/unitree_webrtc/type/lidar.py +++ b/dimos/robot/unitree_webrtc/type/lidar.py @@ -63,7 +63,7 @@ def __init__(self, **kwargs): ) self.origin = kwargs.get("origin") - self.resolution = kwargs.get("resolution") + self.resolution = kwargs.get("resolution", 0.05) @classmethod def from_msg(cls: "LidarMessage", raw_message: RawLidarMsg) -> "LidarMessage": diff --git a/dimos/robot/unitree_webrtc/type/map.py b/dimos/robot/unitree_webrtc/type/map.py index 46d311b039..b9e820eea8 100644 --- a/dimos/robot/unitree_webrtc/type/map.py +++ b/dimos/robot/unitree_webrtc/type/map.py @@ -91,6 +91,7 @@ def add_frame(self, frame: LidarMessage) -> "Map": """Voxelise *frame* and splice it into the running map.""" new_pct = frame.pointcloud.voxel_down_sample(voxel_size=self.voxel_size) self.pointcloud = splice_cylinder(self.pointcloud, new_pct, shrink=0.5) + return self def consume(self, observable: Observable[LidarMessage]) -> Observable["Map"]: """Reactive operator that folds a stream of `LidarMessage` into the map.""" diff --git a/dimos/utils/cli/boxglove/boxglove.py b/dimos/utils/cli/boxglove/boxglove.py new file mode 100644 index 0000000000..eabd13800b --- /dev/null +++ b/dimos/utils/cli/boxglove/boxglove.py @@ -0,0 +1,296 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Optional + +import numpy as np +import reactivex.operators as ops +from rich.text import Text +from textual.app import App, ComposeResult +from textual.color import Color +from textual.containers import Container +from textual.reactive import reactive +from textual.widgets import Footer, Header, Label, Static + +from dimos import core +from dimos.msgs.geometry_msgs import Pose, PoseStamped, Transform, Vector3 +from dimos.msgs.nav_msgs import OccupancyGrid +from dimos.msgs.sensor_msgs import Image, PointCloud2 +from dimos.robot.unitree_webrtc.multiprocess.unitree_go2_navonly import ConnectionModule +from dimos.robot.unitree_webrtc.type.lidar import LidarMessage +from dimos.utils.cli.boxglove.connection import Connection + +if TYPE_CHECKING: + from reactivex.disposable import Disposable + + from dimos.msgs.nav_msgs import OccupancyGrid + + +blocks = "█▗▖▝▘" +shades = "█░░░░" +crosses = "┼┌┐└┘" +quadrant = "█▟▙▜▛" +triangles = "◼◢◣◥◤" # 45-degree triangular blocks + + +alphabet = crosses + +# Box drawing characters for smooth edges +top_left = alphabet[1] # Quadrant lower right +top_right = alphabet[2] # Quadrant lower left +bottom_left = alphabet[3] # Quadrant upper right +bottom_right = alphabet[4] # Quadrant upper left +full = alphabet[0] # Full block + + +class OccupancyGridApp(App): + """A Textual app for visualizing OccupancyGrid data in real-time.""" + + CSS = """ + Screen { + layout: vertical; + overflow: hidden; + } + + #grid-container { + width: 100%; + height: 1fr; + overflow: hidden; + margin: 0; + padding: 0; + } + + #grid-display { + width: 100%; + height: 100%; + margin: 0; + padding: 0; + } + + Footer { + dock: bottom; + height: 1; + } + """ + + # Reactive properties + grid_data: reactive[Optional["OccupancyGrid"]] = reactive(None) + + BINDINGS = [ + ("q", "quit", "Quit"), + ("ctrl+c", "quit", "Quit"), + ] + + def __init__(self, connection: Connection, *args, **kwargs): + super().__init__(*args, **kwargs) + self.connection = connection + self.subscription: Optional[Disposable] = None + self.grid_display: Optional[Static] = None + self.cached_grid: Optional["OccupancyGrid"] = None + + def compose(self) -> ComposeResult: + """Create the app layout.""" + # Container for the grid (no scrolling since we scale to fit) + with Container(id="grid-container"): + self.grid_display = Static("", id="grid-display") + yield self.grid_display + + yield Footer() + + def on_mount(self) -> None: + """Subscribe to the connection when the app starts.""" + self.theme = "flexoki" + + # Subscribe to the OccupancyGrid stream + def on_grid(grid: "OccupancyGrid") -> None: + self.grid_data = grid + + def on_error(error: Exception) -> None: + self.notify(f"Error: {error}", severity="error") + + self.subscription = self.connection().subscribe(on_next=on_grid, on_error=on_error) + + async def on_unmount(self) -> None: + """Clean up subscription when app closes.""" + if self.subscription: + self.subscription.dispose() + + def watch_grid_data(self, grid: Optional["OccupancyGrid"]) -> None: + """Update display when new grid data arrives.""" + if grid is None: + return + + # Cache the grid for rerendering on terminal resize + self.cached_grid = grid + + # Render the grid as ASCII art + grid_text = self.render_grid(grid) + self.grid_display.update(grid_text) + + def on_resize(self, event) -> None: + """Handle terminal resize events.""" + if self.cached_grid: + # Re-render with new terminal dimensions + grid_text = self.render_grid(self.cached_grid) + self.grid_display.update(grid_text) + + def render_grid(self, grid: "OccupancyGrid") -> Text: + """Render the OccupancyGrid as colored ASCII art, scaled to fit terminal.""" + text = Text() + + # Get the actual container dimensions + container = self.query_one("#grid-container") + content_width = container.content_size.width + content_height = container.content_size.height + + # Each cell will be 2 chars wide to make square pixels + terminal_width = max(1, content_width // 2) + terminal_height = max(1, content_height) + + # Handle edge cases + if grid.width == 0 or grid.height == 0: + return text # Return empty text for empty grid + + # Calculate scaling factors (as floats for smoother scaling) + scale_x = grid.width / terminal_width + scale_y = grid.height / terminal_height + + # Use the larger scale to ensure the grid fits + scale_float = max(1.0, max(scale_x, scale_y)) + + # For smoother resizing, we'll use fractional scaling + # This means we might sample between grid cells + render_width = min(int(grid.width / scale_float), terminal_width) + render_height = min(int(grid.height / scale_float), terminal_height) + + # Store both integer and float scale for different uses + scale = int(np.ceil(scale_float)) # For legacy compatibility + + # Adjust render dimensions to use all available space + # This reduces jumping by allowing fractional cell sizes + actual_scale_x = grid.width / render_width if render_width > 0 else 1 + actual_scale_y = grid.height / render_height if render_height > 0 else 1 + + # Function to get value with fractional scaling + def get_cell_value(grid_data: np.ndarray, x: int, y: int) -> int: + # Use fractional coordinates for smoother scaling + y_center = int((y + 0.5) * actual_scale_y) + x_center = int((x + 0.5) * actual_scale_x) + + # Clamp to grid bounds + y_center = max(0, min(y_center, grid.height - 1)) + x_center = max(0, min(x_center, grid.width - 1)) + + # For now, just sample the center point + # Could do area averaging for smoother results + return grid_data[y_center, x_center] + + # Helper function to check if a cell is an obstacle + def is_obstacle(grid_data: np.ndarray, x: int, y: int) -> bool: + if x < 0 or x >= render_width or y < 0 or y >= render_height: + return False + value = get_cell_value(grid_data, x, y) + return value > 90 # Consider cells with >90% probability as obstacles + + # Character and color mapping with intelligent obstacle rendering + def get_cell_char_and_style(grid_data: np.ndarray, x: int, y: int) -> tuple[str, str]: + value = get_cell_value(grid_data, x, y) + norm_value = min(value, 100) / 100.0 + + if norm_value > 0.9: + # Check neighbors for intelligent character selection + top = is_obstacle(grid_data, x, y + 1) + bottom = is_obstacle(grid_data, x, y - 1) + left = is_obstacle(grid_data, x - 1, y) + right = is_obstacle(grid_data, x + 1, y) + + # Count neighbors + neighbor_count = sum([top, bottom, left, right]) + + # Select character based on neighbor configuration + if neighbor_count == 4: + # All neighbors are obstacles - use full block + symbol = full + full + elif neighbor_count == 3: + # Three neighbors - use full block (interior edge) + symbol = full + full + elif neighbor_count == 2: + # Two neighbors - check configuration + if top and bottom: + symbol = full + full # Vertical corridor + elif left and right: + symbol = full + full # Horizontal corridor + elif top and left: + symbol = bottom_right + " " + elif top and right: + symbol = " " + bottom_left + elif bottom and left: + symbol = top_right + " " + elif bottom and right: + symbol = " " + top_left + else: + symbol = full + full + elif neighbor_count == 1: + # One neighbor - point towards it + if top: + symbol = bottom_left + bottom_right + elif bottom: + symbol = top_left + top_right + elif left: + symbol = top_right + bottom_right + elif right: + symbol = top_left + bottom_left + else: + symbol = full + full + else: + # No neighbors - isolated obstacle + symbol = full + full + + return symbol, None + else: + return " ", None + + # Render the scaled grid row by row (flip Y axis for proper display) + for y in range(render_height - 1, -1, -1): + for x in range(render_width): + char, style = get_cell_char_and_style(grid.grid, x, y) + text.append(char, style=style) + if y > 0: # Add newline except for last row + text.append("\n") + + # Could show scale info in footer status if needed + + return text + + +def main(): + """Run the OccupancyGrid visualizer with a connection.""" + # app = OccupancyGridApp(core.LCMTransport("/global_costmap", OccupancyGrid).observable) + + app = OccupancyGridApp( + lambda: core.LCMTransport("/lidar", LidarMessage) + .observable() + .pipe(ops.map(lambda msg: msg.costmap())) + ) + app.run() + import time + + while True: + time.sleep(1) + + +if __name__ == "__main__": + main() diff --git a/dimos/utils/cli/boxglove/connection.py b/dimos/utils/cli/boxglove/connection.py new file mode 100644 index 0000000000..2c1f91469c --- /dev/null +++ b/dimos/utils/cli/boxglove/connection.py @@ -0,0 +1,72 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pickle +import time +from typing import Callable + +import reactivex as rx +from reactivex import operators as ops +from reactivex.disposable import Disposable +from reactivex.observable import Observable + +from dimos.msgs.nav_msgs import OccupancyGrid +from dimos.msgs.sensor_msgs import PointCloud2 +from dimos.protocol.pubsub import lcm +from dimos.robot.unitree_webrtc.type.lidar import LidarMessage +from dimos.robot.unitree_webrtc.type.map import Map +from dimos.utils.data import get_data +from dimos.utils.reactive import backpressure +from dimos.utils.testing import TimedSensorReplay + +Connection = Callable[[], Observable[OccupancyGrid]] + + +def live_connection() -> Observable[OccupancyGrid]: + def subscribe(observer, scheduler=None): + lcm.autoconf() + l = lcm.LCM() + + def on_message(grid: OccupancyGrid, _): + observer.on_next(grid) + + l.subscribe(lcm.Topic("/global_costmap", OccupancyGrid), on_message) + l.start() + + def dispose(): + l.stop() + + return Disposable(dispose) + + return rx.create(subscribe) + + +def recorded_connection() -> Observable[OccupancyGrid]: + lidar_store = TimedSensorReplay("unitree_office_walk/lidar", autocast=LidarMessage.from_msg) + mapper = Map() + return backpressure( + lidar_store.stream(speed=1).pipe( + ops.map(mapper.add_frame), + ops.map(lambda _: mapper.costmap().inflate(0.1).gradient()), + ) + ) + + +def single_message() -> Observable[OccupancyGrid]: + pointcloud_pickle = get_data("lcm_msgs") / "sensor_msgs/PointCloud2.pickle" + with open(pointcloud_pickle, "rb") as f: + pointcloud = PointCloud2.lcm_decode(pickle.load(f)) + mapper = Map() + mapper.add_frame(pointcloud) + return rx.just(mapper.costmap()) diff --git a/dimos/utils/testing.py b/dimos/utils/testing.py index d05688fbbd..b70b8ed71f 100644 --- a/dimos/utils/testing.py +++ b/dimos/utils/testing.py @@ -20,6 +20,7 @@ from typing import Any, Callable, Generic, Iterator, Optional, Tuple, TypeVar, Union from reactivex import ( + concat, concat_with_iterable, empty, from_iterable, @@ -27,7 +28,6 @@ just, merge, timer, - concat, ) from reactivex import operators as ops from reactivex import timer as rx_timer @@ -177,7 +177,7 @@ def iterate(self) -> Iterator[Union[T, Any]]: def iterate_ts(self) -> Iterator[Union[Tuple[float, T], Any]]: return super().iterate() - def stream(self) -> Observable[Union[T, Any]]: + def stream(self, speed=1.0) -> Observable[Union[T, Any]]: def _subscribe(observer, scheduler=None): from reactivex.disposable import CompositeDisposable, Disposable @@ -203,7 +203,7 @@ def emit_next(prev_timestamp): observer.on_completed() return - delay = max(0.0, ts - prev_timestamp) + delay = max(0.0, ts - prev_timestamp) / speed def _action(sc, _state=None): observer.on_next(data) diff --git a/flake.nix b/flake.nix index fbbf45591d..7101de506f 100644 --- a/flake.nix +++ b/flake.nix @@ -17,7 +17,7 @@ devPackages = with pkgs; [ ### Core shell & utils bashInteractive coreutils gh - stdenv.cc.cc.lib + stdenv.cc.cc.lib pcre2 ### Python + static analysis python312 python312Packages.pip python312Packages.setuptools @@ -57,7 +57,7 @@ pkgs.xorg.libXrender pkgs.xorg.libXdamage pkgs.xorg.libXcomposite pkgs.xorg.libxcb pkgs.xorg.libXScrnSaver pkgs.xorg.libXxf86vm pkgs.udev pkgs.portaudio pkgs.SDL2.dev pkgs.zlib pkgs.glib pkgs.gtk3 - pkgs.gdk-pixbuf pkgs.gobject-introspection pkgs.lcm]}:$LD_LIBRARY_PATH" + pkgs.gdk-pixbuf pkgs.gobject-introspection pkgs.lcm pkgs.pcre2]}:$LD_LIBRARY_PATH" export DISPLAY=:0