diff --git a/README.md b/README.md
index 9a74d63aa7..c2f9992abd 100644
--- a/README.md
+++ b/README.md
@@ -1,502 +1,222 @@
-
-
-
-
-
-
- A simple two-shot PlanningAgent
- |
-
-
- 3rd person POV
- |
-
-
-
-
-# The Dimensional Framework
-*The universal framework for AI-native generalist robotics*
+
+ The Open-Source Framework for Robotic Intelligence
-## What is Dimensional?
-Dimensional is an open-source framework for building agentive generalist robots. DimOS allows off-the-shelf Agents to call tools/functions and read sensor/state data directly from ROS.
+
-The framework enables neurosymbolic orchestration of Agents as generalized spatial reasoners/planners and Robot state/action primitives as functions.
+[](https://discord.gg/8m6HMArf)
+[](https://github.com/dimensionalOS/dimos/stargazers)
+[](https://github.com/dimensionalOS/dimos/fork)
+[](https://github.com/dimensionalOS/dimos/graphs/contributors)
+
+
+
+
+[](https://www.docker.com/)
-The result: cross-embodied *"Dimensional Applications"* exceptional at generalization and robust at symbolic action execution.
+
+ Key Features β’
+ How To Use β’
+ Contributing β’ License
+
-## DIMOS x Unitree Go2 (OUT OF DATE)
+
-We are shipping a first look at the DIMOS x Unitree Go2 integration, allowing for off-the-shelf Agents() to "call" Unitree ROS2 Nodes and WebRTC action primitives, including:
+> \[!NOTE]
+>
+> **Active Beta: Expect Breaking Changes**
-- Navigation control primitives (move, reverse, spinLeft, spinRight, etc.)
-- WebRTC control primitives (FrontPounce, FrontFlip, FrontJump, etc.)
-- Camera feeds (image_raw, compressed_image, etc.)
-- IMU data
-- State information
-- Lidar / PointCloud primitives
-- Any other generic Unitree ROS2 topics
+# What is Dimensional?
-### Features
+DimOS is both a language-agnostic framework and a Python-first library for robot control. It has optional ROS integration and is designed to let AI agents invoke tools (skills), directly access sensor and state data, and generate complex emergent behaviors.
-- **DimOS Agents**
- - Agent() classes with planning, spatial reasoning, and Robot.Skill() function calling abilities.
- - Integrate with any off-the-shelf hosted or local model: OpenAIAgent, ClaudeAgent, GeminiAgent π§, DeepSeekAgent π§, HuggingFaceRemoteAgent, HuggingFaceLocalAgent, etc.
- - Modular agent architecture for easy extensibility and chaining of Agent output --> Subagents input.
- - Agent spatial / language memory for location grounded reasoning and recall.
+The python library comes with a rich set of integrations; visualizers, spatial reasoners, planners, simulators (mujoco, Isaac Sim, etc.), robot state/action primitives, and more.
-- **DimOS Infrastructure**
- - A reactive data streaming architecture using RxPY to manage real-time video (or other sensor input), outbound commands, and inbound robot state between the DimOS interface, Agents, and ROS2.
- - Robot Command Queue to handle complex multi-step actions to Robot.
- - Simulation bindings (Genesis, Isaacsim, etc.) to test your agentive application before deploying to a physical robot.
+# How do I get started?
-- **DimOS Interface / Development Tools**
- - Local development interface to control your robot, orchestrate agents, visualize camera/lidar streams, and debug your dimensional agentive application.
+### Installation
-## MacOS Installation
+- Linux is supported, with tests being performed on Ubuntu 22.04 and 24.04
+- MacOS support is in beta, you're welcome to try it *but expect inconsistent/flakey behavior (rather than errors/crashing)*
+ - instead of the apt-get command below run: `brew install gnu-sed gcc portaudio git-lfs libjpeg-turbo python`
```sh
-# Install Nix
-curl --proto '=https' --tlsv1.2 -sSf -L https://install.determinate.systems/nix | sh -s -- install
-
-# clone the repository
-git clone --branch dev --single-branch https://github.com/dimensionalOS/dimos.git
-
-# setup the environment (follow the prompts after nix develop)
-cd dimos
-nix develop
-
-# You should be able to follow the instructions below as well for a more manual installation
-```
-
----
-## Python Installation
-Tested on Ubuntu 22.04/24.04
-
-```bash
-sudo apt install python3-venv
-
-# Clone the repository
-git clone --branch dev --single-branch https://github.com/dimensionalOS/dimos.git
-cd dimos
-
-# Create and activate virtual environment
-python3 -m venv venv
-source venv/bin/activate
-
-sudo apt install portaudio19-dev python3-pyaudio
-
-# Install LFS
-sudo apt install git-lfs
-git lfs install
+sudo apt-get update
+sudo apt-get install -y curl g++ portaudio19-dev git-lfs libturbojpeg python3-dev
+# install uv for python
+curl -LsSf https://astral.sh/uv/install.sh | sh && export PATH="$HOME/.local/bin:$PATH"
+
+#
+# NOTE!!! the first time, you're going to have an empty/black rerun window for a while
+#
+# the command needs to download the replay file (2.4gb), which takes a bit
+
+# OPTION 1: install dimos in a virtualenv
+uv venv && . .venv/bin/activate
+uv pip install 'dimos[base,unitree]'
+# replay recorded data to test that the system is working
+dimos --replay run unitree-go2
-# Install torch and torchvision if not already installed
-# Example CUDA 11.7, Pytorch 2.0.1 (replace with your required pytorch version if different)
-pip install torch==2.0.1 torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
+# OPTION 2: if you want to test out dimos without installing run:
+uvx --from 'dimos[base,unitree]' dimos --replay run unitree-go2
```
-#### Install dependencies
-```bash
-# CPU only (reccomended to attempt first)
-pip install -e '.[cpu,dev]'
+
-# CUDA install
-pip install -e '.[cuda,dev]'
+### Usage
-# Copy and configure environment variables
-cp default.env .env
-```
+#### Control a robot in a simulation (no robot required)
-#### Test the install
-```bash
-pytest -s dimos/
-```
+After running the commads below, open http://localhost:7779/command-center to control the robot movement.
-#### Test Dimensional with a replay UnitreeGo2 stream (no robot required)
-```bash
-dimos --replay run unitree-go2
-```
-
-#### Test Dimensional with a simulated UnitreeGo2 in MuJoCo (no robot required)
-```bash
-pip install -e '.[sim]'
+```sh
export DISPLAY=:1 # Or DISPLAY=:0 if getting GLFW/OpenGL X11 errors
-dimos --simulation run unitree-go2
+# ignore the warp warnings
+dimos --viewer-backend rerun-web --simulation run unitree-go2
```
-#### Test Dimensional with a real UnitreeGo2 over WebRTC
-```bash
-export ROBOT_IP=192.168.X.XXX # Add the robot IP address
-dimos run unitree-go2
-```
+#### Get it working on a physical robot!
-#### Test Dimensional with a real UnitreeGo2 running Agents
-*OpenAI / Alibaba keys required*
-```bash
-export ROBOT_IP=192.168.X.XXX # Add the robot IP address
-dimos run unitree-go2-agentic
+```sh
+export ROBOT_IP=PUT_YOUR_IP_ADDR_HERE
+dimos --viewer-backend rerun-web run unitree-go2
```
----
-### Agent API keys
+#### Have it controlled by AI!
-Full functionality will require API keys for the following:
+WARNING: This is a demo showing the **connection** between AI and robotic control -- not a demo of a super-intelligent AI. Be ready to physically prevent your robot from taking dumb physical actions.
-Requirements:
-- OpenAI API key (required for all LLMAgents due to OpenAIEmbeddings)
-- Claude API key (required for ClaudeAgent)
-- Alibaba API key (required for Navigation skills)
-
-These keys can be added to your .env file or exported as environment variables.
-```
+```sh
export OPENAI_API_KEY=
-export CLAUDE_API_KEY=
-export ALIBABA_API_KEY=
-```
-
-### ROS2 Unitree Go2 SDK Installation
-
-#### System Requirements
-- Ubuntu 22.04
-- ROS2 Distros: Iron, Humble, Rolling
-
-See [Unitree Go2 ROS2 SDK](https://github.com/dimensionalOS/go2_ros2_sdk) for additional installation instructions.
-
-```bash
-mkdir -p ros2_ws
-cd ros2_ws
-git clone --recurse-submodules https://github.com/dimensionalOS/go2_ros2_sdk.git src
-sudo apt install ros-$ROS_DISTRO-image-tools
-sudo apt install ros-$ROS_DISTRO-vision-msgs
-
-sudo apt install python3-pip clang portaudio19-dev
-cd src
-pip install -r requirements.txt
-cd ..
-
-# Ensure clean python install before running
-source /opt/ros/$ROS_DISTRO/setup.bash
-rosdep install --from-paths src --ignore-src -r -y
-colcon build
-```
-
-### Run the test application
-
-#### ROS2 Terminal:
-```bash
-# Change path to your Go2 ROS2 SDK installation
-source /ros2_ws/install/setup.bash
-source /opt/ros/$ROS_DISTRO/setup.bash
-
-export ROBOT_IP="robot_ip" #for muliple robots, just split by ,
-export CONN_TYPE="webrtc"
-ros2 launch go2_robot_sdk robot.launch.py
-
-```
-
-#### Python Terminal:
-```bash
-# Change path to your Go2 ROS2 SDK installation
-source /ros2_ws/install/setup.bash
-python tests/run.py
-```
-
-#### DimOS Interface:
-```bash
-cd dimos/web/dimos_interface
-yarn install
-yarn dev # you may need to run sudo if previously built via Docker
-```
-
-### Project Structure (OUT OF DATE)
-
-```
-.
-βββ dimos/
-β βββ agents/ # Agent implementations
-β β βββ memory/ # Memory systems for agents, including semantic memory
-β βββ environment/ # Environment context and sensing
-β βββ hardware/ # Hardware abstraction and interfaces
-β βββ models/ # ML model definitions and implementations
-β β βββ Detic/ # Detic object detection model
-β β βββ depth/ # Depth estimation models
-β β βββ segmentation/ # Image segmentation models
-β βββ perception/ # Computer vision and sensing
-β β βββ detection2d/ # 2D object detection
-β β βββ segmentation/ # Image segmentation pipelines
-β βββ robot/ # Robot control and hardware interface
-β β βββ global_planner/ # Path planning at global scale
-β β βββ local_planner/ # Local navigation planning
-β β βββ unitree/ # Unitree Go2 specific implementations
-β βββ simulation/ # Robot simulation environments
-β β βββ genesis/ # Genesis simulation integration
-β β βββ isaac/ # NVIDIA Isaac Sim integration
-β βββ skills/ # Task-specific robot capabilities
-β β βββ rest/ # REST API based skills
-β βββ stream/ # WebRTC and data streaming
-β β βββ audio/ # Audio streaming components
-β β βββ video_providers/ # Video streaming components
-β βββ types/ # Type definitions and interfaces
-β βββ utils/ # Utility functions and helpers
-β βββ web/ # DimOS development interface and API
-β βββ dimos_interface/ # DimOS web interface
-β βββ websocket_vis/ # Websocket visualizations
-βββ tests/ # Test files
-β βββ genesissim/ # Genesis simulator tests
-β βββ isaacsim/ # Isaac Sim tests
-βββ docker/ # Docker configuration files
- βββ agent/ # Agent service containers
- βββ interface/ # Interface containers
- βββ simulation/ # Simulation environment containers
- βββ unitree/ # Unitree robot specific containers
+dimos --viewer-backend rerun-web run unitree-go2-agentic
```
-## Building
-
-### Simple DimOS Application (OUT OF DATE)
-
-```python
-from dimos.robot.unitree.unitree_go2 import UnitreeGo2
-from dimos.robot.unitree.unitree_skills import MyUnitreeSkills
-from dimos.robot.unitree.unitree_ros_control import UnitreeROSControl
-from dimos.agents_deprecated.agent import OpenAIAgent
-
-# Initialize robot
-robot = UnitreeGo2(ip=robot_ip,
- ros_control=UnitreeROSControl(),
- skills=MyUnitreeSkills())
-
-# Initialize agent
-agent = OpenAIAgent(
- dev_name="UnitreeExecutionAgent",
- input_video_stream=robot.get_ros_video_stream(),
- skills=robot.get_skills(),
- system_query="Jump when you see a human! Front flip when you see a dog!",
- model_name="gpt-4o"
- )
-
-while True: # keep process running
- time.sleep(1)
-```
-
-
-### DimOS Application with Agent chaining (OUT OF DATE)
-
-Let's build a simple DimOS application with Agent chaining. We define a ```planner``` as a ```PlanningAgent``` that takes in user input to devise a complex multi-step plan. This plan is passed step-by-step to an ```executor``` agent that can queue ```AbstractRobotSkill``` commands to the ```ROSCommandQueue```.
-
-Our reactive Pub/Sub data streaming architecture allows for chaining of ```Agent_0 --> Agent_1 --> ... --> Agent_n``` via the ```input_query_stream``` parameter in each which takes an ```Observable``` input from the previous Agent in the chain.
-
-**Via this method you can chain together any number of Agents() to create complex dimensional applications.**
-
-```python
-
-web_interface = RobotWebInterface(port=5555)
-
-robot = UnitreeGo2(ip=robot_ip,
- ros_control=UnitreeROSControl(),
- skills=MyUnitreeSkills())
-
-# Initialize master planning agent
-planner = PlanningAgent(
- dev_name="UnitreePlanningAgent",
- input_query_stream=web_interface.query_stream, # Takes user input from dimOS interface
- skills=robot.get_skills(),
- model_name="gpt-4o",
- )
-
-# Initialize execution agent
-executor = OpenAIAgent(
- dev_name="UnitreeExecutionAgent",
- input_query_stream=planner.get_response_observable(), # Takes planner output as input
- skills=robot.get_skills(),
- model_name="gpt-4o",
- system_query="""
- You are a robot execution agent that can execute tasks on a virtual
- robot. ONLY OUTPUT THE SKILLS TO EXECUTE.
- """
- )
-
-while True: # keep process running
- time.sleep(1)
-```
-
-### Calling Action Primitives (OUT OF DATE)
-
-Call action primitives directly from ```Robot()``` for prototyping and testing.
-
-```python
-robot = UnitreeGo2(ip=robot_ip,)
-
-# Call a Unitree WebRTC action primitive
-robot.webrtc_req(api_id=1016) # "Hello" command
-
-# Call a ROS2 action primitive
-robot.move(distance=1.0, speed=0.5)
-```
-
-### Creating Custom Skills (non-unitree specific)
-
-#### Create basic custom skills by inheriting from ```AbstractRobotSkill``` and implementing the ```__call__``` method.
-
-```python
-class Move(AbstractRobotSkill):
- distance: float = Field(...,description="Distance to reverse in meters")
- def __init__(self, robot: Optional[Robot] = None, **data):
- super().__init__(robot=robot, **data)
- def __call__(self):
- super().__call__()
- return self._robot.move(distance=self.distance)
+After running that, open a new terminal and run the following to start giving instructions to the agent.
+```sh
+# activate the venv in this new terminal
+source .venv/bin/activate
+
+# Note: after running the next command, WAIT for the agent to connect
+# (this will take a while the first time)
+# then tell the agent "explore the room"
+# then tell it to go to something, ex: "go to the door"
+humancli
```
-#### Chain together skills to create recursive skill trees
-
-```python
-class JumpAndFlip(AbstractRobotSkill):
- def __init__(self, robot: Optional[Robot] = None, **data):
- super().__init__(robot=robot, **data)
- def __call__(self):
- super().__call__()
- jump = Jump(robot=self._robot)
- flip = Flip(robot=self._robot)
- return (jump() and flip())
+# How do I use it as a library?
+
+### Simple Camera Activation
+
+Assuming you have a webcam, save the following as a python file and run it:
+
+```py
+from dimos.core.blueprints import autoconnect
+from dimos.hardware.sensors.camera.module import CameraModule
+
+if __name__ == "__main__":
+ autoconnect(
+ # technically autoconnect is not needed because we only have 1 module
+ CameraModule.blueprint()
+ ).build().loop()
```
-### Integrating Skills with Agents: Single Skills and Skill Libraries
-
-DimOS agents, such as `OpenAIAgent`, can be endowed with capabilities through two primary mechanisms: by providing them with individual skill classes or with comprehensive `SkillLibrary` instances. This design offers flexibility in how robot functionalities are defined and managed within your agent-based applications.
+### Write A Custom Module
-**Agent's `skills` Parameter**
+Lets convert the camera's image to grayscale.
-The `skills` parameter in an agent's constructor is key to this integration:
+```py
+from dimos.core.blueprints import autoconnect
+from dimos.core import In, Out, Module, rpc
+from dimos.hardware.sensors.camera.module import CameraModule
+from dimos.msgs.sensor_msgs import Image
-1. **A Single Skill Class**: This approach is suitable for skills that are relatively self-contained or have straightforward initialization requirements.
- * You pass the skill *class itself* (e.g., `GreeterSkill`) directly to the agent's `skills` parameter.
- * The agent then takes on the responsibility of instantiating this skill when it's invoked. This typically involves the agent providing necessary context to the skill's constructor (`__init__`), such as a `Robot` instance (or any other private instance variable) if the skill requires it.
+from reactivex.disposable import Disposable
-2. **A `SkillLibrary` Instance**: This is the preferred method for managing a collection of skills, especially when skills have dependencies, require specific configurations, or need to share parameters.
- * You first define your custom skill library by inheriting from `SkillLibrary`. Then, you create and configure an *instance* of this library (e.g., `my_lib = EntertainmentSkills(robot=robot_instance)`).
- * This pre-configured `SkillLibrary` instance is then passed to the agent's `skills` parameter. The library itself manages the lifecycle and provision of its contained skills.
+class Listener(Module):
+ # the CameraModule has an Out[Image] named "color_image"
+ # How do we know this? Just print(CameraModule.module_info().outputs)
+ # the name ("color_image") must match the CameraModule's output
+ color_image: In[Image] = None
+ grayscale_image: Out[Image] = None
-**Examples:**
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self.count = 0
-#### 1. Using a Single Skill Class with an Agent
+ @rpc
+ def start(self) -> None:
+ super().start()
+ def callback_func(img: Image) -> None:
+ self.count += 1
+ print(f"got frame {self.count}")
+ print(f"img.data.shape: {img.data.shape}")
+ self.grayscale_image.publish(img.to_grayscale())
-First, define your skill. For instance, a `GreeterSkill` that can deliver a configurable greeting:
+ unsubscribe_func = self.color_image.subscribe(callback_func)
+ # the unsubscribe_func be called when the module is stopped
+ self._disposables.add(Disposable(
+ unsubscribe_func
+ ))
-```python
-class GreeterSkill(AbstractSkill):
- """Greats the user with a friendly message.""" # Gives the agent better context for understanding (the more detailed the better).
+ @rpc
+ def stop(self) -> None:
+ super().stop()
- greeting: str = Field(..., description="The greating message to display.") # The field needed for the calling of the function. Your agent will also pull from the description here to gain better context.
-
- def __init__(self, greeting_message: Optional[str] = None, **data):
- super().__init__(**data)
- if greeting_message:
- self.greeting = greeting_message
- # Any additional skill-specific initialization can go here
-
- def __call__(self):
- super().__call__() # Call parent's method if it contains base logic
- # Implement the logic for the skill
- print(self.greeting)
- return f"Greeting delivered: '{self.greeting}'"
-```
-
-Next, register this skill *class* directly with your agent. The agent can then instantiate it, potentially with specific configurations if your agent or skill supports it (e.g., via default parameters or a more advanced setup).
-
-```python
-agent = OpenAIAgent(
- dev_name="GreetingBot",
- system_query="You are a polite bot. If a user asks for a greeting, use your GreeterSkill.",
- skills=GreeterSkill, # Pass the GreeterSkill CLASS
- # The agent will instantiate GreeterSkill.
- # If the skill had required __init__ args not provided by the agent automatically,
- # this direct class passing might be insufficient without further agent logic
- # or by passing a pre-configured instance (see SkillLibrary example).
- # For simple skills like GreeterSkill with defaults or optional args, this works well.
- model_name="gpt-4o"
-)
+if __name__ == "__main__":
+ autoconnect(
+ Listener.blueprint(),
+ CameraModule.blueprint(),
+ ).build().loop()
```
-In this setup, when the `GreetingBot` agent decides to use the `GreeterSkill`, it will instantiate it. If the `GreeterSkill` were to be instantiated by the agent with a specific `greeting_message`, the agent's design would need to support passing such parameters during skill instantiation.
-
-#### 2. Using a `SkillLibrary` Instance with an Agent
-Define the SkillLibrary and any skills it will manage in its collection:
-```python
-class MovementSkillsLibrary(SkillLibrary):
- """A specialized skill library containing movement and navigation related skills."""
+#### Note: Many More Examples in the [Examples Folder](./examples)
- def __init__(self, robot=None):
- super().__init__()
- self._robot = robot
+### How do custom modules work? (Example breakdown)
- def initialize_skills(self, robot=None):
- """Initialize all movement skills with the robot instance."""
- if robot:
- self._robot = robot
+- Every module represents one process: modules run in parallel (python multiprocessing). Because of this **modules should only save/modify data on themselves**. Do not mutate or share global vars inside a module.
+- At the top of this module definition, the In/Out **streams** are defining a pub-sub system. This module expects *someone somewhere* to give it a color image. And, the module is going to publish a grayscale image (that any other module to subscribe to).
+ - Note: if you are a power user thinking "so streams must be statically declared?" the answer is no, there are ways to perform dynamic connections, but for type-checking and human sanity the creation of dynamic stream connections are under an advanced API and should be used as a last resort.
+- The `autoconnect` ties everything together:
+ - The CameraModule has an output of `color_image`
+ - The Listener has an input of `color_image`
+ - Autoconnect puts them together, and checks that their types are compatible (both are of type `Image`)
+- How can we see what In/Out streams are provided by a module?
+ - Open a python repl (e.g. `python`)
+ - Import the module, ex: `from dimos.hardware.sensors.camera.module import CameraModule`
+ - Print the module outputs: `print(CameraModule.module_info().outputs)`
+ - Print the module inputs: `print(CameraModule.module_info().inputs)`
+ - Print all the information (rpcs, skills, etc): `print(CameraModule.module_info())`
+- What about `@rpc`?
+ - If you want a method to be called by another module (not just an internal method) then add the `@rpc` decorator AND make sure BOTH the arguments and return value of the method are json-serializable.
+ - Rpc methods get called using threads, meaning two rpc methods can be running at the same time. For this reason, python thread locking is often necessary for data that is being written/read during rpc calls.
+ - The start/stop methods always need to be an rpc because they are called externally.
- if not self._robot:
- raise ValueError("Robot instance is required to initialize skills")
+### Monitoring & Debugging
- # Initialize with all movement-related skills
- self.add(Navigate(robot=self._robot))
- self.add(NavigateToGoal(robot=self._robot))
- self.add(FollowHuman(robot=self._robot))
- self.add(NavigateToObject(robot=self._robot))
- self.add(GetPose(robot=self._robot)) # Position tracking skill
-```
+In addition to rerun logging, DimOS comes with a number of monitoring tools:
+- Run `lcmspy` to see how fast messages are being published on streams.
+- Run `skillspy` to see how skills are being called, how long they are running, which are active, etc.
+- Run `agentspy` to see the agent's status over time.
+- If you suspect there is a bug within DimOS itself, you can enable extreme logging by prefixing the dimos command with `DIMOS_LOG_LEVEL=DEBUG RERUN_SAVE=1 `. Ex: `DIMOS_LOG_LEVEL=DEBUG RERUN_SAVE=1 dimos --replay run unitree-go2`
-Note the addision of initialized skills added to this collection above.
-Proceed to use this skill library in an Agent:
+# How does Dimensional work?
-Finally, in your main application code:
-```python
-# 1. Create an instance of your custom skill library, configured with the robot
-my_movement_skills = MovementSkillsLibrary(robot=robot_instance)
-
-# 2. Pass this library INSTANCE to the agent
-performing_agent = OpenAIAgent(
- dev_name="ShowBot",
- system_query="You are a show robot. Use your skills as directed.",
- skills=my_movement_skills, # Pass the configured SkillLibrary INSTANCE
- model_name="gpt-4o"
-)
-```
-
-### Unitree Test Files
-- **`tests/run_go2_ros.py`**: Tests `UnitreeROSControl(ROSControl)` initialization in `UnitreeGo2(Robot)` via direct function calls `robot.move()` and `robot.webrtc_req()`
-- **`tests/simple_agent_test.py`**: Tests a simple zero-shot class `OpenAIAgent` example
-- **`tests/unitree/test_webrtc_queue.py`**: Tests `ROSCommandQueue` via a 20 back-to-back WebRTC requests to the robot
-- **`tests/test_planning_agent_web_interface.py`**: Tests a simple two-stage `PlanningAgent` chained to an `ExecutionAgent` with backend FastAPI interface.
-- **`tests/test_unitree_agent_queries_fastapi.py`**: Tests a zero-shot `ExecutionAgent` with backend FastAPI interface.
-
-## Documentation
-
-For detailed documentation, please visit our [documentation site](#) (Coming Soon).
-
-## Contributing
+Concepts:
+- [Modules](/docs/concepts/modules.md): The building blocks of DimOS, modules run in parallel and are singleton python classes.
+- [Streams](/docs/api/sensor_streams/index.md): How modules communicate, a Pub / Sub system.
+- [Blueprints](/dimos/core/README_BLUEPRINTS.md): a way to group modules together and define their connections to each other.
+- [RPC](/dimos/core/README_BLUEPRINTS.md#calling-the-methods-of-other-modules): how one module can call a method on another module (arguments get serialized to JSON-like binary data).
+- [Skills](/dimos/core/README_BLUEPRINTS.md#defining-skills): An RPC function, except it can be called by an AI agent (a tool for an AI).
+- Agents: AI that has an objective, access to stream data, and is capable of calling skills as tools.
+
+## Contributing / Building From Source
+
+For development, we optimize for flexibilityβwhether you love Docker, Nix, or have nothing but **notepad.exe** and a dream, youβre good to go. Open up the [Development Guide](/docs/development/README.md) to see the extra steps for setting up development environments.
We welcome contributions! See our [Bounty List](https://docs.google.com/spreadsheets/d/1tzYTPvhO7Lou21cU6avSWTQOhACl5H8trSvhtYtsk8U/edit?usp=sharing) for open requests for contributions. If you would like to suggest a feature or sponsor a bounty, open an issue.
-## License
-
-This project is licensed under the Apache 2.0 License - see the [LICENSE](LICENSE) file for details.
-
-## Acknowledgments
-
-Huge thanks to!
-- The Roboverse Community and their unitree-specific help. Check out their [Discord](https://discord.gg/HEXNMCNhEh).
-- @abizovnuralem for his work on the [Unitree Go2 ROS2 SDK](https://github.com/abizovnuralem/go2_ros2_sdk) we integrate with for DimOS.
-- @legion1581 for his work on the [Unitree Go2 WebRTC Connect](https://github.com/legion1581/go2_webrtc_connect) from which we've pulled the ```Go2WebRTCConnection``` class and other types for seamless WebRTC-only integration with DimOS.
-- @tfoldi for the webrtc_req integration via Unitree Go2 ROS2 SDK, which allows for seamless usage of Unitree WebRTC control primitives with DimOS.
-
-## Contact
-
-- GitHub Issues: For bug reports and feature requests
-- Email: [build@dimensionalOS.com](mailto:build@dimensionalOS.com)
+# License
-## Known Issues
-- Agent() failure to execute Nav2 action primitives (move, reverse, spinLeft, spinRight) is almost always due to the internal ROS2 collision avoidance, which will sometimes incorrectly display obstacles or be overly sensitive. Look for ```[behavior_server]: Collision Ahead - Exiting DriveOnHeading``` in the ROS logs. Reccomend restarting ROS2 or moving robot from objects to resolve.
-- ```docker-compose up --build``` does not fully initialize the ROS2 environment due to ```std::bad_alloc``` errors. This will occur during continuous docker development if the ```docker-compose down``` is not run consistently before rebuilding and/or you are on a machine with less RAM, as ROS is very memory intensive. Reccomend running to clear your docker cache/images/containers with ```docker system prune``` and rebuild.
+DimOS is licensed under the Apache License, Version 2.0. And will always be free and open source.
diff --git a/bin/hooks/filter_commit_message.py b/bin/hooks/filter_commit_message.py
index cd92b196af..d22eaf9484 100644
--- a/bin/hooks/filter_commit_message.py
+++ b/bin/hooks/filter_commit_message.py
@@ -28,10 +28,16 @@ def main() -> int:
lines = commit_msg_file.read_text().splitlines(keepends=True)
- # Find the first line containing "Generated with" and truncate there
+ # Patterns that trigger truncation (everything from this line onwards is removed)
+ truncate_patterns = [
+ "Generated with",
+ "Co-Authored-By",
+ ]
+
+ # Find the first line containing any truncate pattern and truncate there
filtered_lines = []
for line in lines:
- if "Generated with" in line:
+ if any(pattern in line for pattern in truncate_patterns):
break
filtered_lines.append(line)
diff --git a/data/.lfs/mujoco_sim.tar.gz b/data/.lfs/mujoco_sim.tar.gz
index 57833fbbc6..eda5e6d95c 100644
--- a/data/.lfs/mujoco_sim.tar.gz
+++ b/data/.lfs/mujoco_sim.tar.gz
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
-oid sha256:d178439569ed81dfad05455419dc51da2c52021313b6d7b9259d9e30946db7c6
-size 60186340
+oid sha256:afb7def453cf0a90275ea65aa39a55c91733315a9bc667fdcac54ef6f8bed4c9
+size 70509219
diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py
index 25d4f7a6e5..30890c7c8c 100644
--- a/dimos/core/__init__.py
+++ b/dimos/core/__init__.py
@@ -13,6 +13,7 @@
from dimos.core.rpc_client import RPCClient
from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport
from dimos.core.transport import (
+ DDSTransport,
LCMTransport,
SHMTransport,
ZenohTransport,
@@ -31,6 +32,7 @@
"LCMRPC",
"LCMTF",
"TF",
+ "DDSTransport",
"DimosCluster",
"In",
"LCMTransport",
diff --git a/dimos/core/blueprints.py b/dimos/core/blueprints.py
index 1fa51629bf..3ae3878d28 100644
--- a/dimos/core/blueprints.py
+++ b/dimos/core/blueprints.py
@@ -392,7 +392,7 @@ def build(
module_coordinator.start_all_modules()
# Compose and send Rerun blueprint from module contributions
- if global_config.viewer_backend.startswith("rerun"):
+ if global_config.rerun_enabled and global_config.viewer_backend.startswith("rerun"):
self._init_rerun_blueprint(module_coordinator)
return module_coordinator
diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py
index bfb553a45d..570b5d43d0 100644
--- a/dimos/core/global_config.py
+++ b/dimos/core/global_config.py
@@ -33,7 +33,7 @@ class GlobalConfig(BaseSettings):
replay: bool = False
rerun_enabled: bool = True
rerun_server_addr: str | None = None
- viewer_backend: ViewerBackend = "rerun-native"
+ viewer_backend: ViewerBackend = "rerun-web"
n_dask_workers: int = 2
memory_limit: str = "auto"
mujoco_camera_position: str | None = None
@@ -44,6 +44,14 @@ class GlobalConfig(BaseSettings):
mujoco_start_pos: str = "-1.0, 1.0"
mujoco_steps_per_frame: int = 7
robot_model: str | None = None
+ # Optional: name of a MuJoCo "bundle" that selects the robot MJCF + policy together.
+ # If set, Dimos MuJoCo sim will prefer:
+ # - data/mujoco_sim/{mujoco_profile}.xml
+ # - data/mujoco_sim/{mujoco_profile}_policy.onnx
+ mujoco_profile: str | None = None
+ # Enable lightweight timing breakdown logs from the MuJoCo subprocess (physics/render/pcd/policy).
+ mujoco_profiler: bool = False
+ mujoco_profiler_interval_s: float = 2.0
robot_width: float = 0.3
robot_rotation_diameter: float = 0.6
planner_strategy: NavigationStrategy = "simple"
diff --git a/dimos/core/transport.py b/dimos/core/transport.py
index 8ffbfc91f4..118c357c9c 100644
--- a/dimos/core/transport.py
+++ b/dimos/core/transport.py
@@ -26,6 +26,7 @@
)
from dimos.core.stream import In, Out, Stream, Transport
+from dimos.protocol.pubsub.ddspubsub import DDS, Topic as DDSTopic
from dimos.protocol.pubsub.jpeg_shm import JpegSharedMemory
from dimos.protocol.pubsub.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic
from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory, SharedMemory
@@ -212,4 +213,34 @@ def start(self) -> None: ...
def stop(self) -> None: ...
+class DDSTransport(PubSubTransport[T]):
+ _started: bool = False
+
+ def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def]
+ super().__init__(DDSTopic(topic, type))
+ if not hasattr(self, "dds"):
+ self.dds = DDS(**kwargs)
+
+ def start(self) -> None:
+ self.dds.start()
+ self._started = True
+
+ def stop(self) -> None:
+ self.dds.stop()
+ self._started = False
+
+ def __reduce__(self): # type: ignore[no-untyped-def]
+ return (DDSTransport, (self.topic.topic, self.topic.dds_type))
+
+ def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
+ if not self._started:
+ self.start()
+ self.dds.publish(self.topic, msg)
+
+ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override]
+ if not self._started:
+ self.start()
+ return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]
+
+
class ZenohTransport(PubSubTransport[T]): ...
diff --git a/dimos/dashboard/rerun_init.py b/dimos/dashboard/rerun_init.py
index 4ccec8209d..4abb910ef1 100644
--- a/dimos/dashboard/rerun_init.py
+++ b/dimos/dashboard/rerun_init.py
@@ -137,6 +137,13 @@ def connect_rerun(
logger.debug("Already connected to Rerun server")
return
+ # Skip if Rerun is disabled globally (even if viewer_backend is still set to rerun-*).
+ if global_config and not global_config.rerun_enabled:
+ logger.debug(
+ "Rerun disabled; skipping connect", viewer_backend=global_config.viewer_backend
+ )
+ return
+
# Skip if foxglove backend selected
if global_config and not global_config.viewer_backend.startswith("rerun"):
logger.debug("Rerun connection skipped", viewer_backend=global_config.viewer_backend)
diff --git a/dimos/protocol/pubsub/__init__.py b/dimos/protocol/pubsub/__init__.py
index 89bd292fda..47d2e9a15d 100644
--- a/dimos/protocol/pubsub/__init__.py
+++ b/dimos/protocol/pubsub/__init__.py
@@ -1,3 +1,4 @@
+import dimos.protocol.pubsub.ddspubsub as dds
import dimos.protocol.pubsub.lcmpubsub as lcm
from dimos.protocol.pubsub.memory import Memory
from dimos.protocol.pubsub.spec import PubSub
diff --git a/dimos/protocol/pubsub/benchmark/test_benchmark.py b/dimos/protocol/pubsub/benchmark/test_benchmark.py
new file mode 100644
index 0000000000..f88df75868
--- /dev/null
+++ b/dimos/protocol/pubsub/benchmark/test_benchmark.py
@@ -0,0 +1,175 @@
+#!/usr/bin/env python3
+
+# Copyright 2025-2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections.abc import Generator
+import threading
+import time
+from typing import Any
+
+import pytest
+
+from dimos.protocol.pubsub.benchmark.testdata import testdata
+from dimos.protocol.pubsub.benchmark.type import (
+ BenchmarkResult,
+ BenchmarkResults,
+ MsgGen,
+ PubSubContext,
+ TestCase,
+)
+
+# Message sizes for throughput benchmarking (powers of 2 from 64B to 10MB)
+MSG_SIZES = [
+ 64,
+ 256,
+ 1024,
+ 4096,
+ 16384,
+ 65536,
+ 262144,
+ 524288,
+ 1048576,
+ 1048576 * 2,
+ 1048576 * 5,
+ 1048576 * 10,
+]
+
+# Benchmark duration in seconds
+BENCH_DURATION = 1.0
+
+# Max messages to send per test (prevents overwhelming slower transports)
+MAX_MESSAGES = 5000
+
+# Max time to wait for in-flight messages after publishing stops
+RECEIVE_TIMEOUT = 1.0
+
+
+def size_id(size: int) -> str:
+ """Convert byte size to human-readable string for test IDs."""
+ if size >= 1048576:
+ return f"{size // 1048576}MB"
+ if size >= 1024:
+ return f"{size // 1024}KB"
+ return f"{size}B"
+
+
+def pubsub_id(testcase: TestCase[Any, Any]) -> str:
+ """Extract pubsub implementation name from context manager function name."""
+ name: str = testcase.pubsub_context.__name__
+ # Convert e.g. "lcm_pubsub_channel" -> "LCM", "memory_pubsub_channel" -> "Memory"
+ prefix = name.replace("_pubsub_channel", "").replace("_", " ")
+ return prefix.upper() if len(prefix) <= 3 else prefix.title().replace(" ", "")
+
+
+@pytest.fixture(scope="module")
+def benchmark_results() -> Generator[BenchmarkResults, None, None]:
+ """Module-scoped fixture to collect benchmark results."""
+ results = BenchmarkResults()
+ yield results
+ results.print_summary()
+ results.print_heatmap()
+ results.print_bandwidth_heatmap()
+ results.print_latency_heatmap()
+
+
+@pytest.mark.tool
+@pytest.mark.parametrize("msg_size", MSG_SIZES, ids=[size_id(s) for s in MSG_SIZES])
+@pytest.mark.parametrize("pubsub_context, msggen", testdata, ids=[pubsub_id(t) for t in testdata])
+def test_throughput(
+ pubsub_context: PubSubContext[Any, Any],
+ msggen: MsgGen[Any, Any],
+ msg_size: int,
+ benchmark_results: BenchmarkResults,
+) -> None:
+ """Measure throughput for publishing and receiving messages over a fixed duration."""
+ with pubsub_context() as pubsub:
+ topic, msg = msggen(msg_size)
+ received_count = 0
+ target_count = [0] # Use list to allow modification after publish loop
+ lock = threading.Lock()
+ all_received = threading.Event()
+
+ def callback(message: Any, _topic: Any) -> None:
+ nonlocal received_count
+ with lock:
+ received_count += 1
+ if target_count[0] > 0 and received_count >= target_count[0]:
+ all_received.set()
+
+ # Subscribe
+ pubsub.subscribe(topic, callback)
+
+ # Warmup: give DDS/ROS time to establish connection
+ time.sleep(0.1)
+
+ # Set target so callback can signal when all received
+ target_count[0] = MAX_MESSAGES
+
+ # Publish messages until time limit, max messages, or all received
+ msgs_sent = 0
+ start = time.perf_counter()
+ end_time = start + BENCH_DURATION
+
+ while time.perf_counter() < end_time and msgs_sent < MAX_MESSAGES:
+ pubsub.publish(topic, msg)
+ msgs_sent += 1
+ # Check if all already received (fast transports)
+ if all_received.is_set():
+ break
+
+ publish_end = time.perf_counter()
+ target_count[0] = msgs_sent # Update to actual sent count
+
+ # Check if already done, otherwise wait up to RECEIVE_TIMEOUT
+ with lock:
+ if received_count >= msgs_sent:
+ all_received.set()
+
+ if not all_received.is_set():
+ all_received.wait(timeout=RECEIVE_TIMEOUT)
+ latency_end = time.perf_counter()
+
+ with lock:
+ final_received = received_count
+
+ # Latency: how long we waited after publishing for messages to arrive
+ # 0 = all arrived during publishing, 1000ms = hit timeout (loss occurred)
+ latency = latency_end - publish_end
+
+ # Record result (duration is publish time only for throughput calculation)
+ # Extract transport name from context manager function name
+ ctx_name = pubsub_context.__name__
+ prefix = ctx_name.replace("_pubsub_channel", "").replace("_", " ")
+ transport_name = prefix.upper() if len(prefix) <= 3 else prefix.title().replace(" ", "")
+ result = BenchmarkResult(
+ transport=transport_name,
+ duration=publish_end - start,
+ msgs_sent=msgs_sent,
+ msgs_received=final_received,
+ msg_size_bytes=msg_size,
+ receive_time=latency,
+ )
+ benchmark_results.add(result)
+
+ # Warn if significant message loss (but don't fail - benchmark records the data)
+ loss_pct = (1 - final_received / msgs_sent) * 100 if msgs_sent > 0 else 0
+ if loss_pct > 10:
+ import warnings
+
+ warnings.warn(
+ f"{transport_name} {msg_size}B: {loss_pct:.1f}% message loss "
+ f"({final_received}/{msgs_sent})",
+ stacklevel=2,
+ )
diff --git a/dimos/protocol/pubsub/benchmark/testdata.py b/dimos/protocol/pubsub/benchmark/testdata.py
new file mode 100644
index 0000000000..e9317557c5
--- /dev/null
+++ b/dimos/protocol/pubsub/benchmark/testdata.py
@@ -0,0 +1,278 @@
+# Copyright 2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections.abc import Generator
+from contextlib import contextmanager
+from typing import Any
+
+from dimos.msgs.sensor_msgs.Image import Image, ImageFormat
+from dimos.protocol.pubsub.benchmark.type import TestCase
+from dimos.protocol.pubsub.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic
+from dimos.protocol.pubsub.memory import Memory
+from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory
+
+
+def make_data(size: int) -> bytes:
+ """Generate random bytes of given size."""
+ return bytes(i % 256 for i in range(size))
+
+
+testdata: list[TestCase[Any, Any]] = []
+
+
+@contextmanager
+def lcm_pubsub_channel() -> Generator[LCM, None, None]:
+ lcm_pubsub = LCM(autoconf=True)
+ lcm_pubsub.start()
+ yield lcm_pubsub
+ lcm_pubsub.stop()
+
+
+def lcm_msggen(size: int) -> tuple[LCMTopic, Image]:
+ import numpy as np
+
+ # Create image data as numpy array with shape (height, width, channels)
+ raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1)
+ # Pad to make it divisible by 3 for RGB
+ padded_size = ((len(raw_data) + 2) // 3) * 3
+ padded_data = np.pad(raw_data, (0, padded_size - len(raw_data)))
+ pixels = len(padded_data) // 3
+ # Find reasonable dimensions
+ height = max(1, int(pixels**0.5))
+ width = pixels // height
+ data = padded_data[: height * width * 3].reshape(height, width, 3)
+ topic = LCMTopic(topic="benchmark/lcm", lcm_type=Image)
+ msg = Image(data=data, format=ImageFormat.RGB)
+ return (topic, msg)
+
+
+testdata.append(
+ TestCase(
+ pubsub_context=lcm_pubsub_channel,
+ msg_gen=lcm_msggen,
+ )
+)
+
+
+@contextmanager
+def udp_raw_pubsub_channel() -> Generator[LCMPubSubBase, None, None]:
+ """LCM with raw bytes - no encoding overhead."""
+ lcm_pubsub = LCMPubSubBase(autoconf=True)
+ lcm_pubsub.start()
+ yield lcm_pubsub
+ lcm_pubsub.stop()
+
+
+def udp_raw_msggen(size: int) -> tuple[LCMTopic, bytes]:
+ """Generate raw bytes for LCM transport benchmark."""
+ topic = LCMTopic(topic="benchmark/lcm_raw")
+ return (topic, make_data(size))
+
+
+testdata.append(
+ TestCase(
+ pubsub_context=udp_raw_pubsub_channel,
+ msg_gen=udp_raw_msggen,
+ )
+)
+
+
+@contextmanager
+def memory_pubsub_channel() -> Generator[Memory, None, None]:
+ """Context manager for Memory PubSub implementation."""
+ yield Memory()
+
+
+def memory_msggen(size: int) -> tuple[str, Any]:
+ import numpy as np
+
+ raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1)
+ padded_size = ((len(raw_data) + 2) // 3) * 3
+ padded_data = np.pad(raw_data, (0, padded_size - len(raw_data)))
+ pixels = len(padded_data) // 3
+ height = max(1, int(pixels**0.5))
+ width = pixels // height
+ data = padded_data[: height * width * 3].reshape(height, width, 3)
+ return ("benchmark/memory", Image(data=data, format=ImageFormat.RGB))
+
+
+# testdata.append(
+# TestCase(
+# pubsub_context=memory_pubsub_channel,
+# msg_gen=memory_msggen,
+# )
+# )
+
+
+@contextmanager
+def shm_pubsub_channel() -> Generator[PickleSharedMemory, None, None]:
+ # 12MB capacity to handle benchmark sizes up to 10MB
+ shm_pubsub = PickleSharedMemory(prefer="cpu", default_capacity=12 * 1024 * 1024)
+ shm_pubsub.start()
+ yield shm_pubsub
+ shm_pubsub.stop()
+
+
+def shm_msggen(size: int) -> tuple[str, Any]:
+ """Generate message for SharedMemory pubsub benchmark."""
+ import numpy as np
+
+ raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1)
+ padded_size = ((len(raw_data) + 2) // 3) * 3
+ padded_data = np.pad(raw_data, (0, padded_size - len(raw_data)))
+ pixels = len(padded_data) // 3
+ height = max(1, int(pixels**0.5))
+ width = pixels // height
+ data = padded_data[: height * width * 3].reshape(height, width, 3)
+ return ("benchmark/shm", Image(data=data, format=ImageFormat.RGB))
+
+
+testdata.append(
+ TestCase(
+ pubsub_context=shm_pubsub_channel,
+ msg_gen=shm_msggen,
+ )
+)
+
+
+try:
+ from dimos.protocol.pubsub.redispubsub import Redis
+
+ @contextmanager
+ def redis_pubsub_channel() -> Generator[Redis, None, None]:
+ redis_pubsub = Redis()
+ redis_pubsub.start()
+ yield redis_pubsub
+ redis_pubsub.stop()
+
+ def redis_msggen(size: int) -> tuple[str, Any]:
+ # Redis uses JSON serialization, so use a simple dict with base64-encoded data
+ import base64
+
+ data = base64.b64encode(make_data(size)).decode("ascii")
+ return ("benchmark/redis", {"data": data, "size": size})
+
+ testdata.append(
+ TestCase(
+ pubsub_context=redis_pubsub_channel,
+ msg_gen=redis_msggen,
+ )
+ )
+
+except (ConnectionError, ImportError):
+ # either redis is not installed or the server is not running
+ print("Redis not available")
+
+
+from dimos.protocol.pubsub.rospubsub import ROS_AVAILABLE, RawROS, ROSTopic
+
+try:
+ from cyclonedds.idl import IdlStruct
+
+ from dimos.protocol.pubsub.ddspubsub import DDS, Topic as DDSTopic
+
+ class DDSBenchmarkMessage(IdlStruct):
+ """DDS benchmark message using an IDL 'string' payload (latin-1 encoded bytes)."""
+
+ payload: str
+
+ @contextmanager
+ def dds_pubsub_channel() -> Generator[DDS, None, None]:
+ dds_pubsub = DDS()
+ dds_pubsub.start()
+ yield dds_pubsub
+ dds_pubsub.stop()
+
+ def dds_msggen(size: int) -> tuple[DDSTopic, DDSBenchmarkMessage]:
+ topic = DDSTopic(topic="benchmark/dds", dds_type=DDSBenchmarkMessage)
+ msg = DDSBenchmarkMessage()
+ msg.payload = make_data(size).decode("latin-1")
+ return (topic, msg)
+
+ testdata.append(
+ TestCase(
+ pubsub_context=dds_pubsub_channel,
+ msg_gen=dds_msggen,
+ )
+ )
+except ImportError:
+ print("DDS not available")
+
+
+if ROS_AVAILABLE:
+ from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
+ from sensor_msgs.msg import Image as ROSImage
+
+ @contextmanager
+ def ros_best_effort_pubsub_channel() -> Generator[RawROS, None, None]:
+ qos = QoSProfile(
+ reliability=QoSReliabilityPolicy.BEST_EFFORT,
+ history=QoSHistoryPolicy.KEEP_LAST,
+ durability=QoSDurabilityPolicy.VOLATILE,
+ depth=5000,
+ )
+ ros_pubsub = RawROS(node_name="benchmark_ros_best_effort", qos=qos)
+ ros_pubsub.start()
+ yield ros_pubsub
+ ros_pubsub.stop()
+
+ @contextmanager
+ def ros_reliable_pubsub_channel() -> Generator[RawROS, None, None]:
+ qos = QoSProfile(
+ reliability=QoSReliabilityPolicy.RELIABLE,
+ history=QoSHistoryPolicy.KEEP_LAST,
+ durability=QoSDurabilityPolicy.VOLATILE,
+ depth=5000,
+ )
+ ros_pubsub = RawROS(node_name="benchmark_ros_reliable", qos=qos)
+ ros_pubsub.start()
+ yield ros_pubsub
+ ros_pubsub.stop()
+
+ def ros_msggen(size: int) -> tuple[ROSTopic, ROSImage]:
+ import numpy as np
+
+ # Create image data
+ data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1)
+ padded_size = ((len(data) + 2) // 3) * 3
+ data = np.pad(data, (0, padded_size - len(data)))
+ pixels = len(data) // 3
+ height = max(1, int(pixels**0.5))
+ width = pixels // height
+ data = data[: height * width * 3]
+
+ # Create ROS Image message
+ msg = ROSImage()
+ msg.height = height
+ msg.width = width
+ msg.encoding = "rgb8"
+ msg.step = width * 3
+ msg.data = data.tobytes()
+
+ topic = ROSTopic(topic="/benchmark/ros", ros_type=ROSImage)
+ return (topic, msg)
+
+ testdata.append(
+ TestCase(
+ pubsub_context=ros_best_effort_pubsub_channel,
+ msg_gen=ros_msggen,
+ )
+ )
+
+ testdata.append(
+ TestCase(
+ pubsub_context=ros_reliable_pubsub_channel,
+ msg_gen=ros_msggen,
+ )
+ )
diff --git a/dimos/protocol/pubsub/benchmark/type.py b/dimos/protocol/pubsub/benchmark/type.py
new file mode 100644
index 0000000000..55649381e2
--- /dev/null
+++ b/dimos/protocol/pubsub/benchmark/type.py
@@ -0,0 +1,277 @@
+#!/usr/bin/env python3
+
+# Copyright 2025-2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections.abc import Callable, Iterator, Sequence
+from contextlib import AbstractContextManager
+from dataclasses import dataclass, field
+import pickle
+import threading
+import time
+from typing import Any, Generic, TypeVar
+
+import pytest
+
+from dimos.msgs.geometry_msgs import Vector3
+from dimos.msgs.sensor_msgs.Image import Image
+from dimos.protocol.pubsub.lcmpubsub import LCM, Topic
+from dimos.protocol.pubsub.memory import Memory
+from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory
+from dimos.protocol.pubsub.spec import MsgT, PubSub, TopicT
+from dimos.utils.data import get_data
+
+MsgGen = Callable[[int], tuple[TopicT, MsgT]]
+
+PubSubContext = Callable[[], AbstractContextManager[PubSub[TopicT, MsgT]]]
+
+
+@dataclass
+class TestCase(Generic[TopicT, MsgT]):
+ pubsub_context: PubSubContext[TopicT, MsgT]
+ msg_gen: MsgGen[TopicT, MsgT]
+
+ def __iter__(self) -> Iterator[PubSubContext[TopicT, MsgT] | MsgGen[TopicT, MsgT]]:
+ return iter((self.pubsub_context, self.msg_gen))
+
+ def __len__(self) -> int:
+ return 2
+
+
+TestData = Sequence[TestCase[Any, Any]]
+
+
+def _format_size(size_bytes: int) -> str:
+ """Format byte size to human-readable string."""
+ if size_bytes >= 1048576:
+ return f"{size_bytes / 1048576:.1f} MB"
+ if size_bytes >= 1024:
+ return f"{size_bytes / 1024:.1f} KB"
+ return f"{size_bytes} B"
+
+
+def _format_throughput(bytes_per_sec: float) -> str:
+ """Format throughput to human-readable string."""
+ if bytes_per_sec >= 1e9:
+ return f"{bytes_per_sec / 1e9:.2f} GB/s"
+ if bytes_per_sec >= 1e6:
+ return f"{bytes_per_sec / 1e6:.2f} MB/s"
+ if bytes_per_sec >= 1e3:
+ return f"{bytes_per_sec / 1e3:.2f} KB/s"
+ return f"{bytes_per_sec:.2f} B/s"
+
+
+@dataclass
+class BenchmarkResult:
+ transport: str
+ duration: float # Time spent publishing
+ msgs_sent: int
+ msgs_received: int
+ msg_size_bytes: int
+ receive_time: float = 0.0 # Time after publishing until all messages received
+
+ @property
+ def total_time(self) -> float:
+ """Total time including latency."""
+ return self.duration + self.receive_time
+
+ @property
+ def throughput_msgs(self) -> float:
+ """Messages per second (including latency)."""
+ return self.msgs_received / self.total_time if self.total_time > 0 else 0
+
+ @property
+ def throughput_bytes(self) -> float:
+ """Bytes per second (including latency)."""
+ return (
+ (self.msgs_received * self.msg_size_bytes) / self.total_time
+ if self.total_time > 0
+ else 0
+ )
+
+ @property
+ def loss_pct(self) -> float:
+ """Message loss percentage."""
+ return (1 - self.msgs_received / self.msgs_sent) * 100 if self.msgs_sent > 0 else 0
+
+
+@dataclass
+class BenchmarkResults:
+ results: list[BenchmarkResult] = field(default_factory=list)
+
+ def add(self, result: BenchmarkResult) -> None:
+ self.results.append(result)
+
+ def print_summary(self) -> None:
+ if not self.results:
+ return
+
+ from rich.console import Console
+ from rich.table import Table
+
+ console = Console()
+
+ table = Table(title="Benchmark Results")
+ table.add_column("Transport", style="cyan")
+ table.add_column("Msg Size", justify="right")
+ table.add_column("Sent", justify="right")
+ table.add_column("Recv", justify="right")
+ table.add_column("Msgs/s", justify="right", style="green")
+ table.add_column("Throughput", justify="right", style="green")
+ table.add_column("Latency", justify="right")
+ table.add_column("Loss", justify="right")
+
+ for r in sorted(self.results, key=lambda x: (x.transport, x.msg_size_bytes)):
+ loss_style = "red" if r.loss_pct > 0 else "dim"
+ recv_style = "yellow" if r.receive_time > 0.1 else "dim"
+ table.add_row(
+ r.transport,
+ _format_size(r.msg_size_bytes),
+ f"{r.msgs_sent:,}",
+ f"{r.msgs_received:,}",
+ f"{r.throughput_msgs:,.0f}",
+ _format_throughput(r.throughput_bytes),
+ f"[{recv_style}]{r.receive_time * 1000:.0f}ms[/{recv_style}]",
+ f"[{loss_style}]{r.loss_pct:.1f}%[/{loss_style}]",
+ )
+
+ console.print()
+ console.print(table)
+
+ def _print_heatmap(
+ self,
+ title: str,
+ value_fn: Callable[[BenchmarkResult], float],
+ format_fn: Callable[[float], str],
+ high_is_good: bool = True,
+ ) -> None:
+ """Generic heatmap printer."""
+ if not self.results:
+ return
+
+ def size_id(size: int) -> str:
+ if size >= 1048576:
+ return f"{size // 1048576}MB"
+ if size >= 1024:
+ return f"{size // 1024}KB"
+ return f"{size}B"
+
+ transports = sorted(set(r.transport for r in self.results))
+ sizes = sorted(set(r.msg_size_bytes for r in self.results))
+
+ # Build matrix
+ matrix: list[list[float]] = []
+ for transport in transports:
+ row = []
+ for size in sizes:
+ result = next(
+ (
+ r
+ for r in self.results
+ if r.transport == transport and r.msg_size_bytes == size
+ ),
+ None,
+ )
+ row.append(value_fn(result) if result else 0)
+ matrix.append(row)
+
+ all_vals = [v for row in matrix for v in row if v > 0]
+ if not all_vals:
+ return
+ min_val, max_val = min(all_vals), max(all_vals)
+
+ # ANSI 256 gradient: red -> orange -> yellow -> green
+ gradient = [
+ 52,
+ 88,
+ 124,
+ 160,
+ 196,
+ 202,
+ 208,
+ 214,
+ 220,
+ 226,
+ 190,
+ 154,
+ 148,
+ 118,
+ 82,
+ 46,
+ 40,
+ 34,
+ ]
+ if not high_is_good:
+ gradient = gradient[::-1]
+
+ def val_to_color(v: float) -> int:
+ if v <= 0 or max_val == min_val:
+ return 236
+ t = (v - min_val) / (max_val - min_val)
+ return gradient[int(t * (len(gradient) - 1))]
+
+ reset = "\033[0m"
+ size_labels = [size_id(s) for s in sizes]
+ col_w = max(8, max(len(s) for s in size_labels) + 1)
+ transport_w = max(len(t) for t in transports) + 1
+
+ print()
+ print(f"{title:^{transport_w + col_w * len(sizes)}}")
+ print()
+ print(" " * transport_w + "".join(f"{s:^{col_w}}" for s in size_labels))
+
+ # Dark colors that need white text (dark reds)
+ dark_colors = {52, 88, 124, 160, 236}
+
+ for i, transport in enumerate(transports):
+ row_str = f"{transport:<{transport_w}}"
+ for val in matrix[i]:
+ color = val_to_color(val)
+ fg = 255 if color in dark_colors else 16 # white on dark, black on bright
+ cell = format_fn(val) if val > 0 else "-"
+ row_str += f"\033[48;5;{color}m\033[38;5;{fg}m{cell:^{col_w}}{reset}"
+ print(row_str)
+ print()
+
+ def print_heatmap(self) -> None:
+ """Print msgs/sec heatmap."""
+
+ def fmt(v: float) -> str:
+ return f"{v / 1000:.1f}k" if v >= 1000 else f"{v:.0f}"
+
+ self._print_heatmap("Msgs/sec", lambda r: r.throughput_msgs, fmt)
+
+ def print_bandwidth_heatmap(self) -> None:
+ """Print bandwidth heatmap."""
+
+ def fmt(v: float) -> str:
+ if v >= 1e9:
+ return f"{v / 1e9:.1f}G"
+ if v >= 1e6:
+ return f"{v / 1e6:.0f}M"
+ if v >= 1e3:
+ return f"{v / 1e3:.0f}K"
+ return f"{v:.0f}"
+
+ self._print_heatmap("Bandwidth", lambda r: r.throughput_bytes, fmt)
+
+ def print_latency_heatmap(self) -> None:
+ """Print latency heatmap (time waiting for messages after publishing)."""
+
+ def fmt(v: float) -> str:
+ if v >= 1:
+ return f"{v:.1f}s"
+ return f"{v * 1000:.0f}ms"
+
+ self._print_heatmap("Latency", lambda r: r.receive_time, fmt, high_is_good=False)
diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py
new file mode 100644
index 0000000000..bdfd7c1b17
--- /dev/null
+++ b/dimos/protocol/pubsub/ddspubsub.py
@@ -0,0 +1,187 @@
+# Copyright 2025-2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+import threading
+from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
+
+from cyclonedds.core import Listener
+from cyclonedds.idl import IdlStruct
+from cyclonedds.pub import DataWriter as DDSDataWriter
+from cyclonedds.sub import DataReader as DDSDataReader
+from cyclonedds.topic import Topic as CycloneDDSTopic
+
+from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin
+from dimos.protocol.service.ddsservice import DDSConfig, DDSService
+from dimos.utils.logging_config import setup_logger
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+
+logger = setup_logger()
+
+
+@runtime_checkable
+class DDSMsg(Protocol):
+ msg_name: str
+
+ @classmethod
+ def dds_decode(cls, data: bytes) -> DDSMsg:
+ """Decode bytes into a DDS message instance."""
+ ...
+
+ def dds_encode(self) -> bytes:
+ """Encode this message instance into bytes."""
+ ...
+
+
+@dataclass
+class Topic:
+ """Represents a DDS topic with optional type information."""
+
+ topic: str = ""
+ dds_type: type[DDSMsg] | None = None
+
+ def __str__(self) -> str:
+ if self.dds_type is None:
+ return self.topic
+ return f"{self.topic}#{self.dds_type.__name__}"
+
+ def __hash__(self) -> int:
+ return hash((self.topic, self.dds_type))
+
+ def __eq__(self, other: Any) -> bool:
+ return (
+ isinstance(other, Topic)
+ and self.topic == other.topic
+ and self.dds_type == other.dds_type
+ )
+
+
+class DDSPubSubBase(DDSService, PubSub[Topic, Any]):
+ def __init__(self, **kwargs: Any) -> None:
+ super().__init__(**kwargs)
+ self._callbacks: dict[Topic, list[Callable[[Any, Topic], None]]] = {}
+ self._writers: dict[Topic, DDSDataWriter] = {}
+ self._readers: dict[Topic, DDSDataReader] = {}
+ self._cyclonedds_topics: dict[Topic, CycloneDDSTopic] = {}
+ self._writer_lock = threading.Lock()
+ self._reader_lock = threading.Lock()
+
+ def _get_cyclonedds_topic(self, topic: Topic) -> CycloneDDSTopic:
+ """Convert custom Topic to cyclonedds.topic.Topic, caching the result."""
+ if topic not in self._cyclonedds_topics:
+ if topic.dds_type is None:
+ raise ValueError(f"Cannot create DDS topic '{topic.topic}': no dds_type specified")
+ dds_topic = CycloneDDSTopic(self.get_participant(), topic.topic, topic.dds_type)
+ self._cyclonedds_topics[topic] = dds_topic
+ return self._cyclonedds_topics[topic]
+
+ def _get_writer(self, topic: Topic) -> DDSDataWriter:
+ """Get a DataWriter for the given topic name, create if it does not exist."""
+
+ with self._writer_lock:
+ if topic not in self._writers:
+ dds_topic = self._get_cyclonedds_topic(topic)
+ writer = DDSDataWriter(self.get_participant(), dds_topic)
+ self._writers[topic] = writer
+ logger.debug(f"Created DataWriter for topic: {topic.topic}")
+ return self._writers[topic]
+
+ def publish(self, topic: Topic, message: Any) -> None:
+ """Publish a message to a DDS topic."""
+
+ writer = self._get_writer(topic)
+ try:
+ # Publish to DDS network
+ writer.write(message)
+
+ except Exception as e:
+ logger.error(f"Error publishing to topic {topic}: {e}")
+
+ # Dispatch to local subscribers
+ if topic in self._callbacks:
+ for callback in self._callbacks[topic]:
+ try:
+ callback(message, topic)
+ except Exception as e:
+ # Log but continue processing other callbacks
+ logger.error(f"Error in callback for topic {topic}: {e}")
+
+ def _get_reader(self, topic: Topic) -> DDSDataReader:
+ """Get or create a DataReader for the given topic with listener."""
+
+ with self._reader_lock:
+ if topic not in self._readers:
+ dds_topic = self._get_cyclonedds_topic(topic)
+ reader = DDSDataReader[Any](self.get_participant(), dds_topic)
+ self._readers[topic] = reader
+ logger.debug(f"Created DataReader for topic: {topic.topic}")
+ return self._readers[topic]
+
+ def subscribe(self, topic: Topic, callback: Callable[[Any, Topic], None]) -> Callable[[], None]:
+ """Subscribe to a DDS topic with a callback."""
+
+ # Create a DataReader for this topic if needed
+ self._get_reader(topic)
+
+ # Add callback to our list
+ if topic not in self._callbacks:
+ self._callbacks[topic] = []
+ self._callbacks[topic].append(callback)
+
+ # Return unsubscribe function
+ def unsubscribe() -> None:
+ self.unsubscribe_callback(topic, callback)
+
+ return unsubscribe
+
+ def unsubscribe_callback(self, topic: Topic, callback: Callable[[Any, Topic], None]) -> None:
+ """Unsubscribe a callback from a topic."""
+ try:
+ if topic in self._callbacks:
+ self._callbacks[topic].remove(callback)
+ if not self._callbacks[topic]:
+ del self._callbacks[topic]
+ except ValueError:
+ pass
+
+
+class DDSEncoderMixin(PubSubEncoderMixin[Topic, Any, IdlStruct]):
+ def encode(self, msg: DDSMsg, _: Topic) -> bytes:
+ return msg.dds_encode()
+
+ def decode(self, msg: bytes, topic: Topic) -> DDSMsg:
+ if topic.dds_type is None:
+ raise ValueError(
+ f"Cannot decode message for topic '{topic.topic}': no dds_type specified"
+ )
+ return topic.dds_type.dds_decode(msg)
+
+
+class DDS(
+ # DDSEncoderMixin, # TODO: Add back so encoding and decoding is handled by DDS
+ DDSPubSubBase,
+): ...
+
+
+__all__ = [
+ "DDS",
+ "DDSEncoderMixin",
+ "DDSMsg",
+ "DDSPubSubBase",
+ "Topic",
+]
diff --git a/dimos/protocol/pubsub/jpeg_shm.py b/dimos/protocol/pubsub/jpeg_shm.py
index de6868390c..f2c9e35814 100644
--- a/dimos/protocol/pubsub/jpeg_shm.py
+++ b/dimos/protocol/pubsub/jpeg_shm.py
@@ -22,7 +22,7 @@
from dimos.protocol.pubsub.spec import PubSubEncoderMixin
-class JpegSharedMemoryEncoderMixin(PubSubEncoderMixin[str, Image]):
+class JpegSharedMemoryEncoderMixin(PubSubEncoderMixin[str, Image, bytes]):
def __init__(self, quality: int = 75, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(**kwargs)
self.jpeg = TurboJPEG()
diff --git a/dimos/protocol/pubsub/lcmpubsub.py b/dimos/protocol/pubsub/lcmpubsub.py
index e07d010895..07ab65bfbe 100644
--- a/dimos/protocol/pubsub/lcmpubsub.py
+++ b/dimos/protocol/pubsub/lcmpubsub.py
@@ -95,7 +95,7 @@ def unsubscribe() -> None:
return unsubscribe
-class LCMEncoderMixin(PubSubEncoderMixin[Topic, Any]):
+class LCMEncoderMixin(PubSubEncoderMixin[Topic, Any, bytes]):
def encode(self, msg: LCMMsg, _: Topic) -> bytes:
return msg.lcm_encode()
@@ -107,7 +107,7 @@ def decode(self, msg: bytes, topic: Topic) -> LCMMsg:
return topic.lcm_type.lcm_decode(msg)
-class JpegEncoderMixin(PubSubEncoderMixin[Topic, Any]):
+class JpegEncoderMixin(PubSubEncoderMixin[Topic, Any, bytes]):
def encode(self, msg: LCMMsg, _: Topic) -> bytes:
return msg.lcm_jpeg_encode() # type: ignore[attr-defined, no-any-return]
@@ -142,7 +142,6 @@ class JpegLCM(
"JpegLCM",
"LCMEncoderMixin",
"LCMMsg",
- "LCMMsg",
"LCMPubSubBase",
"PickleLCM",
"autoconf",
diff --git a/dimos/protocol/pubsub/rospubsub.py b/dimos/protocol/pubsub/rospubsub.py
new file mode 100644
index 0000000000..36547dc60c
--- /dev/null
+++ b/dimos/protocol/pubsub/rospubsub.py
@@ -0,0 +1,269 @@
+# Copyright 2025-2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections.abc import Callable
+from dataclasses import dataclass
+import importlib
+import threading
+from typing import Any, Protocol, TypeAlias, TypeVar, runtime_checkable
+
+try:
+ import rclpy
+ from rclpy.executors import SingleThreadedExecutor
+ from rclpy.node import Node
+ from rclpy.qos import (
+ QoSDurabilityPolicy,
+ QoSHistoryPolicy,
+ QoSProfile,
+ QoSReliabilityPolicy,
+ )
+
+ ROS_AVAILABLE = True
+except ImportError:
+ ROS_AVAILABLE = False
+ rclpy = None # type: ignore[assignment]
+ SingleThreadedExecutor = None # type: ignore[assignment, misc]
+ Node = None # type: ignore[assignment, misc]
+
+from dimos.protocol.pubsub.spec import MsgT, PubSub, PubSubEncoderMixin, TopicT
+
+
+# Type definitions for LCM and ROS messages, be gentle for now
+# just a sketch until proper translation is written
+@runtime_checkable
+class DimosMessage(Protocol):
+ """Protocol for LCM message types (from dimos_lcm or lcm_msgs)."""
+
+ msg_name: str
+ __slots__: tuple[str, ...]
+
+
+@runtime_checkable
+class ROSMessage(Protocol):
+ """Protocol for ROS message types."""
+
+ def get_fields_and_field_types(self) -> dict[str, str]: ...
+
+
+@dataclass
+class ROSTopic:
+ """Topic descriptor for ROS pubsub."""
+
+ topic: str
+ ros_type: type
+ qos: "QoSProfile | None" = None # Optional per-topic QoS override
+
+
+class RawROS(PubSub[ROSTopic, Any]):
+ """ROS 2 PubSub implementation following the PubSub spec.
+
+ This allows direct comparison of ROS messaging performance against
+ native LCM and other pubsub implementations.
+ """
+
+ def __init__(
+ self, node_name: str = "dimos_ros_pubsub", qos: "QoSProfile | None" = None
+ ) -> None:
+ """Initialize the ROS pubsub.
+
+ Args:
+ node_name: Name for the ROS node
+ qos: Optional QoS profile (defaults to BEST_EFFORT for throughput)
+ """
+ if not ROS_AVAILABLE:
+ raise ImportError("rclpy is not installed. ROS pubsub requires ROS 2.")
+
+ self._node_name = node_name
+ self._node: Node | None = None
+ self._executor: SingleThreadedExecutor | None = None
+ self._spin_thread: threading.Thread | None = None
+ self._running = False
+
+ # Track publishers and subscriptions
+ self._publishers: dict[str, Any] = {}
+ self._subscriptions: dict[str, list[tuple[Any, Callable[[Any, ROSTopic], None]]]] = {}
+ self._lock = threading.Lock()
+
+ # QoS profile - use provided or default to best-effort for throughput
+ if qos is not None:
+ self._qos = qos
+ else:
+ self._qos = QoSProfile(
+ reliability=QoSReliabilityPolicy.BEST_EFFORT,
+ history=QoSHistoryPolicy.KEEP_LAST,
+ durability=QoSDurabilityPolicy.VOLATILE,
+ depth=1,
+ )
+
+ def start(self) -> None:
+ """Start the ROS node and executor."""
+ if self._running:
+ return
+
+ if not rclpy.ok():
+ rclpy.init()
+
+ self._node = Node(self._node_name)
+ self._executor = SingleThreadedExecutor()
+ self._executor.add_node(self._node)
+
+ self._running = True
+ self._spin_thread = threading.Thread(target=self._spin, name="ros_pubsub_spin")
+ self._spin_thread.start()
+
+ def stop(self) -> None:
+ """Stop the ROS node and clean up."""
+ if not self._running:
+ return
+
+ self._running = False
+
+ # Wake up the executor so spin thread can exit
+ if self._executor:
+ self._executor.wake()
+
+ # Wait for spin thread to finish
+ if self._spin_thread and self._spin_thread.is_alive():
+ self._spin_thread.join(timeout=2.0)
+
+ if self._executor:
+ self._executor.shutdown()
+
+ if self._node:
+ self._node.destroy_node()
+
+ if rclpy.ok():
+ rclpy.shutdown()
+
+ self._publishers.clear()
+ self._subscriptions.clear()
+ self._spin_thread = None
+
+ def _spin(self) -> None:
+ """Background thread for spinning the ROS executor."""
+ while self._running:
+ executor = self._executor
+ if executor is None:
+ break
+ executor.spin_once(timeout_sec=0) # Non-blocking for max throughput
+
+ def _get_or_create_publisher(self, topic: ROSTopic) -> Any:
+ """Get existing publisher or create a new one."""
+ with self._lock:
+ if topic.topic not in self._publishers:
+ node = self._node
+ if node is None:
+ raise RuntimeError("Pubsub must be started before publishing")
+ qos = topic.qos if topic.qos is not None else self._qos
+ self._publishers[topic.topic] = node.create_publisher(
+ topic.ros_type, topic.topic, qos
+ )
+ return self._publishers[topic.topic]
+
+ def publish(self, topic: ROSTopic, message: Any) -> None:
+ """Publish a message to a ROS topic.
+
+ Args:
+ topic: ROSTopic descriptor with topic name and message type
+ message: ROS message to publish
+ """
+ if not self._running or not self._node:
+ return
+
+ publisher = self._get_or_create_publisher(topic)
+ publisher.publish(message)
+
+ def subscribe(
+ self, topic: ROSTopic, callback: Callable[[Any, ROSTopic], None]
+ ) -> Callable[[], None]:
+ """Subscribe to a ROS topic with a callback.
+
+ Args:
+ topic: ROSTopic descriptor with topic name and message type
+ callback: Function called with (message, topic) when message received
+
+ Returns:
+ Unsubscribe function
+ """
+ if not self._running or not self._node:
+ raise RuntimeError("ROS pubsub not started")
+
+ with self._lock:
+
+ def ros_callback(msg: Any) -> None:
+ callback(msg, topic)
+
+ qos = topic.qos if topic.qos is not None else self._qos
+ subscription = self._node.create_subscription(
+ topic.ros_type, topic.topic, ros_callback, qos
+ )
+
+ if topic.topic not in self._subscriptions:
+ self._subscriptions[topic.topic] = []
+ self._subscriptions[topic.topic].append((subscription, callback))
+
+ def unsubscribe() -> None:
+ with self._lock:
+ if topic.topic in self._subscriptions:
+ self._subscriptions[topic.topic] = [
+ (sub, cb)
+ for sub, cb in self._subscriptions[topic.topic]
+ if cb is not callback
+ ]
+ if self._node:
+ self._node.destroy_subscription(subscription)
+
+ return unsubscribe
+
+
+class Dimos2RosMixin(PubSubEncoderMixin[TopicT, DimosMessage, ROSMessage]):
+ """Mixin that converts between dimos_lcm (LCM-based) and ROS messages.
+
+ This enables seamless interop: publish LCM messages to ROS topics
+ and receive ROS messages as LCM messages.
+ """
+
+ def encode(self, msg: DimosMessage, *_: TopicT) -> ROSMessage:
+ """Convert a dimos_lcm message to its equivalent ROS message.
+
+ Args:
+ msg: An LCM message (e.g., dimos_lcm.geometry_msgs.Vector3)
+
+ Returns:
+ The corresponding ROS message (e.g., geometry_msgs.msg.Vector3)
+ """
+ raise NotImplementedError("Encode method not implemented")
+
+ def decode(self, msg: ROSMessage, _: TopicT | None = None) -> DimosMessage:
+ """Convert a ROS message to its equivalent dimos_lcm message.
+
+ Args:
+ msg: A ROS message (e.g., geometry_msgs.msg.Vector3)
+
+ Returns:
+ The corresponding LCM message (e.g., dimos_lcm.geometry_msgs.Vector3)
+ """
+ raise NotImplementedError("Decode method not implemented")
+
+
+class DimosROS(
+ Dimos2RosMixin[ROSTopic],
+ RawROS,
+):
+ """ROS PubSub with automatic dimos.msgs β ROS message conversion."""
+
+ pass
+
+
+ROS = DimosROS
diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py
index 0006020f6c..e1ae8600aa 100644
--- a/dimos/protocol/pubsub/shmpubsub.py
+++ b/dimos/protocol/pubsub/shmpubsub.py
@@ -124,7 +124,7 @@ def __init__(
def start(self) -> None:
pref = (self.config.prefer or "auto").lower()
backend = os.getenv("DIMOS_IPC_BACKEND", pref).lower()
- logger.info(f"SharedMemory PubSub starting (backend={backend})")
+ logger.debug(f"SharedMemory PubSub starting (backend={backend})")
# No global thread needed; per-topic fanout starts on first subscribe.
def stop(self) -> None:
@@ -145,7 +145,7 @@ def stop(self) -> None:
except Exception:
pass
self._topics.clear()
- logger.info("SharedMemory PubSub stopped.")
+ logger.debug("SharedMemory PubSub stopped.")
# ----- PubSub API (bytes on the wire) ----------------------------------
@@ -295,7 +295,7 @@ def _fanout_loop(self, topic: str, st: _TopicState) -> None:
# --------------------------------------------------------------------------------------
-class SharedMemoryBytesEncoderMixin(PubSubEncoderMixin[str, bytes]):
+class SharedMemoryBytesEncoderMixin(PubSubEncoderMixin[str, bytes, bytes]):
"""Identity encoder for raw bytes."""
def encode(self, msg: bytes, _: str) -> bytes:
diff --git a/dimos/protocol/pubsub/spec.py b/dimos/protocol/pubsub/spec.py
index a43061e492..b4e82d3993 100644
--- a/dimos/protocol/pubsub/spec.py
+++ b/dimos/protocol/pubsub/spec.py
@@ -22,6 +22,7 @@
MsgT = TypeVar("MsgT")
TopicT = TypeVar("TopicT")
+EncodingT = TypeVar("EncodingT")
class PubSub(Generic[TopicT, MsgT], ABC):
@@ -91,7 +92,7 @@ def _queue_cb(msg: MsgT, topic: TopicT) -> None:
unsubscribe_fn()
-class PubSubEncoderMixin(Generic[TopicT, MsgT], ABC):
+class PubSubEncoderMixin(Generic[TopicT, MsgT, EncodingT], ABC):
"""Mixin that encodes messages before publishing and decodes them after receiving.
Usage: Just specify encoder and decoder as a subclass:
@@ -104,10 +105,10 @@ def decoder(msg, topic):
"""
@abstractmethod
- def encode(self, msg: MsgT, topic: TopicT) -> bytes: ...
+ def encode(self, msg: MsgT, topic: TopicT) -> EncodingT: ...
@abstractmethod
- def decode(self, msg: bytes, topic: TopicT) -> MsgT: ...
+ def decode(self, msg: EncodingT, topic: TopicT) -> MsgT: ...
def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(*args, **kwargs)
@@ -127,14 +128,14 @@ def subscribe(
) -> Callable[[], None]:
"""Subscribe with automatic decoding."""
- def wrapper_cb(encoded_data: bytes, topic: TopicT) -> None:
+ def wrapper_cb(encoded_data: EncodingT, topic: TopicT) -> None:
decoded_message = self.decode(encoded_data, topic)
callback(decoded_message, topic)
return super().subscribe(topic, wrapper_cb) # type: ignore[misc, no-any-return]
-class PickleEncoderMixin(PubSubEncoderMixin[TopicT, MsgT]):
+class PickleEncoderMixin(PubSubEncoderMixin[TopicT, MsgT, bytes]):
def encode(self, msg: MsgT, *_: TopicT) -> bytes: # type: ignore[return]
try:
return pickle.dumps(msg)
diff --git a/dimos/protocol/pubsub/test_encoder.py b/dimos/protocol/pubsub/test_encoder.py
index f39bd170d5..38aac4664d 100644
--- a/dimos/protocol/pubsub/test_encoder.py
+++ b/dimos/protocol/pubsub/test_encoder.py
@@ -15,6 +15,7 @@
# limitations under the License.
import json
+from typing import Any
from dimos.protocol.pubsub.memory import Memory, MemoryWithJSONEncoder
@@ -24,7 +25,7 @@ def test_json_encoded_pubsub() -> None:
pubsub = MemoryWithJSONEncoder()
received_messages = []
- def callback(message, topic) -> None:
+ def callback(message: Any, topic: str) -> None:
received_messages.append(message)
# Subscribe to a topic
@@ -56,7 +57,7 @@ def test_json_encoding_edge_cases() -> None:
pubsub = MemoryWithJSONEncoder()
received_messages = []
- def callback(message, topic) -> None:
+ def callback(message: Any, topic: str) -> None:
received_messages.append(message)
pubsub.subscribe("edge_cases", callback)
@@ -84,10 +85,10 @@ def test_multiple_subscribers_with_encoding() -> None:
received_messages_1 = []
received_messages_2 = []
- def callback_1(message, topic) -> None:
+ def callback_1(message: Any, topic: str) -> None:
received_messages_1.append(message)
- def callback_2(message, topic) -> None:
+ def callback_2(message: Any, topic: str) -> None:
received_messages_2.append(f"callback_2: {message}")
pubsub.subscribe("json_topic", callback_1)
@@ -130,9 +131,9 @@ def test_data_actually_encoded_in_transit() -> None:
class SpyMemory(Memory):
def __init__(self) -> None:
super().__init__()
- self.raw_messages_received = []
+ self.raw_messages_received: list[tuple[str, Any, type]] = []
- def publish(self, topic: str, message) -> None:
+ def publish(self, topic: str, message: Any) -> None:
# Capture what actually gets published
self.raw_messages_received.append((topic, message, type(message)))
super().publish(topic, message)
@@ -142,9 +143,9 @@ class SpyMemoryWithJSON(MemoryWithJSONEncoder, SpyMemory):
pass
pubsub = SpyMemoryWithJSON()
- received_decoded = []
+ received_decoded: list[Any] = []
- def callback(message, topic) -> None:
+ def callback(message: Any, topic: str) -> None:
received_decoded.append(message)
pubsub.subscribe("test_topic", callback)
diff --git a/dimos/protocol/pubsub/test_lcmpubsub.py b/dimos/protocol/pubsub/test_lcmpubsub.py
index d06bf20716..8165be9fef 100644
--- a/dimos/protocol/pubsub/test_lcmpubsub.py
+++ b/dimos/protocol/pubsub/test_lcmpubsub.py
@@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from collections.abc import Generator
import time
+from typing import Any
import pytest
@@ -26,7 +28,7 @@
@pytest.fixture
-def lcm_pub_sub_base():
+def lcm_pub_sub_base() -> Generator[LCMPubSubBase, None, None]:
lcm = LCMPubSubBase(autoconf=True)
lcm.start()
yield lcm
@@ -34,7 +36,7 @@ def lcm_pub_sub_base():
@pytest.fixture
-def pickle_lcm():
+def pickle_lcm() -> Generator[PickleLCM, None, None]:
lcm = PickleLCM(autoconf=True)
lcm.start()
yield lcm
@@ -42,7 +44,7 @@ def pickle_lcm():
@pytest.fixture
-def lcm():
+def lcm() -> Generator[LCM, None, None]:
lcm = LCM(autoconf=True)
lcm.start()
yield lcm
@@ -54,7 +56,7 @@ class MockLCMMessage:
msg_name = "geometry_msgs.Mock"
- def __init__(self, data) -> None:
+ def __init__(self, data: Any) -> None:
self.data = data
def lcm_encode(self) -> bytes:
@@ -64,19 +66,19 @@ def lcm_encode(self) -> bytes:
def lcm_decode(cls, data: bytes) -> "MockLCMMessage":
return cls(data.decode("utf-8"))
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
return isinstance(other, MockLCMMessage) and self.data == other.data
-def test_LCMPubSubBase_pubsub(lcm_pub_sub_base) -> None:
+def test_LCMPubSubBase_pubsub(lcm_pub_sub_base: LCMPubSubBase) -> None:
lcm = lcm_pub_sub_base
- received_messages = []
+ received_messages: list[tuple[Any, Any]] = []
topic = Topic(topic="/test_topic", lcm_type=MockLCMMessage)
test_message = MockLCMMessage("test_data")
- def callback(msg, topic) -> None:
+ def callback(msg: Any, topic: Any) -> None:
received_messages.append((msg, topic))
lcm.subscribe(topic, callback)
@@ -97,13 +99,13 @@ def callback(msg, topic) -> None:
assert received_topic == topic
-def test_lcm_autodecoder_pubsub(lcm) -> None:
- received_messages = []
+def test_lcm_autodecoder_pubsub(lcm: LCM) -> None:
+ received_messages: list[tuple[Any, Any]] = []
topic = Topic(topic="/test_topic", lcm_type=MockLCMMessage)
test_message = MockLCMMessage("test_data")
- def callback(msg, topic) -> None:
+ def callback(msg: Any, topic: Any) -> None:
received_messages.append((msg, topic))
lcm.subscribe(topic, callback)
@@ -133,12 +135,12 @@ def callback(msg, topic) -> None:
# passes some geometry types through LCM
@pytest.mark.parametrize("test_message", test_msgs)
-def test_lcm_geometry_msgs_pubsub(test_message, lcm) -> None:
- received_messages = []
+def test_lcm_geometry_msgs_pubsub(test_message: Any, lcm: LCM) -> None:
+ received_messages: list[tuple[Any, Any]] = []
topic = Topic(topic="/test_topic", lcm_type=test_message.__class__)
- def callback(msg, topic) -> None:
+ def callback(msg: Any, topic: Any) -> None:
received_messages.append((msg, topic))
lcm.subscribe(topic, callback)
@@ -164,13 +166,13 @@ def callback(msg, topic) -> None:
# passes some geometry types through pickle LCM
@pytest.mark.parametrize("test_message", test_msgs)
-def test_lcm_geometry_msgs_autopickle_pubsub(test_message, pickle_lcm) -> None:
+def test_lcm_geometry_msgs_autopickle_pubsub(test_message: Any, pickle_lcm: PickleLCM) -> None:
lcm = pickle_lcm
- received_messages = []
+ received_messages: list[tuple[Any, Any]] = []
topic = Topic(topic="/test_topic")
- def callback(msg, topic) -> None:
+ def callback(msg: Any, topic: Any) -> None:
received_messages.append((msg, topic))
lcm.subscribe(topic, callback)
diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py
index 91e8514b70..0abbfca02b 100644
--- a/dimos/protocol/pubsub/test_spec.py
+++ b/dimos/protocol/pubsub/test_spec.py
@@ -15,7 +15,7 @@
# limitations under the License.
import asyncio
-from collections.abc import Callable
+from collections.abc import Callable, Generator
from contextlib import contextmanager
import time
from typing import Any
@@ -28,7 +28,7 @@
@contextmanager
-def memory_context():
+def memory_context() -> Generator[Memory, None, None]:
"""Context manager for Memory PubSub implementation."""
memory = Memory()
try:
@@ -47,7 +47,7 @@ def memory_context():
from dimos.protocol.pubsub.redispubsub import Redis
@contextmanager
- def redis_context():
+ def redis_context() -> Generator[Redis, None, None]:
redis_pubsub = Redis()
redis_pubsub.start()
yield redis_pubsub
@@ -63,7 +63,7 @@ def redis_context():
@contextmanager
-def lcm_context():
+def lcm_context() -> Generator[LCM, None, None]:
lcm_pubsub = LCM(autoconf=True)
lcm_pubsub.start()
yield lcm_pubsub
@@ -83,7 +83,7 @@ def lcm_context():
@contextmanager
-def shared_memory_cpu_context():
+def shared_memory_cpu_context() -> Generator[PickleSharedMemory, None, None]:
shared_mem_pubsub = PickleSharedMemory(prefer="cpu")
shared_mem_pubsub.start()
yield shared_mem_pubsub
@@ -100,13 +100,13 @@ def shared_memory_cpu_context():
@pytest.mark.parametrize("pubsub_context, topic, values", testdata)
-def test_store(pubsub_context, topic, values) -> None:
+def test_store(pubsub_context: Callable[[], Any], topic: Any, values: list[Any]) -> None:
with pubsub_context() as x:
# Create a list to capture received messages
- received_messages = []
+ received_messages: list[Any] = []
# Define callback function that stores received messages
- def callback(message, _) -> None:
+ def callback(message: Any, _: Any) -> None:
received_messages.append(message)
# Subscribe to the topic with our callback
@@ -125,18 +125,20 @@ def callback(message, _) -> None:
@pytest.mark.parametrize("pubsub_context, topic, values", testdata)
-def test_multiple_subscribers(pubsub_context, topic, values) -> None:
+def test_multiple_subscribers(
+ pubsub_context: Callable[[], Any], topic: Any, values: list[Any]
+) -> None:
"""Test that multiple subscribers receive the same message."""
with pubsub_context() as x:
# Create lists to capture received messages for each subscriber
- received_messages_1 = []
- received_messages_2 = []
+ received_messages_1: list[Any] = []
+ received_messages_2: list[Any] = []
# Define callback functions
- def callback_1(message, topic) -> None:
+ def callback_1(message: Any, topic: Any) -> None:
received_messages_1.append(message)
- def callback_2(message, topic) -> None:
+ def callback_2(message: Any, topic: Any) -> None:
received_messages_2.append(message)
# Subscribe both callbacks to the same topic
@@ -157,14 +159,14 @@ def callback_2(message, topic) -> None:
@pytest.mark.parametrize("pubsub_context, topic, values", testdata)
-def test_unsubscribe(pubsub_context, topic, values) -> None:
+def test_unsubscribe(pubsub_context: Callable[[], Any], topic: Any, values: list[Any]) -> None:
"""Test that unsubscribed callbacks don't receive messages."""
with pubsub_context() as x:
# Create a list to capture received messages
- received_messages = []
+ received_messages: list[Any] = []
# Define callback function
- def callback(message, topic) -> None:
+ def callback(message: Any, topic: Any) -> None:
received_messages.append(message)
# Subscribe and get unsubscribe function
@@ -184,14 +186,16 @@ def callback(message, topic) -> None:
@pytest.mark.parametrize("pubsub_context, topic, values", testdata)
-def test_multiple_messages(pubsub_context, topic, values) -> None:
+def test_multiple_messages(
+ pubsub_context: Callable[[], Any], topic: Any, values: list[Any]
+) -> None:
"""Test that subscribers receive multiple messages in order."""
with pubsub_context() as x:
# Create a list to capture received messages
- received_messages = []
+ received_messages: list[Any] = []
# Define callback function
- def callback(message, topic) -> None:
+ def callback(message: Any, topic: Any) -> None:
received_messages.append(message)
# Subscribe to the topic
@@ -212,7 +216,9 @@ def callback(message, topic) -> None:
@pytest.mark.parametrize("pubsub_context, topic, values", testdata)
@pytest.mark.asyncio
-async def test_async_iterator(pubsub_context, topic, values) -> None:
+async def test_async_iterator(
+ pubsub_context: Callable[[], Any], topic: Any, values: list[Any]
+) -> None:
"""Test that async iterator receives messages correctly."""
with pubsub_context() as x:
# Get the messages to send (using the rest of the values)
@@ -261,15 +267,17 @@ async def consume_messages() -> None:
@pytest.mark.parametrize("pubsub_context, topic, values", testdata)
-def test_high_volume_messages(pubsub_context, topic, values) -> None:
+def test_high_volume_messages(
+ pubsub_context: Callable[[], Any], topic: Any, values: list[Any]
+) -> None:
"""Test that all 5000 messages are received correctly."""
with pubsub_context() as x:
# Create a list to capture received messages
- received_messages = []
+ received_messages: list[Any] = []
last_message_time = [time.time()] # Use list to allow modification in callback
# Define callback function
- def callback(message, topic) -> None:
+ def callback(message: Any, topic: Any) -> None:
received_messages.append(message)
last_message_time[0] = time.time()
diff --git a/dimos/protocol/service/ddsservice.py b/dimos/protocol/service/ddsservice.py
new file mode 100644
index 0000000000..050d481faf
--- /dev/null
+++ b/dimos/protocol/service/ddsservice.py
@@ -0,0 +1,67 @@
+# Copyright 2025-2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+import threading
+from typing import TYPE_CHECKING, Any
+
+from cyclonedds.domain import DomainParticipant
+
+from dimos.protocol.service.spec import Service
+from dimos.utils.logging_config import setup_logger
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+
+logger = setup_logger()
+
+
+@dataclass
+class DDSConfig:
+ """Configuration for DDS service."""
+
+ domain_id: int = 0
+ participant: DomainParticipant | None = None
+
+
+class DDSService(Service[DDSConfig]):
+ default_config = DDSConfig
+ _participant: DomainParticipant | None = None
+ _participant_lock = threading.Lock()
+
+ def __init__(self, **kwargs: Any) -> None:
+ super().__init__(**kwargs)
+
+ def start(self) -> None:
+ """Start the DDS service."""
+ pass
+
+ def stop(self) -> None:
+ """Stop the DDS service."""
+ pass
+
+ def get_participant(self) -> DomainParticipant:
+ """Get the DomainParticipant instance, or None if not yet initialized."""
+
+ # Lazy initialization of the participant
+ with self.__class__._participant_lock:
+ if self.__class__._participant is None:
+ self.__class__._participant = DomainParticipant(self.config.domain_id)
+ logger.info(f"DDS service started with Cyclone DDS domain {self.config.domain_id}")
+ return self.__class__._participant
+
+
+__all__ = ["DDSConfig", "DDSService"]
diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py
index ec2db16006..75a6a4d362 100644
--- a/dimos/protocol/service/lcmservice.py
+++ b/dimos/protocol/service/lcmservice.py
@@ -107,7 +107,7 @@ def _set_net_value(commands_needed: list[str], sudo: str, name: str, value: int)
return None
-TARGET_RMEM_SIZE = 2097152 # prev was 67108864
+TARGET_RMEM_SIZE = 67108864
TARGET_MAX_SOCKET_BUFFER_SIZE_MACOS = 8388608
TARGET_MAX_DGRAM_SIZE_MACOS = 65535
@@ -155,7 +155,7 @@ def check_system() -> None:
multicast_commands = check_multicast()
buffer_commands, current_buffer_size = check_buffers()
- # Check multicast first - this is critical
+ # Multicast configuration
if multicast_commands:
logger.error(
"Critical: Multicast configuration required. Please run the following commands:"
@@ -165,7 +165,7 @@ def check_system() -> None:
logger.error("\nThen restart your application.")
sys.exit(1)
- # Buffer configuration is just for performance
+ # Buffer configuration is critical for throughput and packet loss
elif buffer_commands:
if current_buffer_size:
logger.warning(
diff --git a/dimos/protocol/service/test_lcmservice.py b/dimos/protocol/service/test_lcmservice.py
index 977a441676..ce686d9e1d 100644
--- a/dimos/protocol/service/test_lcmservice.py
+++ b/dimos/protocol/service/test_lcmservice.py
@@ -183,7 +183,7 @@ def test_check_buffers_all_configured() -> None:
type(
"MockResult",
(),
- {"stdout": "net.core.rmem_default = 16777216", "returncode": 0},
+ {"stdout": "net.core.rmem_default = 67108864", "returncode": 0},
)(),
]
diff --git a/dimos/robot/unitree_webrtc/mujoco_connection.py b/dimos/robot/unitree_webrtc/mujoco_connection.py
index a8e5b5e6d4..6fac68a8f8 100644
--- a/dimos/robot/unitree_webrtc/mujoco_connection.py
+++ b/dimos/robot/unitree_webrtc/mujoco_connection.py
@@ -63,9 +63,11 @@ def __init__(self, global_config: GlobalConfig) -> None:
# Trigger the download of the mujoco_menagerie package. This is so it
# doesn't trigger in the mujoco process where it can time out.
- from mujoco_playground._src import mjx_env
+ # When using a profile bundle, we should not rely on menagerie assets.
+ if not global_config.mujoco_profile:
+ from mujoco_playground._src import mjx_env
- mjx_env.ensure_menagerie_exists()
+ mjx_env.ensure_menagerie_exists()
self.global_config = global_config
self.process: subprocess.Popen[bytes] | None = None
diff --git a/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py b/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py
index c79cee2a18..6e332a97cf 100644
--- a/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py
+++ b/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py
@@ -49,7 +49,15 @@
from dimos.msgs.vision_msgs import Detection2DArray
from dimos.navigation.frontier_exploration import wavefront_frontier_explorer
from dimos.navigation.replanning_a_star.module import replanning_a_star_planner
-from dimos.navigation.rosnav import ros_nav
+
+try:
+ # Optional ROS2 navigation module (lazy import so simulation can run without ROS installed).
+ from dimos.navigation.rosnav import ros_nav # type: ignore[import-untyped]
+
+ HAS_ROS_NAV = True
+except Exception: # pragma: no cover
+ ros_nav = None # type: ignore[assignment]
+ HAS_ROS_NAV = False
from dimos.perception.detection.detectors.person.yolo import YoloPersonDetector
from dimos.perception.detection.module3D import Detection3DModule, detection3d_module
from dimos.perception.detection.moduleDB import ObjectDBModule, detectionDB_module
@@ -64,7 +72,7 @@
from dimos.utils.monitoring import utilization
from dimos.web.websocket_vis.websocket_vis_module import websocket_vis
-_basic_no_nav = (
+_basic_no_nav_hw = (
autoconnect(
camera_module(
transform=Transform(
@@ -113,14 +121,57 @@
)
)
-basic_ros = autoconnect(
- _basic_no_nav,
- g1_connection(),
- ros_nav(),
+# Simulation-friendly base stack: no physical webcam access.
+_basic_no_nav_sim = (
+ autoconnect(
+ voxel_mapper(voxel_size=0.1),
+ cost_mapper(),
+ wavefront_frontier_explorer(),
+ # Visualization
+ websocket_vis(),
+ foxglove_bridge(),
+ )
+ .global_config(n_dask_workers=4, robot_model="unitree_g1")
+ .transports(
+ {
+ # G1 uses Twist for movement commands
+ ("cmd_vel", Twist): LCMTransport("/cmd_vel", Twist),
+ # State estimation from ROS
+ ("state_estimation", Odometry): LCMTransport("/state_estimation", Odometry),
+ # Odometry output from ROSNavigationModule
+ ("odom", PoseStamped): LCMTransport("/odom", PoseStamped),
+ # Navigation module topics from nav_bot
+ ("goal_req", PoseStamped): LCMTransport("/goal_req", PoseStamped),
+ ("goal_active", PoseStamped): LCMTransport("/goal_active", PoseStamped),
+ ("path_active", Path): LCMTransport("/path_active", Path),
+ ("pointcloud", PointCloud2): LCMTransport("/lidar", PointCloud2),
+ ("global_pointcloud", PointCloud2): LCMTransport("/map", PointCloud2),
+ # Original navigation topics for backwards compatibility
+ ("goal_pose", PoseStamped): LCMTransport("/goal_pose", PoseStamped),
+ ("goal_reached", Bool): LCMTransport("/goal_reached", Bool),
+ ("cancel_goal", Bool): LCMTransport("/cancel_goal", Bool),
+ # Camera topics (sim connection may publish these)
+ ("color_image", Image): LCMTransport("/g1/color_image", Image),
+ ("camera_info", CameraInfo): LCMTransport("/g1/camera_info", CameraInfo),
+ }
+ )
)
+if HAS_ROS_NAV and ros_nav is not None:
+ basic_ros = autoconnect(
+ _basic_no_nav_hw,
+ g1_connection(),
+ ros_nav(),
+ )
+else:
+ # ROS-free fallback: keep the same blueprint name, but omit the ROS nav module.
+ basic_ros = autoconnect(
+ _basic_no_nav_hw,
+ g1_connection(),
+ )
+
basic_sim = autoconnect(
- _basic_no_nav,
+ _basic_no_nav_sim,
g1_sim_connection(),
replanning_a_star_planner(),
)
diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py
index be57ad14a7..be53702412 100644
--- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py
+++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py
@@ -93,7 +93,7 @@
nav = autoconnect(
basic,
- voxel_mapper(voxel_size=0.05),
+ voxel_mapper(voxel_size=0.1),
cost_mapper(),
replanning_a_star_planner(),
wavefront_frontier_explorer(),
diff --git a/dimos/simulation/mujoco/model.py b/dimos/simulation/mujoco/model.py
index de533521da..9c6543569d 100644
--- a/dimos/simulation/mujoco/model.py
+++ b/dimos/simulation/mujoco/model.py
@@ -15,6 +15,7 @@
# limitations under the License.
+import json
from pathlib import Path
import xml.etree.ElementTree as ET
@@ -27,7 +28,12 @@
from dimos.mapping.occupancy.extrude_occupancy import generate_mujoco_scene
from dimos.msgs.nav_msgs.OccupancyGrid import OccupancyGrid
from dimos.simulation.mujoco.input_controller import InputController
-from dimos.simulation.mujoco.policy import G1OnnxController, Go1OnnxController, OnnxController
+from dimos.simulation.mujoco.policy import (
+ G1OnnxController,
+ Go1OnnxController,
+ MjlabVelocityOnnxController,
+ OnnxController,
+)
from dimos.utils.data import get_data
@@ -35,26 +41,79 @@ def _get_data_dir() -> epath.Path:
return epath.Path(str(get_data("mujoco_sim")))
-def get_assets() -> dict[str, bytes]:
+def _bundle_dir(profile: str) -> epath.Path:
+ return _get_data_dir() / profile
+
+
+def _bundle_model_path(profile: str) -> epath.Path:
+ # New bundle layout: data/mujoco_sim//model.xml
+ return _bundle_dir(profile) / "model.xml"
+
+
+def _bundle_policy_path(profile: str) -> epath.Path:
+ # New bundle layout: data/mujoco_sim//policy.onnx
+ return _bundle_dir(profile) / "policy.onnx"
+
+
+def _legacy_profile_xml_path(profile: str) -> epath.Path:
+ # Legacy layout (older integration): data/mujoco_sim/.xml
+ return _get_data_dir() / f"{profile}.xml"
+
+
+def _legacy_profile_policy_path(profile: str) -> epath.Path:
+ # Legacy layout (older integration): data/mujoco_sim/_policy.onnx
+ return _get_data_dir() / f"{profile}_policy.onnx"
+
+
+def load_bundle_json(profile: str) -> dict[str, object] | None:
+ """Load optional bundle.json for a MuJoCo profile.
+
+ The MuJoCo subprocess uses this to resolve profile-specific camera names.
+ """
+ cfg_path = _bundle_dir(profile) / "bundle.json"
+ if not cfg_path.exists():
+ return None
+ with cfg_path.open("r", encoding="utf-8") as f:
+ return json.load(f)
+
+
+def get_assets(*, profile: str | None = None) -> dict[str, bytes]:
data_dir = _get_data_dir()
# Assets used from https://sketchfab.com/3d-models/mersus-office-8714be387bcd406898b2615f7dae3a47
# Created by Ryan Cassidy and Coleman Costello
assets: dict[str, bytes] = {}
+
+ # Add all top-level XMLs. Keys must match include paths like "unitree_go1.xml".
mjx_env.update_assets(assets, data_dir, "*.xml")
- mjx_env.update_assets(assets, data_dir / "scene_office1/textures", "*.png")
- mjx_env.update_assets(assets, data_dir / "scene_office1/office_split", "*.obj")
- mjx_env.update_assets(assets, mjx_env.MENAGERIE_PATH / "unitree_go1" / "assets")
- mjx_env.update_assets(assets, mjx_env.MENAGERIE_PATH / "unitree_g1" / "assets")
+
+ # Scene assets are referenced with explicit paths (e.g. "scene_office1/office_split/*.obj")
+ # after we rewrite the scene XML in get_model_xml(). Load them with path-prefixed keys.
+ fs_root = Path(str(data_dir))
+ for p in (fs_root / "scene_office1/textures").glob("*.png"):
+ assets[f"scene_office1/textures/{p.name}"] = p.read_bytes()
+ for p in (fs_root / "scene_office1/office_split").glob("*.obj"):
+ assets[f"scene_office1/office_split/{p.name}"] = p.read_bytes()
+
+ if profile:
+ # Bundle-scoped assets: keep the sim fully self-contained when a profile is used.
+ # Include model.xml and anything under assets/ (meshes/textures).
+ for p in (fs_root / profile).rglob("*"):
+ if p.is_file():
+ assets[p.relative_to(fs_root).as_posix()] = p.read_bytes()
+ else:
+ mjx_env.update_assets(assets, mjx_env.MENAGERIE_PATH / "unitree_go1" / "assets")
+ mjx_env.update_assets(assets, mjx_env.MENAGERIE_PATH / "unitree_g1" / "assets")
return assets
def load_model(
- input_device: InputController, robot: str, scene_xml: str
+ input_device: InputController, robot: str, scene_xml: str, *, profile: str | None = None
) -> tuple[mujoco.MjModel, mujoco.MjData]:
mujoco.set_mjcb_control(None)
- xml_string = get_model_xml(robot, scene_xml)
- model = mujoco.MjModel.from_xml_string(xml_string, assets=get_assets())
+ include_name = profile or robot
+ xml_string = get_model_xml(robot=robot, scene_xml=scene_xml, profile=profile)
+ model = mujoco.MjModel.from_xml_string(xml_string, assets=get_assets(profile=profile))
data = mujoco.MjData(model)
mujoco.mj_resetDataKeyframe(model, data, 0)
@@ -69,9 +128,21 @@ def load_model(
n_substeps = round(ctrl_dt / sim_dt)
model.opt.timestep = sim_dt
+ # Resolve policy path. Prefer new bundle layout; fall back to legacy flat files.
+ if profile and _bundle_policy_path(profile).exists():
+ policy_path = _bundle_policy_path(profile).as_posix()
+ else:
+ policy_path = (_legacy_profile_policy_path(include_name)).as_posix()
+
+ # Default joint angles used for legacy controllers and as a shape reference.
+ # Some MJLab-exported bundles use "init_state" and may not provide "home".
+ if model.nkey > 0:
+ default_qpos = np.array(model.key_qpos[0, 7:], dtype=np.float32)
+ else:
+ default_qpos = np.array(data.qpos[7:], dtype=np.float32)
params = {
- "policy_path": (_get_data_dir() / f"{robot}_policy.onnx").as_posix(),
- "default_angles": np.array(model.keyframe("home").qpos[7:]),
+ "policy_path": policy_path,
+ "default_angles": default_qpos,
"n_substeps": n_substeps,
"action_scale": 0.5,
"input_controller": input_device,
@@ -82,7 +153,11 @@ def load_model(
case "unitree_go1":
policy: OnnxController = Go1OnnxController(**params)
case "unitree_g1":
- policy = G1OnnxController(**params, drift_compensation=[-0.18, 0.0, -0.09])
+ # Select controller by profile/bundle name when provided.
+ if include_name == "unitree_g1_mjlab":
+ policy = MjlabVelocityOnnxController(**params)
+ else:
+ policy = G1OnnxController(**params, drift_compensation=[-0.18, 0.0, -0.09])
case _:
raise ValueError(f"Unknown robot policy: {robot}")
@@ -91,10 +166,42 @@ def load_model(
return model, data
-def get_model_xml(robot: str, scene_xml: str) -> str:
+def get_model_xml(*, robot: str, scene_xml: str, profile: str | None = None) -> str:
root = ET.fromstring(scene_xml)
- root.set("model", f"{robot}_scene")
- root.insert(0, ET.Element("include", file=f"{robot}.xml"))
+ root.set("model", f"{(profile or robot)}_scene")
+
+ # The office scene config uses a global compiler meshdir/texturedir.
+ # When we include a robot MJCF (e.g. Unitree GO1) that references meshes like "trunk.stl"
+ # without a directory prefix, MuJoCo incorrectly resolves them relative to the scene meshdir
+ # and fails to load. Fix by rewriting scene asset file paths to be explicit and clearing
+ # meshdir/texturedir so they can't leak into the included robot model.
+ compiler = root.find("compiler")
+ if compiler is not None:
+ meshdir = compiler.get("meshdir")
+ texturedir = compiler.get("texturedir")
+
+ if meshdir:
+ for mesh in root.findall("./asset/mesh"):
+ f = mesh.get("file")
+ if f and "/" not in f and "\\" not in f:
+ mesh.set("file", f"{meshdir}/{f}")
+ compiler.attrib.pop("meshdir", None)
+
+ if texturedir:
+ for tex in root.findall("./asset/texture"):
+ f = tex.get("file")
+ if f and "/" not in f and "\\" not in f:
+ tex.set("file", f"{texturedir}/{f}")
+ compiler.attrib.pop("texturedir", None)
+
+ # Resolve robot include file path.
+ # Prefer new bundle layout: /model.xml
+ if profile and _bundle_model_path(profile).exists():
+ include_file = f"{profile}/model.xml"
+ else:
+ # Legacy behavior: include .xml from data/mujoco_sim root.
+ include_file = f"{profile or robot}.xml"
+ root.insert(0, ET.Element("include", file=include_file))
# Ensure visual/map element exists with znear and zfar
visual = root.find("visual")
diff --git a/dimos/simulation/mujoco/mujoco_process.py b/dimos/simulation/mujoco/mujoco_process.py
index 2363a8abd3..d5275749e0 100755
--- a/dimos/simulation/mujoco/mujoco_process.py
+++ b/dimos/simulation/mujoco/mujoco_process.py
@@ -40,7 +40,7 @@
VIDEO_WIDTH,
)
from dimos.simulation.mujoco.depth_camera import depth_image_to_point_cloud
-from dimos.simulation.mujoco.model import load_model, load_scene_xml
+from dimos.simulation.mujoco.model import load_bundle_json, load_model, load_scene_xml
from dimos.simulation.mujoco.shared_memory import ShmReader
from dimos.utils.logging_config import setup_logger
@@ -76,7 +76,15 @@ def _run_simulation(config: GlobalConfig, shm: ShmReader) -> None:
robot_name = "unitree_go1"
controller = MockController(shm)
- model, data = load_model(controller, robot=robot_name, scene_xml=load_scene_xml(config))
+ # Only use a MuJoCo profile bundle when explicitly requested.
+ # Otherwise fall back to the legacy behavior (menagerie assets + .xml include).
+ profile = config.mujoco_profile
+ model, data = load_model(
+ controller,
+ robot=robot_name,
+ scene_xml=load_scene_xml(config),
+ profile=profile,
+ )
if model is None or data is None:
raise ValueError("Failed to load MuJoCo model: model or data is None")
@@ -95,15 +103,52 @@ def _run_simulation(config: GlobalConfig, shm: ShmReader) -> None:
mujoco.mj_forward(model, data)
- camera_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_CAMERA, "head_camera")
- lidar_camera_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_CAMERA, "lidar_front_camera")
- lidar_left_camera_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_CAMERA, "lidar_left_camera")
- lidar_right_camera_id = mujoco.mj_name2id(
- model, mujoco.mjtObj.mjOBJ_CAMERA, "lidar_right_camera"
+ # Camera naming can differ per profile bundle. If bundle.json exists, use it.
+ bundle_cfg = load_bundle_json(profile) if profile else None
+ rgb_cam_name = (
+ str(bundle_cfg.get("rgb_camera")) # type: ignore[union-attr]
+ if bundle_cfg and "rgb_camera" in bundle_cfg
+ else "head_camera"
+ )
+ lidar_front_name = (
+ str(bundle_cfg.get("lidar_front_camera")) # type: ignore[union-attr]
+ if bundle_cfg and "lidar_front_camera" in bundle_cfg
+ else "lidar_front_camera"
+ )
+ lidar_left_name = (
+ str(bundle_cfg.get("lidar_left_camera")) # type: ignore[union-attr]
+ if bundle_cfg and "lidar_left_camera" in bundle_cfg
+ else "lidar_left_camera"
)
+ lidar_right_name = (
+ str(bundle_cfg.get("lidar_right_camera")) # type: ignore[union-attr]
+ if bundle_cfg and "lidar_right_camera" in bundle_cfg
+ else "lidar_right_camera"
+ )
+
+ camera_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_CAMERA, rgb_cam_name)
+ lidar_camera_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_CAMERA, lidar_front_name)
+ lidar_left_camera_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_CAMERA, lidar_left_name)
+ lidar_right_camera_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_CAMERA, lidar_right_name)
shm.signal_ready()
+ # Lightweight profiler: log rolling averages of time spent in the MuJoCo subprocess.
+ # Includes physics stepping, rendering, pointcloud conversion, SHM writes, and policy inference.
+ profiler_enabled = bool(getattr(config, "mujoco_profiler", False))
+ profiler_interval_s = float(getattr(config, "mujoco_profiler_interval_s", 2.0))
+ if profiler_enabled:
+ from dimos.simulation.mujoco import policy as mujoco_policy
+
+ mujoco_policy.set_mujoco_profiler_enabled(True)
+ logger.info(
+ "MuJoCo profiler enabled",
+ interval_s=profiler_interval_s,
+ video_fps=VIDEO_FPS,
+ lidar_fps=LIDAR_FPS,
+ steps_per_frame=config.mujoco_steps_per_frame,
+ )
+
with viewer.launch_passive(model, data, show_left_ui=False, show_right_ui=False) as m_viewer:
camera_size = (VIDEO_WIDTH, VIDEO_HEIGHT)
@@ -131,32 +176,57 @@ def _run_simulation(config: GlobalConfig, shm: ShmReader) -> None:
m_viewer.cam.azimuth = config.mujoco_camera_position_float[4]
m_viewer.cam.elevation = config.mujoco_camera_position_float[5]
+ # Profiler accumulators (seconds).
+ acc_frames = 0
+ acc_step_s = 0.0
+ acc_sync_s = 0.0
+ acc_odom_s = 0.0
+ acc_rgb_render_s = 0.0
+ acc_rgb_shm_s = 0.0
+ acc_depth_render_s = 0.0
+ acc_depth_shm_s = 0.0
+ acc_pcd_s = 0.0
+ acc_lidar_shm_s = 0.0
+ next_report_t = time.perf_counter() + profiler_interval_s
+
while m_viewer.is_running() and not shm.should_stop():
step_start = time.time()
+ time.perf_counter()
# Step simulation
+ t0 = time.perf_counter()
for _ in range(config.mujoco_steps_per_frame):
mujoco.mj_step(model, data)
+ acc_step_s += time.perf_counter() - t0
+ t0 = time.perf_counter()
m_viewer.sync()
+ acc_sync_s += time.perf_counter() - t0
# Always update odometry
+ t0 = time.perf_counter()
pos = data.qpos[0:3].copy()
quat = data.qpos[3:7].copy() # (w, x, y, z)
shm.write_odom(pos, quat, time.time())
+ acc_odom_s += time.perf_counter() - t0
current_time = time.time()
# Video rendering
if current_time - last_video_time >= video_interval:
+ t0 = time.perf_counter()
rgb_renderer.update_scene(data, camera=camera_id, scene_option=scene_option)
pixels = rgb_renderer.render()
+ acc_rgb_render_s += time.perf_counter() - t0
+ t0 = time.perf_counter()
shm.write_video(pixels)
+ acc_rgb_shm_s += time.perf_counter() - t0
last_video_time = current_time
# Lidar/depth rendering
if current_time - last_lidar_time >= lidar_interval:
# Render all depth cameras
+ t0 = time.perf_counter()
depth_renderer.update_scene(data, camera=lidar_camera_id, scene_option=scene_option)
depth_front = depth_renderer.render()
@@ -169,10 +239,14 @@ def _run_simulation(config: GlobalConfig, shm: ShmReader) -> None:
data, camera=lidar_right_camera_id, scene_option=scene_option
)
depth_right = depth_right_renderer.render()
+ acc_depth_render_s += time.perf_counter() - t0
+ t0 = time.perf_counter()
shm.write_depth(depth_front, depth_left, depth_right)
+ acc_depth_shm_s += time.perf_counter() - t0
# Process depth images into lidar message
+ t0 = time.perf_counter()
all_points = []
cameras_data = [
(
@@ -211,7 +285,12 @@ def _run_simulation(config: GlobalConfig, shm: ShmReader) -> None:
origin=Vector3(pos[0], pos[1], pos[2]),
resolution=LIDAR_RESOLUTION,
)
+ acc_pcd_s += time.perf_counter() - t0
+ t0 = time.perf_counter()
shm.write_lidar(lidar_msg)
+ acc_lidar_shm_s += time.perf_counter() - t0
+ else:
+ acc_pcd_s += time.perf_counter() - t0
last_lidar_time = current_time
@@ -220,6 +299,51 @@ def _run_simulation(config: GlobalConfig, shm: ShmReader) -> None:
if time_until_next_step > 0:
time.sleep(time_until_next_step)
+ if profiler_enabled:
+ acc_frames += 1
+ now = time.perf_counter()
+ if now >= next_report_t:
+ from dimos.simulation.mujoco import policy as mujoco_policy
+
+ pol = mujoco_policy.get_mujoco_profiler_and_reset()
+ calls = int(pol.get("control_calls", 0))
+ ctrl_ms = (float(pol.get("control_total_s", 0.0)) * 1000.0) / max(calls, 1)
+ obs_ms = (float(pol.get("obs_total_s", 0.0)) * 1000.0) / max(calls, 1)
+ onnx_ms = (float(pol.get("onnx_total_s", 0.0)) * 1000.0) / max(calls, 1)
+
+ def per_frame_ms(total_s: float) -> float:
+ return (total_s * 1000.0) / max(acc_frames, 1)
+
+ logger.info(
+ "MuJoCo perf (avg ms/frame)",
+ frames=acc_frames,
+ physics_ms=per_frame_ms(acc_step_s),
+ viewer_sync_ms=per_frame_ms(acc_sync_s),
+ odom_ms=per_frame_ms(acc_odom_s),
+ rgb_render_ms=per_frame_ms(acc_rgb_render_s),
+ rgb_shm_ms=per_frame_ms(acc_rgb_shm_s),
+ depth_render_ms=per_frame_ms(acc_depth_render_s),
+ depth_shm_ms=per_frame_ms(acc_depth_shm_s),
+ pcd_ms=per_frame_ms(acc_pcd_s),
+ lidar_shm_ms=per_frame_ms(acc_lidar_shm_s),
+ ctrl_calls=calls,
+ ctrl_total_ms_per_call=ctrl_ms,
+ ctrl_obs_ms_per_call=obs_ms,
+ ctrl_onnx_ms_per_call=onnx_ms,
+ )
+
+ acc_frames = 0
+ acc_step_s = 0.0
+ acc_sync_s = 0.0
+ acc_odom_s = 0.0
+ acc_rgb_render_s = 0.0
+ acc_rgb_shm_s = 0.0
+ acc_depth_render_s = 0.0
+ acc_depth_shm_s = 0.0
+ acc_pcd_s = 0.0
+ acc_lidar_shm_s = 0.0
+ next_report_t = now + profiler_interval_s
+
if __name__ == "__main__":
diff --git a/dimos/simulation/mujoco/policy.py b/dimos/simulation/mujoco/policy.py
index 212c7ac60a..7797101bea 100644
--- a/dimos/simulation/mujoco/policy.py
+++ b/dimos/simulation/mujoco/policy.py
@@ -16,6 +16,7 @@
from abc import ABC, abstractmethod
+import time
from typing import Any
import mujoco
@@ -27,6 +28,41 @@
logger = setup_logger()
+_MUJOCO_PROFILER_ENABLED = False
+_MUJOCO_PROF: dict[str, float | int] = {
+ "control_calls": 0,
+ "control_total_s": 0.0,
+ "obs_total_s": 0.0,
+ "onnx_total_s": 0.0,
+}
+
+
+def set_mujoco_profiler_enabled(enabled: bool) -> None:
+ global _MUJOCO_PROFILER_ENABLED
+ _MUJOCO_PROFILER_ENABLED = enabled
+
+
+def get_mujoco_profiler_and_reset() -> dict[str, float | int]:
+ global _MUJOCO_PROF
+ out = dict(_MUJOCO_PROF)
+ _MUJOCO_PROF = {
+ "control_calls": 0,
+ "control_total_s": 0.0,
+ "obs_total_s": 0.0,
+ "onnx_total_s": 0.0,
+ }
+ return out
+
+
+def _parse_csv_floats(s: str) -> np.ndarray[Any, Any]:
+ parts = [p.strip() for p in s.split(",") if p.strip()]
+ return np.array([float(p) for p in parts], dtype=np.float32)
+
+
+def _parse_csv_strings(s: str) -> list[str]:
+ # Metadata is written as comma-separated values. Joint names do not contain commas.
+ return [p.strip() for p in s.split(",") if p.strip()]
+
class OnnxController(ABC):
def __init__(
@@ -39,9 +75,27 @@ def __init__(
ctrl_dt: float | None = None,
drift_compensation: list[float] | None = None,
) -> None:
- self._output_names = ["continuous_actions"]
+ # Load policy. Prefer available providers (CPU-only by default, GPU if installed).
self._policy = ort.InferenceSession(policy_path, providers=ort.get_available_providers())
- logger.info(f"Loaded policy: {policy_path} with providers: {self._policy.get_providers()}")
+
+ # Support multiple exporter conventions. Prefer "continuous_actions" (older exports),
+ # then "actions" (RSL-RL/IsaacLab default), and fall back to the first output.
+ outputs = [o.name for o in self._policy.get_outputs()]
+ if "continuous_actions" in outputs:
+ self._output_names = ["continuous_actions"]
+ elif "actions" in outputs:
+ self._output_names = ["actions"]
+ elif outputs:
+ self._output_names = [outputs[0]]
+ else:
+ raise ValueError(f"ONNX policy has no outputs: {policy_path}")
+
+ logger.info(
+ "Loaded policy",
+ policy_path=policy_path,
+ providers=self._policy.get_providers(),
+ outputs=self._output_names,
+ )
self._action_scale = action_scale
self._default_angles = default_angles
@@ -59,7 +113,26 @@ def get_obs(self, model: mujoco.MjModel, data: mujoco.MjData) -> np.ndarray[Any,
def get_control(self, model: mujoco.MjModel, data: mujoco.MjData) -> None:
self._counter += 1
- if self._counter % self._n_substeps == 0:
+ if self._counter % self._n_substeps != 0:
+ return
+
+ if _MUJOCO_PROFILER_ENABLED:
+ t0 = time.perf_counter()
+ obs = self.get_obs(model, data)
+ t_obs = time.perf_counter()
+ onnx_input = {"obs": obs.reshape(1, -1)}
+ onnx_pred = self._policy.run(self._output_names, onnx_input)[0][0]
+ t_onnx = time.perf_counter()
+ self._last_action = onnx_pred.copy()
+ data.ctrl[:] = onnx_pred * self._action_scale + self._default_angles
+ self._post_control_update()
+ t1 = time.perf_counter()
+
+ _MUJOCO_PROF["control_calls"] = int(_MUJOCO_PROF["control_calls"]) + 1
+ _MUJOCO_PROF["control_total_s"] = float(_MUJOCO_PROF["control_total_s"]) + (t1 - t0)
+ _MUJOCO_PROF["obs_total_s"] = float(_MUJOCO_PROF["obs_total_s"]) + (t_obs - t0)
+ _MUJOCO_PROF["onnx_total_s"] = float(_MUJOCO_PROF["onnx_total_s"]) + (t_onnx - t_obs)
+ else:
obs = self.get_obs(model, data)
onnx_input = {"obs": obs.reshape(1, -1)}
onnx_pred = self._policy.run(self._output_names, onnx_input)[0][0]
@@ -149,3 +222,189 @@ def get_obs(self, model: mujoco.MjModel, data: mujoco.MjData) -> np.ndarray[Any,
def _post_control_update(self) -> None:
phase_tp1 = self._phase + self._phase_dt
self._phase = np.fmod(phase_tp1 + np.pi, 2 * np.pi) - np.pi
+
+
+class MjlabVelocityOnnxController(OnnxController):
+ """MJLab velocity-policy interface for G1.
+
+ Matches MJLab `Velocity` policy observation layout (99 dims):
+ [imu_lin_vel(3), imu_ang_vel(3), projected_gravity(3),
+ joint_pos_rel(N), joint_vel_rel(N), last_action(N), command(3)]
+ """
+
+ def __init__(
+ self,
+ policy_path: str,
+ default_angles: np.ndarray[Any, Any],
+ n_substeps: int,
+ action_scale: float,
+ input_controller: InputController,
+ ctrl_dt: float | None = None,
+ *,
+ imu_site_name: str = "imu_in_pelvis",
+ gravity_vec_w: np.ndarray[Any, Any] | None = None,
+ ) -> None:
+ del ctrl_dt # Unused (kept for compatibility with existing load_model params).
+ super().__init__(
+ policy_path=policy_path,
+ default_angles=default_angles,
+ n_substeps=n_substeps,
+ action_scale=action_scale,
+ input_controller=input_controller,
+ )
+ self._imu_site_name = imu_site_name
+ self._gravity_w = (
+ gravity_vec_w
+ if gravity_vec_w is not None
+ else np.array([0.0, 0.0, -1.0], dtype=np.float32)
+ )
+
+ # MJLab exporter can embed metadata required to reproduce the exact action contract.
+ meta = self._policy.get_modelmeta().custom_metadata_map
+ self._policy_joint_names: list[str] | None = None
+ self._default_joint_pos_policy: np.ndarray[Any, Any] | None = None
+ self._action_scale_policy: np.ndarray[Any, Any] | None = None
+ if "joint_names" in meta and "default_joint_pos" in meta and "action_scale" in meta:
+ self._policy_joint_names = _parse_csv_strings(meta["joint_names"])
+ self._default_joint_pos_policy = _parse_csv_floats(meta["default_joint_pos"])
+ self._action_scale_policy = _parse_csv_floats(meta["action_scale"])
+
+ # Lazy-built mappings (depend on the loaded MuJoCo model).
+ self._policy_qpos_adr: np.ndarray[Any, Any] | None = None
+ self._policy_qvel_adr: np.ndarray[Any, Any] | None = None
+ self._ctrl_policy_idx: np.ndarray[Any, Any] | None = None
+
+ def get_obs(self, model: mujoco.MjModel, data: mujoco.MjData) -> np.ndarray[Any, Any]:
+ self._ensure_mappings(model)
+ # Linear/angular velocity sensors (names match MJLab G1 XML).
+ linvel = data.sensor("imu_lin_vel").data
+ gyro = data.sensor("imu_ang_vel").data
+
+ # Projected gravity in body frame using IMU site orientation.
+ imu_xmat = data.site_xmat[model.site(self._imu_site_name).id].reshape(3, 3)
+ gravity_b = imu_xmat.T @ self._gravity_w
+
+ assert self._default_joint_pos_policy is not None
+ assert self._policy_qpos_adr is not None
+ assert self._policy_qvel_adr is not None
+
+ # Joint state relative to MJLab default, in MJLab joint order.
+ joint_pos = data.qpos[self._policy_qpos_adr]
+ joint_angles = joint_pos - self._default_joint_pos_policy
+ joint_velocities = data.qvel[self._policy_qvel_adr]
+
+ obs = np.hstack(
+ [
+ linvel,
+ gyro,
+ gravity_b,
+ joint_angles,
+ joint_velocities,
+ self._last_action,
+ self._input_controller.get_command(),
+ ]
+ )
+ return obs.astype(np.float32)
+
+ def _ensure_mappings(self, model: mujoco.MjModel) -> None:
+ """Build joint/actuator mappings to exactly match MJLab joint ordering."""
+ if self._policy_joint_names is None:
+ raise ValueError(
+ "MJLab profile requires ONNX metadata keys: joint_names, default_joint_pos, action_scale."
+ )
+ if self._policy_qpos_adr is not None:
+ return
+
+ names = self._policy_joint_names
+ default = self._default_joint_pos_policy
+ scale = self._action_scale_policy
+ assert default is not None and scale is not None
+
+ if len(names) != len(default) or len(names) != len(scale):
+ raise ValueError(
+ f"ONNX metadata size mismatch: len(joint_names)={len(names)}, "
+ f"len(default_joint_pos)={len(default)}, len(action_scale)={len(scale)}"
+ )
+
+ # Build qpos/qvel address arrays in policy joint order.
+ qpos_adr: list[int] = []
+ qvel_adr: list[int] = []
+ for jname in names:
+ jid = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_JOINT, jname)
+ if jid < 0:
+ raise ValueError(f"Policy joint '{jname}' not found in MuJoCo model joints.")
+ qpos_adr.append(int(model.jnt_qposadr[jid]))
+ qvel_adr.append(int(model.jnt_dofadr[jid]))
+ self._policy_qpos_adr = np.array(qpos_adr, dtype=np.int32)
+ self._policy_qvel_adr = np.array(qvel_adr, dtype=np.int32)
+
+ # Build actuator ctrl->policy index mapping so actions are applied to the correct joints.
+ index_by_name = {n: i for i, n in enumerate(names)}
+ ctrl_policy_idx: list[int] = []
+ for act_id in range(model.nu):
+ # For joint actuators, actuator_trnid[act_id, 0] is the joint id.
+ jid = int(model.actuator_trnid[act_id, 0])
+ jname = mujoco.mj_id2name(model, mujoco.mjtObj.mjOBJ_JOINT, jid)
+ if jname is None or jname not in index_by_name:
+ raise ValueError(
+ f"Actuator {act_id} joint '{jname}' not present in policy joint_names metadata."
+ )
+ ctrl_policy_idx.append(index_by_name[jname])
+ self._ctrl_policy_idx = np.array(ctrl_policy_idx, dtype=np.int32)
+
+ def get_control(self, model: mujoco.MjModel, data: mujoco.MjData) -> None:
+ """Override: apply MJLab action contract (name-based remap + per-joint scaling)."""
+ self._counter += 1
+ if self._counter % self._n_substeps != 0:
+ return
+
+ if _MUJOCO_PROFILER_ENABLED:
+ t0 = time.perf_counter()
+
+ self._ensure_mappings(model)
+ assert self._ctrl_policy_idx is not None
+ assert self._default_joint_pos_policy is not None
+ assert self._action_scale_policy is not None
+
+ obs = self.get_obs(model, data)
+ t_obs = time.perf_counter()
+ onnx_input = {"obs": obs.reshape(1, -1)}
+ onnx_pred = self._policy.run(self._output_names, onnx_input)[0][0].astype(np.float32)
+ t_onnx = time.perf_counter()
+
+ # Store last action in MJLab policy order (for next observation).
+ self._last_action = onnx_pred.copy()
+
+ # Apply control in MuJoCo actuator order.
+ idx = self._ctrl_policy_idx
+ targets = (
+ self._default_joint_pos_policy[idx]
+ + onnx_pred[idx] * self._action_scale_policy[idx]
+ )
+ data.ctrl[:] = targets
+ t1 = time.perf_counter()
+
+ _MUJOCO_PROF["control_calls"] = int(_MUJOCO_PROF["control_calls"]) + 1
+ _MUJOCO_PROF["control_total_s"] = float(_MUJOCO_PROF["control_total_s"]) + (t1 - t0)
+ _MUJOCO_PROF["obs_total_s"] = float(_MUJOCO_PROF["obs_total_s"]) + (t_obs - t0)
+ _MUJOCO_PROF["onnx_total_s"] = float(_MUJOCO_PROF["onnx_total_s"]) + (t_onnx - t_obs)
+ else:
+ self._ensure_mappings(model)
+ assert self._ctrl_policy_idx is not None
+ assert self._default_joint_pos_policy is not None
+ assert self._action_scale_policy is not None
+
+ obs = self.get_obs(model, data)
+ onnx_input = {"obs": obs.reshape(1, -1)}
+ onnx_pred = self._policy.run(self._output_names, onnx_input)[0][0].astype(np.float32)
+
+ # Store last action in MJLab policy order (for next observation).
+ self._last_action = onnx_pred.copy()
+
+ # Apply control in MuJoCo actuator order.
+ idx = self._ctrl_policy_idx
+ targets = (
+ self._default_joint_pos_policy[idx]
+ + onnx_pred[idx] * self._action_scale_policy[idx]
+ )
+ data.ctrl[:] = targets
diff --git a/dimos/web/templates/rerun_dashboard.html b/dimos/web/templates/rerun_dashboard.html
index 9917d9d2af..f0792079e3 100644
--- a/dimos/web/templates/rerun_dashboard.html
+++ b/dimos/web/templates/rerun_dashboard.html
@@ -6,15 +6,78 @@
* { margin: 0; padding: 0; box-sizing: border-box; }
html, body { width: 100%; height: 100%; overflow: hidden; }
body { background: #0a0a0f; font-family: -apple-system, system-ui, sans-serif; }
+ :root { --command-center-width: max(30vw, 35rem); }
.container { display: flex; width: 100%; height: 100%; }
- .rerun { flex: 1; border: none; }
- .command-center { width: 400px; border: none; border-left: 1px solid #333; }
+ .command-center {
+ width: var(--command-center-width);
+ min-width: 16rem;
+ border: none;
+ border-right: 1px solid #333;
+ }
+ .rerun { flex: 1 1 auto; border: none; min-width: 0; }
+ .divider {
+ width: 6px;
+ background: linear-gradient(180deg, #202530 0%, #141824 100%);
+ cursor: col-resize;
+ border-left: 1px solid #0f1016;
+ border-right: 1px solid #0f1016;
+ }
+ .divider:hover { background: #2a3140; }
+ .divider.dragging { background: #3a4458; }
+ body.dragging { user-select: none; cursor: col-resize; }
+