diff --git a/.github/workflows/code-cleanup.yml b/.github/workflows/code-cleanup.yml index 7177b3f9d1..745f3852c1 100644 --- a/.github/workflows/code-cleanup.yml +++ b/.github/workflows/code-cleanup.yml @@ -1,7 +1,8 @@ name: code-cleanup on: - push: pull_request: + paths-ignore: + - '**.md' permissions: contents: write diff --git a/README.md b/README.md index 991481c841..3f0d25949b 100644 --- a/README.md +++ b/README.md @@ -1,252 +1,215 @@ +
- banner_bordered_trimmed -

Program Atoms

-

The Agentive Operating System for Generalist Robotics

+![Dimensional](assets/dimensional-dark.svg#gh-dark-mode-only) +![Dimensional](assets/dimensional-light.svg#gh-light-mode-only) -
+

The Agentive Operating System for Generalist Robotics

[![Discord](https://img.shields.io/discord/1341146487186391173?style=flat-square&logo=discord&logoColor=white&label=Discord&color=5865F2)](https://discord.gg/dimos) [![Stars](https://img.shields.io/github/stars/dimensionalOS/dimos?style=flat-square)](https://github.com/dimensionalOS/dimos/stargazers) [![Forks](https://img.shields.io/github/forks/dimensionalOS/dimos?style=flat-square)](https://github.com/dimensionalOS/dimos/fork) [![Contributors](https://img.shields.io/github/contributors/dimensionalOS/dimos?style=flat-square)](https://github.com/dimensionalOS/dimos/graphs/contributors) -
![Nix](https://img.shields.io/badge/Nix-flakes-5277C3?style=flat-square&logo=NixOS&logoColor=white) ![NixOS](https://img.shields.io/badge/NixOS-supported-5277C3?style=flat-square&logo=NixOS&logoColor=white) -![CUDA](https://img.shields.io/badge/CUDA-12.x-76B900?style=flat-square&logo=nvidia&logoColor=white) +![CUDA](https://img.shields.io/badge/CUDA-supported-76B900?style=flat-square&logo=nvidia&logoColor=white) [![Docker](https://img.shields.io/badge/Docker-ready-2496ED?style=flat-square&logo=docker&logoColor=white)](https://www.docker.com/) -

- Features • - Installation • - Documentation • - Development • - Contributing -

+ -
+[Hardware](#hardware) • +[Installation](#installation) • +[Development](#development) • +[Multi Language](#multi-language-support) • +[ROS](#ros-interop) -> \[!NOTE] -> -> ⚠️ **Alpha Pre-Release: Expect Breaking Changes** ⚠️ +⚠️ **Alpha Pre-Release: Expect Breaking Changes** ⚠️ -# The Dimensional Framework + -Dimensional is the open-source, universal operating system for generalist robotics. On DimOS, developers -can design, build, and run physical ("dimensional") applications that run on any humanoid, quadruped, -drone, or wheeled embodiment. - -**Programming physical robots is now as simple as programming digital software**: Composable, Modular, Repeatable. - -Core Features: -- **Navigation:** Production navigation stack for any robot with lidar: SLAM, terrain analysis, collision - avoidance, route planning, exploration. -- **Dashboard:** The DimOS command center gives developers the tooling to debug, visualize, compose, and - test dimensional applications in real-time. Control your robot via waypoint, agent query, keyboard, - VR, more. -- **Modules:** Standalone components (equivalent to ROS nodes) that publish and subscribe to typed - In/Out streams that communicate over DimOS transports. The primary components of Dimensional. -- **Agents (experimental):** DimOS agents understand physical space, subscribe to sensor streams, and call - **physical** tools. Emergence appears when agents have physical agency. -- **MCP (experimental):** Vibecode robots by giving your AI editor (Cursor, Claude Code) MCP access to run - physical commands (move forward 1 meter, jump, etc.). -- **Manipulation (unreleased)** Classical (OMPL, IK, GraspGen), Agentive (TAMP), and VLA-native manipulation stack runs out-of-the-box on any DimOS supported arm embodiment. -- **Transport/Middleware:** DimOS native Python transport supports LCM, DDS, and SHM, plus ROS 2. -- **Robot integrations:** We integrate with the majority of hardware OEMs and are moving fast to cover - them all. Supported and/or immediate roadmap: + - | Category | Platforms | - | --- | --- | - | Quadrupeds | Unitree Go2, Unitree B1, AGIBOT D1 Max/Pro, Dobot Rover | - | Drones | DJI Mavic 2, Holybro x500 | - | Humanoids | Unitree G1, Booster K1, AGIBOT X2, ABIBOT A2 | - | Arms | OpenARMs, xARM 6/7, AgileX Piper, HighTorque Pantera | +# Intro + +Dimensional is the modern operating system for generalist robotics. We are setting the next-generation SDK standard, integrating with the majority of robot manufacturers. + +With a simple install and no ROS required, build physical applications entirely in python that run on any humanoid, quadruped, or drone. + +Dimensional is agent native -- "vibecode" your robots in natural language and build (local & hosted) multi-agent systems that work seamlessly with your hardware. Agents run as native modules — subscribing to any embedded stream, from perception (lidar, camera) and spatial memory down to control loops and motor drivers. + + + + + + + + + + + + + + + + + +
+ Navigation + + Perception +
+

Navigation and Mapping

+ SLAM, dynamic obstacle avoidance, route planning, and autonomous exploration — via both DimOS native and ROS
Watch video +
+

Perception

+ Detectors, 3d projections, VLMs, Audio processing +
+ Agents + + Spatial Memory +
+

Agentive Control, MCP

+ "hey Robot, go find the kitchen"
Watch video +
+

Spatial Memory

+ Spatio-temporal RAG, Dynamic memory, Object localization and permanence
Watch video +
+ + +# Hardware + + + + + + + + + + + + + + + + + +
+

Quadruped

+ +
+

Humanoid

+ +
+

Arm

+ +
+

Drone

+ +
+

Misc

+ +
+ 🟩 Unitree Go2 pro/air
+ 🟥 Unitree B1
+
+ 🟥 Unitree G1
+
+ 🟥 Xarm
+ 🟥 AgileX Piper
+
+ 🟥 Mavlink
+ 🟥 DJI SDK
+
+ 🟥 Force Torque Sensor
+
+
+
+🟩 stable 🟨 beta 🟧 alpha 🟥 experimental -# Getting Started +
-## Installation +# Installation -Supported/tested matrix: +## Step 1: System Install -| Platform | Status | Tested | Required System deps | -| --- | --- | --- | --- | -| Linux | supported | Ubuntu 22.04, 24.04 | See below | -| macOS | experimental beta | not CI-tested | `brew install gnu-sed gcc portaudio git-lfs libjpeg-turbo python; export ARCHFLAGS="-arch $(uname -m)"` | +To set up your system dependencies, follow one of these guides: -Note: macOS is usable but expect inconsistent/flaky behavior (rather than hard errors/crashes). Setting `ARCHFLAGS` is likely optional, but some systems it is required to avoid a `clang` error. +- 🟩 [Ubuntu 22.04 / 24.04](docs/installation/ubuntu.md) +- 🟩 [NixOS / General Linux](docs/installation/nix.md) +- 🟧 [macOS](docs/installation/osx.md) -```sh -sudo apt-get update -sudo apt-get install -y curl g++ portaudio19-dev git-lfs libturbojpeg python3-dev -# install uv for python -curl -LsSf https://astral.sh/uv/install.sh | sh && export PATH="$HOME/.local/bin:$PATH" -``` +## Step 2: Python Installs -Option 1: Install in a virtualenv +### Quickstart -```sh +```bash +uv pip install dimos[base,unitree] -uv venv && . .venv/bin/activate -uv pip install 'dimos[base,unitree]' -# replay recorded data to test that the system is working -# IMPORTANT: First replay run will show a black rerun window while 2.4 GB downloads from LFS +# Replay a recorded Go2 session (no hardware needed) +# NOTE: First run will show a black rerun window while ~2.4 GB downloads from LFS dimos --replay run unitree-go2 ``` -Option 2: Run without installing +```bash +# Install with simulation support +uv pip install dimos[base,unitree,sim] -```sh -uvx --from 'dimos[base,unitree]' dimos --replay run unitree-go2 -``` - -### Test Installation - -#### Control a robot in a simulation (no robot required) - - -```sh +# Run Go2 in MuJoCo simulation export DISPLAY=:1 # Or DISPLAY=:0 if getting GLFW/OpenGL X11 errors -# ignore the warp warnings -dimos --viewer-backend rerun-web --simulation run unitree-go2 -``` +dimos --simulation run unitree-go2 -#### Control a real robot (Unitree Go2 over WebRTC) +# Run G1 humanoid in simulation +dimos --simulation run unitree-g1-sim +``` -```sh +```bash +# Control a real robot (Unitree Go2 over WebRTC) export ROBOT_IP= -dimos --viewer-backend rerun-web run unitree-go2 +dimos run unitree-go2 ``` -After running dimOS open http://localhost:7779 to control robot movement. - -#### Dimensional Agents - -> \[!NOTE] -> -> **Experimental Beta: Potential unstoppable robot sentience** +### Develop on DimOS ```sh -export OPENAI_API_KEY= -dimos --viewer-backend rerun-web run unitree-go2-agentic -``` +export GIT_LFS_SKIP_SMUDGE=1 +git clone -b dev https://github.com/dimensionalOS/dimos.git +cd dimos -After running that, open a new terminal and run the following to start giving instructions to the agent. -```sh -# activate the venv in this new terminal +uv venv --python 3.12 source .venv/bin/activate -# then tell the agent "explore the room" -# then tell it to go to something, ex: "go to the door" -humancli -``` - -# The Dimensional Library - -### Modules - -Modules are subsystems on a robot that operate autonomously and communicate with other subsystems using standardized messages. See below a simple robot connection module that sends streams of continuous `cmd_vel` to the robot and recieves `color_image` to a simple `Listener` module. - -```python -import threading, time, numpy as np -from dimos.core import In, Module, Out, rpc, autoconnect -from dimos.msgs.geometry_msgs import Twist -from dimos.msgs.sensor_msgs import Image, ImageFormat - -class RobotConnection(Module): - cmd_vel: In[Twist] - color_image: Out[Image] - - @rpc - def start(self): - threading.Thread(target=self._image_loop, daemon=True).start() +uv sync --all-extras - def _image_loop(self): - while True: - img = Image.from_numpy( - np.zeros((120, 160, 3), np.uint8), - format=ImageFormat.RGB, - frame_id="camera_optical", - ) - self.color_image.publish(img) - time.sleep(0.2) - -class Listener(Module): - color_image: In[Image] - - @rpc - def start(self): - self.color_image.subscribe(lambda img: print(f"image {img.width}x{img.height}")) - -if __name__ == "__main__": - autoconnect( - RobotConnection.blueprint(), - Listener.blueprint(), - ).build().loop() +# Run full test suite +uv run pytest dimos ``` -### Blueprints - -Blueprints are how robots are constructed on Dimensional; instructions for how to construct and wire modules. You compose them with -`autoconnect(...)`, which connects streams by `(name, type)` and returns a `Blueprint`. +### Step 3 - Profit -Blueprints can be composed, remapped, and have transports overridden if `autoconnect()` fails due to conflicting variable names or `In[]` and `Out[]` message types. - -A blueprint example that connects the image stream from a robot to an LLM Agent for reasoning and action execution. -```python -from dimos.core import autoconnect, LCMTransport -from dimos.msgs.sensor_msgs import Image -from dimos.robot.unitree.go2.connection import go2_connection -from dimos.agents.agent import agent - -blueprint = autoconnect( - go2_connection(), - agent(), -).transports({("color_image", Image): LCMTransport("/color_image", Image)}) - -# Run the blueprint -if __name__ == "__main__": - blueprint.build().loop() -``` +DimOS Demo # Development -```sh -GIT_LFS_SKIP_SMUDGE=1 git clone -b dev https://github.com/dimensionalOS/dimos.git -cd dimos -``` - -Then pick one of two development paths: - -Option A: Devcontainer -```sh -./bin/dev -``` - -Option B: Editable install with uv -```sh -uv venv && . .venv/bin/activate -uv pip install -e '.[base,dev]' -``` - -For system deps, Nix setups, and testing, see `/docs/development/README.md`. - -### Monitoring & Debugging +## API -DimOS comes with a number of monitoring tools: -- Run `lcmspy` to see how fast messages are being published on streams. -- Run `agentspy` to see the agent's status over time. -- If you suspect there is a bug within DimOS itself, you can enable extreme logging by prefixing the dimos command with `DIMOS_LOG_LEVEL=DEBUG RERUN_SAVE=1 `. Ex: `DIMOS_LOG_LEVEL=DEBUG RERUN_SAVE=1 dimos --replay run unitree-go2` +- [Modules](docs/usage/modules.md) +- [LCM](docs/usage/lcm.md) +- [Blueprints](docs/usage/blueprints.md) +- [Transports](docs/usage/transports/index.md) +- [Data Streams](docs/usage/data_streams/README.md) +- [Configuration](docs/usage/configuration.md) +- [Visualization](docs/usage/visualization.md) +## Multi Language Support -# Documentation +Python is our glue and prototyping language, but we support many languages via LCM interop. -Concepts: -- [Modules](/docs/usage/modules.md): The building blocks of DimOS, modules run in parallel and are singleton python classes. -- [Streams](/docs/usage/sensor_streams/index.md): How modules communicate, a Pub / Sub system. -- [Blueprints](/dimos/core/README_BLUEPRINTS.md): a way to group modules together and define their connections to each other. -- [RPC](/dimos/core/README_BLUEPRINTS.md#calling-the-methods-of-other-modules): how one module can call a method on another module (arguments get serialized to JSON-like binary data). -- [Skills](/dimos/core/README_BLUEPRINTS.md#defining-skills): An RPC function, except it can be called by an AI agent (a tool for an AI). +Check our language interop examples: +- [C++](examples/language-interop/cpp/) +- [Lua](examples/language-interop/lua/) +- [TypeScript](examples/language-interop/ts/) -## Contributing +## ROS interop -We welcome contributions! See our [Bounty List](https://docs.google.com/spreadsheets/d/1tzYTPvhO7Lou21cU6avSWTQOhACl5H8trSvhtYtsk8U/edit?usp=sharing) for open requests for contributions. If you would like to suggest a feature or sponsor a bounty, open an issue. +For researchers, we can talk to ROS directly via [ROS Transports](docs/usage/transports/index.md), or host dockerized ROS deployments as first-class DimOS modules, allowing you easy installation and portability diff --git a/assets/dimensional-dark.svg b/assets/dimensional-dark.svg new file mode 100644 index 0000000000..95edaadc0e --- /dev/null +++ b/assets/dimensional-dark.svg @@ -0,0 +1,23 @@ + + + + diff --git a/assets/dimensional-light.svg b/assets/dimensional-light.svg new file mode 100644 index 0000000000..f0a107bd10 --- /dev/null +++ b/assets/dimensional-light.svg @@ -0,0 +1,23 @@ + + + + diff --git a/assets/dimensional-text.svg b/assets/dimensional-text.svg new file mode 100644 index 0000000000..0cf32b3d19 --- /dev/null +++ b/assets/dimensional-text.svg @@ -0,0 +1,20 @@ + + + + diff --git a/assets/dimensional.svg b/assets/dimensional.svg new file mode 100644 index 0000000000..f36a0dea40 --- /dev/null +++ b/assets/dimensional.svg @@ -0,0 +1,23 @@ + + + + diff --git a/assets/readme/agentic_control.gif b/assets/readme/agentic_control.gif new file mode 100644 index 0000000000..f9f5970441 --- /dev/null +++ b/assets/readme/agentic_control.gif @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:eb0411de5e5967be8773d5d95e692a6a5859f75bb400164451a3b383b1025fb4 +size 2416274 diff --git a/assets/readme/agents.png b/assets/readme/agents.png new file mode 100644 index 0000000000..b05bee0b03 --- /dev/null +++ b/assets/readme/agents.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a255d32f9a0ecff12d99dda9b8a51e0958ac282d7a0f814f93fd39261afaf84d +size 477123 diff --git a/assets/readme/dimos_demo.gif b/assets/readme/dimos_demo.gif new file mode 100644 index 0000000000..5a68bd72ac --- /dev/null +++ b/assets/readme/dimos_demo.gif @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:fda7f7a859ce98002e0faef88fb2942f395e19995b36b585c48447ec5a9435ee +size 24011189 diff --git a/assets/readme/lidar.gif b/assets/readme/lidar.gif new file mode 100644 index 0000000000..8302c2957d --- /dev/null +++ b/assets/readme/lidar.gif @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d47badc970572aa7badf98c908490c8b86ea9f1cafbb18507cfdb5d08655cdfb +size 5900150 diff --git a/assets/readme/lidar.png b/assets/readme/lidar.png new file mode 100644 index 0000000000..1b499de10f --- /dev/null +++ b/assets/readme/lidar.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:65b1797fd9ac8edae5dce0691397b6aca2e975badfd58462ed8e20a4dace655e +size 927067 diff --git a/assets/readme/navigation.gif b/assets/readme/navigation.gif new file mode 100644 index 0000000000..1402b1e85a --- /dev/null +++ b/assets/readme/navigation.gif @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:64e7965f421916cdb71667a9ed99eab96c14c64bd195bd483628d1b9b9a4e95c +size 4395592 diff --git a/assets/readme/navigation.png b/assets/readme/navigation.png new file mode 100644 index 0000000000..16819a5007 --- /dev/null +++ b/assets/readme/navigation.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:472cabca4b0d661658bf9ffbde78e636668e9ef6499dc38ea0f552557d735bd9 +size 617989 diff --git a/assets/readme/perception.png b/assets/readme/perception.png new file mode 100644 index 0000000000..7ec15aabbf --- /dev/null +++ b/assets/readme/perception.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:48e4c61c1ec588d56d61a74fd9f0d9251eadc042e7a514fb1896826d52a32988 +size 797817 diff --git a/assets/readme/spacer.png b/assets/readme/spacer.png new file mode 100644 index 0000000000..8745fc9687 --- /dev/null +++ b/assets/readme/spacer.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4a16ec40112698cf02b9abd3d18c8db65ce40f48f2c61076b45de58695f16532 +size 66 diff --git a/assets/readme/spatial_memory.gif b/assets/readme/spatial_memory.gif new file mode 100644 index 0000000000..070c65270b --- /dev/null +++ b/assets/readme/spatial_memory.gif @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:50b9cb7898ae8d238a088252fd96d2278a1be96a0dbb761839bc58c99c17f7a7 +size 4655580 diff --git a/dimos/msgs/sensor_msgs/PointCloud2.py b/dimos/msgs/sensor_msgs/PointCloud2.py index 39a0146e4c..bd89c0a0cb 100644 --- a/dimos/msgs/sensor_msgs/PointCloud2.py +++ b/dimos/msgs/sensor_msgs/PointCloud2.py @@ -609,7 +609,7 @@ def __len__(self) -> int: def to_rerun( self, voxel_size: float = 0.05, - colormap: str | None = "turbo", + colormap: str | None = None, colors: list[int] | None = None, mode: str = "boxes", size: float | None = None, @@ -636,6 +636,8 @@ def to_rerun( if len(points) == 0: return rr.Points3D([]) if mode == "points" else rr.Boxes3D(centers=[]) + if colors is None and colormap is None: + colormap = "turbo" # Default colormap if no colors provided # Determine colors point_colors = None if colormap is not None: diff --git a/dimos/utils/docs/doclinks.md b/dimos/utils/docs/doclinks.md new file mode 100644 index 0000000000..dce2e67fec --- /dev/null +++ b/dimos/utils/docs/doclinks.md @@ -0,0 +1,96 @@ +# doclinks + +A Markdown link resolver that automatically fills in correct file paths for code references in documentation. + +## What it does + +When writing docs, you can use placeholder links like: + + +```markdown +See [`service/spec.py`]() for the implementation. +``` + + +Running `doclinks` resolves these to actual paths: + + +```markdown +See [`service/spec.py`](/dimos/protocol/service/spec.py) for the implementation. +``` + + +## Features + + +- **Code file links**: `[`filename.py`]()` resolves to the file's path +- **Symbol line linking**: If another backticked term appears on the same line, it finds that symbol in the file and adds `#L`: + ```markdown + See `Configurable` in [`config.py`]() + → [`config.py`](/path/config.py#L42) + ``` +- **Doc-to-doc links**: `[Modules](.md)` resolves to `modules.md` or `modules/index.md` + +- **Multiple link modes**: absolute, relative, or GitHub URLs +- **Watch mode**: Automatically re-process on file changes +- **Ignore regions**: Skip sections with `` comments + +## Usage + +```bash +# Process a single file +doclinks docs/guide.md + +# Process a directory recursively +doclinks docs/ + +# Relative links (from doc location) +doclinks --link-mode relative docs/ + +# GitHub links +doclinks --link-mode github \ + --github-url https://github.com/org/repo docs/ + +# Dry run (preview changes) +doclinks --dry-run docs/ + +# CI check (exit 1 if changes needed) +doclinks --check docs/ + +# Watch mode (auto-update on changes) +doclinks --watch docs/ +``` + +## Options + +| Option | Description | +|--------------------|-------------------------------------------------| +| `--root PATH` | Repository root (default: auto-detect git root) | +| `--link-mode MODE` | `absolute` (default), `relative`, or `github` | +| `--github-url URL` | Base GitHub URL (required for github mode) | +| `--github-ref REF` | Branch/ref for GitHub links (default: `main`) | +| `--dry-run` | Show changes without modifying files | +| `--check` | Exit with error if changes needed (for CI) | +| `--watch` | Watch for changes and re-process | + +## Link patterns + + +| Pattern | Description | +|----------------------|------------------------------------------------| +| `[`file.py`]()` | Code file reference (empty or any link) | +| `[`path/file.py`]()` | Code file with partial path for disambiguation | +| `[`file.py`](#L42)` | Preserves existing line fragments | +| `[Doc Name](.md)` | Doc-to-doc link (resolves by name) | + + +## How resolution works + +The tool builds an index of all files in the repo. For `/dimos/protocol/service/spec.py`, it creates lookup entries for: + +- `spec.py` +- `service/spec.py` +- `protocol/service/spec.py` +- `dimos/protocol/service/spec.py` + +Use longer paths when multiple files share the same name. diff --git a/dimos/utils/docs/doclinks.py b/dimos/utils/docs/doclinks.py index c04fe7ef16..67d5897b28 100644 --- a/dimos/utils/docs/doclinks.py +++ b/dimos/utils/docs/doclinks.py @@ -529,7 +529,7 @@ def process_file(md_path: Path, quiet: bool = False) -> tuple[bool, list[str]]: watch_paths = args.paths if args.paths else [str(root / "docs")] - class MarkdownHandler(FileSystemEventHandler): # type: ignore[misc] + class MarkdownHandler(FileSystemEventHandler): def on_modified(self, event: Any) -> None: if not event.is_directory and event.src_path.endswith(".md"): process_file(Path(event.src_path)) diff --git a/docs/agents/docs/assets/codeblocks_example.svg b/docs/agents/docs/assets/codeblocks_example.svg new file mode 100644 index 0000000000..3ba6c37a4b --- /dev/null +++ b/docs/agents/docs/assets/codeblocks_example.svg @@ -0,0 +1,47 @@ + + + + + + + + +A + +A + + + +B + +B + + + +A->B + + + + + +C + +C + + + +A->C + + + + + +B->C + + + + + diff --git a/docs/agents/docs/assets/pikchr_basic.svg b/docs/agents/docs/assets/pikchr_basic.svg new file mode 100644 index 0000000000..5410d35577 --- /dev/null +++ b/docs/agents/docs/assets/pikchr_basic.svg @@ -0,0 +1,12 @@ + + +Step 1 + + + +Step 2 + + + +Step 3 + diff --git a/docs/agents/docs/assets/pikchr_branch.svg b/docs/agents/docs/assets/pikchr_branch.svg new file mode 100644 index 0000000000..e7b2b86596 --- /dev/null +++ b/docs/agents/docs/assets/pikchr_branch.svg @@ -0,0 +1,16 @@ + + +Input + + + +Process + + + +Path A + + + +Path B + diff --git a/docs/agents/docs/assets/pikchr_explicit.svg b/docs/agents/docs/assets/pikchr_explicit.svg new file mode 100644 index 0000000000..a6a913fcb4 --- /dev/null +++ b/docs/agents/docs/assets/pikchr_explicit.svg @@ -0,0 +1,8 @@ + + +Step 1 + + + +Step 2 + diff --git a/docs/agents/docs/assets/pikchr_labels.svg b/docs/agents/docs/assets/pikchr_labels.svg new file mode 100644 index 0000000000..b11fe64bca --- /dev/null +++ b/docs/agents/docs/assets/pikchr_labels.svg @@ -0,0 +1,5 @@ + + +Box +label below + diff --git a/docs/agents/docs/assets/pikchr_sizing.svg b/docs/agents/docs/assets/pikchr_sizing.svg new file mode 100644 index 0000000000..3a0c433cb1 --- /dev/null +++ b/docs/agents/docs/assets/pikchr_sizing.svg @@ -0,0 +1,13 @@ + + +short + + + +.subscribe() + + + +two lines +of text + diff --git a/docs/agents/docs/codeblocks.md b/docs/agents/docs/codeblocks.md new file mode 100644 index 0000000000..323f1c0c50 --- /dev/null +++ b/docs/agents/docs/codeblocks.md @@ -0,0 +1,314 @@ +# Executable Code Blocks + +We use [md-babel-py](https://github.com/leshy/md-babel-py/) to execute code blocks in markdown and insert results. + +## Golden Rule + +**All code blocks must be executable.** Never write illustrative/pseudo code blocks. If you're showing an API usage pattern, create a minimal working example that actually runs. This ensures documentation stays correct as the codebase evolves. + +## Running + +```sh skip +md-babel-py run document.md # edit in-place +md-babel-py run document.md --stdout # preview to stdout +md-babel-py run document.md --dry-run # show what would run +``` + +## Supported Languages + +Python, Shell (sh), Node.js, plus visualization: Matplotlib, Graphviz, Pikchr, Asymptote, OpenSCAD, Diagon. + +## Code Block Flags + +Add flags after the language identifier: + +| Flag | Effect | +|------|--------| +| `session=NAME` | Share state between blocks with same session name | +| `output=path.png` | Write output to file instead of inline | +| `no-result` | Execute but don't insert result | +| `skip` | Don't execute this block | +| `expected-error` | Block is expected to fail | + +## Examples + +# md-babel-py + +Execute code blocks in markdown files and insert the results. + +![Demo](assets/screencast.gif) + +**Use cases:** +- Keep documentation examples up-to-date automatically +- Validate code snippets in docs actually work +- Generate diagrams and charts from code in markdown +- Literate programming with executable documentation + +## Languages + +### Shell + +```sh +echo "cwd: $(pwd)" +``` + + +``` +cwd: /work +``` + +### Python + +```python session=example +a = "hello world" +print(a) +``` + + +``` +hello world +``` + +Sessions preserve state between code blocks: + +```python session=example +print(a, "again") +``` + + +``` +hello world again +``` + +### Node.js + +```node +console.log("Hello from Node.js"); +console.log(`Node version: ${process.version}`); +``` + + +``` +Hello from Node.js +Node version: v22.21.1 +``` + +### Matplotlib + +```python output=assets/matplotlib-demo.svg +import matplotlib.pyplot as plt +import numpy as np +plt.style.use('dark_background') +x = np.linspace(0, 4 * np.pi, 200) +plt.figure(figsize=(8, 4)) +plt.plot(x, np.sin(x), label='sin(x)', linewidth=2) +plt.plot(x, np.cos(x), label='cos(x)', linewidth=2) +plt.xlabel('x') +plt.ylabel('y') +plt.legend() +plt.grid(alpha=0.3) +plt.savefig('{output}', transparent=True) +``` + + +![output](assets/matplotlib-demo.svg) + +### Pikchr + +SQLite's diagram language: + +
+diagram source + +```pikchr fold output=assets/pikchr-demo.svg +color = white +fill = none +linewid = 0.4in + +# Input file +In: file "README.md" fit +arrow + +# Processing +Parse: box "Parse" rad 5px fit +arrow +Exec: box "Execute" rad 5px fit + +# Fan out to languages +arrow from Exec.e right 0.3in then up 0.4in then right 0.3in +Sh: oval "Shell" fit +arrow from Exec.e right 0.3in then right 0.3in +Node: oval "Node" fit +arrow from Exec.e right 0.3in then down 0.4in then right 0.3in +Py: oval "Python" fit + +# Merge back +X: dot at (Py.e.x + 0.3in, Node.e.y) invisible +line from Sh.e right until even with X then down to X +line from Node.e to X +line from Py.e right until even with X then up to X +Out: file "README.md" fit with .w at (X.x + 0.3in, X.y) +arrow from X to Out.w +``` + +
+ + +![output](assets/pikchr-demo.svg) + +### Asymptote + +Vector graphics: + +```asymptote output=assets/histogram.svg +import graph; +import stats; + +size(400,200,IgnoreAspect); +defaultpen(white); + +int n=10000; +real[] a=new real[n]; +for(int i=0; i < n; ++i) a[i]=Gaussrand(); + +draw(graph(Gaussian,min(a),max(a)),orange); + +int N=bins(a); + +histogram(a,min(a),max(a),N,normalize=true,low=0,rgb(0.4,0.6,0.8),rgb(0.2,0.4,0.6),bars=true); + +xaxis("$x$",BottomTop,LeftTicks,p=white); +yaxis("$dP/dx$",LeftRight,RightTicks(trailingzero),p=white); +``` + + +![output](assets/histogram.svg) + +### Graphviz + +```dot output=assets/graph.svg +A -> B -> C +A -> C +``` + + +![output](assets/graph.svg) + +### OpenSCAD + +```openscad output=assets/cube-sphere.png +cube([10, 10, 10]); +sphere(r=7); +``` + + +![output](assets/cube-sphere.png) + +### Diagon + +ASCII art diagrams: + +```diagon mode=Math +1 + 1/2 + sum(i,0,10) +``` + + +``` + 10 + ___ + 1 ╲ +1 + ─ + ╱ i + 2 ‾‾‾ + 0 +``` + +```diagon mode=GraphDAG +A -> B -> C +A -> C +``` + + +``` +┌───┐ +│A │ +└┬─┬┘ + │┌▽┐ + ││B│ + │└┬┘ +┌▽─▽┐ +│C │ +└───┘ +``` + +## Install + +### Nix (recommended) + +```sh skip +# Run directly from GitHub +nix run github:leshy/md-babel-py -- run README.md --stdout + +# Or clone and run locally +nix run . -- run README.md --stdout +``` + +### Docker + +```sh skip +# Pull from Docker Hub +docker run -v $(pwd):/work lesh/md-babel-py:main run /work/README.md --stdout + +# Or build locally via Nix +nix build .#docker # builds tarball to ./result +docker load < result # loads image from tarball +docker run -v $(pwd):/work md-babel-py:latest run /work/file.md --stdout +``` + +### pipx + +```sh skip +pipx install md-babel-py +# or: uv pip install md-babel-py +md-babel-py run README.md --stdout +``` + +If not using nix or docker, evaluators require system dependencies: + +| Language | System packages | +|-----------|-----------------------------| +| python | python3 | +| node | nodejs | +| dot | graphviz | +| asymptote | asymptote, texlive, dvisvgm | +| pikchr | pikchr | +| openscad | openscad, xvfb, imagemagick | +| diagon | diagon | + +```sh skip +# Arch Linux +sudo pacman -S python nodejs graphviz asymptote texlive-basic openscad xorg-server-xvfb imagemagick + +# Debian/Ubuntu +sudo apt-get install python3 nodejs graphviz asymptote texlive xvfb imagemagick openscad +``` + +Note: pikchr and diagon may need to be built from source. Use Docker or Nix for full evaluator support. + +## Usage + +```sh skip +# Edit file in-place +md-babel-py run document.md + +# Output to separate file +md-babel-py run document.md --output result.md + +# Print to stdout +md-babel-py run document.md --stdout + +# Only run specific languages +md-babel-py run document.md --lang python,sh + +# Dry run - show what would execute +md-babel-py run document.md --dry-run +``` diff --git a/docs/agents/docs/doclinks.md b/docs/agents/docs/doclinks.md new file mode 100644 index 0000000000..d5533c5983 --- /dev/null +++ b/docs/agents/docs/doclinks.md @@ -0,0 +1,21 @@ +When writing or editing markdown documentation, use `doclinks` tool to resolve file references. + +Full documentation if needed: [`utils/docs/doclinks.md`](/dimos/utils/docs/doclinks.md) + +## Syntax + + +| Pattern | Example | +|-------------|-----------------------------------------------------| +| Code file | `[`service/spec.py`]()` → resolves path | +| With symbol | `Configurable` in `[`spec.py`]()` → adds `#L` | +| Doc link | `[Configuration](.md)` → resolves to doc | + + +## Usage + +```bash +doclinks docs/guide.md # single file +doclinks docs/ # directory +doclinks --dry-run ... # preview only +``` diff --git a/docs/agents/docs/index.md b/docs/agents/docs/index.md new file mode 100644 index 0000000000..bec2ce79e6 --- /dev/null +++ b/docs/agents/docs/index.md @@ -0,0 +1,192 @@ + +# Code Blocks + +**All code blocks must be executable.** +Never write illustrative/pseudo code blocks. +If you're showing an API usage pattern, create a minimal working example that actually runs. This ensures documentation stays correct as the codebase evolves. + +After writing a code block in your markdown file, you can run it by executing +`md-babel-py run document.md` + +more information on this tool is in [codeblocks](/docs/agents/docs_agent/codeblocks.md) + + +# Code or Docs Links + +After adding a link to a doc run + +`doclinks document.md` + +### Code file references +```markdown +See [`service/spec.py`](/dimos/protocol/service/spec.py) for the implementation. +``` + +After running doclinks, becomes: +```markdown +See [`service/spec.py`](/dimos/protocol/service/spec.py) for the implementation. +``` + +### Symbol auto-linking +Mention a symbol on the same line to auto-link to its line number: +```markdown +The `Configurable` class is defined in [`service/spec.py`](/dimos/protocol/service/spec.py#L22). +``` + +Becomes: +```markdown +The `Configurable` class is defined in [`service/spec.py`](/dimos/protocol/service/spec.py#L22). +``` +### Doc-to-doc references +Use `.md` as the link target: +```markdown +See [Configuration](/docs/api/configuration.md) for more details. +``` + +Becomes: +```markdown +See [Configuration](/docs/concepts/configuration.md) for more details. +``` + +More information on this in [doclinks](/docs/agents/docs_agent/doclinks.md) + + +# Pikchr + +[Pikchr](https://pikchr.org/) is a diagram language from SQLite. Use it for flowcharts and architecture diagrams. + +**Important:** Always wrap pikchr blocks in `
` tags so the source is collapsed by default on GitHub. The rendered SVG stays visible outside the fold. Code blocks (Python, etc.) should NOT be folded—they're meant to be read. + +## Basic syntax + +
+diagram source + +```pikchr fold output=assets/pikchr_basic.svg +color = white +fill = none + +A: box "Step 1" rad 5px fit wid 170% ht 170% +arrow right 0.3in +B: box "Step 2" rad 5px fit wid 170% ht 170% +arrow right 0.3in +C: box "Step 3" rad 5px fit wid 170% ht 170% +``` + +
+ + +![output](assets/pikchr_basic.svg) + +## Box sizing + +Use `fit` with percentage scaling to auto-size boxes with padding: + +
+diagram source + +```pikchr fold output=assets/pikchr_sizing.svg +color = white +fill = none + +# fit wid 170% ht 170% = auto-size + padding +A: box "short" rad 5px fit wid 170% ht 170% +arrow right 0.3in +B: box ".subscribe()" rad 5px fit wid 170% ht 170% +arrow right 0.3in +C: box "two lines" "of text" rad 5px fit wid 170% ht 170% +``` + +
+ + +![output](assets/pikchr_sizing.svg) + +The pattern `fit wid 170% ht 170%` means: auto-size to text, then scale width by 170% and height by 170%. + +For explicit sizing (when you need consistent box sizes): + +
+diagram source + +```pikchr fold output=assets/pikchr_explicit.svg +color = white +fill = none + +A: box "Step 1" rad 5px fit wid 170% ht 170% +arrow right 0.3in +B: box "Step 2" rad 5px fit wid 170% ht 170% +``` + +
+ + +![output](assets/pikchr_explicit.svg) + +## Common settings + +Always start with: + +``` +color = white # text color +fill = none # transparent box fill +``` + +## Branching paths + +
+diagram source + +```pikchr fold output=assets/pikchr_branch.svg +color = white +fill = none + +A: box "Input" rad 5px fit wid 170% ht 170% +arrow +B: box "Process" rad 5px fit wid 170% ht 170% + +# Branch up +arrow from B.e right 0.3in then up 0.35in then right 0.3in +C: box "Path A" rad 5px fit wid 170% ht 170% + +# Branch down +arrow from B.e right 0.3in then down 0.35in then right 0.3in +D: box "Path B" rad 5px fit wid 170% ht 170% +``` + +
+ + +![output](assets/pikchr_branch.svg) + +**Tip:** For tree/hierarchy diagrams, prefer left-to-right layout (root on left, children branching right). This reads more naturally and avoids awkward vertical stacking. + +## Adding labels + +
+diagram source + +```pikchr fold output=assets/pikchr_labels.svg +color = white +fill = none + +A: box "Box" rad 5px fit wid 170% ht 170% +text "label below" at (A.x, A.y - 0.4in) +``` + +
+ + +![output](assets/pikchr_labels.svg) + +## Reference + +| Element | Syntax | +|---------|--------| +| Box | `box "text" rad 5px wid Xin ht Yin` | +| Arrow | `arrow right 0.3in` | +| Oval | `oval "text" wid Xin ht Yin` | +| Text | `text "label" at (X, Y)` | +| Named point | `A: box ...` then reference `A.e`, `A.n`, `A.x`, `A.y` | + +See [pikchr.org/home/doc/trunk/doc/userman.md](https://pikchr.org/home/doc/trunk/doc/userman.md) for full documentation. diff --git a/docs/agents/index.md b/docs/agents/index.md new file mode 100644 index 0000000000..ec9d66e886 --- /dev/null +++ b/docs/agents/index.md @@ -0,0 +1,19 @@ +# For Agents + +These docs are mostly for coding agents + +```sh +tree . -P '*.md' --prune +``` + + +``` +. +├── docs +│   ├── codeblocks.md +│   ├── doclinks.md +│   └── index.md +└── index.md + +2 directories, 4 files +``` diff --git a/docs/capabilities/agents/readme.md b/docs/capabilities/agents/readme.md new file mode 100644 index 0000000000..57be659e9c --- /dev/null +++ b/docs/capabilities/agents/readme.md @@ -0,0 +1 @@ +# Agents diff --git a/docs/capabilities/navigation/native/assets/1-lidar.png b/docs/capabilities/navigation/native/assets/1-lidar.png new file mode 100644 index 0000000000..6584ee90cb --- /dev/null +++ b/docs/capabilities/navigation/native/assets/1-lidar.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:2d76742ada18d20dc0e3a3be04159d3412e7df6acee8596ff37916f0f269d3e0 +size 597386 diff --git a/docs/capabilities/navigation/native/assets/2-globalmap.png b/docs/capabilities/navigation/native/assets/2-globalmap.png new file mode 100644 index 0000000000..55541a8fcb --- /dev/null +++ b/docs/capabilities/navigation/native/assets/2-globalmap.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:bc2f27ec2dcc4048acde6b53229c7596b3a7f6ed6afad30c4cd062cf5751bd24 +size 1104485 diff --git a/docs/capabilities/navigation/native/assets/3-globalcostmap.png b/docs/capabilities/navigation/native/assets/3-globalcostmap.png new file mode 100644 index 0000000000..907d0b0448 --- /dev/null +++ b/docs/capabilities/navigation/native/assets/3-globalcostmap.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d1f9e6c142b220f1a4be7b08950f628a2d34e26caba8a1f5c100726bec6c88ef +size 793366 diff --git a/docs/capabilities/navigation/native/assets/4-navcostmap.png b/docs/capabilities/navigation/native/assets/4-navcostmap.png new file mode 100644 index 0000000000..6c40bce0e0 --- /dev/null +++ b/docs/capabilities/navigation/native/assets/4-navcostmap.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9ee4332e3d92162ddf41a0137c2ab5b6a885d758aa5a27037e413cdd4d946436 +size 741912 diff --git a/docs/capabilities/navigation/native/assets/5-all.png b/docs/capabilities/navigation/native/assets/5-all.png new file mode 100644 index 0000000000..655be72c1c --- /dev/null +++ b/docs/capabilities/navigation/native/assets/5-all.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:1a777d315beac6f4773adcb5c27384fd983720083941b4f62060958ddf6c16d2 +size 1209867 diff --git a/docs/capabilities/navigation/native/assets/go2_blueprint.svg b/docs/capabilities/navigation/native/assets/go2_blueprint.svg new file mode 100644 index 0000000000..51b0e7c40f --- /dev/null +++ b/docs/capabilities/navigation/native/assets/go2_blueprint.svg @@ -0,0 +1,188 @@ + + + + + + +modules + +cluster_mapping + +mapping + + +cluster_navigation + +navigation + + +cluster_robot + +robot + + +cluster_visualization + +visualization + + + +CostMapper + +CostMapper + + + +chan_global_costmap_OccupancyGrid + + + +global_costmap:OccupancyGrid + + + +CostMapper->chan_global_costmap_OccupancyGrid + + + + +VoxelGridMapper + +VoxelGridMapper + + + +chan_global_map_PointCloud2 + + + +global_map:PointCloud2 + + + +VoxelGridMapper->chan_global_map_PointCloud2 + + + + +ReplanningAStarPlanner + +ReplanningAStarPlanner + + + +chan_cmd_vel_Twist + + + +cmd_vel:Twist + + + +ReplanningAStarPlanner->chan_cmd_vel_Twist + + + + +chan_goal_reached_Bool + + + +goal_reached:Bool + + + +ReplanningAStarPlanner->chan_goal_reached_Bool + + + + +WavefrontFrontierExplorer + +WavefrontFrontierExplorer + + + +chan_goal_request_PoseStamped + + + +goal_request:PoseStamped + + + +WavefrontFrontierExplorer->chan_goal_request_PoseStamped + + + + +GO2Connection + +GO2Connection + + + +chan_lidar_PointCloud2 + + + +lidar:PointCloud2 + + + +GO2Connection->chan_lidar_PointCloud2 + + + + +RerunBridgeModule + +RerunBridgeModule + + + +chan_cmd_vel_Twist->GO2Connection + + + + + +chan_global_costmap_OccupancyGrid->ReplanningAStarPlanner + + + + + +chan_global_costmap_OccupancyGrid->WavefrontFrontierExplorer + + + + + +chan_global_map_PointCloud2->CostMapper + + + + + +chan_goal_reached_Bool->WavefrontFrontierExplorer + + + + + +chan_goal_request_PoseStamped->ReplanningAStarPlanner + + + + + +chan_lidar_PointCloud2->VoxelGridMapper + + + + + diff --git a/docs/capabilities/navigation/native/assets/go2nav_dataflow.svg b/docs/capabilities/navigation/native/assets/go2nav_dataflow.svg new file mode 100644 index 0000000000..94bb3e39ee --- /dev/null +++ b/docs/capabilities/navigation/native/assets/go2nav_dataflow.svg @@ -0,0 +1,22 @@ + + +Go2 + + + +VoxelGridMapper + + + +CostMapper + + + +Navigation +PointCloud2 +PointCloud2 +OccupancyGrid + + +Twist + diff --git a/docs/capabilities/navigation/native/assets/noros_nav.gif b/docs/capabilities/navigation/native/assets/noros_nav.gif new file mode 100644 index 0000000000..ab47bb9cb5 --- /dev/null +++ b/docs/capabilities/navigation/native/assets/noros_nav.gif @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:60f842cd2fda539338443b3c501197fbb875f5c5f3883ba3ffdd17005e9bd786 +size 612786 diff --git a/docs/capabilities/navigation/native/index.md b/docs/capabilities/navigation/native/index.md new file mode 100644 index 0000000000..115c6f0ee2 --- /dev/null +++ b/docs/capabilities/navigation/native/index.md @@ -0,0 +1,144 @@ +# Go2 Non-ROS Navigation + + + +The Go2 navigation stack runs entirely without ROS. It uses a **column-carving voxel map** strategy: each new LiDAR frame replaces the corresponding region of the global map entirely, ensuring the map always reflects the latest observations. + +## Data Flow + +
+diagram source + +```pikchr fold output=assets/go2nav_dataflow.svg +color = white +fill = none + +Go2: box "Go2" rad 5px fit wid 170% ht 170% +arrow right 0.5in +Vox: box "VoxelGridMapper" rad 5px fit wid 170% ht 170% +arrow right 0.5in +Cost: box "CostMapper" rad 5px fit wid 170% ht 170% +arrow right 0.5in +Nav: box "Navigation" rad 5px fit wid 170% ht 170% + +M1: dot at 1/2 way between Go2.e and Vox.w invisible +text "PointCloud2" italic at (M1.x, Go2.n.y + 0.15in) + +M2: dot at 1/2 way between Vox.e and Cost.w invisible +text "PointCloud2" italic at (M2.x, Vox.n.y + 0.15in) + +M3: dot at 1/2 way between Cost.e and Nav.w invisible +text "OccupancyGrid" italic at (M3.x, Cost.n.y + 0.15in) + +arrow dashed from Nav.s down 0.3in then left until even with Go2.s then to Go2.s +M4: dot at 1/2 way between Go2.s and Nav.s invisible +text "Twist" italic at (M4.x, Nav.s.y - 0.45in) +``` + +
+ + +![output](assets/go2nav_dataflow.svg) +## Pipeline Steps + +### 1. LiDAR Frame — [`GO2Connection`](/dimos/robot/unitree/go2/connection.py) + +We don't connect to the LiDAR directly — instead we use Unitree's WebRTC client (via [legion's webrtc driver](https://github.com/legion1581/unitree_webrtc_connect)), which streams a heavily preprocessed 5cm voxel grid rather than raw point cloud data. This allows us to support stock, unjailbroken Go2 Air and Pro models out of the box. + +![LiDAR frame](assets/1-lidar.png) + +### 2. Global Voxel Map — [`VoxelGridMapper`](/dimos/mapping/voxels.py) + +The [`VoxelGridMapper`](/dimos/mapping/voxels.py) maintains a sparse 3D occupancy grid using Open3D's `VoxelBlockGrid` backed by a hash map. Each voxel is a 5cm cube by default. + +Voxel hash map provides O(1) insert/erase/lookup, so this is efficient even with millions of voxels. The grid runs on **CUDA** by default for speed, with CPU fallback. + +Each incoming LiDAR frame is spliced into the global map via column carving. We consider any previously mapped voxels in the space of a received LiDAR frame stale, by erasing entire Z-columns in the footprint, we guarantee: + +- No ghost obstacles from previous passes +- Dynamic objects (people, doors) get cleared automatically +- The latest observation always wins + +We don't have proper loop closure and stable odometry, we trust the data go2 odom reports, which is surprisingly stable but does drift eventually, You will reliably map and nav through very large spaces (500sqm in our tests) but you won't go down the street to a super market. + + +#### Configuration + +| Parameter | Default | Description | +|--------------------|-----------|---------------------------------------------------------| +| `voxel_size` | 0.05 | Voxel cube size in meters | +| `block_count` | 2,000,000 | Max voxels in hash map | +| `device` | `CUDA:0` | Compute device (`CUDA:0` or `CPU:0`) | +| `carve_columns` | `true` | Enable column carving (disable for append-only mapping) | +| `publish_interval` | 0 | Seconds between map publishes (0 = every frame) | + +![Global map](assets/2-globalmap.png) + +### 3. Global Costmap — [`CostMapper`](/dimos/mapping/costmapper.py) + +The [`CostMapper`](/dimos/mapping/costmapper.py) converts the 3D voxel map into a 2D occupancy grid. The default algorithm (`height_cost`) maps rate of change of Z, with some smoothing. + +algo settings are in [`occupancy.py`](/dimos/mapping/pointclouds/occupancy.py) and can be configured per robot + + +#### Configuration + +```python skip +@dataclass(frozen=True) +class HeightCostConfig(OccupancyConfig): + """Config for height-cost based occupancy (terrain slope analysis).""" + can_pass_under: float = 0.6 + can_climb: float = 0.15 + ignore_noise: float = 0.05 + smoothing: float = 1.0 +``` + +| Cost | Meaning | +|------|----------------------------------------------------------| +| 0 | Flat, easy to traverse | +| 50 | Moderate slope (~7.5cm rise per cell in case of go2) | +| 100 | Steep or impassable (≥15cm rise per cell in case of go2) | +| -1 | Unknown (no observations) | + +![Global costmap](assets/3-globalcostmap.png) + +### 4. Navigation Costmap — [`ReplanningAStarPlanner`](/dimos/navigation/replanning_a_star/module.py) + +The planner will process the terrain gradient and compute it's own algo-relevant costmap, prioritizing safe free paths, while be willing to path aggressively through tight spaces if it has to + +We run the planner in a constant loop so it will dynamically react to obstacles encountered. + +![Navigation costmap with path](assets/4-navcostmap.png) + +### 5. All Layers Combined + +All visualization layers shown together + +![All layers](assets/5-all.png) + +## Blueprint Composition + +The navigation stack is composed in the [`unitree_go2`](/dimos/robot/unitree/go2/blueprints/__init__.py) blueprint: + +```python fold output=assets/go2_blueprint.svg +from dimos.core.blueprints import autoconnect +from dimos.core.introspection import to_svg +from dimos.mapping.costmapper import cost_mapper +from dimos.mapping.voxels import voxel_mapper +from dimos.navigation.frontier_exploration import wavefront_frontier_explorer +from dimos.navigation.replanning_a_star.module import replanning_a_star_planner +from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic + +unitree_go2 = autoconnect( + unitree_go2_basic, # robot connection + visualization + voxel_mapper(voxel_size=0.05), # 3D voxel mapping + cost_mapper(), # 2D costmap generation + replanning_a_star_planner(), # path planning + wavefront_frontier_explorer(), # exploration +).global_config(n_dask_workers=6, robot_model="unitree_go2") + +to_svg(unitree_go2, "assets/go2_blueprint.svg") +``` + + +![output](assets/go2_blueprint.svg) diff --git a/docs/capabilities/navigation/readme.md b/docs/capabilities/navigation/readme.md new file mode 100644 index 0000000000..af26c07f94 --- /dev/null +++ b/docs/capabilities/navigation/readme.md @@ -0,0 +1,10 @@ +# Navigation + + +## Non-ROS + +- [Go2 Navigation](native/index.md) — column-carving voxel mapping + slope-based costmap + +## ROS + +See [ROS Transports](/docs/api/transports.md) for bridging DimOS streams to ROS topics. diff --git a/docs/capabilities/perception/readme.md b/docs/capabilities/perception/readme.md new file mode 100644 index 0000000000..5d6e089dbf --- /dev/null +++ b/docs/capabilities/perception/readme.md @@ -0,0 +1,3 @@ +# Perception + +## Detections diff --git a/docs/development/README.md b/docs/development/README.md index c7d903e690..48a3ab8d79 100644 --- a/docs/development/README.md +++ b/docs/development/README.md @@ -71,38 +71,6 @@ uv add git+https://github.com/openai/CLIP.git uv add git+https://github.com/facebookresearch/detectron2.git ``` -### Optional: DDS Transport Support - -The `dds` extra provides DDS (Data Distribution Service) transport support via [Eclipse Cyclone DDS](https://cyclonedds.io/docs/cyclonedds-python/latest/). This requires installing system libraries before the Python package can be built. - -**Ubuntu/Debian:** - -```bash -# Install the CycloneDDS development library -sudo apt install cyclonedds-dev - -# Create a compatibility directory structure -# (required because Ubuntu's multiarch layout doesn't match the expected CMake layout) -sudo mkdir -p /opt/cyclonedds/{lib,bin,include} -sudo ln -sf /usr/lib/x86_64-linux-gnu/libddsc.so* /opt/cyclonedds/lib/ -sudo ln -sf /usr/lib/x86_64-linux-gnu/libcycloneddsidl.so* /opt/cyclonedds/lib/ -sudo ln -sf /usr/bin/idlc /opt/cyclonedds/bin/ -sudo ln -sf /usr/bin/ddsperf /opt/cyclonedds/bin/ -sudo ln -sf /usr/include/dds /opt/cyclonedds/include/ - -# Install with the dds extra -CYCLONEDDS_HOME=/opt/cyclonedds uv pip install -e '.[dds]' -``` - -To install all extras including DDS: - -```bash -CYCLONEDDS_HOME=/opt/cyclonedds uv sync --extra dds -``` - - - - ## Setup Option B: Nix Flake + direnv ### Why pick this option? (pros/cons/when-to-use) diff --git a/docs/hardware/integration_guide.md b/docs/hardware/integration_guide.md new file mode 100644 index 0000000000..805e6ad418 --- /dev/null +++ b/docs/hardware/integration_guide.md @@ -0,0 +1,3 @@ +# New Hardware Integration Guide + +TODO: Document how to add support for new hardware platforms. diff --git a/docs/installation/nix.md b/docs/installation/nix.md new file mode 100644 index 0000000000..f0e1d03089 --- /dev/null +++ b/docs/installation/nix.md @@ -0,0 +1,82 @@ +# Nix install (required for nix managed dimos) + +You need to have [nix](https://nixos.org/) installed and [flakes](https://nixos.wiki/wiki/Flakes) enabled, + +[official install docs](https://nixos.org/download/) recommended, but here is a quickstart: + +```sh +# Install Nix https://nixos.org/download/ +curl --proto '=https' --tlsv1.2 -sSf -L https://install.determinate.systems/nix | sh -s -- install +. /nix/var/nix/profiles/default/etc/profile.d/nix-daemon.sh + +# make sure nix-flakes are enabled +mkdir -p "$HOME/.config/nix"; echo "experimental-features = nix-command flakes" >> "$HOME/.config/nix/nix.conf" +``` + +# Install Direnv (optional) + +[direnv](https://direnv.net) is a convenient helper that allows you to automatically enter your nix and python env once you cd into the project dir. + +You can skip this step if you intend to type `nix develop` by hand. + +Following [direnv install docs](https://direnv.net/docs/installation.html) is recommended (many distros have native package support) + +but a quick oneliner binary install is +```sh +curl -sfL https://direnv.net/install.sh | bash +``` + +# Using DimOS as a library + +```sh +mkdir myproject +cd myproject + +# if on nixos you can pull our flake +wget https://raw.githubusercontent.com/dimensionalOS/dimos/refs/heads/main/flake.nix +wget https://raw.githubusercontent.com/dimensionalOS/dimos/refs/heads/main/flake.lock + +# if using direnv (recommended) +wget https://raw.githubusercontent.com/dimensionalOS/dimos/refs/heads/main/.envrc.nix -O .envrc +direnv allow + +# if using nix develop directly instead of direnv, +# nix develop + +uv venv --python "3.12" +source .venv/bin/activate + +# this will just pull everything (big checkout) +# depending on what you are working on you might not need everything, +# check your respective platform guides +uv pip install dimos[misc,sim,visualization,agents,web,perception,unitree,manipulation,cpu,dev] +``` + +# Developing on DimOS + +```sh +# this allows getting large files on-demand (and not pulling all immediately) +export GIT_LFS_SKIP_SMUDGE=1 +git clone -b dev https://github.com/dimensionalOS/dimos.git +cd dimos + +# if using direnv (recommended) +cp .envrc.nix .envrc +direnv allow + +# if using nix develop directly instead of direnv, +# nix develop + +# create venv +uv venv --python 3.12 + +source .venv/bin/activate + +uv sync --all-extras + +# type check +uv run mypy dimos + +# tests (around a minute to run) +uv run pytest dimos +``` diff --git a/docs/installation/osx.md b/docs/installation/osx.md new file mode 100644 index 0000000000..7555c178f2 --- /dev/null +++ b/docs/installation/osx.md @@ -0,0 +1,71 @@ +# osx install + +## macOS 12.6 or newer +```sh +# install homebrew +/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" +# install dependencies +brew install gnu-sed gcc portaudio git-lfs libjpeg-turbo python pre-commit + +# install uv +curl -LsSf https://astral.sh/uv/install.sh | sh && export PATH="$HOME/.local/bin:$PATH" +``` + +# Install Direnv (optional) + +[direnv](https://direnv.net) is a convenient helper that allows you to automatically enter your python env once you cd into the project dir. + +You can skip this step if you intend to activate manually. + +Following [direnv install docs](https://direnv.net/docs/installation.html) is recommended (many distros have native package support) + +but a quick oneliner binary install is +```sh +curl -sfL https://direnv.net/install.sh | bash +``` + +# Using DimOS as a library + +```sh +mkdir myproject +cd myproject + +uv venv --python "3.12" + +# if using direnv (recommended) +wget https://raw.githubusercontent.com/dimensionalOS/dimos/refs/heads/main/.envrc.venv -O .envrc +direnv allow +# if not using direnv +# source .venv/bin/activate + +# this will just pull everything (big checkout) +# depending on what you are working on you might not need everything, +# check your respective platform guides +uv pip install dimos[misc,sim,visualization,agents,web,perception,unitree,manipulation,cpu,dev] +``` + +# Developing on DimOS + +```sh +# this allows getting large files on-demand (and not pulling all immediately) +export GIT_LFS_SKIP_SMUDGE=1 +git clone -b dev https://github.com/dimensionalOS/dimos.git +cd dimos + +# create venv +uv venv --python 3.12 + +# if using direnv (recommended) +direnv allow + +# if not using direnv +# source .venv/bin/activate + +uv sync --all-extras + +# type check +uv run mypy dimos + +# tests (around a minute to run) +uv run pytest dimos +``` diff --git a/docs/installation/ubuntu.md b/docs/installation/ubuntu.md new file mode 100644 index 0000000000..af4a8b8d07 --- /dev/null +++ b/docs/installation/ubuntu.md @@ -0,0 +1,68 @@ +# System Dependencies Install (Ubuntu 22.04 or 24.04) + +```sh +sudo apt-get update +sudo apt-get install -y curl g++ portaudio19-dev git-lfs libturbojpeg python3-dev pre-commit + +# install uv +curl -LsSf https://astral.sh/uv/install.sh | sh && export PATH="$HOME/.local/bin:$PATH" +``` + +# Install Direnv (optional) + +[direnv](https://direnv.net) is a convenient helper that allows you to automatically enter your python env once you cd into the project dir. + +You can skip this step if you intend to activate manually. + +Following [direnv install docs](https://direnv.net/docs/installation.html) is recommended (many distros have native package support) + +but a quick oneliner binary install is +```sh +curl -sfL https://direnv.net/install.sh | bash +``` + +# Using DimOS as a library + +```sh +mkdir myproject +cd myproject + +uv venv --python "3.12" + +# if using direnv (recommended) +wget https://raw.githubusercontent.com/dimensionalOS/dimos/refs/heads/main/.envrc.venv -O .envrc +direnv allow +# if not using direnv +# source .venv/bin/activate + +# this will just pull everything (big checkout) +# depending on what you are working on you might not need everything, +# check your respective platform guides +uv pip install dimos[misc,sim,visualization,agents,web,perception,unitree,manipulation,cpu,dev] +``` + +# Developing on DimOS + +```sh +# this allows getting large files on-demand (and not pulling all immediately) +export GIT_LFS_SKIP_SMUDGE=1 +git clone -b dev https://github.com/dimensionalOS/dimos.git +cd dimos + +# create venv +uv venv --python 3.12 + +# if using direnv (recommended) +direnv allow + +# if not using direnv +# source .venv/bin/activate + +uv sync --all-extras + +# type check +uv run mypy dimos + +# tests (around a minute to run) +uv run pytest dimos +``` diff --git a/docs/platforms/quadruped/go2/index.md b/docs/platforms/quadruped/go2/index.md new file mode 100644 index 0000000000..3ce7c425de --- /dev/null +++ b/docs/platforms/quadruped/go2/index.md @@ -0,0 +1 @@ +# GO2 Getting started diff --git a/docs/todo.md b/docs/todo.md new file mode 100644 index 0000000000..464090415c --- /dev/null +++ b/docs/todo.md @@ -0,0 +1 @@ +# TODO diff --git a/docs/usage/data_streams/README.md b/docs/usage/data_streams/README.md new file mode 100644 index 0000000000..dc2ce6c91d --- /dev/null +++ b/docs/usage/data_streams/README.md @@ -0,0 +1,41 @@ +# Sensor Streams + +Dimos uses reactive streams (RxPY) to handle sensor data. This approach naturally fits robotics where multiple sensors emit data asynchronously at different rates, and downstream processors may be slower than the data sources. + +## Guides + +| Guide | Description | +|----------------------------------------------|---------------------------------------------------------------| +| [ReactiveX Fundamentals](reactivex.md) | Observables, subscriptions, and disposables | +| [Advanced Streams](advanced_streams.md) | Backpressure, parallel subscribers, synchronous getters | +| [Quality-Based Filtering](quality_filter.md) | Select highest quality frames when downsampling streams | +| [Temporal Alignment](temporal_alignment.md) | Match messages from multiple sensors by timestamp | +| [Storage & Replay](storage_replay.md) | Record sensor streams to disk and replay with original timing | + +## Quick Example + +```python +from reactivex import operators as ops +from dimos.utils.reactive import backpressure +from dimos.types.timestamped import align_timestamped +from dimos.msgs.sensor_msgs.Image import sharpness_barrier + +# Camera at 30fps, lidar at 10Hz +camera_stream = camera.observable() +lidar_stream = lidar.observable() + +# Pipeline: filter blurry frames -> align with lidar -> handle slow consumers +processed = ( + camera_stream.pipe( + sharpness_barrier(10.0), # Keep sharpest frame per 100ms window (10Hz) + ) +) + +aligned = align_timestamped( + backpressure(processed), # Camera as primary + lidar_stream, # Lidar as secondary + match_tolerance=0.1, +) + +aligned.subscribe(lambda pair: process_frame_with_pointcloud(*pair)) +``` diff --git a/docs/usage/data_streams/advanced_streams.md b/docs/usage/data_streams/advanced_streams.md new file mode 100644 index 0000000000..187d432af2 --- /dev/null +++ b/docs/usage/data_streams/advanced_streams.md @@ -0,0 +1,295 @@ +# Advanced Stream Handling + +> **Prerequisite:** Read [ReactiveX Fundamentals](reactivex.md) first for Observable basics. + +## Backpressure and Parallel Subscribers to Hardware + +In robotics, we deal with hardware that produces data at its own pace - a camera outputs 30fps whether you're ready or not. We can't tell the camera to slow down. And we often have multiple consumers: one module wants every frame for recording, another runs slow ML inference and only needs the latest frame. + +**The problem:** A fast producer can overwhelm a slow consumer, causing memory buildup or dropped frames. We might have multiple subscribers to the same hardware that operate at different speeds. + + +
Pikchr + +```pikchr fold output=assets/backpressure.svg +color = white +fill = none + +Fast: box "Camera" "60 fps" rad 5px fit wid 130% ht 130% +arrow right 0.4in +Queue: box "queue" rad 5px fit wid 170% ht 170% +arrow right 0.4in +Slow: box "ML Model" "2 fps" rad 5px fit wid 130% ht 130% + +text "items pile up!" at (Queue.x, Queue.y - 0.45in) +``` + +
+ + +![output](assets/backpressure.svg) + + +**The solution:** The `backpressure()` wrapper handles this by: + +1. **Sharing the source** - Camera runs once, all subscribers share the stream +2. **Per-subscriber speed** - Fast subscribers get every frame, slow ones get the latest when ready +3. **No blocking** - Slow subscribers never block the source or each other + +```python session=bp +import time +import reactivex as rx +from reactivex import operators as ops +from reactivex.scheduler import ThreadPoolScheduler +from dimos.utils.reactive import backpressure + +# We need this scaffolding here. Normally DimOS handles this. +scheduler = ThreadPoolScheduler(max_workers=4) + +# Simulate fast source +source = rx.interval(0.05).pipe(ops.take(20)) +safe = backpressure(source, scheduler=scheduler) + +fast_results = [] +slow_results = [] + +safe.subscribe(lambda x: fast_results.append(x)) + +def slow_handler(x): + time.sleep(0.15) + slow_results.append(x) + +safe.subscribe(slow_handler) + +time.sleep(1.5) +print(f"fast got {len(fast_results)} items: {fast_results[:5]}...") +print(f"slow got {len(slow_results)} items (skipped {len(fast_results) - len(slow_results)})") +scheduler.executor.shutdown(wait=True) +``` + + +``` +fast got 20 items: [0, 1, 2, 3, 4]... +slow got 7 items (skipped 13) +``` + +### How it works + + +
Pikchr + +```pikchr fold output=assets/backpressure_solution.svg +color = white +fill = none +linewid = 0.3in + +Source: box "Camera" "60 fps" rad 5px fit wid 170% ht 170% +arrow +Core: box "backpressure" rad 5px fit wid 170% ht 170% +arrow from Core.e right 0.3in then up 0.35in then right 0.3in +Fast: box "Fast Sub" rad 5px fit wid 170% ht 170% +arrow from Core.e right 0.3in then down 0.35in then right 0.3in +SlowPre: box "LATEST" rad 5px fit wid 170% ht 170% +arrow +Slow: box "Slow Sub" rad 5px fit wid 170% ht 170% +``` + +
+ + +![output](assets/backpressure_solution.svg) + +The `LATEST` strategy means: when the slow subscriber finishes processing, it gets whatever the most recent value is, skipping any values that arrived while it was busy. + +### Usage in modules + +Most module streams offer backpressured observables. + +```python session=bp +from dimos.core import Module, In +from dimos.msgs.sensor_msgs import Image + +class MLModel(Module): + color_image: In[Image] + def start(self): + # no reactivex, simple callback + self.color_image.subscribe(...) + # backpressured + self.color_image.observable().subscribe(...) + # non-backpressured - will pile up queue + self.color_image.pure_observable().subscribe(...) + + +``` + +## Getting Values Synchronously + +Sometimes you don't want a stream, you just want to call a function and get the latest value. + +If you are doing this periodically as a part of a processing loop, it is very likely that your code will be much cleaner and safer using actual reactivex pipeline. So bias towards checking our [reactivex quick guide](reactivex.md) and [official docs](https://rxpy.readthedocs.io/) + +(TODO we should actually make this example actually executable) + +```python skip + self.color_image.observable().pipe( + # takes the best image from a stream every 200ms, + # ensuring we are feeding our detector with highest quality frames + quality_barrier(lambda x: x["quality"], target_frequency=0.2), + + # converts Image into Person detections + ops.map(detect_person), + + # converts Detection2D to Twist pointing in the direction of a detection + ops.map(detection2d_to_twist), + + # emits the latest value every 50ms making our control loop run at 20hz + # despite detections running at 200ms + ops.sample(0.05), + ).subscribe(self.twist.publish) # shoots off the Twist out of the module +``` + + +If you'd still like to switch to synchronous fetching, we provide two approaches, `getter_hot()` and `getter_cold()` + +| | `getter_hot()` | `getter_cold()` | +|------------------|--------------------------------|----------------------------------| +| **Subscription** | Stays active in background | Fresh subscription each call | +| **Read speed** | Instant (value already cached) | Slower (waits for value) | +| **Resources** | Keeps connection open | Opens/closes each call | +| **Use when** | Frequent reads, need latest | Occasional reads, save resources | + +
+diagram source + +```pikchr fold output=assets/getter_hot_cold.svg +color = white +fill = none + +H_Title: box "getter_hot()" rad 5px fit wid 170% ht 170% + +Sub: box "subscribe" rad 5px fit wid 170% ht 170% with .n at H_Title.s + (0, -0.5in) +arrow from H_Title.s to Sub.n +arrow right from Sub.e +Cache: box "Cache" rad 5px fit wid 170% ht 170% + +# blocking box around subscribe->cache (one-time setup) +Blk0: box dashed color 0x5c9ff0 with .nw at Sub.nw + (-0.1in, 0.25in) wid (Cache.e.x - Sub.w.x + 0.2in) ht 0.7in rad 5px +text "blocking" italic with .n at Blk0.n + (0, -0.05in) + +arrow right from Cache.e +Getter: box "getter" rad 5px fit wid 170% ht 170% + +arrow from Getter.e right 0.3in then down 0.25in then right 0.2in +G1: box invis "call()" color 0x8cbdf2 fit wid 150% +arrow right 0.4in from G1.e +box invis "instant" fit wid 150% + +arrow from Getter.e right 0.3in then down 0.7in then right 0.2in +G2: box invis "call()" color 0x8cbdf2 fit wid 150% +arrow right 0.4in from G2.e +box invis "instant" fit wid 150% + +text "always subscribed" italic with .n at Blk0.s + (0, -0.1in) + + +# === getter_cold section === +C_Title: box "getter_cold()" rad 5px fit wid 170% ht 170% with .nw at H_Title.sw + (0, -1.6in) + +arrow down 0.3in from C_Title.s +ColdGetter: box "getter" rad 5px fit wid 170% ht 170% + +# Branch to first call +arrow from ColdGetter.e right 0.3in then down 0.3in then right 0.2in +Cold1: box invis "call()" color 0x8cbdf2 fit wid 150% +arrow right 0.4in from Cold1.e +Sub1: box invis "subscribe" fit wid 150% +arrow right 0.4in from Sub1.e +Wait1: box invis "wait" fit wid 150% +arrow right 0.4in from Wait1.e +Val1: box invis "value" fit wid 150% +arrow right 0.4in from Val1.e +Disp1: box invis "dispose " fit wid 150% + +# blocking box around first row +Blk1: box dashed color 0x5c9ff0 with .nw at Cold1.nw + (-0.1in, 0.25in) wid (Disp1.e.x - Cold1.w.x + 0.2in) ht 0.7in rad 5px +text "blocking" italic with .n at Blk1.n + (0, -0.05in) + +# Branch to second call +arrow from ColdGetter.e right 0.3in then down 1.2in then right 0.2in +Cold2: box invis "call()" color 0x8cbdf2 fit wid 150% +arrow right 0.4in from Cold2.e +Sub2: box invis "subscribe" fit wid 150% +arrow right 0.4in from Sub2.e +Wait2: box invis "wait" fit wid 150% +arrow right 0.4in from Wait2.e +Val2: box invis "value" fit wid 150% +arrow right 0.4in from Val2.e +Disp2: box invis "dispose " fit wid 150% + +# blocking box around second row +Blk2: box dashed color 0x5c9ff0 with .nw at Cold2.nw + (-0.1in, 0.25in) wid (Disp2.e.x - Cold2.w.x + 0.2in) ht 0.7in rad 5px +text "blocking" italic with .n at Blk2.n + (0, -0.05in) +``` + +
+ + +![output](assets/getter_hot_cold.svg) + + +**Prefer `getter_cold()`** when you can afford to wait and warmup isn't expensive. It's simpler (no cleanup needed) and doesn't hold resources. Only use `getter_hot()` when you need instant reads or the source is expensive to start. + +### `getter_hot()` - Background subscription, instant reads + +Subscribes immediately and keeps updating in the background. Each call returns the cached latest value instantly. + +```python session=sync +import time +import reactivex as rx +from reactivex import operators as ops +from dimos.utils.reactive import getter_hot + +source = rx.interval(0.1).pipe(ops.take(10)) + +get_val = getter_hot(source, timeout=5.0) # blocks until first message, with 5s timeout +# alternatively not to block (but get_val() might return None) +# get_val = getter_hot(source, nonblocking=True) + +print("first call:", get_val()) # instant - value already there +time.sleep(0.35) +print("after 350ms:", get_val()) # instant - returns cached latest +time.sleep(0.35) +print("after 700ms:", get_val()) + +get_val.dispose() # Don't forget to clean up! +``` + + +``` +first call: 0 +after 350ms: 3 +after 700ms: 6 +``` + +### `getter_cold()` - Fresh subscription each call + +Each call creates a new subscription, waits for one value, and cleans up. Slower but doesn't hold resources: + +```python session=sync +from dimos.utils.reactive import getter_cold + +source = rx.of(0, 1, 2, 3, 4) +get_val = getter_cold(source, timeout=5.0) + +# Each call creates fresh subscription, gets first value +print("call 1:", get_val()) # subscribes, gets 0, disposes +print("call 2:", get_val()) # subscribes again, gets 0, disposes +print("call 3:", get_val()) # subscribes again, gets 0, disposes +``` + + +``` +call 1: 0 +call 2: 0 +call 3: 0 +``` diff --git a/docs/usage/data_streams/assets/alignment_flow.svg b/docs/usage/data_streams/assets/alignment_flow.svg new file mode 100644 index 0000000000..72aeb337f3 --- /dev/null +++ b/docs/usage/data_streams/assets/alignment_flow.svg @@ -0,0 +1,22 @@ + + +Primary +arrives + + + +Check +secondaries + + + +Emit +match +all found + + + +Buffer +primary +waiting... + diff --git a/docs/usage/data_streams/assets/alignment_overview.svg b/docs/usage/data_streams/assets/alignment_overview.svg new file mode 100644 index 0000000000..8abada6d02 --- /dev/null +++ b/docs/usage/data_streams/assets/alignment_overview.svg @@ -0,0 +1,18 @@ + + +Camera +30 fps + + + +align_timestamped + +Lidar +10 Hz + + + + + +(image, pointcloud) + diff --git a/docs/usage/data_streams/assets/alignment_timeline.png b/docs/usage/data_streams/assets/alignment_timeline.png new file mode 100644 index 0000000000..235ddd7be0 --- /dev/null +++ b/docs/usage/data_streams/assets/alignment_timeline.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:cfea5a6aac40182b25decb9ddaeb387ed97a7708e2c51a48f47453c8df7adf57 +size 16136 diff --git a/docs/usage/data_streams/assets/alignment_timeline2.png b/docs/usage/data_streams/assets/alignment_timeline2.png new file mode 100644 index 0000000000..2bf8ec5eef --- /dev/null +++ b/docs/usage/data_streams/assets/alignment_timeline2.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:22b64923637d05f8f40c9f7c0f0597ee894dc4f31a0f10674aeb809101b54765 +size 23471 diff --git a/docs/usage/data_streams/assets/alignment_timeline3.png b/docs/usage/data_streams/assets/alignment_timeline3.png new file mode 100644 index 0000000000..61ddc3b54b --- /dev/null +++ b/docs/usage/data_streams/assets/alignment_timeline3.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:b8e9589dcd5308f511a2ec7d41bd36978204ccfe1441907bd139029b0489d605 +size 9969 diff --git a/docs/usage/data_streams/assets/backpressure.svg b/docs/usage/data_streams/assets/backpressure.svg new file mode 100644 index 0000000000..b3d69af6fb --- /dev/null +++ b/docs/usage/data_streams/assets/backpressure.svg @@ -0,0 +1,15 @@ + + +Camera +60 fps + + + +queue + + + +ML Model +2 fps +items pile up! + diff --git a/docs/usage/data_streams/assets/backpressure_solution.svg b/docs/usage/data_streams/assets/backpressure_solution.svg new file mode 100644 index 0000000000..454a8f460b --- /dev/null +++ b/docs/usage/data_streams/assets/backpressure_solution.svg @@ -0,0 +1,21 @@ + + +Camera +60 fps + + + +backpressure + + + +Fast Sub + + + +LATEST + + + +Slow Sub + diff --git a/docs/usage/data_streams/assets/frame_mosaic.jpg b/docs/usage/data_streams/assets/frame_mosaic.jpg new file mode 100644 index 0000000000..5c3fbf8350 --- /dev/null +++ b/docs/usage/data_streams/assets/frame_mosaic.jpg @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e83934e1179651fbca6c9b62cceb7425d1b2f0e8da18a63d4d95bcb4e6ac33ca +size 88206 diff --git a/docs/usage/data_streams/assets/frame_mosaic2.jpg b/docs/usage/data_streams/assets/frame_mosaic2.jpg new file mode 100644 index 0000000000..5e3032acf2 --- /dev/null +++ b/docs/usage/data_streams/assets/frame_mosaic2.jpg @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:2d73f683e92fda39bac9d1bb840f1fc375c821b4099714829e81f3e739f4d602 +size 91036 diff --git a/docs/usage/data_streams/assets/getter_hot_cold.svg b/docs/usage/data_streams/assets/getter_hot_cold.svg new file mode 100644 index 0000000000..d2f336459c --- /dev/null +++ b/docs/usage/data_streams/assets/getter_hot_cold.svg @@ -0,0 +1,71 @@ + + +getter_hot() + +subscribe + + + + + +Cache + +blocking + + + +getter + + +call() + + +instant + + +call() + + +instant +always subscribed + +getter_cold() + + + +getter + + +call() + + +subscribe + + +wait + + +value + + +dispose   + +blocking + + +call() + + +subscribe + + +wait + + +value + + +dispose   + +blocking + diff --git a/docs/usage/data_streams/assets/observable_flow.svg b/docs/usage/data_streams/assets/observable_flow.svg new file mode 100644 index 0000000000..d7e0e021d6 --- /dev/null +++ b/docs/usage/data_streams/assets/observable_flow.svg @@ -0,0 +1,16 @@ + + +observable + + + +.pipe(ops) + + + +.subscribe() + + + +callback + diff --git a/docs/usage/data_streams/assets/sharpness_graph.svg b/docs/usage/data_streams/assets/sharpness_graph.svg new file mode 100644 index 0000000000..3d61d12d7c --- /dev/null +++ b/docs/usage/data_streams/assets/sharpness_graph.svg @@ -0,0 +1,1414 @@ + + + + + + + + 1980-01-01T00:00:00+00:00 + image/svg+xml + + + Matplotlib v3.10.8, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/usage/data_streams/assets/sharpness_graph2.svg b/docs/usage/data_streams/assets/sharpness_graph2.svg new file mode 100644 index 0000000000..37c1032de0 --- /dev/null +++ b/docs/usage/data_streams/assets/sharpness_graph2.svg @@ -0,0 +1,1429 @@ + + + + + + + + 1980-01-01T00:00:00+00:00 + image/svg+xml + + + Matplotlib v3.10.8, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/usage/data_streams/quality_filter.md b/docs/usage/data_streams/quality_filter.md new file mode 100644 index 0000000000..db21da9c54 --- /dev/null +++ b/docs/usage/data_streams/quality_filter.md @@ -0,0 +1,316 @@ +# Quality-Based Stream Filtering + +When processing sensor streams, you often want to reduce frequency while keeping the best quality data. For discrete data like images that can't be averaged or merged, instead of blindly dropping frames, `quality_barrier` selects the highest quality item within each time window. + +## The Problem + +A camera outputs 30fps, but your ML model only needs 2fps. Simple approaches: + +- **`sample(0.5)`** - Takes whatever frame happens to land on the interval tick +- **`throttle_first(0.5)`** - Takes the first frame, ignores the rest + +Both ignore quality. You might get a blurry frame when a sharp one was available. + +## The Solution: `quality_barrier` + +```python session=qb +import reactivex as rx +from reactivex import operators as ops +from dimos.utils.reactive import quality_barrier + +# Simulated sensor data with quality scores +data = [ + {"id": 1, "quality": 0.3}, + {"id": 2, "quality": 0.9}, # best in first window + {"id": 3, "quality": 0.5}, + {"id": 4, "quality": 0.2}, + {"id": 5, "quality": 0.8}, # best in second window + {"id": 6, "quality": 0.4}, +] + +source = rx.of(*data) + +# Select best quality item per window (2 items per second = 0.5s windows) +result = source.pipe( + quality_barrier(lambda x: x["quality"], target_frequency=2.0), + ops.to_list(), +).run() + +print("Selected:", [r["id"] for r in result]) +print("Qualities:", [r["quality"] for r in result]) +``` + + +``` +Selected: [2] +Qualities: [0.9] +``` + +## Image Sharpness Filtering + +For camera streams, we provide `sharpness_barrier` which uses the image's sharpness score. + +Let's use real camera data from the Unitree Go2 robot to demonstrate. We use the [Sensor Storage & Replay](/docs/usage/sensor_streams/storage_replay.md) toolkit, which provides access to recorded robot data: + +```python session=qb +from dimos.utils.testing import TimedSensorReplay +from dimos.msgs.sensor_msgs.Image import Image, sharpness_barrier + +# Load recorded Go2 camera frames +video_replay = TimedSensorReplay("unitree_go2_bigoffice/video") + +# Use stream() with seek to skip blank frames, speed=10x to collect faster +input_frames = video_replay.stream(seek=5.0, duration=1.4, speed=10.0).pipe( + ops.to_list() +).run() + +def show_frames(frames): + for i, frame in enumerate(frames[:10]): + print(f" Frame {i}: {frame.sharpness:.3f}") + +print(f"Loaded {len(input_frames)} frames from Go2 camera") +print(f"Frame resolution: {input_frames[0].width}x{input_frames[0].height}") +print("Sharpness scores:") +show_frames(input_frames) +``` + + +``` +Loaded 20 frames from Go2 camera +Frame resolution: 1280x720 +Sharpness scores: + Frame 0: 0.351 + Frame 1: 0.227 + Frame 2: 0.223 + Frame 3: 0.267 + Frame 4: 0.295 + Frame 5: 0.307 + Frame 6: 0.328 + Frame 7: 0.348 + Frame 8: 0.346 + Frame 9: 0.322 +``` + +Using `sharpness_barrier` to select the sharpest frames: + +```python session=qb +# Create a stream from the recorded frames + +sharp_frames = video_replay.stream(seek=5.0, duration=1.5, speed=1.0).pipe( + sharpness_barrier(2.0), + ops.to_list() +).run() + +print(f"Output: {len(sharp_frames)} frame(s) (selected sharpest per window)") +show_frames(sharp_frames) +``` + + +``` +Output: 3 frame(s) (selected sharpest per window) + Frame 0: 0.351 + Frame 1: 0.352 + Frame 2: 0.360 +``` + +
+Visualization helpers + +```python session=qb fold no-result +import matplotlib +import matplotlib.pyplot as plt +import math + +def plot_mosaic(frames, selected, path, cols=5): + matplotlib.use('Agg') + rows = math.ceil(len(frames) / cols) + aspect = frames[0].width / frames[0].height + fig_w, fig_h = 12, 12 * rows / (cols * aspect) + + fig, axes = plt.subplots(rows, cols, figsize=(fig_w, fig_h)) + fig.patch.set_facecolor('black') + for i, ax in enumerate(axes.flat): + if i < len(frames): + ax.imshow(frames[i].data) + for spine in ax.spines.values(): + spine.set_color('lime' if frames[i] in selected else 'black') + spine.set_linewidth(4 if frames[i] in selected else 0) + ax.set_xticks([]); ax.set_yticks([]) + else: + ax.axis('off') + plt.subplots_adjust(wspace=0.02, hspace=0.02, left=0, right=1, top=1, bottom=0) + plt.savefig(path, facecolor='black', dpi=100, bbox_inches='tight', pad_inches=0) + plt.close() + +def plot_sharpness(frames, selected, path): + matplotlib.use('svg') + plt.style.use('dark_background') + sharpness = [f.sharpness for f in frames] + selected_idx = [i for i, f in enumerate(frames) if f in selected] + + plt.figure(figsize=(10, 3)) + plt.plot(sharpness, 'o-', label='All frames', color='#b5e4f4', alpha=0.7) + for i, idx in enumerate(selected_idx): + plt.axvline(x=idx, color='lime', linestyle='--', label='Selected' if i == 0 else None) + plt.xlabel('Frame'); plt.ylabel('Sharpness') + plt.xticks(range(len(sharpness))) + plt.legend(); plt.grid(alpha=0.3); plt.tight_layout() + plt.savefig(path, transparent=True) + plt.close() +``` + +
+ +Visualizing which frames were selected (green border = selected as sharpest in window): + +```python session=qb output=assets/frame_mosaic.jpg +plot_mosaic(input_frames, sharp_frames, '{output}') +``` + + +![output](assets/frame_mosaic.jpg) + +```python session=qb output=assets/sharpness_graph.svg +plot_sharpness(input_frames, sharp_frames, '{output}') +``` + + +![output](assets/sharpness_graph.svg) + +Let's request a higher frequency. + +```python session=qb +sharp_frames = video_replay.stream(seek=5.0, duration=1.5, speed=1.0).pipe( + sharpness_barrier(4.0), + ops.to_list() +).run() + +print(f"Output: {len(sharp_frames)} frame(s) (selected sharpest per window)") +show_frames(sharp_frames) +``` + + +``` +Output: 6 frame(s) (selected sharpest per window) + Frame 0: 0.351 + Frame 1: 0.348 + Frame 2: 0.346 + Frame 3: 0.352 + Frame 4: 0.360 + Frame 5: 0.329 +``` + +```python session=qb output=assets/frame_mosaic2.jpg +plot_mosaic(input_frames, sharp_frames, '{output}') +``` + + +![output](assets/frame_mosaic2.jpg) + + +```python session=qb output=assets/sharpness_graph2.svg +plot_sharpness(input_frames, sharp_frames, '{output}') +``` + + +![output](assets/sharpness_graph2.svg) + +As we can see the system is trying to strike a balance between requested frequency and quality that's available + +### Usage in Camera Module + +Here's how it's used in the actual camera module: + +```python skip +from dimos.core.module import Module + +class CameraModule(Module): + frequency: float = 2.0 # Target output frequency + @rpc + def start(self) -> None: + stream = self.hardware.image_stream() + + if self.config.frequency > 0: + stream = stream.pipe(sharpness_barrier(self.config.frequency)) + + self._disposables.add( + stream.subscribe(self.color_image.publish), + ) + +``` + +### How Sharpness is Calculated + +The sharpness score (0.0 to 1.0) is computed using Sobel edge detection: + +from [`Image.py`](/dimos/msgs/sensor_msgs/Image.py) + +```python session=qb +import cv2 + +# Get a frame and show the calculation +img = input_frames[10] +gray = img.to_grayscale() + +# Sobel gradients - use .data to get the underlying numpy array +sx = cv2.Sobel(gray.data, cv2.CV_32F, 1, 0, ksize=5) +sy = cv2.Sobel(gray.data, cv2.CV_32F, 0, 1, ksize=5) +magnitude = cv2.magnitude(sx, sy) + +print(f"Mean gradient magnitude: {magnitude.mean():.2f}") +print(f"Normalized sharpness: {img.sharpness:.3f}") +``` + + +``` +Mean gradient magnitude: 230.00 +Normalized sharpness: 0.332 +``` + +## Custom Quality Functions + +You can use `quality_barrier` with any quality metric: + +```python session=qb +# Example: select by "confidence" field +detections = [ + {"name": "cat", "confidence": 0.7}, + {"name": "dog", "confidence": 0.95}, # best + {"name": "bird", "confidence": 0.6}, +] + +result = rx.of(*detections).pipe( + quality_barrier(lambda d: d["confidence"], target_frequency=2.0), + ops.to_list(), +).run() + +print(f"Selected: {result[0]['name']} (conf: {result[0]['confidence']})") +``` + + +``` +Selected: dog (conf: 0.95) +``` + +## API Reference + +### `quality_barrier(quality_func, target_frequency)` + +RxPY pipe operator that selects the highest quality item within each time window. + +| Parameter | Type | Description | +|--------------------|------------------------|------------------------------------------------------| +| `quality_func` | `Callable[[T], float]` | Function that returns a quality score for each item | +| `target_frequency` | `float` | Output frequency in Hz (e.g., 2.0 for 2 items/second)| + +**Returns:** A pipe operator for use with `.pipe()` + +### `sharpness_barrier(target_frequency)` + +Convenience wrapper for images that uses `image.sharpness` as the quality function. + +| Parameter | Type | Description | +|--------------------|---------|--------------------------| +| `target_frequency` | `float` | Output frequency in Hz | + +**Returns:** A pipe operator for use with `.pipe()` diff --git a/docs/usage/data_streams/reactivex.md b/docs/usage/data_streams/reactivex.md new file mode 100644 index 0000000000..45873b471b --- /dev/null +++ b/docs/usage/data_streams/reactivex.md @@ -0,0 +1,494 @@ +# ReactiveX (RxPY) Quick Reference + +RxPY provides composable asynchronous data streams. This is a practical guide focused on common patterns in this codebase. + +## Quick Start: Using an Observable + +Given a function that returns an `Observable`, here's how to use it: + +```python session=rx +import reactivex as rx +from reactivex import operators as ops + +# Create an observable that emits 0,1,2,3,4 +source = rx.of(0, 1, 2, 3, 4) + +# Subscribe and print each value +received = [] +source.subscribe(lambda x: received.append(x)) +print("received:", received) +``` + + +``` +received: [0, 1, 2, 3, 4] +``` + +## The `.pipe()` Pattern + +Chain operators using `.pipe()`: + +```python session=rx +# Transform values: multiply by 2, then filter > 4 +result = [] + +# We build another observable. It's passive until `subscribe` is called. +observable = source.pipe( + ops.map(lambda x: x * 2), + ops.filter(lambda x: x > 4), +) + +observable.subscribe(lambda x: result.append(x)) + +print("transformed:", result) +``` + + +``` +transformed: [6, 8] +``` + +## Common Operators + +### Transform: `map` + +```python session=rx +rx.of(1, 2, 3).pipe( + ops.map(lambda x: f"item_{x}") +).subscribe(print) +``` + + +``` +item_1 +item_2 +item_3 + +``` + +### Filter: `filter` + +```python session=rx +rx.of(1, 2, 3, 4, 5).pipe( + ops.filter(lambda x: x % 2 == 0) +).subscribe(print) +``` + + +``` +2 +4 + +``` + +### Limit emissions: `take` + +```python session=rx +rx.of(1, 2, 3, 4, 5).pipe( + ops.take(3) +).subscribe(print) +``` + + +``` +1 +2 +3 + +``` + +### Flatten nested observables: `flat_map` + +```python session=rx +# For each input, emit multiple values +rx.of(1, 2).pipe( + ops.flat_map(lambda x: rx.of(x, x * 10, x * 100)) +).subscribe(print) +``` + + +``` +1 +10 +100 +2 +20 +200 + +``` + +## Rate Limiting + +### `sample(interval)` - Emit latest value every N seconds + +Takes the most recent value at each interval. Good for continuous streams where you want the freshest data. + +```python session=rx +# Use blocking .run() to collect results properly +results = rx.interval(0.05).pipe( + ops.take(10), + ops.sample(0.2), + ops.to_list(), +).run() +print("sample() got:", results) +``` + + +``` +sample() got: [2, 6, 9] +``` + +### `throttle_first(interval)` - Emit first, then block for N seconds + +Takes the first value then ignores subsequent values for the interval. Good for user input debouncing. + +```python session=rx +results = rx.interval(0.05).pipe( + ops.take(10), + ops.throttle_first(0.15), + ops.to_list(), +).run() +print("throttle_first() got:", results) +``` + + +``` +throttle_first() got: [0, 3, 6, 9] +``` + +### Difference Between `sample` and `throttle_first` + +```python session=rx +# sample: takes LATEST value at each interval tick +# throttle_first: takes FIRST value then blocks + +# With fast emissions (0,1,2,3,4,5,6,7,8,9) every 50ms: +# sample(0.2s) -> gets value at 200ms, 400ms marks -> [2, 6, 9] +# throttle_first(0.15s) -> gets 0, blocks, then 3, blocks, then 6... -> [0,3,6,9] +print("sample: latest value at each tick") +print("throttle_first: first value, then block") +``` + + +``` +sample: latest value at each tick +throttle_first: first value, then block +``` + + +## What is an Observable? + +An Observable is like a list, but instead of holding all values at once, it produces values over time. + +| | List | Iterator | Observable | +|-------------|-----------------------|-----------------------|------------------| +| **Values** | All exist now | Generated on demand | Arrive over time | +| **Control** | You pull (`for x in`) | You pull (`next()`) | Pushed to you | +| **Size** | Finite | Can be infinite | Can be infinite | +| **Async** | No | Yes (with asyncio) | Yes | +| **Cancel** | N/A | Stop calling `next()` | `.dispose()` | + +The key difference from iterators: with an Observable, **you don't control when values arrive**. A camera produces frames at 30fps whether you're ready or not. An iterator waits for you to call `next()`. + +**Observables are lazy.** An Observable is just a description of work to be done - it sits there doing nothing until you call `.subscribe()`. That's when it "wakes up" and starts producing values. + +This means you can build complex pipelines, pass them around, and nothing happens until someone subscribes. + +**The three things an Observable can tell you:** + +1. **"Here's a value"** (`on_next`) - A new value arrived +2. **"Something went wrong"** (`on_error`) - An error occurred, stream stops +3. **"I'm done"** (`on_completed`) - No more values coming + +**The basic pattern:** + +``` +observable.subscribe(what_to_do_with_each_value) +``` + +That's it. You create or receive an Observable, then subscribe to start receiving values. + +When you subscribe, data flows through a pipeline: + +
+diagram source + +```pikchr fold output=assets/observable_flow.svg +color = white +fill = none + +Obs: box "observable" rad 5px fit wid 170% ht 170% +arrow right 0.3in +Pipe: box ".pipe(ops)" rad 5px fit wid 170% ht 170% +arrow right 0.3in +Sub: box ".subscribe()" rad 5px fit wid 170% ht 170% +arrow right 0.3in +Handler: box "callback" rad 5px fit wid 170% ht 170% +``` + +
+ + +![output](assets/observable_flow.svg) + + +**Key property: Observables are lazy.** Nothing happens until you call `.subscribe()`. This means you can build up complex pipelines without any work being done, then start the flow when ready. + +Here's the full subscribe signature with all three callbacks: + +```python session=rx +rx.of(1, 2, 3).subscribe( + on_next=lambda x: print(f"value: {x}"), + on_error=lambda e: print(f"error: {e}"), + on_completed=lambda: print("done") +) +``` + + +``` +value: 1 +value: 2 +value: 3 +done + +``` + +## Disposables: Cancelling Subscriptions + +When you subscribe, you get back a `Disposable`. This is your "cancel button": + +```python session=rx +import reactivex as rx + +source = rx.interval(0.1) # emits 0, 1, 2, ... every 100ms forever +subscription = source.subscribe(lambda x: print(x)) + +# Later, when you're done: +subscription.dispose() # Stop receiving values, clean up resources +print("disposed") +``` + + +``` +disposed +``` + +**Why does this matter?** + +- Observables can be infinite (sensor feeds, websockets, timers) +- Without disposing, you leak memory and keep processing values forever +- Disposing also cleans up any resources the Observable opened (connections, file handles, etc.) + +**Rule of thumb:** Whenever you subscribe, save the disposable because you have to unsubscribe at some point by calling `disposable.dispose()`. + +**In dimos modules:** Every `Module` has a `self._disposables` (a `CompositeDisposable`) that automatically disposes everything when the module closes: + +```python session=rx +import time +from dimos.core import Module + +class MyModule(Module): + def start(self): + source = rx.interval(0.05) + self._disposables.add(source.subscribe(lambda x: print(f"got {x}"))) + +module = MyModule() +module.start() +time.sleep(0.25) + +# unsubscribes disposables +module.stop() +``` + + +``` +got 0 +got 1 +got 2 +got 3 +got 4 +``` + +## Creating Observables + +There are two common callback patterns in APIs. Use the appropriate helper: + +| Pattern | Example | Helper | +|---------|---------|--------| +| Register/unregister with same callback | `sensor.register(cb)` / `sensor.unregister(cb)` | `callback_to_observable` | +| Subscribe returns unsub function | `unsub = pubsub.subscribe(cb)` | `to_observable` | + +### From register/unregister APIs + +Use `callback_to_observable` when the API has separate register and unregister functions that take the same callback reference: + +```python session=create +import reactivex as rx +from reactivex import operators as ops +from dimos.utils.reactive import callback_to_observable + +class MockSensor: + def __init__(self): + self._callbacks = [] + def register(self, cb): + self._callbacks.append(cb) + def unregister(self, cb): + self._callbacks.remove(cb) + def emit(self, value): + for cb in self._callbacks: + cb(value) + +sensor = MockSensor() + +obs = callback_to_observable( + start=sensor.register, + stop=sensor.unregister +) + +received = [] +sub = obs.subscribe(lambda x: received.append(x)) + +sensor.emit("reading_1") +sensor.emit("reading_2") +print("received:", received) + +sub.dispose() +print("callbacks after dispose:", len(sensor._callbacks)) +``` + + +``` +received: ['reading_1', 'reading_2'] +callbacks after dispose: 0 +``` + +### From subscribe-returns-unsub APIs + +Use `to_observable` when the subscribe function returns an unsubscribe callable: + +```python session=create +from dimos.utils.reactive import to_observable + +class MockPubSub: + def __init__(self): + self._callbacks = [] + def subscribe(self, cb): + self._callbacks.append(cb) + return lambda: self._callbacks.remove(cb) # returns unsub function + def publish(self, value): + for cb in self._callbacks: + cb(value) + +pubsub = MockPubSub() + +obs = to_observable(pubsub.subscribe) + +received = [] +sub = obs.subscribe(lambda x: received.append(x)) + +pubsub.publish("msg_1") +pubsub.publish("msg_2") +print("received:", received) + +sub.dispose() +print("callbacks after dispose:", len(pubsub._callbacks)) +``` + + +``` +received: ['msg_1', 'msg_2'] +callbacks after dispose: 0 +``` + +### From scratch with `rx.create` + +```python session=create +from reactivex.disposable import Disposable + +def custom_subscribe(observer, scheduler=None): + observer.on_next("first") + observer.on_next("second") + observer.on_completed() + return Disposable(lambda: print("cleaned up")) + +obs = rx.create(custom_subscribe) + +results = [] +obs.subscribe( + on_next=lambda x: results.append(x), + on_completed=lambda: results.append("DONE") +) +print("results:", results) +``` + + +``` +cleaned up +results: ['first', 'second', 'DONE'] +``` + +## CompositeDisposable + +As we know we can always dispose subscriptions when done to prevent leaks: + +```python session=dispose +import time +import reactivex as rx +from reactivex import operators as ops + +source = rx.interval(0.1).pipe(ops.take(100)) +received = [] + +subscription = source.subscribe(lambda x: received.append(x)) +time.sleep(0.25) +subscription.dispose() +time.sleep(0.2) + +print(f"received {len(received)} items before dispose") +``` + + +``` +received 2 items before dispose +``` + +For multiple subscriptions, use `CompositeDisposable`: + +```python session=dispose +from reactivex.disposable import CompositeDisposable + +disposables = CompositeDisposable() + +s1 = rx.of(1,2,3).subscribe(lambda x: None) +s2 = rx.of(4,5,6).subscribe(lambda x: None) + +disposables.add(s1) +disposables.add(s2) + +print("subscriptions:", len(disposables)) +disposables.dispose() +print("after dispose:", disposables.is_disposed) +``` + + +``` +subscriptions: 2 +after dispose: True +``` + +## Reference + +| Operator | Purpose | Example | +|-----------------------|------------------------------------------|---------------------------------------| +| `map(fn)` | Transform each value | `ops.map(lambda x: x * 2)` | +| `filter(pred)` | Keep values matching predicate | `ops.filter(lambda x: x > 0)` | +| `take(n)` | Take first n values | `ops.take(10)` | +| `first()` | Take first value only | `ops.first()` | +| `sample(sec)` | Emit latest every interval | `ops.sample(0.5)` | +| `throttle_first(sec)` | Emit first, block for interval | `ops.throttle_first(0.5)` | +| `flat_map(fn)` | Map + flatten nested observables | `ops.flat_map(lambda x: rx.of(x, x))` | +| `observe_on(sched)` | Switch scheduler | `ops.observe_on(pool_scheduler)` | +| `replay(n)` | Cache last n values for late subscribers | `ops.replay(buffer_size=1)` | +| `timeout(sec)` | Error if no value within timeout | `ops.timeout(5.0)` | + +See [RxPY documentation](https://rxpy.readthedocs.io/) for complete operator reference. diff --git a/docs/usage/data_streams/storage_replay.md b/docs/usage/data_streams/storage_replay.md new file mode 100644 index 0000000000..c5cbe306a8 --- /dev/null +++ b/docs/usage/data_streams/storage_replay.md @@ -0,0 +1,231 @@ +# Sensor Storage and Replay + +Record sensor streams to disk and replay them with original timing. Useful for testing, debugging, and creating reproducible datasets. + +## Quick Start + +### Recording + +```python skip +from dimos.utils.testing.replay import TimedSensorStorage + +# Create storage (directory in data folder) +storage = TimedSensorStorage("my_recording") + +# Save frames from a stream +camera_stream.subscribe(storage.save_one) + +# Or save manually +storage.save(frame1, frame2, frame3) +``` + +### Replaying + +```python skip +from dimos.utils.testing.replay import TimedSensorReplay + +# Load recording +replay = TimedSensorReplay("my_recording") + +# Iterate at original speed +for frame in replay.iterate_realtime(): + process(frame) + +# Or as an Observable stream +replay.stream(speed=1.0).subscribe(process) +``` + +## TimedSensorStorage + +Stores sensor data with timestamps as pickle files. Each frame is saved as `000.pickle`, `001.pickle`, etc. + +```python skip +from dimos.utils.testing.replay import TimedSensorStorage + +storage = TimedSensorStorage("lidar_capture") + +# Save individual frames +storage.save_one(lidar_msg) # Returns frame count + +# Save multiple frames +storage.save(frame1, frame2, frame3) + +# Subscribe to a stream +lidar_stream.subscribe(storage.save_one) + +# Or pipe through (emits frame count) +lidar_stream.pipe( + ops.flat_map(storage.save_stream) +).subscribe() +``` + +**Storage location:** Files are saved to the data directory under the given name. The directory must not already contain pickle files (prevents accidental overwrites). + +**What gets stored:** By default, if a frame has a `.raw_msg` attribute, that's pickled instead of the full object. You can customize with the `autocast` parameter: + +```python skip +# Custom serialization +storage = TimedSensorStorage( + "custom_capture", + autocast=lambda frame: frame.to_dict() +) +``` + +## TimedSensorReplay + +Replays stored sensor data with timestamp-aware iteration and seeking. + +### Basic Iteration + +```python skip +from dimos.utils.testing.replay import TimedSensorReplay + +replay = TimedSensorReplay("lidar_capture") + +# Iterate all frames (ignores timing) +for frame in replay.iterate(): + process(frame) + +# Iterate with timestamps +for ts, frame in replay.iterate_ts(): + print(f"Frame at {ts}: {frame}") + +# Iterate with relative timestamps (from start) +for relative_ts, frame in replay.iterate_duration(): + print(f"At {relative_ts:.2f}s: {frame}") +``` + +### Realtime Playback + +```python skip +# Play at original speed (blocks between frames) +for frame in replay.iterate_realtime(): + process(frame) + +# Play at 2x speed +for frame in replay.iterate_realtime(speed=2.0): + process(frame) + +# Play at half speed +for frame in replay.iterate_realtime(speed=0.5): + process(frame) +``` + +### Seeking and Slicing + +```python skip +# Start 10 seconds into the recording +for ts, frame in replay.iterate_ts(seek=10.0): + process(frame) + +# Play only 5 seconds starting at 10s +for ts, frame in replay.iterate_ts(seek=10.0, duration=5.0): + process(frame) + +# Loop forever +for frame in replay.iterate(loop=True): + process(frame) +``` + +### Finding Specific Frames + +```python skip +# Find frame closest to absolute timestamp +frame = replay.find_closest(1704067200.0) + +# Find frame closest to relative time (30s from start) +frame = replay.find_closest_seek(30.0) + +# With tolerance (returns None if no match within 0.1s) +frame = replay.find_closest(timestamp, tolerance=0.1) +``` + +### Observable Stream + +The `.stream()` method returns an Observable that emits frames with original timing: + +```python skip +# Stream at original speed +replay.stream(speed=1.0).subscribe(process) + +# Stream at 2x with seeking +replay.stream( + speed=2.0, + seek=10.0, # Start 10s in + duration=30.0, # Play for 30s + loop=True # Loop forever +).subscribe(process) +``` + +## Usage: Stub Connections for Testing + +A common pattern is creating replay-based connection stubs for testing without hardware. From [`robot/unitree/go2/connection.py`](/dimos/robot/unitree/go2/connection.py#L83): + +This is a bit primitive. We'd like to write a higher-order API for recording full module I/O for any module, but this is a work in progress at the moment. + + +```python skip +class ReplayConnection(UnitreeWebRTCConnection): + dir_name = "unitree_go2_bigoffice" + + def __init__(self, **kwargs) -> None: + get_data(self.dir_name) + self.replay_config = { + "loop": kwargs.get("loop"), + "seek": kwargs.get("seek"), + "duration": kwargs.get("duration"), + } + + def lidar_stream(self): + lidar_store = TimedSensorReplay(f"{self.dir_name}/lidar") + return lidar_store.stream(**self.replay_config) + + def video_stream(self): + video_store = TimedSensorReplay(f"{self.dir_name}/video") + return video_store.stream(**self.replay_config) +``` + +This allows running the full perception pipeline against recorded data: + +```python skip +# Use replay connection instead of real hardware +connection = ReplayConnection(loop=True, seek=5.0) +robot = GO2Connection(connection=connection) +``` + +## Data Format + +Each pickle file contains a tuple `(timestamp, data)`: + +- **timestamp**: Unix timestamp (float) when the frame was captured +- **data**: The sensor data (or result of `autocast` if provided) + +Files are numbered sequentially: `000.pickle`, `001.pickle`, etc. + +Recordings are stored in the `data/` directory. See [Data Loading](/docs/development/large_file_management.md) for how data storage works, including Git LFS handling for large datasets. + +## API Reference + +### TimedSensorStorage + +| Method | Description | +|------------------------------|------------------------------------------| +| `save_one(frame)` | Save a single frame, returns frame count | +| `save(*frames)` | Save multiple frames | +| `save_stream(observable)` | Pipe an observable through storage | +| `consume_stream(observable)` | Subscribe and save without returning | + +### TimedSensorReplay + +| Method | Description | +|--------------------------------------------------|---------------------------------------| +| `iterate(loop=False)` | Iterate frames (no timing) | +| `iterate_ts(seek, duration, loop)` | Iterate with absolute timestamps | +| `iterate_duration(...)` | Iterate with relative timestamps | +| `iterate_realtime(speed, ...)` | Iterate with blocking to match timing | +| `stream(speed, seek, duration, loop)` | Observable with original timing | +| `find_closest(timestamp, tolerance)` | Find frame by absolute timestamp | +| `find_closest_seek(relative_seconds, tolerance)` | Find frame by relative time | +| `first()` | Get first frame | +| `first_timestamp()` | Get first timestamp | +| `load(name)` | Load specific frame by name/index | diff --git a/docs/usage/data_streams/temporal_alignment.md b/docs/usage/data_streams/temporal_alignment.md new file mode 100644 index 0000000000..66230c9d54 --- /dev/null +++ b/docs/usage/data_streams/temporal_alignment.md @@ -0,0 +1,313 @@ +# Temporal Message Alignment + +Robots have multiple sensors emitting data at different rates and latencies. A camera might run at 30fps, while lidar scans at 10Hz, and each has different processing delays. For perception tasks like projecting 2D detections into 3D pointclouds, we need to match data from these streams by timestamp. + +`align_timestamped` solves this by buffering messages and matching them within a time tolerance. + +
Pikchr + +```pikchr fold output=assets/alignment_overview.svg +color = white +fill = none + +Cam: box "Camera" "30 fps" rad 5px fit wid 170% ht 170% +arrow from Cam.e right 0.4in then down 0.35in then right 0.4in +Align: box "align_timestamped" rad 5px fit wid 170% ht 170% + +Lidar: box "Lidar" "10 Hz" rad 5px fit wid 170% ht 170% with .s at (Cam.s.x, Cam.s.y - 0.7in) +arrow from Lidar.e right 0.4in then up 0.35in then right 0.4in + +arrow from Align.e right 0.4in +Out: box "(image, pointcloud)" rad 5px fit wid 170% ht 170% +``` + +
+ + +![output](assets/alignment_overview.svg) + + +## Basic Usage + +Below we set up replay of real camera and lidar data from the Unitree Go2 robot. You can check it if you're interested. + +
+Stream Setup + +You can read more about [sensor storage here](storage_replay.md) and [LFS data storage here](/docs/development/large_file_management.md). + +```python session=align no-result +from reactivex import Subject +from dimos.utils.testing import TimedSensorReplay +from dimos.types.timestamped import Timestamped, align_timestamped +from reactivex import operators as ops +import reactivex as rx + +# Load recorded Go2 sensor streams +video_replay = TimedSensorReplay("unitree_go2_bigoffice/video") +lidar_replay = TimedSensorReplay("unitree_go2_bigoffice/lidar") + +# This is a bit tricky. We find the first video frame timestamp, then add 2 seconds to it. +seek_ts = video_replay.first_timestamp() + 2 + +# Lists to collect items as they flow through streams +video_frames = [] +lidar_scans = [] + +# We are using from_timestamp=... and not seek=... because seek seeks through recording +# timestamps, from_timestamp matches actual message timestamp. +# It's possible for sensor data to come in late, but with correct capture time timestamps +video_stream = video_replay.stream(from_timestamp=seek_ts, duration=2.0).pipe( + ops.do_action(lambda x: video_frames.append(x)) +) + +lidar_stream = lidar_replay.stream(from_timestamp=seek_ts, duration=2.0).pipe( + ops.do_action(lambda x: lidar_scans.append(x)) +) + +``` + + +
+ +Streams would normally come from an actual robot into your module via `In` inputs. [`detection/module3D.py`](/dimos/perception/detection/module3D.py#L11) is a good example of this. + +Assume we have them. Let's align them. + +```python session=align +# Align video (primary) with lidar (secondary) +# match_tolerance: max time difference for a match (seconds) +# buffer_size: how long to keep messages waiting for matches (seconds) +aligned_pairs = align_timestamped( + video_stream, + lidar_stream, + match_tolerance=0.025, # 25ms tolerance + buffer_size=5.0, # how long to wait for match +).pipe(ops.to_list()).run() + +print(f"Video: {len(video_frames)} frames, Lidar: {len(lidar_scans)} scans") +print(f"Aligned pairs: {len(aligned_pairs)} out of {len(video_frames)} video frames") + +# Show a matched pair +if aligned_pairs: + img, pc = aligned_pairs[0] + dt = abs(img.ts - pc.ts) + print(f"\nFirst matched pair: Δ{dt*1000:.1f}ms") +``` + + +``` +Video: 29 frames, Lidar: 15 scans +Aligned pairs: 11 out of 29 video frames + +First matched pair: Δ11.3ms +``` + +
+Visualization helper + +```python session=align fold no-result +import matplotlib +import matplotlib.pyplot as plt + +def plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, path): + """Single timeline: video above axis, lidar below, green lines for matches.""" + matplotlib.use('Agg') + plt.style.use('dark_background') + + # Get base timestamp for relative times (frames have .ts attribute) + base_ts = video_frames[0].ts + video_ts = [f.ts - base_ts for f in video_frames] + lidar_ts = [s.ts - base_ts for s in lidar_scans] + + # Find matched timestamps + matched_video_ts = set(img.ts for img, _ in aligned_pairs) + matched_lidar_ts = set(pc.ts for _, pc in aligned_pairs) + + fig, ax = plt.subplots(figsize=(12, 2.5)) + + # Video markers above axis (y=0.3) - circles, cyan when matched + for frame in video_frames: + rel_ts = frame.ts - base_ts + matched = frame.ts in matched_video_ts + ax.plot(rel_ts, 0.3, 'o', color='cyan' if matched else '#688', markersize=8) + + # Lidar markers below axis (y=-0.3) - squares, orange when matched + for scan in lidar_scans: + rel_ts = scan.ts - base_ts + matched = scan.ts in matched_lidar_ts + ax.plot(rel_ts, -0.3, 's', color='orange' if matched else '#a86', markersize=8) + + # Green lines connecting matched pairs + for img, pc in aligned_pairs: + img_rel = img.ts - base_ts + pc_rel = pc.ts - base_ts + ax.plot([img_rel, pc_rel], [0.3, -0.3], '-', color='lime', alpha=0.6, linewidth=1) + + # Axis styling + ax.axhline(y=0, color='white', linewidth=0.5, alpha=0.3) + ax.set_xlim(-0.1, max(video_ts + lidar_ts) + 0.1) + ax.set_ylim(-0.6, 0.6) + ax.set_xlabel('Time (s)') + ax.set_yticks([0.3, -0.3]) + ax.set_yticklabels(['Video', 'Lidar']) + ax.set_title(f'{len(aligned_pairs)} matched from {len(video_frames)} video + {len(lidar_scans)} lidar') + plt.tight_layout() + plt.savefig(path, transparent=True) + plt.close() +``` + +
+ +```python session=align output=assets/alignment_timeline.png +plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, '{output}') +``` + + +![output](assets/alignment_timeline.png) + +If we loosen up our match tolerance, we might get multiple pairs matching the same lidar frame. + +```python session=align +aligned_pairs = align_timestamped( + video_stream, + lidar_stream, + match_tolerance=0.05, # 50ms tolerance + buffer_size=5.0, # how long to wait for match +).pipe(ops.to_list()).run() + +print(f"Video: {len(video_frames)} frames, Lidar: {len(lidar_scans)} scans") +print(f"Aligned pairs: {len(aligned_pairs)} out of {len(video_frames)} video frames") +``` + + +``` +Video: 58 frames, Lidar: 30 scans +Aligned pairs: 23 out of 58 video frames +``` + + +```python session=align output=assets/alignment_timeline2.png +plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, '{output}') +``` + + +![output](assets/alignment_timeline2.png) + +## Combine Frame Alignment with a Quality Filter + +More on [quality filtering here](quality_filter.md). + +```python session=align +from dimos.msgs.sensor_msgs.Image import Image, sharpness_barrier + +# Lists to collect items as they flow through streams +video_frames = [] +lidar_scans = [] + +video_stream = video_replay.stream(from_timestamp=seek_ts, duration=2.0).pipe( + sharpness_barrier(3.0), + ops.do_action(lambda x: video_frames.append(x)) +) + +lidar_stream = lidar_replay.stream(from_timestamp=seek_ts, duration=2.0).pipe( + ops.do_action(lambda x: lidar_scans.append(x)) +) + +aligned_pairs = align_timestamped( + video_stream, + lidar_stream, + match_tolerance=0.025, # 25ms tolerance + buffer_size=5.0, # how long to wait for match +).pipe(ops.to_list()).run() + +print(f"Video: {len(video_frames)} frames, Lidar: {len(lidar_scans)} scans") +print(f"Aligned pairs: {len(aligned_pairs)} out of {len(video_frames)} video frames") + +``` + + +``` +Video: 6 frames, Lidar: 15 scans +Aligned pairs: 1 out of 6 video frames +``` + +```python session=align output=assets/alignment_timeline3.png +plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, '{output}') +``` + + +![output](assets/alignment_timeline3.png) + +We are very picky but data is high quality. Best frame, with closest lidar match in this window. + +## How It Works + +The primary stream (first argument) drives emissions. When a primary message arrives: + +1. **Immediate match**: If matching secondaries already exist in buffers, emit immediately +2. **Deferred match**: If secondaries are missing, buffer the primary and wait + +When secondary messages arrive: +1. Add to buffer for future primary matches +2. Check buffered primaries - if this completes a match, emit + +
+diagram source + +```pikchr fold output=assets/alignment_flow.svg +color = white +fill = none +linewid = 0.35in + +Primary: box "Primary" "arrives" rad 5px fit wid 170% ht 170% +arrow +Check: box "Check" "secondaries" rad 5px fit wid 170% ht 170% + +arrow from Check.e right 0.35in then up 0.4in then right 0.35in +Emit: box "Emit" "match" rad 5px fit wid 170% ht 170% +text "all found" at (Emit.w.x - 0.4in, Emit.w.y + 0.15in) + +arrow from Check.e right 0.35in then down 0.4in then right 0.35in +Buffer: box "Buffer" "primary" rad 5px fit wid 170% ht 170% +text "waiting..." at (Buffer.w.x - 0.4in, Buffer.w.y - 0.15in) +``` + +
+ + +![output](assets/alignment_flow.svg) + +## Parameters + +| Parameter | Type | Default | Description | +|--------------------------|--------------------|----------|-------------------------------------------------| +| `primary_observable` | `Observable[T]` | required | Primary stream that drives output timing | +| `*secondary_observables` | `Observable[S]...` | required | One or more secondary streams to align | +| `match_tolerance` | `float` | 0.1 | Max time difference for a match (seconds) | +| `buffer_size` | `float` | 1.0 | How long to buffer unmatched messages (seconds) | + + + +## Usage in Modules + +Every module `In` port exposes an `.observable()` method that returns a backpressured stream of incoming messages. This makes it easy to align inputs from multiple sensors. + +From [`detection/module3D.py`](/dimos/perception/detection/module3D.py), projecting 2D detections into 3D pointclouds: + +```python skip +class Detection3DModule(Detection2DModule): + color_image: In[Image] + pointcloud: In[PointCloud2] + + def start(self): + # Align 2D detections with pointcloud data + self.detection_stream_3d = align_timestamped( + backpressure(self.detection_stream_2d()), + self.pointcloud.observable(), + match_tolerance=0.25, + buffer_size=20.0, + ).pipe(ops.map(detection2d_to_3d)) +``` + +The 2D detection stream (camera + ML model) is the primary, matched with raw pointcloud data from lidar. The longer `buffer_size=20.0` accounts for variable ML inference times. diff --git a/docs/usage/transports/dds.md b/docs/usage/transports/dds.md new file mode 100644 index 0000000000..1aec0bafe5 --- /dev/null +++ b/docs/usage/transports/dds.md @@ -0,0 +1,26 @@ +# Installing DDS Transport Libs on Ubuntu + +The `dds` extra provides DDS (Data Distribution Service) transport support via [Eclipse Cyclone DDS](https://cyclonedds.io/docs/cyclonedds-python/latest/). This requires installing system libraries before the Python package can be built. + +```bash +# Install the CycloneDDS development library +sudo apt install cyclonedds-dev + +# Create a compatibility directory structure +# (required because Ubuntu's multiarch layout doesn't match the expected CMake layout) +sudo mkdir -p /opt/cyclonedds/{lib,bin,include} +sudo ln -sf /usr/lib/x86_64-linux-gnu/libddsc.so* /opt/cyclonedds/lib/ +sudo ln -sf /usr/lib/x86_64-linux-gnu/libcycloneddsidl.so* /opt/cyclonedds/lib/ +sudo ln -sf /usr/bin/idlc /opt/cyclonedds/bin/ +sudo ln -sf /usr/bin/ddsperf /opt/cyclonedds/bin/ +sudo ln -sf /usr/include/dds /opt/cyclonedds/include/ + +# Install with the dds extra +CYCLONEDDS_HOME=/opt/cyclonedds uv pip install -e '.[dds]' +``` + +To install all extras including DDS: + +```bash +CYCLONEDDS_HOME=/opt/cyclonedds uv sync --extra dds +``` diff --git a/docs/usage/transports/index.md b/docs/usage/transports/index.md new file mode 100644 index 0000000000..748cf03aa1 --- /dev/null +++ b/docs/usage/transports/index.md @@ -0,0 +1,437 @@ +# Transports + +Transports connect **module streams** across **process boundaries** and/or **networks**. + +* **Module**: a running component (e.g., camera, mapping, nav). +* **Stream**: a unidirectional flow of messages owned by a module (one broadcaster → many receivers). +* **Topic**: the name/identifier used by a transport or pubsub backend. +* **Message**: payload carried on a stream (often `dimos.msgs.*`, but can be bytes / images / pointclouds / etc.). + +Each edge in the graph is a **transported stream** (potentially different protocols). Each node is a **module**: + +![go2_nav](../assets/go2_nav.svg) + +## What the transport layer guarantees (and what it doesn’t) + +Modules **don’t** know or care *how* data moves. They just: + +* emit messages (broadcast) +* subscribe to messages (receive) + +A transport is responsible for the mechanics of delivery (IPC, sockets, Redis, ROS 2, etc.). + +**Important:** delivery semantics depend on the backend: + +* Some are **best-effort** (e.g., UDP multicast / LCM): loss can happen. +* Some can be **reliable** (e.g., TCP-backed, Redis, some DDS configs) but may add latency/backpressure. + +So: treat the API as uniform, but pick a backend whose semantics match the task. + +--- + +## Benchmarks + +Quick view on performance of our pubsub backends: + +```sh skip +python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py +``` + +![Benchmark results](../assets/pubsub_benchmark.png) + +--- + +## Abstraction layers + +
Pikchr + +```pikchr output=../assets/abstraction_layers.svg fold +color = white +fill = none +linewid = 0.5in +boxwid = 1.0in +boxht = 0.4in + +# Boxes with labels +B: box "Blueprints" rad 10px +arrow +M: box "Modules" rad 5px +arrow +T: box "Transports" rad 5px +arrow +P: box "PubSub" rad 5px + +# Descriptions below +text "robot configs" at B.s + (0.1, -0.2in) +text "camera, nav" at M.s + (0, -0.2in) +text "LCM, SHM, ROS" at T.s + (0, -0.2in) +text "pub/sub API" at P.s + (0, -0.2in) +``` + +
+ + +![output](../assets/abstraction_layers.svg) + +We’ll go through these layers top-down. + +--- + +## Using transports with blueprints + +See [Blueprints](blueprints.md) for the blueprint API. + +From [`unitree/go2/blueprints/__init__.py`](/dimos/robot/unitree/go2/blueprints/__init__.py). + +Example: rebind a few streams from the default `LCMTransport` to `ROSTransport` (defined at [`transport.py`](/dimos/core/transport.py#L226)) so you can visualize in **rviz2**. + +```python skip +nav = autoconnect( + basic, + voxel_mapper(voxel_size=0.1), + cost_mapper(), + replanning_a_star_planner(), + wavefront_frontier_explorer(), +).global_config(n_dask_workers=6, robot_model="unitree_go2") + +ros = nav.transports( + { + ("lidar", PointCloud2): ROSTransport("lidar", PointCloud2), + ("global_map", PointCloud2): ROSTransport("global_map", PointCloud2), + ("odom", PoseStamped): ROSTransport("odom", PoseStamped), + ("color_image", Image): ROSTransport("color_image", Image), + } +) +``` + +--- + +## Using transports with modules + +Each **stream** on a module can use a different transport. Set `.transport` on the stream **before starting** modules. + +```python ansi=false +import time + +from dimos.core import In, Module, start +from dimos.core.transport import LCMTransport +from dimos.hardware.sensors.camera.module import CameraModule +from dimos.msgs.sensor_msgs import Image + + +class ImageListener(Module): + image: In[Image] + + def start(self): + super().start() + self.image.subscribe(lambda img: print(f"Received: {img.shape}")) + + +if __name__ == "__main__": + # Start local cluster and deploy modules to separate processes + dimos = start(2) + + camera = dimos.deploy(CameraModule, frequency=2.0) + listener = dimos.deploy(ImageListener) + + # Choose a transport for the stream (example: LCM typed channel) + camera.color_image.transport = LCMTransport("/camera/rgb", Image) + + # Connect listener input to camera output + listener.image.connect(camera.color_image) + + camera.start() + listener.start() + + time.sleep(2) + dimos.stop() +``` + + + +``` +Initialized dimos local cluster with 2 workers, memory limit: auto +2026-01-24T13:17:50.190559Z [info ] Deploying module. [dimos/core/__init__.py] module=CameraModule +2026-01-24T13:17:50.218466Z [info ] Deployed module. [dimos/core/__init__.py] module=CameraModule worker_id=1 +2026-01-24T13:17:50.229474Z [info ] Deploying module. [dimos/core/__init__.py] module=ImageListener +2026-01-24T13:17:50.250199Z [info ] Deployed module. [dimos/core/__init__.py] module=ImageListener worker_id=0 +Received: (480, 640, 3) +Received: (480, 640, 3) +Received: (480, 640, 3) +``` + +See [Modules](modules.md) for more on module architecture. + +--- + +## Inspecting LCM traffic (CLI) + +`lcmspy` shows topic frequency/bandwidth stats: + +![lcmspy](../assets/lcmspy.png) + +`dimos topic echo /topic` listens on typed channels like `/topic#pkg.Msg` and decodes automatically: + +```sh skip +Listening on /camera/rgb (inferring from typed LCM channels like '/camera/rgb#pkg.Msg')... (Ctrl+C to stop) +Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) +``` + +--- + +## Implementing a transport + +At the stream layer, a transport is implemented by subclassing `Transport` (see [`core/stream.py`](/dimos/core/stream.py#L83)) and implementing: + +* `broadcast(...)` +* `subscribe(...)` + +Your `Transport.__init__` args can be anything meaningful for your backend: + +* `(ip, port)` +* a shared-memory segment name +* a filesystem path +* a Redis channel + +Encoding is an implementation detail, but we encourage using LCM-compatible message types when possible. + +### Encoding helpers + +Many of our message types provide `lcm_encode` / `lcm_decode` for compact, language-agnostic binary encoding (often faster than pickle). For details, see [LCM](/docs/usage/lcm.md). + +--- + +## PubSub transports + +Even though transport can be anything (TCP connection, unix socket) for now all our transport backends implement the `PubSub` interface. + +* `publish(topic, message)` +* `subscribe(topic, callback) -> unsubscribe` + +```python +from dimos.protocol.pubsub.spec import PubSub +import inspect + +print(inspect.getsource(PubSub.publish)) +print(inspect.getsource(PubSub.subscribe)) +``` + + +```python + @abstractmethod + def publish(self, topic: TopicT, message: MsgT) -> None: + """Publish a message to a topic.""" + ... + + @abstractmethod + def subscribe( + self, topic: TopicT, callback: Callable[[MsgT, TopicT], None] + ) -> Callable[[], None]: + """Subscribe to a topic with a callback. returns unsubscribe function""" + ... +``` + +Topic/message types are flexible: bytes, JSON, or our ROS-compatible [LCM](/docs/usage/lcm.md) types. We also have pickle-based transports for arbitrary Python objects. + +### LCM (UDP multicast) + +LCM is UDP multicast. It’s very fast on a robot LAN, but it’s **best-effort** (packets can drop). +For local emission it autoconfigures system in a way in which it's more robust and faster then other more common protocols like ROS, DDS + +```python +from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.msgs.geometry_msgs import Vector3 + +lcm = LCM(autoconf=True) +lcm.start() + +received = [] +topic = Topic("/robot/velocity", Vector3) + +lcm.subscribe(topic, lambda msg, t: received.append(msg)) +lcm.publish(topic, Vector3(1.0, 0.0, 0.5)) + +import time +time.sleep(0.1) + +print(f"Received velocity: x={received[0].x}, y={received[0].y}, z={received[0].z}") +lcm.stop() +``` + + +``` +Received velocity: x=1.0, y=0.0, z=0.5 +``` + +### Shared memory (IPC) + +Shared memory is highest performance, but only works on the **same machine**. + +```python +from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory + +shm = PickleSharedMemory(prefer="cpu") +shm.start() + +received = [] +shm.subscribe("test/topic", lambda msg, topic: received.append(msg)) +shm.publish("test/topic", {"data": [1, 2, 3]}) + +import time +time.sleep(0.1) + +print(f"Received: {received}") +shm.stop() +``` + + +``` +Received: [{'data': [1, 2, 3]}] +``` + +### DDS Transport + +For network communication, DDS uses the Data Distribution Service (DDS) protocol: + +```python session=dds_demo ansi=false +from dataclasses import dataclass +from cyclonedds.idl import IdlStruct + +from dimos.protocol.pubsub.impl.ddspubsub import DDS, Topic + +@dataclass +class SensorReading(IdlStruct): + value: float + +dds = DDS() +dds.start() + +received = [] +sensor_topic = Topic(name="sensors/temperature", data_type=SensorReading) + +dds.subscribe(sensor_topic, lambda msg, t: received.append(msg)) +dds.publish(sensor_topic, SensorReading(value=22.5)) + +import time +time.sleep(0.1) + +print(f"Received: {received}") +dds.stop() +``` + + +``` +Received: [SensorReading(value=22.5)] +``` + +--- + +## A minimal transport: `Memory` + +The simplest toy backend is `Memory` (single process). Start from there when implementing a new pubsub backend. + +```python +from dimos.protocol.pubsub.memory import Memory + +bus = Memory() +received = [] + +unsubscribe = bus.subscribe("sensor/data", lambda msg, topic: received.append(msg)) + +bus.publish("sensor/data", {"temperature": 22.5}) +bus.publish("sensor/data", {"temperature": 23.0}) + +print(f"Received {len(received)} messages:") +for msg in received: + print(f" {msg}") + +unsubscribe() +``` + + +``` +Received 2 messages: + {'temperature': 22.5} + {'temperature': 23.0} +``` + +See [`memory.py`](/dimos/protocol/pubsub/impl/memory.py) for the complete source. + +--- + +## Encode/decode mixins + +Transports often need to serialize messages before sending and deserialize after receiving. + +`PubSubEncoderMixin` at [`pubsub/spec.py`](/dimos/protocol/pubsub/spec.py#L95) provides a clean way to add encoding/decoding to any pubsub implementation. + +### Available mixins + +| Mixin | Encoding | Use case | +|----------------------|-----------------|------------------------------------| +| `PickleEncoderMixin` | Python pickle | Any Python object, Python-only | +| `LCMEncoderMixin` | LCM binary | Cross-language (C/C++/Python/Go/…) | +| `JpegEncoderMixin` | JPEG compressed | Image data, reduces bandwidth | + +`LCMEncoderMixin` is especially useful: you can use LCM message definitions with *any* transport (not just UDP multicast). See [LCM](/docs/usage/lcm.md) for details. + +### Creating a custom mixin + +```python session=jsonencoder no-result +from dimos.protocol.pubsub.spec import PubSubEncoderMixin +import json + +class JsonEncoderMixin(PubSubEncoderMixin[str, dict, bytes]): + def encode(self, msg: dict, topic: str) -> bytes: + return json.dumps(msg).encode("utf-8") + + def decode(self, msg: bytes, topic: str) -> dict: + return json.loads(msg.decode("utf-8")) +``` + +Combine with a pubsub implementation via multiple inheritance: + +```python session=jsonencoder no-result +from dimos.protocol.pubsub.memory import Memory + +class MyJsonPubSub(JsonEncoderMixin, Memory): + pass +``` + +Swap serialization by changing the mixin: + +```python session=jsonencoder no-result +from dimos.protocol.pubsub.spec import PickleEncoderMixin + +class MyPicklePubSub(PickleEncoderMixin, Memory): + pass +``` + +--- + +## Testing and benchmarks + +### Spec tests + +See [`pubsub/test_spec.py`](/dimos/protocol/pubsub/test_spec.py) for the grid tests your new backend should pass. + +### Benchmarks + +Add your backend to benchmarks to compare in context: + +```sh skip +python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py +``` + +--- + +# Available transports + +| Transport | Use case | Cross-process | Network | Notes | +|----------------|-------------------------------------|---------------|---------|--------------------------------------| +| `Memory` | Testing only, single process | No | No | Minimal reference impl | +| `SharedMemory` | Multi-process on same machine | Yes | No | Highest throughput (IPC) | +| `LCM` | Robot LAN broadcast (UDP multicast) | Yes | Yes | Best-effort; can drop packets on LAN | +| `Redis` | Network pubsub via Redis server | Yes | Yes | Central broker; adds hop | +| `ROS` | ROS 2 topic communication | Yes | Yes | Integrates with RViz/ROS tools | +| `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | WIP |