From d60767daf664911a5a82b2d5d7842b5700253816 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 15:46:19 +0800 Subject: [PATCH 01/13] docs: update transports documentation with encode/decode mixins and module examples - Add encode/decode mixins section with PickleEncoderMixin, LCMEncoderMixin, JpegEncoderMixin - Add working example of deploying modules with shared memory transport - Add ROS and DDS to available transports table - Add doclinks CLI entry point to pyproject.toml - Fix webcam.py to import CameraInfo from dimos.msgs (has with_ts method) instead of dimos_lcm --- dimos/hardware/sensors/camera/webcam.py | 3 +- docs/concepts/transports.md | 379 ++++++++++-------------- pyproject.toml | 1 + 3 files changed, 153 insertions(+), 230 deletions(-) diff --git a/dimos/hardware/sensors/camera/webcam.py b/dimos/hardware/sensors/camera/webcam.py index 54989ca568..51199624fe 100644 --- a/dimos/hardware/sensors/camera/webcam.py +++ b/dimos/hardware/sensors/camera/webcam.py @@ -19,12 +19,11 @@ from typing import Literal import cv2 -from dimos_lcm.sensor_msgs import CameraInfo from reactivex import create from reactivex.observable import Observable from dimos.hardware.sensors.camera.spec import CameraConfig, CameraHardware -from dimos.msgs.sensor_msgs import Image +from dimos.msgs.sensor_msgs import CameraInfo, Image from dimos.msgs.sensor_msgs.Image import ImageFormat from dimos.utils.reactive import backpressure diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index 62279b6baf..03cea3e280 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -3,11 +3,29 @@ Transports enable communication between [modules](modules.md) across process boundaries and networks. When modules run in different processes or on different machines, they need a transport layer to exchange messages. -While the interface is called "PubSub", transports aren't limited to traditional pub-sub services. A topic can be anything that identifies a communication channel: an IP address and port, a shared memory segment name, a file path, or a Redis channel. The abstraction is flexible enough to support any communication pattern that can publish and subscribe to named channels. +## Unconventional Transports -## The PubSub Interface +Transports (theoretically) aren't limited to traditional pub-sub services. By implementing just broadcast and subscribe functions on `Transport` at [`core/stream.py`](/dimos/core/stream.py#L83) subclass you can do anything. -At the core of all transports is the `PubSub` abstract class. Any transport implementation must provide two methods: +Your arguments to Transport can be an IP address and port, a shared memory segment name, a file path, or a Redis channel. + +For example rebinding an existing go2 blueprint to use (imagined) TCP transport for lidar (each module that requires lidar data would connect via TCP to this ip. + + +```python skip +from dimos.robot.unitree_webrtc.unitree_go2_blueprints import nav + +# use TCP for lidar data (each module individually would establish a tcp connection) +ros = nav.transports( + {("lidar", PointCloud2): TCPTransport(ip="10.10.10.1",port=1414)} +) +``` + +subscribe() of your TCP transport just needs to return a standard PointCloud2 object, we don't care how you transport, encode or construct it, though all our types provide `lcm_encode` and `lcm_decode` functions for (faster then pickle and language agnostic) binary encoding. for more info on this, check [lcm](/docs/concepts/lcm.md) + +## PubSub Transports + +For now at dimos, we've been using exclusively PubSub protocols for our transports `PubSub` abstract class is what those protocol implementations conform to. ```python session=pubsub_demo ansi=false from dimos.protocol.pubsub.spec import PubSub @@ -18,20 +36,32 @@ print(inspect.getsource(PubSub.publish)) print(inspect.getsource(PubSub.subscribe)) ``` - + ``` -Session process exited unexpectedly: -/home/lesh/coding/dimos/.venv/bin/python3: No module named md_babel_py.session_server - + @abstractmethod + def publish(self, topic: TopicT, message: MsgT) -> None: + """Publish a message to a topic.""" + ... + + @abstractmethod + def subscribe( + self, topic: TopicT, callback: Callable[[MsgT, TopicT], None] + ) -> Callable[[], None]: + """Subscribe to a topic with a callback. returns unsubscribe function""" + ... ``` +So new protocols are very easy to implement. + Key points: - `publish(topic, message)` - Send a message to all subscribers on a topic - `subscribe(topic, callback)` - Register a callback, returns an unsubscribe function +We don't tell you what topic type actually is, it can be a complex configuration object, and we also don't tell you what a message is. it can be bytes, json etc. If you accept `DimosMsg` [`msgs/protocol.py`](/dimos/msgs/protocol.py#L19) you'll be able to transport any dimos msg, but we don't prevent you from implementing bytes only transports, video streams etc + ## Implementing a Simple Transport -The simplest transport is `Memory`, which works within a single process: +The simplest toy transport is `Memory`, which works within a single process, start from there, ```python session=memory_demo ansi=false from dimos.protocol.pubsub.memory import Memory @@ -66,16 +96,83 @@ Received 2 messages: The full implementation is minimal. See [`memory.py`](/dimos/protocol/pubsub/memory.py) for the complete source. +## Encode/Decode Mixins + +Transports often need to serialize messages before sending and deserialize after receiving. The `PubSubEncoderMixin` at [`pubsub/spec.py`](/dimos/protocol/pubsub/spec.py#L95) provides a clean way to add encoding/decoding to any pubsub implementation. + +### Available Mixins + +| Mixin | Encoding | Use Case | +|--------------------------------|-----------------|---------------------------------------| +| `PickleEncoderMixin` | Python pickle | Any Python object, same-language only | +| `LCMEncoderMixin` | LCM binary | Cross-language (C/C++/Python/Java/Go) | +| `JpegEncoderMixin` | JPEG compressed | Image data, reduces bandwidth | + +The `LCMEncoderMixin` is especially powerful because LCM messages encode to compact binary that works across languages. This means you can use LCM message definitions with *any* transport - not just LCM's UDP multicast. See [lcm](/docs/concepts/lcm.md) for details on LCM message types. + +### Creating a Custom Mixin + +Subclass `PubSubEncoderMixin` and implement `encode()` and `decode()`: + +```python session=jsonencoder +from dimos.protocol.pubsub.spec import PubSubEncoderMixin +import json + +class JsonEncoderMixin(PubSubEncoderMixin[str, dict, bytes]): + def encode(self, msg: dict, topic: str) -> bytes: + return json.dumps(msg).encode('utf-8') + + def decode(self, msg: bytes, topic: str) -> dict: + return json.loads(msg.decode('utf-8')) +``` + +Then combine with a pubsub implementation using multiple inheritance: + +```python session=jsonencoder +from dimos.protocol.pubsub import Memory + +class MyJsonPubSub(JsonEncoderMixin, Memory): + pass +``` + +The mixin automatically wraps `publish()` and `subscribe()` to handle encoding/decoding transparently. Your new transport implementation stays the same - just swap the mixin: + +```python session=jsonencoder +from dimos.protocol.pubsub.spec import PickleEncoderMixin + +# Switch to pickle - just change the mixin +class MyPicklePubSub(PickleEncoderMixin, Memory): + pass +``` + +Same transport, different serialization - no code changes needed in the transport itself. + +## Passing tests + +### Spec + +check [`pubsub/test_spec.py`](/dimos/protocol/pubsub/test_spec.py) for grid tests for your new protocol, make sure to pass those spec tests + +### Benchmark + +We also have fancy benchmark tests that will tell you your max bandwidth, latency, message throughput etc. + +`python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py` + +![Benchmark results](assets/pubsub_benchmark.png) + ## Available Transports Dimos includes several transport implementations: -| Transport | Use Case | Process Boundary | Network | -|-----------|----------|------------------|---------| -| `Memory` | Testing, single process | No | No | -| `SharedMemory` | Multi-process on same machine | Yes | No | -| `LCM` | Network communication (UDP multicast) | Yes | Yes | -| `Redis` | Network communication via Redis server | Yes | Yes | +| Transport | Use Case | Process Boundary | Network | +|----------------|----------------------------------------|------------------|---------| +| `Memory` | Testing, single process | No | No | +| `SharedMemory` | Multi-process on same machine | Yes | No | +| `LCM` | Network communication (UDP multicast) | Yes | Yes | +| `Redis` | Network communication via Redis server | Yes | Yes | +| `ROS` | ROS 2 topic communication | Yes | Yes | +| `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | ### SharedMemory Transport @@ -105,7 +202,7 @@ Received: [{'data': [1, 2, 3]}] ### LCM Transport -For network communication, LCM uses UDP multicast and supports typed messages: +For local network communication, LCM uses UDP multicast and supports typed messages: ```python session=lcm_demo ansi=false from dimos.protocol.pubsub.lcmpubsub import LCM, Topic @@ -138,237 +235,63 @@ Received velocity: x=1.0, y=0.0, z=0.5 - `dimos topic echo /topic` listens on typed channels like `/topic#pkg.Msg` and decodes automatically. - `dimos topic echo /topic TypeName` is the explicit legacy form. -## Encoder Mixins - -Transports can use encoder mixins to serialize messages. The `PubSubEncoderMixin` pattern wraps publish/subscribe to encode/decode automatically: - -```python session=encoder_demo ansi=false -from dimos.protocol.pubsub.spec import PubSubEncoderMixin, PickleEncoderMixin - -# PickleEncoderMixin provides: -# - encode(msg, topic) -> bytes (uses pickle.dumps) -# - decode(bytes, topic) -> msg (uses pickle.loads) - -# Create a transport with pickle encoding by mixing in: -from dimos.protocol.pubsub.memory import Memory - -class PickleMemory(PickleEncoderMixin, Memory): - pass - -bus = PickleMemory() -received = [] -bus.subscribe("data", lambda msg, t: received.append(msg)) -bus.publish("data", {"complex": [1, 2, 3], "nested": {"key": "value"}}) - -print(f"Received: {received[0]}") -``` - - -``` -Received: {'complex': [1, 2, 3], 'nested': {'key': 'value'}} -``` - ## Using Transports with Modules -Modules use the `Transport` wrapper class which adapts `PubSub` to the stream interface. You can set a transport on any module stream: - -```python session=module_transport ansi=false -from dimos.core.transport import pLCMTransport, pSHMTransport - -# Transport wrappers for module streams: -# - pLCMTransport: Pickle-encoded LCM -# - LCMTransport: Native LCM encoding -# - pSHMTransport: Pickle-encoded SharedMemory -# - SHMTransport: Native SharedMemory -# - JpegShmTransport: JPEG-compressed images via SharedMemory -# - JpegLcmTransport: JPEG-compressed images via LCM - -# Example: Set a transport on a module output -# camera.set_transport("color_image", pSHMTransport("camera/color")) -print("Available transport wrappers in dimos.core.transport:") -from dimos.core import transport -print([name for name in dir(transport) if "Transport" in name]) -``` - - -``` -Available transport wrappers in dimos.core.transport: -['JpegLcmTransport', 'JpegShmTransport', 'LCMTransport', 'PubSubTransport', 'SHMTransport', 'ZenohTransport', 'pLCMTransport', 'pSHMTransport'] -``` - -## Testing Custom Transports - -The test suite in [`pubsub/test_spec.py`](/dimos/protocol/pubsub/test_spec.py) uses pytest parametrization to run the same tests against all transport implementations. To add your custom transport to the test grid: - -```python session=test_grid ansi=false -# The test grid pattern from test_spec.py: -test_pattern = """ -from contextlib import contextmanager - -@contextmanager -def my_transport_context(): - transport = MyCustomTransport() - transport.start() - yield transport - transport.stop() - -# Add to testdata list: -testdata.append( - (my_transport_context, "my_topic", ["value1", "value2", "value3"]) -) -""" -print(test_pattern) -``` - - -``` - -from contextlib import contextmanager - -@contextmanager -def my_transport_context(): - transport = MyCustomTransport() - transport.start() - yield transport - transport.stop() - -# Add to testdata list: -testdata.append( - (my_transport_context, "my_topic", ["value1", "value2", "value3"]) -) - -``` - -The test suite validates: -- Basic publish/subscribe -- Multiple subscribers receiving the same message -- Unsubscribe functionality -- Multiple messages in order -- Async iteration -- High-volume message handling (10,000 messages) - -Run the tests with: -```bash -pytest dimos/protocol/pubsub/test_spec.py -v -``` - -## Creating a Custom Transport - -To implement a new transport: +Every module stream can use a different transport. Set `.transport` on the stream before starting the module: -1. **Subclass `PubSub`** and implement `publish()` and `subscribe()` -2. **Add encoding** if needed via `PubSubEncoderMixin` -3. **Create a `Transport` wrapper** by subclassing `PubSubTransport` -4. **Add to the test grid** in `test_spec.py` - -Here's a minimal template: +```python ansi=false +import time -```python session=custom_transport ansi=false -template = ''' -from dimos.protocol.pubsub.spec import PubSub, PickleEncoderMixin -from dimos.core.transport import PubSubTransport +from dimos.core import In, Module, start +from dimos.core.transport import pSHMTransport +from dimos.hardware.sensors.camera.module import CameraModule +from dimos.msgs.sensor_msgs import Image -class MyPubSub(PubSub[str, bytes]): - """Custom pub/sub implementation.""" - def __init__(self): - self._subscribers = {} +# Define a simple listener module +class ImageListener(Module): + image: In[Image] def start(self): - # Initialize connection/resources - pass - - def stop(self): - # Cleanup - pass + super().start() + self.image.subscribe(lambda img: print(f"Received: {img.shape}")) - def publish(self, topic: str, message: bytes) -> None: - # Send message to all subscribers on topic - for cb in self._subscribers.get(topic, []): - cb(message, topic) - def subscribe(self, topic, callback): - # Register callback, return unsubscribe function - if topic not in self._subscribers: - self._subscribers[topic] = [] - self._subscribers[topic].append(callback) +if __name__ == "__main__": + # Start cluster and deploy modules to separate processes + dimos = start(2) - def unsubscribe(): - self._subscribers[topic].remove(callback) - return unsubscribe + camera = dimos.deploy(CameraModule, frequency=2.0) + listener = dimos.deploy(ImageListener) + # Connect via shared memory transport (pSHMTransport uses pickle encoding) + camera.color_image.transport = pSHMTransport("/camera/rgb") + listener.image.transport = pSHMTransport("/camera/rgb") -# With pickle encoding -class MyPicklePubSub(PickleEncoderMixin, MyPubSub): - pass - + # Start both modules + camera.start() + listener.start() -# Transport wrapper for use with modules -class MyTransport(PubSubTransport): - def __init__(self, topic: str): - super().__init__(topic) - self.pubsub = MyPicklePubSub() + time.sleep(2) - def broadcast(self, _, msg): - self.pubsub.publish(self.topic, msg) - - def subscribe(self, callback, selfstream=None): - return self.pubsub.subscribe(self.topic, lambda msg, t: callback(msg)) -''' -print(template) + dimos.stop() ``` ``` +Initialized dimos local cluster with 2 workers, memory limit: auto +2026-01-24T07:44:39.770667Z [info ] Deploying module. [dimos/core/__init__.py] module=CameraModule +2026-01-24T07:44:39.805460Z [info ] Deployed module. [dimos/core/__init__.py] module=CameraModule worker_id=0 +2026-01-24T07:44:39.819562Z [info ] Deploying module. [dimos/core/__init__.py] module=ImageListener +2026-01-24T07:44:39.849461Z [info ] Deployed module. [dimos/core/__init__.py] module=ImageListener worker_id=1 +Received: (480, 640, 3) +Received: (480, 640, 3) +Received: (480, 640, 3) +``` -from dimos.protocol.pubsub.spec import PubSub, PickleEncoderMixin -from dimos.core.transport import PubSubTransport - -class MyPubSub(PubSub[str, bytes]): - """Custom pub/sub implementation.""" - - def __init__(self): - self._subscribers = {} - - def start(self): - # Initialize connection/resources - pass - - def stop(self): - # Cleanup - pass - - def publish(self, topic: str, message: bytes) -> None: - # Send message to all subscribers on topic - for cb in self._subscribers.get(topic, []): - cb(message, topic) - - def subscribe(self, topic, callback): - # Register callback, return unsubscribe function - if topic not in self._subscribers: - self._subscribers[topic] = [] - self._subscribers[topic].append(callback) - - def unsubscribe(): - self._subscribers[topic].remove(callback) - return unsubscribe - - -# With pickle encoding -class MyPicklePubSub(PickleEncoderMixin, MyPubSub): - pass - - -# Transport wrapper for use with modules -class MyTransport(PubSubTransport): - def __init__(self, topic: str): - super().__init__(topic) - self.pubsub = MyPicklePubSub() - - def broadcast(self, _, msg): - self.pubsub.publish(self.topic, msg) - - def subscribe(self, callback, selfstream=None): - return self.pubsub.subscribe(self.topic, lambda msg, t: callback(msg)) +This is useful when you need to: +- Share data between processes on the same machine (SharedMemory) +- Communicate across the network (LCM, Redis) +- Mix transports for different streams (e.g., low-latency for control, high-bandwidth for video) -``` +See [Modules](modules.md) for more on module architecture. diff --git a/pyproject.toml b/pyproject.toml index 5c824bdc16..de6242d746 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ skillspy = "dimos.utils.cli.skillspy.skillspy:main" agentspy = "dimos.utils.cli.agentspy.agentspy:main" humancli = "dimos.utils.cli.human.humanclianim:main" dimos = "dimos.robot.cli.dimos:main" +doclinks = "dimos.utils.docs.doclinks:main" [project.optional-dependencies] misc = [ From dc675a4efcd5bc75125fbe63b885f65d4262db57 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 20:35:20 +0800 Subject: [PATCH 02/13] transports work, blueprints moved, data moved --- docs/{ => api}/data.md | 0 docs/concepts/assets/lcmspy.png | 3 + docs/concepts/assets/pubsub_benchmark.png | 3 + .../concepts/blueprints.md | 0 docs/concepts/transports.md | 334 ++++++++++-------- 5 files changed, 201 insertions(+), 139 deletions(-) rename docs/{ => api}/data.md (100%) create mode 100644 docs/concepts/assets/lcmspy.png create mode 100644 docs/concepts/assets/pubsub_benchmark.png rename dimos/core/README_BLUEPRINTS.md => docs/concepts/blueprints.md (100%) diff --git a/docs/data.md b/docs/api/data.md similarity index 100% rename from docs/data.md rename to docs/api/data.md diff --git a/docs/concepts/assets/lcmspy.png b/docs/concepts/assets/lcmspy.png new file mode 100644 index 0000000000..6e68fde03a --- /dev/null +++ b/docs/concepts/assets/lcmspy.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:91da9ef9f7797cce332da448739e28591f7ecfc0fd674e8b4be973cf28331438 +size 7118 diff --git a/docs/concepts/assets/pubsub_benchmark.png b/docs/concepts/assets/pubsub_benchmark.png new file mode 100644 index 0000000000..759a8b3977 --- /dev/null +++ b/docs/concepts/assets/pubsub_benchmark.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:728484a4358df18ced7b5763a88a962701c2b02b5d319eb9a8b28c6c72d009fe +size 23946 diff --git a/dimos/core/README_BLUEPRINTS.md b/docs/concepts/blueprints.md similarity index 100% rename from dimos/core/README_BLUEPRINTS.md rename to docs/concepts/blueprints.md diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index 03cea3e280..9ce9290b00 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -1,15 +1,131 @@ - -# Dimos Transports +# Intro Transports enable communication between [modules](modules.md) across process boundaries and networks. When modules run in different processes or on different machines, they need a transport layer to exchange messages. -## Unconventional Transports +For example each line in this graph is a Transport, each node is a Module. + +![output](assets/go2_agentic.svg) + +Each of these lines can be a different Transport (protocol) that modules use to communicate with each other. Module transports are always unidirectional. +They can be things like HTTP server, redis, ros dds etc. Generally for most deployments we use the same pubsub protocol, usually LCM or shared memory. + +Modules internally don't know or care how their data is transported. They just emit messages and subscribe to messages. It is a job of a transport to reliably deliver those messages. -Transports (theoretically) aren't limited to traditional pub-sub services. By implementing just broadcast and subscribe functions on `Transport` at [`core/stream.py`](/dimos/core/stream.py#L83) subclass you can do anything. +Messages can be anything a transport can transport -Your arguments to Transport can be an IP address and port, a shared memory segment name, a file path, or a Redis channel. +# Using Transports with Blueprints -For example rebinding an existing go2 blueprint to use (imagined) TCP transport for lidar (each module that requires lidar data would connect via TCP to this ip. +See [Blueprints](blueprints.md) for more on blueprints API + +From [`unitree_go2_blueprints.py`](/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py) +Here is an example of rebinding some topics used for Unitree GO2 to `ROSTransport` (defined at [`transport.py`](/dimos/core/transport.py#L226)) + +This allows us to view the data with rviz2 + +```python skip +nav = autoconnect( + basic, + voxel_mapper(voxel_size=0.1), + cost_mapper(), + replanning_a_star_planner(), + wavefront_frontier_explorer(), +).global_config(n_dask_workers=6, robot_model="unitree_go2") + +ros = nav.transports( + { + ("lidar", PointCloud2): ROSTransport("lidar", PointCloud2), + ("global_map", PointCloud2): ROSTransport("global_map", PointCloud2), + ("odom", PoseStamped): ROSTransport("odom", PoseStamped), + ("color_image", Image): ROSTransport("color_image", Image), + } +) +``` + +# Using Transports with Modules + +Every module stream can use a different transport. Set `.transport` on the stream before starting the module: + +```python ansi=false +import time + +from dimos.core import In, Module, start +from dimos.core.transport import LCMTransport +from dimos.hardware.sensors.camera.module import CameraModule +from dimos.msgs.sensor_msgs import Image + + +# Define a simple listener module +class ImageListener(Module): + image: In[Image] + + def start(self): + super().start() + self.image.subscribe(lambda img: print(f"Received: {img.shape}")) + + +if __name__ == "__main__": + # Start cluster and deploy modules to separate processes + dimos = start(2) + + camera = dimos.deploy(CameraModule, frequency=2.0) + listener = dimos.deploy(ImageListener) + + # Connect via shared memory transport (pSHMTransport uses pickle encoding) + camera.color_image.transport = LCMTransport("/camera/rgb", Image) + listener.image.connect(camera.color_image) + + # Or we can set manually, these are equivalent. + # Setting manually allows us to implement and run listener in a separate file + # + # listener.image.transport = LCMTransport("/camera/rgb", Image) + + # Start both modules + camera.start() + listener.start() + + time.sleep(2) + + dimos.stop() +``` + + +``` +Initialized dimos local cluster with 2 workers, memory limit: auto +2026-01-24T07:44:39.770667Z [info ] Deploying module. [dimos/core/__init__.py] module=CameraModule +2026-01-24T07:44:39.805460Z [info ] Deployed module. [dimos/core/__init__.py] module=CameraModule worker_id=0 +2026-01-24T07:44:39.819562Z [info ] Deploying module. [dimos/core/__init__.py] module=ImageListener +2026-01-24T07:44:39.849461Z [info ] Deployed module. [dimos/core/__init__.py] module=ImageListener worker_id=1 +Received: (480, 640, 3) +Received: (480, 640, 3) +Received: (480, 640, 3) +``` + +See [Modules](modules.md) for more on module architecture. + +# Inspecting LCM traffic (CLI) + +`lcmspy` shows topic frequency/bandwidth stats. + +![lcmspy](assets/lcmspy.png) + +`dimos topic echo /topic` listens on typed channels like `/topic#pkg.Msg` and decodes automatically. + +```sh +Listening on /camera/rgb (inferring from typed LCM channels like '/camera/rgb#pkg.Msg')... (Ctrl+C to stop) +Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) +Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) +Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) +Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) +Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) +``` + +# Theory + +Transport is implemented by subclassing `Transport` at [`core/stream.py`](/dimos/core/stream.py#L83) and implementing `broadcast` and `subscribe` functions. + +Your init arguments to Transport can be an IP address and port, a shared memory segment name, a file path, or a Redis channel, the way you transfer and encode the data is an implementation detail left to you. You can specify which type of data you are able to transport on a type level. For example Video encoding HTTP transport probaby only takes Image type. TCP channel takes bytes etc. + +Rebinding an existing go2 blueprint to use (imagined) TCP transport for lidar (each module that requires lidar data would connect via TCP to this ip) would look something like this: ```python skip @@ -21,9 +137,11 @@ ros = nav.transports( ) ``` -subscribe() of your TCP transport just needs to return a standard PointCloud2 object, we don't care how you transport, encode or construct it, though all our types provide `lcm_encode` and `lcm_decode` functions for (faster then pickle and language agnostic) binary encoding. for more info on this, check [lcm](/docs/concepts/lcm.md) +subscribe() of your TCP transport needs to return a standard `PointCloud2` object in the case above, we don't care how you transport, encode or construct it, + +If helpful, all our types provide `lcm_encode` and `lcm_decode` functions for (faster then pickle and language agnostic) binary encoding. for more info on this, check [lcm](/docs/concepts/lcm.md) -## PubSub Transports +# Practice (PubSub Transports) For now at dimos, we've been using exclusively PubSub protocols for our transports `PubSub` abstract class is what those protocol implementations conform to. @@ -59,7 +177,67 @@ Key points: We don't tell you what topic type actually is, it can be a complex configuration object, and we also don't tell you what a message is. it can be bytes, json etc. If you accept `DimosMsg` [`msgs/protocol.py`](/dimos/msgs/protocol.py#L19) you'll be able to transport any dimos msg, but we don't prevent you from implementing bytes only transports, video streams etc -## Implementing a Simple Transport +# Using Transports Directly + +## LCM + +LCM is UDP multicast, will work through fast reliable networks on a robot. + +We could easily instantiate and pass messages using a pubsub implementation (though normally you would not do this, and would use module or blueprint level API) + +```python session=lcm_demo ansi=false +from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.msgs.geometry_msgs import Vector3 + +lcm = LCM(autoconf=True) +lcm.start() + +received = [] +topic = Topic(topic="/robot/velocity", Vector3) + +lcm.subscribe(topic, lambda msg, t: received.append(msg)) +lcm.publish(topic, Vector3(1.0, 0.0, 0.5)) + +import time +time.sleep(0.1) + +print(f"Received velocity: x={received[0].x}, y={received[0].y}, z={received[0].z}") +lcm.stop() +``` + + +``` +Received velocity: x=1.0, y=0.0, z=0.5 +``` + +## Shared Memory + +Shared Memory is highest performancce, works only on the same machien as IPC + +```python session=shm_demo ansi=false +from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory + +shm = PickleSharedMemory(prefer="cpu") +shm.start() + +received = [] +shm.subscribe("test/topic", lambda msg, topic: received.append(msg)) +shm.publish("test/topic", {"data": [1, 2, 3]}) + +import time +time.sleep(0.1) # Allow message to propagate + +print(f"Received: {received}") +shm.stop() +``` + + +``` +Received: [{'data': [1, 2, 3]}] +``` + + +# Implementing a Simple Transport The simplest toy transport is `Memory`, which works within a single process, start from there, @@ -96,11 +274,11 @@ Received 2 messages: The full implementation is minimal. See [`memory.py`](/dimos/protocol/pubsub/memory.py) for the complete source. -## Encode/Decode Mixins +# Encode/Decode Mixins Transports often need to serialize messages before sending and deserialize after receiving. The `PubSubEncoderMixin` at [`pubsub/spec.py`](/dimos/protocol/pubsub/spec.py#L95) provides a clean way to add encoding/decoding to any pubsub implementation. -### Available Mixins +## Available Mixins | Mixin | Encoding | Use Case | |--------------------------------|-----------------|---------------------------------------| @@ -110,7 +288,7 @@ Transports often need to serialize messages before sending and deserialize after The `LCMEncoderMixin` is especially powerful because LCM messages encode to compact binary that works across languages. This means you can use LCM message definitions with *any* transport - not just LCM's UDP multicast. See [lcm](/docs/concepts/lcm.md) for details on LCM message types. -### Creating a Custom Mixin +## Creating a Custom Mixin Subclass `PubSubEncoderMixin` and implement `encode()` and `decode()`: @@ -147,13 +325,13 @@ class MyPicklePubSub(PickleEncoderMixin, Memory): Same transport, different serialization - no code changes needed in the transport itself. -## Passing tests +# Passing tests -### Spec +## Spec check [`pubsub/test_spec.py`](/dimos/protocol/pubsub/test_spec.py) for grid tests for your new protocol, make sure to pass those spec tests -### Benchmark +## Benchmarks We also have fancy benchmark tests that will tell you your max bandwidth, latency, message throughput etc. @@ -161,137 +339,15 @@ We also have fancy benchmark tests that will tell you your max bandwidth, latenc ![Benchmark results](assets/pubsub_benchmark.png) -## Available Transports +# Available Transports Dimos includes several transport implementations: | Transport | Use Case | Process Boundary | Network | |----------------|----------------------------------------|------------------|---------| -| `Memory` | Testing, single process | No | No | +| `Memory` | Testing only, single process | No | No | | `SharedMemory` | Multi-process on same machine | Yes | No | | `LCM` | Network communication (UDP multicast) | Yes | Yes | | `Redis` | Network communication via Redis server | Yes | Yes | | `ROS` | ROS 2 topic communication | Yes | Yes | | `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | - -### SharedMemory Transport - -For inter-process communication on the same machine, `SharedMemory` provides high-performance message passing: - -```python session=shm_demo ansi=false -from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory - -shm = PickleSharedMemory(prefer="cpu") -shm.start() - -received = [] -shm.subscribe("test/topic", lambda msg, topic: received.append(msg)) -shm.publish("test/topic", {"data": [1, 2, 3]}) - -import time -time.sleep(0.1) # Allow message to propagate - -print(f"Received: {received}") -shm.stop() -``` - - -``` -Received: [{'data': [1, 2, 3]}] -``` - -### LCM Transport - -For local network communication, LCM uses UDP multicast and supports typed messages: - -```python session=lcm_demo ansi=false -from dimos.protocol.pubsub.lcmpubsub import LCM, Topic -from dimos.msgs.geometry_msgs import Vector3 - -lcm = LCM(autoconf=True) -lcm.start() - -received = [] -topic = Topic(topic="/robot/velocity", lcm_type=Vector3) - -lcm.subscribe(topic, lambda msg, t: received.append(msg)) -lcm.publish(topic, Vector3(1.0, 0.0, 0.5)) - -import time -time.sleep(0.1) - -print(f"Received velocity: x={received[0].x}, y={received[0].y}, z={received[0].z}") -lcm.stop() -``` - - -``` -Received velocity: x=1.0, y=0.0, z=0.5 -``` - -### Inspecting LCM traffic (CLI) - -- `dimos lcmspy` shows topic frequency/bandwidth stats. -- `dimos topic echo /topic` listens on typed channels like `/topic#pkg.Msg` and decodes automatically. -- `dimos topic echo /topic TypeName` is the explicit legacy form. - -## Using Transports with Modules - -Every module stream can use a different transport. Set `.transport` on the stream before starting the module: - -```python ansi=false -import time - -from dimos.core import In, Module, start -from dimos.core.transport import pSHMTransport -from dimos.hardware.sensors.camera.module import CameraModule -from dimos.msgs.sensor_msgs import Image - - -# Define a simple listener module -class ImageListener(Module): - image: In[Image] - - def start(self): - super().start() - self.image.subscribe(lambda img: print(f"Received: {img.shape}")) - - -if __name__ == "__main__": - # Start cluster and deploy modules to separate processes - dimos = start(2) - - camera = dimos.deploy(CameraModule, frequency=2.0) - listener = dimos.deploy(ImageListener) - - # Connect via shared memory transport (pSHMTransport uses pickle encoding) - camera.color_image.transport = pSHMTransport("/camera/rgb") - listener.image.transport = pSHMTransport("/camera/rgb") - - # Start both modules - camera.start() - listener.start() - - time.sleep(2) - - dimos.stop() -``` - - -``` -Initialized dimos local cluster with 2 workers, memory limit: auto -2026-01-24T07:44:39.770667Z [info ] Deploying module. [dimos/core/__init__.py] module=CameraModule -2026-01-24T07:44:39.805460Z [info ] Deployed module. [dimos/core/__init__.py] module=CameraModule worker_id=0 -2026-01-24T07:44:39.819562Z [info ] Deploying module. [dimos/core/__init__.py] module=ImageListener -2026-01-24T07:44:39.849461Z [info ] Deployed module. [dimos/core/__init__.py] module=ImageListener worker_id=1 -Received: (480, 640, 3) -Received: (480, 640, 3) -Received: (480, 640, 3) -``` - -This is useful when you need to: -- Share data between processes on the same machine (SharedMemory) -- Communicate across the network (LCM, Redis) -- Mix transports for different streams (e.g., low-latency for control, high-bandwidth for video) - -See [Modules](modules.md) for more on module architecture. From 1af12044da7bcac700be3e51a87666281ba05c6e Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 20:43:52 +0800 Subject: [PATCH 03/13] more transport wording changes --- docs/concepts/transports.md | 38 +++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index 9ce9290b00..476f792524 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -1,24 +1,27 @@ # Intro -Transports enable communication between [modules](modules.md) across process boundaries and networks. When modules run in different processes or on different machines, they need a transport layer to exchange messages. +Transports enable communication between [modules](modules.md) across process boundaries and networks. -For example each line in this graph is a Transport, each node is a Module. +Each line in this example graph is a Transport, each node is a Module. ![output](assets/go2_agentic.svg) -Each of these lines can be a different Transport (protocol) that modules use to communicate with each other. Module transports are always unidirectional. -They can be things like HTTP server, redis, ros dds etc. Generally for most deployments we use the same pubsub protocol, usually LCM or shared memory. +Each of these lines can be a different Transport (protocol) that modules use to communicate with each other. + +Module transports are always unidirectional (they have a broadcaster and receiver side) + +Implementation wise these are things like HTTP server, redis, ROS DDS etc. Generally for most deployments we use the same pubsub protocol, usually LCM or shared memory. Modules internally don't know or care how their data is transported. They just emit messages and subscribe to messages. It is a job of a transport to reliably deliver those messages. -Messages can be anything a transport can transport +Messages can be anything a transport can transport, so binary data, images, pointclouds etc. Most of the time these are `dimos.msgs` # Using Transports with Blueprints See [Blueprints](blueprints.md) for more on blueprints API From [`unitree_go2_blueprints.py`](/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py) -Here is an example of rebinding some topics used for Unitree GO2 to `ROSTransport` (defined at [`transport.py`](/dimos/core/transport.py#L226)) +Here is an example of rebinding some topics used for Unitree GO2 to `ROSTransport` (defined at [`transport.py`](/dimos/core/transport.py#L226)) from the default `LCMTransport` This allows us to view the data with rviz2 @@ -123,7 +126,11 @@ Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28 Transport is implemented by subclassing `Transport` at [`core/stream.py`](/dimos/core/stream.py#L83) and implementing `broadcast` and `subscribe` functions. -Your init arguments to Transport can be an IP address and port, a shared memory segment name, a file path, or a Redis channel, the way you transfer and encode the data is an implementation detail left to you. You can specify which type of data you are able to transport on a type level. For example Video encoding HTTP transport probaby only takes Image type. TCP channel takes bytes etc. +Your init arguments to Transport can be anything you'd like - an IP address and port, a shared memory segment name, a file path, or a Redis channel, + +The way you transfer and encode the data is an implementation detail left to you. You can specify which type of data you are able to transport on a type level. + +For example Video encoding HTTP transport probaby only takes Image type. TCP channel takes bytes etc. Rebinding an existing go2 blueprint to use (imagined) TCP transport for lidar (each module that requires lidar data would connect via TCP to this ip) would look something like this: @@ -143,7 +150,9 @@ If helpful, all our types provide `lcm_encode` and `lcm_decode` functions for (f # Practice (PubSub Transports) -For now at dimos, we've been using exclusively PubSub protocols for our transports `PubSub` abstract class is what those protocol implementations conform to. +For now at dimos, we've been using exclusively PubSub protocols for our transports. + +`PubSub` abstract class is what those protocols implement, it's just `publish(topic, message)` and `subscribe(topic, callback)` functions ```python session=pubsub_demo ansi=false from dimos.protocol.pubsub.spec import PubSub @@ -155,7 +164,7 @@ print(inspect.getsource(PubSub.subscribe)) ``` -``` +```python @abstractmethod def publish(self, topic: TopicT, message: MsgT) -> None: """Publish a message to a topic.""" @@ -171,20 +180,18 @@ print(inspect.getsource(PubSub.subscribe)) So new protocols are very easy to implement. -Key points: -- `publish(topic, message)` - Send a message to all subscribers on a topic -- `subscribe(topic, callback)` - Register a callback, returns an unsubscribe function +We don't tell you what topic type actually is, it can be a complex configuration object, and we also don't tell you what a message is. it can be bytes, json etc. Generally most of our transports we have are made to transport specifically our ros compatible message types (see [LCM](/docs/concepts/lcm.md)) -We don't tell you what topic type actually is, it can be a complex configuration object, and we also don't tell you what a message is. it can be bytes, json etc. If you accept `DimosMsg` [`msgs/protocol.py`](/dimos/msgs/protocol.py#L19) you'll be able to transport any dimos msg, but we don't prevent you from implementing bytes only transports, video streams etc +But we also have generic pickle transports that will send off any python object # Using Transports Directly +We could easily instantiate and pass messages using a pubsub implementation, though normally you would not do this, and would use module or blueprint level API. + ## LCM LCM is UDP multicast, will work through fast reliable networks on a robot. -We could easily instantiate and pass messages using a pubsub implementation (though normally you would not do this, and would use module or blueprint level API) - ```python session=lcm_demo ansi=false from dimos.protocol.pubsub.lcmpubsub import LCM, Topic from dimos.msgs.geometry_msgs import Vector3 @@ -236,7 +243,6 @@ shm.stop() Received: [{'data': [1, 2, 3]}] ``` - # Implementing a Simple Transport The simplest toy transport is `Memory`, which works within a single process, start from there, From e3560881dd06c2f1d7bfe655b42c6ca245614663 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 20:48:58 +0800 Subject: [PATCH 04/13] doc simplification --- docs/concepts/transports.md | 51 +++++++++++++------------------------ 1 file changed, 18 insertions(+), 33 deletions(-) diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index 476f792524..f55111db8c 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -2,11 +2,9 @@ Transports enable communication between [modules](modules.md) across process boundaries and networks. -Each line in this example graph is a Transport, each node is a Module. +Each line in this graph is a Transport (can be different protocols), each node is a Module: -![output](assets/go2_agentic.svg) - -Each of these lines can be a different Transport (protocol) that modules use to communicate with each other. +![output](assets/go2_basic.svg) Module transports are always unidirectional (they have a broadcaster and receiver side) @@ -16,6 +14,14 @@ Modules internally don't know or care how their data is transported. They just e Messages can be anything a transport can transport, so binary data, images, pointclouds etc. Most of the time these are `dimos.msgs` +# Benchmarks + +Quick view on performance of our transports + +`python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py` + +![Benchmark results](assets/pubsub_benchmark.png) + # Using Transports with Blueprints See [Blueprints](blueprints.md) for more on blueprints API @@ -130,29 +136,15 @@ Your init arguments to Transport can be anything you'd like - an IP address and The way you transfer and encode the data is an implementation detail left to you. You can specify which type of data you are able to transport on a type level. -For example Video encoding HTTP transport probaby only takes Image type. TCP channel takes bytes etc. - -Rebinding an existing go2 blueprint to use (imagined) TCP transport for lidar (each module that requires lidar data would connect via TCP to this ip) would look something like this: - - -```python skip -from dimos.robot.unitree_webrtc.unitree_go2_blueprints import nav - -# use TCP for lidar data (each module individually would establish a tcp connection) -ros = nav.transports( - {("lidar", PointCloud2): TCPTransport(ip="10.10.10.1",port=1414)} -) -``` +For example a video streaming HTTP transport probably only takes Image type, while a TCP channel takes bytes. -subscribe() of your TCP transport needs to return a standard `PointCloud2` object in the case above, we don't care how you transport, encode or construct it, +Your transport's `subscribe()` just needs to return the expected message type - we don't care how you transport, encode, or construct it. If helpful, all our types provide `lcm_encode` and `lcm_decode` functions for (faster then pickle and language agnostic) binary encoding. for more info on this, check [lcm](/docs/concepts/lcm.md) -# Practice (PubSub Transports) +# PubSub Transports -For now at dimos, we've been using exclusively PubSub protocols for our transports. - -`PubSub` abstract class is what those protocols implement, it's just `publish(topic, message)` and `subscribe(topic, callback)` functions +All our transports implement the `PubSub` interface - just `publish(topic, message)` and `subscribe(topic, callback)`: ```python session=pubsub_demo ansi=false from dimos.protocol.pubsub.spec import PubSub @@ -178,15 +170,9 @@ print(inspect.getsource(PubSub.subscribe)) ... ``` -So new protocols are very easy to implement. - -We don't tell you what topic type actually is, it can be a complex configuration object, and we also don't tell you what a message is. it can be bytes, json etc. Generally most of our transports we have are made to transport specifically our ros compatible message types (see [LCM](/docs/concepts/lcm.md)) - -But we also have generic pickle transports that will send off any python object +Topic and message types are flexible - bytes, json, or our ROS-compatible [LCM](/docs/concepts/lcm.md) types. We also have pickle transports for any Python object. -# Using Transports Directly - -We could easily instantiate and pass messages using a pubsub implementation, though normally you would not do this, and would use module or blueprint level API. +You can use pubsub directly (though normally you'd use module/blueprint APIs): ## LCM @@ -219,7 +205,7 @@ Received velocity: x=1.0, y=0.0, z=0.5 ## Shared Memory -Shared Memory is highest performancce, works only on the same machien as IPC +Shared Memory is highest performance, works only on the same machine as IPC ```python session=shm_demo ansi=false from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory @@ -339,11 +325,10 @@ check [`pubsub/test_spec.py`](/dimos/protocol/pubsub/test_spec.py) for grid test ## Benchmarks -We also have fancy benchmark tests that will tell you your max bandwidth, latency, message throughput etc. +Make sure to also benchmark your stuff to see it in context `python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py` -![Benchmark results](assets/pubsub_benchmark.png) # Available Transports From 7a8085eeddee9e4f3cfbaa3bd092d3197d24b0bd Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 20:53:31 +0800 Subject: [PATCH 05/13] linking go2_basic svg correctly --- docs/concepts/transports.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index f55111db8c..3aa581304c 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -4,7 +4,7 @@ Transports enable communication between [modules](modules.md) across process bou Each line in this graph is a Transport (can be different protocols), each node is a Module: -![output](assets/go2_basic.svg) +![go2_basic](assets/go2_basic.svg) Module transports are always unidirectional (they have a broadcaster and receiver side) From 98680282f65a441a17926f63867353fb00a75641 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 21:07:40 +0800 Subject: [PATCH 06/13] abstraction layer map --- docs/concepts/assets/abstraction_layers.svg | 20 +++++++++++ docs/concepts/transports.md | 39 +++++++++++++++++++-- 2 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 docs/concepts/assets/abstraction_layers.svg diff --git a/docs/concepts/assets/abstraction_layers.svg b/docs/concepts/assets/abstraction_layers.svg new file mode 100644 index 0000000000..0903cfbf27 --- /dev/null +++ b/docs/concepts/assets/abstraction_layers.svg @@ -0,0 +1,20 @@ + + +Blueprints + + + +Modules + + + +Transports + + + +PubSub +robot configs +camera, nav +LCM, SHM, ROS +pub/sub API + diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index 3aa581304c..2410653795 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -22,12 +22,48 @@ Quick view on performance of our transports ![Benchmark results](assets/pubsub_benchmark.png) + +# Abstraction Layers + +
Pikchr + +```pikchr output=assets/abstraction_layers.svg fold +color = white +fill = none +linewid = 0.5in +boxwid = 1.0in +boxht = 0.4in + +# Boxes with labels +B: box "Blueprints" rad 10px +arrow +M: box "Modules" rad 5px +arrow +T: box "Transports" rad 5px +arrow +P: box "PubSub" rad 5px + +# Descriptions below +text "robot configs" at B.s + (0.1, -0.2in) +text "camera, nav" at M.s + (0, -0.2in) +text "LCM, SHM, ROS" at T.s + (0, -0.2in) +text "pub/sub API" at P.s + (0, -0.2in) +``` + +
+ +These are the abstraction layers we'll go through one by one + + +![output](assets/abstraction_layers.svg) + # Using Transports with Blueprints See [Blueprints](blueprints.md) for more on blueprints API From [`unitree_go2_blueprints.py`](/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py) -Here is an example of rebinding some topics used for Unitree GO2 to `ROSTransport` (defined at [`transport.py`](/dimos/core/transport.py#L226)) from the default `LCMTransport` + +An example of rebinding some topics used for Unitree GO2 robot from the default `LCMTransport` to `ROSTransport` (defined at [`transport.py`](/dimos/core/transport.py#L226)) This allows us to view the data with rviz2 @@ -49,7 +85,6 @@ ros = nav.transports( } ) ``` - # Using Transports with Modules Every module stream can use a different transport. Set `.transport` on the stream before starting the module: From 16ecf618c904704caaaedf249a679f39fbeff174 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 21:20:10 +0800 Subject: [PATCH 07/13] alt transport doc --- docs/concepts/transports.md | 239 ++++++++++++++++++++---------------- 1 file changed, 131 insertions(+), 108 deletions(-) diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index 2410653795..14e1d611a6 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -1,29 +1,47 @@ -# Intro +# Transports -Transports enable communication between [modules](modules.md) across process boundaries and networks. +Transports connect **module streams** across **process boundaries** and/or **networks**. -Each line in this graph is a Transport (can be different protocols), each node is a Module: +* **Module**: a running component (e.g., camera, mapping, nav). +* **Stream**: a unidirectional flow of messages owned by a module (one broadcaster → many receivers). +* **Topic**: the name/identifier used by a transport or pubsub backend. +* **Message**: payload carried on a stream (often `dimos.msgs.*`, but can be bytes / images / pointclouds / etc.). + +Each edge in the graph is a **transported stream** (potentially different protocols). Each node is a **module**: ![go2_basic](assets/go2_basic.svg) -Module transports are always unidirectional (they have a broadcaster and receiver side) +## What the transport layer guarantees (and what it doesn’t) + +Modules **don’t** know or care *how* data moves. They just: + +* emit messages (broadcast) +* subscribe to messages (receive) + +A transport is responsible for the mechanics of delivery (IPC, sockets, Redis, ROS 2, etc.). + +**Important:** delivery semantics depend on the backend: -Implementation wise these are things like HTTP server, redis, ROS DDS etc. Generally for most deployments we use the same pubsub protocol, usually LCM or shared memory. +* Some are **best-effort** (e.g., UDP multicast / LCM): loss can happen. +* Some can be **reliable** (e.g., TCP-backed, Redis, some DDS configs) but may add latency/backpressure. -Modules internally don't know or care how their data is transported. They just emit messages and subscribe to messages. It is a job of a transport to reliably deliver those messages. +So: treat the API as uniform, but pick a backend whose semantics match the task. -Messages can be anything a transport can transport, so binary data, images, pointclouds etc. Most of the time these are `dimos.msgs` +--- # Benchmarks -Quick view on performance of our transports +Quick view on performance of our pubsub backends: -`python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py` +```sh skip +python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py +``` ![Benchmark results](assets/pubsub_benchmark.png) +--- -# Abstraction Layers +# Abstraction layers
Pikchr @@ -52,20 +70,22 @@ text "pub/sub API" at P.s + (0, -0.2in)
-These are the abstraction layers we'll go through one by one - ![output](assets/abstraction_layers.svg) -# Using Transports with Blueprints +![output](assets/abstraction_layers.svg) + +We’ll go through these layers top-down. + +--- -See [Blueprints](blueprints.md) for more on blueprints API +# Using transports with blueprints -From [`unitree_go2_blueprints.py`](/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py) +See [Blueprints](blueprints.md) for the blueprint API. -An example of rebinding some topics used for Unitree GO2 robot from the default `LCMTransport` to `ROSTransport` (defined at [`transport.py`](/dimos/core/transport.py#L226)) +From [`unitree_go2_blueprints.py`](/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py). -This allows us to view the data with rviz2 +Example: rebind a few streams from the default `LCMTransport` to `ROSTransport` (defined at [`transport.py`](/dimos/core/transport.py#L226)) so you can visualize in **rviz2**. ```python skip nav = autoconnect( @@ -85,9 +105,12 @@ ros = nav.transports( } ) ``` -# Using Transports with Modules -Every module stream can use a different transport. Set `.transport` on the stream before starting the module: +--- + +# Using transports with modules + +Each **stream** on a module can use a different transport. Set `.transport` on the stream **before starting** modules. ```python ansi=false import time @@ -98,7 +121,6 @@ from dimos.hardware.sensors.camera.module import CameraModule from dimos.msgs.sensor_msgs import Image -# Define a simple listener module class ImageListener(Module): image: In[Image] @@ -108,37 +130,32 @@ class ImageListener(Module): if __name__ == "__main__": - # Start cluster and deploy modules to separate processes + # Start local cluster and deploy modules to separate processes dimos = start(2) camera = dimos.deploy(CameraModule, frequency=2.0) listener = dimos.deploy(ImageListener) - # Connect via shared memory transport (pSHMTransport uses pickle encoding) + # Choose a transport for the stream (example: LCM typed channel) camera.color_image.transport = LCMTransport("/camera/rgb", Image) - listener.image.connect(camera.color_image) - # Or we can set manually, these are equivalent. - # Setting manually allows us to implement and run listener in a separate file - # - # listener.image.transport = LCMTransport("/camera/rgb", Image) + # Connect listener input to camera output + listener.image.connect(camera.color_image) - # Start both modules camera.start() listener.start() time.sleep(2) - dimos.stop() ``` ``` Initialized dimos local cluster with 2 workers, memory limit: auto -2026-01-24T07:44:39.770667Z [info ] Deploying module. [dimos/core/__init__.py] module=CameraModule -2026-01-24T07:44:39.805460Z [info ] Deployed module. [dimos/core/__init__.py] module=CameraModule worker_id=0 -2026-01-24T07:44:39.819562Z [info ] Deploying module. [dimos/core/__init__.py] module=ImageListener -2026-01-24T07:44:39.849461Z [info ] Deployed module. [dimos/core/__init__.py] module=ImageListener worker_id=1 +2026-01-24T13:17:50.190559Z [info ] Deploying module. [dimos/core/__init__.py] module=CameraModule +2026-01-24T13:17:50.218466Z [info ] Deployed module. [dimos/core/__init__.py] module=CameraModule worker_id=1 +2026-01-24T13:17:50.229474Z [info ] Deploying module. [dimos/core/__init__.py] module=ImageListener +2026-01-24T13:17:50.250199Z [info ] Deployed module. [dimos/core/__init__.py] module=ImageListener worker_id=0 Received: (480, 640, 3) Received: (480, 640, 3) Received: (480, 640, 3) @@ -146,52 +163,62 @@ Received: (480, 640, 3) See [Modules](modules.md) for more on module architecture. +--- + # Inspecting LCM traffic (CLI) -`lcmspy` shows topic frequency/bandwidth stats. +`lcmspy` shows topic frequency/bandwidth stats: ![lcmspy](assets/lcmspy.png) -`dimos topic echo /topic` listens on typed channels like `/topic#pkg.Msg` and decodes automatically. +`dimos topic echo /topic` listens on typed channels like `/topic#pkg.Msg` and decodes automatically: -```sh +```sh skip Listening on /camera/rgb (inferring from typed LCM channels like '/camera/rgb#pkg.Msg')... (Ctrl+C to stop) Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) -Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) -Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) -Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) -Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) ``` -# Theory +--- -Transport is implemented by subclassing `Transport` at [`core/stream.py`](/dimos/core/stream.py#L83) and implementing `broadcast` and `subscribe` functions. +# Implementing a transport -Your init arguments to Transport can be anything you'd like - an IP address and port, a shared memory segment name, a file path, or a Redis channel, +At the stream layer, a transport is implemented by subclassing `Transport` (see [`core/stream.py`](/dimos/core/stream.py#L83)) and implementing: -The way you transfer and encode the data is an implementation detail left to you. You can specify which type of data you are able to transport on a type level. +* `broadcast(...)` +* `subscribe(...)` -For example a video streaming HTTP transport probably only takes Image type, while a TCP channel takes bytes. +Your `Transport.__init__` args can be anything meaningful for your backend: -Your transport's `subscribe()` just needs to return the expected message type - we don't care how you transport, encode, or construct it. +* `(ip, port)` +* a shared-memory segment name +* a filesystem path +* a Redis channel -If helpful, all our types provide `lcm_encode` and `lcm_decode` functions for (faster then pickle and language agnostic) binary encoding. for more info on this, check [lcm](/docs/concepts/lcm.md) +Encoding is an implementation detail, but we encourage using LCM-compatible message types when possible. -# PubSub Transports +## Encoding helpers -All our transports implement the `PubSub` interface - just `publish(topic, message)` and `subscribe(topic, callback)`: +Many of our message types provide `lcm_encode` / `lcm_decode` for compact, language-agnostic binary encoding (often faster than pickle). For details, see [LCM](/docs/concepts/lcm.md). -```python session=pubsub_demo ansi=false -from dimos.protocol.pubsub.spec import PubSub +--- + +# PubSub transports (practice) + +All our transport backends implement the `PubSub` interface: -# The interface every transport must implement: +* `publish(topic, message)` +* `subscribe(topic, callback) -> unsubscribe` + +```python +from dimos.protocol.pubsub.spec import PubSub import inspect + print(inspect.getsource(PubSub.publish)) print(inspect.getsource(PubSub.subscribe)) ``` -```python +``` @abstractmethod def publish(self, topic: TopicT, message: MsgT) -> None: """Publish a message to a topic.""" @@ -205,15 +232,13 @@ print(inspect.getsource(PubSub.subscribe)) ... ``` -Topic and message types are flexible - bytes, json, or our ROS-compatible [LCM](/docs/concepts/lcm.md) types. We also have pickle transports for any Python object. - -You can use pubsub directly (though normally you'd use module/blueprint APIs): +Topic/message types are flexible: bytes, JSON, or our ROS-compatible [LCM](/docs/concepts/lcm.md) types. We also have pickle-based transports for arbitrary Python objects. -## LCM +## LCM (UDP multicast) -LCM is UDP multicast, will work through fast reliable networks on a robot. +LCM is UDP multicast. It’s very fast on a robot LAN, but it’s **best-effort** (packets can drop). -```python session=lcm_demo ansi=false +```python from dimos.protocol.pubsub.lcmpubsub import LCM, Topic from dimos.msgs.geometry_msgs import Vector3 @@ -221,7 +246,7 @@ lcm = LCM(autoconf=True) lcm.start() received = [] -topic = Topic(topic="/robot/velocity", Vector3) +topic = Topic("/robot/velocity", Vector3) lcm.subscribe(topic, lambda msg, t: received.append(msg)) lcm.publish(topic, Vector3(1.0, 0.0, 0.5)) @@ -238,11 +263,11 @@ lcm.stop() Received velocity: x=1.0, y=0.0, z=0.5 ``` -## Shared Memory +## Shared memory (IPC) -Shared Memory is highest performance, works only on the same machine as IPC +Shared memory is highest performance, but only works on the **same machine**. -```python session=shm_demo ansi=false +```python from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory shm = PickleSharedMemory(prefer="cpu") @@ -253,7 +278,7 @@ shm.subscribe("test/topic", lambda msg, topic: received.append(msg)) shm.publish("test/topic", {"data": [1, 2, 3]}) import time -time.sleep(0.1) # Allow message to propagate +time.sleep(0.1) print(f"Received: {received}") shm.stop() @@ -264,23 +289,20 @@ shm.stop() Received: [{'data': [1, 2, 3]}] ``` -# Implementing a Simple Transport +--- -The simplest toy transport is `Memory`, which works within a single process, start from there, +# A minimal transport: `Memory` -```python session=memory_demo ansi=false +The simplest toy backend is `Memory` (single process). Start from there when implementing a new pubsub backend. + +```python from dimos.protocol.pubsub.memory import Memory -# Create a memory transport bus = Memory() - -# Track received messages received = [] -# Subscribe to a topic unsubscribe = bus.subscribe("sensor/data", lambda msg, topic: received.append(msg)) -# Publish messages bus.publish("sensor/data", {"temperature": 22.5}) bus.publish("sensor/data", {"temperature": 23.0}) @@ -288,7 +310,6 @@ print(f"Received {len(received)} messages:") for msg in received: print(f" {msg}") -# Unsubscribe when done unsubscribe() ``` @@ -299,25 +320,27 @@ Received 2 messages: {'temperature': 23.0} ``` -The full implementation is minimal. See [`memory.py`](/dimos/protocol/pubsub/memory.py) for the complete source. +See [`memory.py`](/dimos/protocol/pubsub/memory.py) for the complete source. + +--- -# Encode/Decode Mixins +# Encode/decode mixins -Transports often need to serialize messages before sending and deserialize after receiving. The `PubSubEncoderMixin` at [`pubsub/spec.py`](/dimos/protocol/pubsub/spec.py#L95) provides a clean way to add encoding/decoding to any pubsub implementation. +Transports often need to serialize messages before sending and deserialize after receiving. -## Available Mixins +`PubSubEncoderMixin` at [`pubsub/spec.py`](/dimos/protocol/pubsub/spec.py#L95) provides a clean way to add encoding/decoding to any pubsub implementation. -| Mixin | Encoding | Use Case | -|--------------------------------|-----------------|---------------------------------------| -| `PickleEncoderMixin` | Python pickle | Any Python object, same-language only | -| `LCMEncoderMixin` | LCM binary | Cross-language (C/C++/Python/Java/Go) | -| `JpegEncoderMixin` | JPEG compressed | Image data, reduces bandwidth | +## Available mixins -The `LCMEncoderMixin` is especially powerful because LCM messages encode to compact binary that works across languages. This means you can use LCM message definitions with *any* transport - not just LCM's UDP multicast. See [lcm](/docs/concepts/lcm.md) for details on LCM message types. +| Mixin | Encoding | Use case | +|----------------------|-----------------|------------------------------------| +| `PickleEncoderMixin` | Python pickle | Any Python object, Python-only | +| `LCMEncoderMixin` | LCM binary | Cross-language (C/C++/Python/Go/…) | +| `JpegEncoderMixin` | JPEG compressed | Image data, reduces bandwidth | -## Creating a Custom Mixin +`LCMEncoderMixin` is especially useful: you can use LCM message definitions with *any* transport (not just UDP multicast). See [LCM](/docs/concepts/lcm.md) for details. -Subclass `PubSubEncoderMixin` and implement `encode()` and `decode()`: +## Creating a custom mixin ```python session=jsonencoder from dimos.protocol.pubsub.spec import PubSubEncoderMixin @@ -325,55 +348,55 @@ import json class JsonEncoderMixin(PubSubEncoderMixin[str, dict, bytes]): def encode(self, msg: dict, topic: str) -> bytes: - return json.dumps(msg).encode('utf-8') + return json.dumps(msg).encode("utf-8") def decode(self, msg: bytes, topic: str) -> dict: - return json.loads(msg.decode('utf-8')) + return json.loads(msg.decode("utf-8")) ``` -Then combine with a pubsub implementation using multiple inheritance: +Combine with a pubsub implementation via multiple inheritance: ```python session=jsonencoder -from dimos.protocol.pubsub import Memory +from dimos.protocol.pubsub.memory import Memory class MyJsonPubSub(JsonEncoderMixin, Memory): pass ``` -The mixin automatically wraps `publish()` and `subscribe()` to handle encoding/decoding transparently. Your new transport implementation stays the same - just swap the mixin: +Swap serialization by changing the mixin: ```python session=jsonencoder from dimos.protocol.pubsub.spec import PickleEncoderMixin -# Switch to pickle - just change the mixin class MyPicklePubSub(PickleEncoderMixin, Memory): pass ``` -Same transport, different serialization - no code changes needed in the transport itself. +--- -# Passing tests +# Testing and benchmarks -## Spec +## Spec tests -check [`pubsub/test_spec.py`](/dimos/protocol/pubsub/test_spec.py) for grid tests for your new protocol, make sure to pass those spec tests +See [`pubsub/test_spec.py`](/dimos/protocol/pubsub/test_spec.py) for the grid tests your new backend should pass. ## Benchmarks -Make sure to also benchmark your stuff to see it in context - -`python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py` +Add your backend to benchmarks to compare in context: +```sh skip +python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py +``` -# Available Transports +--- -Dimos includes several transport implementations: +# Available transports -| Transport | Use Case | Process Boundary | Network | -|----------------|----------------------------------------|------------------|---------| -| `Memory` | Testing only, single process | No | No | -| `SharedMemory` | Multi-process on same machine | Yes | No | -| `LCM` | Network communication (UDP multicast) | Yes | Yes | -| `Redis` | Network communication via Redis server | Yes | Yes | -| `ROS` | ROS 2 topic communication | Yes | Yes | -| `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | +| Transport | Use case | Cross-process | Network | Notes | +|----------------|-------------------------------------|---------------|---------|--------------------------------| +| `Memory` | Testing only, single process | No | No | Minimal reference impl | +| `SharedMemory` | Multi-process on same machine | Yes | No | Highest throughput (IPC) | +| `LCM` | Robot LAN broadcast (UDP multicast) | Yes | Yes | Best-effort; can drop packets | +| `Redis` | Network pubsub via Redis server | Yes | Yes | Central broker; adds hop | +| `ROS` | ROS 2 topic communication | Yes | Yes | Integrates with RViz/ROS tools | +| `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | WIP | From f37c4f74989deec910baf66b8d3e5d5a399e2d48 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 21:26:08 +0800 Subject: [PATCH 08/13] transports.md wrap --- docs/concepts/transports.md | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index 14e1d611a6..b81794fc77 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -73,8 +73,6 @@ text "pub/sub API" at P.s + (0, -0.2in) ![output](assets/abstraction_layers.svg) -![output](assets/abstraction_layers.svg) - We’ll go through these layers top-down. --- @@ -202,9 +200,9 @@ Many of our message types provide `lcm_encode` / `lcm_decode` for compact, langu --- -# PubSub transports (practice) +# PubSub transports -All our transport backends implement the `PubSub` interface: +Even though transport can be anything (TCP connection, unix socket) for now all our transport backends implement the `PubSub` interface. * `publish(topic, message)` * `subscribe(topic, callback) -> unsubscribe` @@ -218,7 +216,7 @@ print(inspect.getsource(PubSub.subscribe)) ``` -``` +```python @abstractmethod def publish(self, topic: TopicT, message: MsgT) -> None: """Publish a message to a topic.""" @@ -237,6 +235,7 @@ Topic/message types are flexible: bytes, JSON, or our ROS-compatible [LCM](/docs ## LCM (UDP multicast) LCM is UDP multicast. It’s very fast on a robot LAN, but it’s **best-effort** (packets can drop). +For local emission it autoconfigures system in a way in which it's more robust and faster then other more common protocols like ROS, DDS ```python from dimos.protocol.pubsub.lcmpubsub import LCM, Topic @@ -392,11 +391,11 @@ python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_b # Available transports -| Transport | Use case | Cross-process | Network | Notes | -|----------------|-------------------------------------|---------------|---------|--------------------------------| -| `Memory` | Testing only, single process | No | No | Minimal reference impl | -| `SharedMemory` | Multi-process on same machine | Yes | No | Highest throughput (IPC) | -| `LCM` | Robot LAN broadcast (UDP multicast) | Yes | Yes | Best-effort; can drop packets | -| `Redis` | Network pubsub via Redis server | Yes | Yes | Central broker; adds hop | -| `ROS` | ROS 2 topic communication | Yes | Yes | Integrates with RViz/ROS tools | -| `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | WIP | +| Transport | Use case | Cross-process | Network | Notes | +|----------------|-------------------------------------|---------------|---------|--------------------------------------| +| `Memory` | Testing only, single process | No | No | Minimal reference impl | +| `SharedMemory` | Multi-process on same machine | Yes | No | Highest throughput (IPC) | +| `LCM` | Robot LAN broadcast (UDP multicast) | Yes | Yes | Best-effort; can drop packets on LAN | +| `Redis` | Network pubsub via Redis server | Yes | Yes | Central broker; adds hop | +| `ROS` | ROS 2 topic communication | Yes | Yes | Integrates with RViz/ROS tools | +| `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | WIP | From 271082fcd513348836cd5cc33bfd783b21260948 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 21:28:11 +0800 Subject: [PATCH 09/13] blueprints pulled from dev --- docs/concepts/blueprints.md | 159 ++++++++++++++++++++++++++---------- 1 file changed, 117 insertions(+), 42 deletions(-) diff --git a/docs/concepts/blueprints.md b/docs/concepts/blueprints.md index 0a3e2ceaf5..686aff60d5 100644 --- a/docs/concepts/blueprints.md +++ b/docs/concepts/blueprints.md @@ -6,19 +6,26 @@ You don't typically want to run a single module, so multiple blueprints are hand You create a `ModuleBlueprintSet` from a single module (say `ConnectionModule`) with: -```python +```python session=blueprint-ex1 +from dimos.core.blueprints import create_module_blueprint +from dimos.core import Module, rpc + +class ConnectionModule(Module): + def __init__(self, arg1, arg2, kwarg='value') -> None: + super().__init__() + blueprint = create_module_blueprint(ConnectionModule, 'arg1', 'arg2', kwarg='value') ``` -But the same thing can be acomplished more succinctly as: +But the same thing can be accomplished more succinctly as: -```python +```python session=blueprint-ex1 connection = ConnectionModule.blueprint ``` Now you can create the blueprint with: -```python +```python session=blueprint-ex1 blueprint = connection('arg1', 'arg2', kwarg='value') ``` @@ -26,7 +33,23 @@ blueprint = connection('arg1', 'arg2', kwarg='value') You can link multiple blueprints together with `autoconnect`: -```python +```python session=blueprint-ex1 +from dimos.core.blueprints import autoconnect + +class Module1(Module): + def __init__(self, arg1) -> None: + super().__init__() + +class Module2(Module): + ... + +class Module3(Module): + ... + +module1 = Module1.blueprint +module2 = Module2.blueprint +module3 = Module3.blueprint + blueprint = autoconnect( module1(), module2(), @@ -36,7 +59,16 @@ blueprint = autoconnect( `blueprint` itself is a `ModuleBlueprintSet` so you can link it with other modules: -```python +```python session=blueprint-ex1 +class Module4(Module): + ... + +class Module5(Module): + ... + +module4 = Module4.blueprint +module5 = Module5.blueprint + expanded_blueprint = autoconnect( blueprint, module4(), @@ -50,11 +82,11 @@ Blueprints are frozen data classes, and `autoconnect()` always constructs an exp If the same module appears multiple times in `autoconnect`, the **later blueprint wins** and overrides earlier ones: -```python +```python session=blueprint-ex1 blueprint = autoconnect( - module_a(arg1=1), - module_b(), - module_a(arg1=2), # This one is used, the first is discarded + module1(arg1=1), + module2(), + module1(arg1=2), # This one is used, the first is discarded ) ``` @@ -64,14 +96,20 @@ This is so you can "inherit" from one blueprint but override something you need Imagine you have this code: -```python +```python session=blueprint-ex1 +from functools import partial + +from dimos.core.blueprints import create_module_blueprint, autoconnect +from dimos.core import Module, rpc, Out, In +from dimos.msgs.sensor_msgs import Image + class ModuleA(Module): image: Out[Image] - start_explore: Out[Bool] + start_explore: Out[bool] class ModuleB(Module): image: In[Image] - begin_explore: In[Bool] + begin_explore: In[bool] module_a = partial(create_module_blueprint, ModuleA) module_b = partial(create_module_blueprint, ModuleB) @@ -95,24 +133,37 @@ By default `LCMTransport` is used if the object supports `lcm_encode`. If it doe You can override transports with the `transports` method. It returns a new blueprint in which the override is set. -```python -blueprint = autoconnect(...) -expanded_blueprint = autoconnect(blueprint, ...) -blueprint = blueprint.transports({ +```python session=blueprint-ex1 +from dimos.core.transport import pSHMTransport, pLCMTransport + +base_blueprint = autoconnect( + module1(arg1=1), + module2(), +) +expanded_blueprint = autoconnect( + base_blueprint, + module4(), + module5(), +) +base_blueprint = base_blueprint.transports({ ("image", Image): pSHMTransport( - "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + "/go2/color_image", default_capacity=1920 * 1080 * 3, # 1920x1080 frame x 3 (RGB) x uint8 ), - ("start_explore", Bool): pLCMTransport(), + ("start_explore", bool): pLCMTransport("/start_explore"), }) ``` -Note: `expanded_blueprint` does not get the transport overrides because it's created from the initial value of `blueprint`, not the second. +Note: `expanded_blueprint` does not get the transport overrides because it's created from the initial value of `base_blueprint`, not the second. ## Remapping connections Sometimes you need to rename a connection to match what other modules expect. You can use `remappings` to rename module connections: -```python +```python session=blueprint-ex2 +from dimos.core.blueprints import autoconnect +from dimos.core import Module, rpc, Out, In +from dimos.msgs.sensor_msgs import Image + class ConnectionModule(Module): color_image: Out[Image] # Outputs on 'color_image' @@ -139,12 +190,11 @@ After remapping: If you want to override the topic, you still have to do it manually: -```python -blueprint -.remappings([ +```python session=blueprint-ex2 +from dimos.core.transport import LCMTransport +blueprint.remappings([ (ConnectionModule, 'color_image', 'rgb_image'), -]) -.transports({ +]).transports({ ("rgb_image", Image): LCMTransport("/custom/rgb/image", Image), }) ``` @@ -153,7 +203,10 @@ blueprint Each module can optionally take a `global_config` option in `__init__`. E.g.: -```python +```python session=blueprint-ex3 +from dimos.core import Module, rpc +from dimos.core.global_config import GlobalConfig + class ModuleA(Module): def __init__(self, global_config: GlobalConfig | None = None): @@ -162,15 +215,17 @@ class ModuleA(Module): The config is normally taken from .env or from environment variables. But you can specifically override the values for a specific blueprint: -```python -blueprint = blueprint.global_config(n_dask_workers=8) +```python session=blueprint-ex3 +blueprint = ModuleA.blueprint().global_config(n_dask_workers=8) ``` ## Calling the methods of other modules Imagine you have this code: -```python +```python session=blueprint-ex3 +from dimos.core import Module, rpc + class ModuleA(Module): @rpc @@ -186,7 +241,9 @@ And you want to call `ModuleA.get_time` in `ModuleB.request_the_time`. To do this, you can request a link to the method you want to call in `rpc_calls`. Calling `get_time_rcp` will call the original `ModuleA.get_time`. -```python +```python session=blueprint-ex3 +from dimos.core import Module, rpc + class ModuleB(Module): rpc_calls: list[str] = [ "ModuleA.get_time", @@ -199,8 +256,10 @@ class ModuleB(Module): You can also request multiple methods at a time: -```python -method1_rpc, method2_rpc = self.get_rpc_calls("ModuleX.m1", "ModuleX.m2") +```python session=blueprint-ex3 +class ModuleB(Module): + def request_the_time(self) -> None: + method1_rpc, method2_rpc = self.get_rpc_calls("ModuleX.m1", "ModuleX.m2") ``` ## Alternative RPC calls @@ -209,7 +268,10 @@ There is an alternative way of receiving RPC methods. It is useful when you want You can use it by defining a method like `set__`: -```python +```python session=blueprint-ex3 +from dimos.core import Module, rpc +from dimos.core.rpc_client import RpcCall + class ModuleB(Module): @rpc # Note that it has to be an rpc method. def set_ModuleA_get_time(self, rpc_call: RpcCall) -> None: @@ -228,12 +290,16 @@ In the previous examples, you can only call methods in a module called `ModuleA` You can do so by extracting the common interface as an `ABC` (abstract base class) and linking to the `ABC` instead one particular class. -```python +```python session=blueprint-ex3 +from abc import ABC, abstractmethod +from dimos.core.blueprints import autoconnect +from dimos.core import Module, rpc + class TimeInterface(ABC): @abstractmethod def get_time(self): ... -class ProperTime(TimeInterface): +class ProperTime(Module, TimeInterface): def get_time(self): return "13:00" @@ -254,7 +320,7 @@ class ModuleB(Module): The actual method that you get in `get_time_rpc` depends on which module is deployed. If you deploy `ProperTime`, you get `ProperTime.get_time`: -```python +```python session=blueprint-ex3 blueprint = autoconnect( ProperTime.blueprint(), # get_rpc_calls("TimeInterface.get_time") returns ProperTime.get_time @@ -268,7 +334,13 @@ If both are deployed, the blueprint will throw an error because it's ambiguous. Skills have to be registered with `AgentSpec.register_skills(self)`. -```python +```python session=blueprint-ex4 +from dimos.core import Module, rpc +from dimos.core.skill_module import SkillModule +from dimos.protocol.skill.skill import skill +from dimos.core.rpc_client import RpcCall +from dimos.core.global_config import GlobalConfig + class SomeSkill(Module): @skill @@ -290,7 +362,10 @@ class SomeSkill(Module): Or, you can avoid all of this by inheriting from `SkillModule` which does the above automatically: -```python +```python session=blueprint-ex4 +from dimos.core.skill_module import SkillModule +from dimos.protocol.skill.skill import skill + class SomeSkill(SkillModule): @skill @@ -302,8 +377,8 @@ class SomeSkill(SkillModule): All you have to do to build a blueprint is call: -```python -module_coordinator = blueprint.build(global_config=config) +```python session=blueprint-ex4 +module_coordinator = SomeSkill.blueprint().build(global_config=GlobalConfig()) ``` This returns a `ModuleCoordinator` instance that manages all deployed modules. @@ -312,7 +387,7 @@ This returns a `ModuleCoordinator` instance that manages all deployed modules. You can block the thread until it exits with: -```python +```python session=blueprint-ex4 module_coordinator.loop() ``` From 1e349fdf7540be883cb10d769a112666a19b0f56 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 21:35:05 +0800 Subject: [PATCH 10/13] reintroduced missing data flow svg --- docs/development/assets/get_data_flow.svg | 25 +++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 docs/development/assets/get_data_flow.svg diff --git a/docs/development/assets/get_data_flow.svg b/docs/development/assets/get_data_flow.svg new file mode 100644 index 0000000000..d875e1dadb --- /dev/null +++ b/docs/development/assets/get_data_flow.svg @@ -0,0 +1,25 @@ + + +get_data(name) + + + +Check +data/{name} + + + +Return path + + + +Pull LFS + + + +Decompress + + + +Return path + From 7cbce6afa91ad89d00a67f9a63c2ab0148a5fe6f Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 24 Jan 2026 21:37:44 +0800 Subject: [PATCH 11/13] fixed broken link --- docs/concepts/transports.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index b81794fc77..018fd91cb0 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -9,7 +9,7 @@ Transports connect **module streams** across **process boundaries** and/or **net Each edge in the graph is a **transported stream** (potentially different protocols). Each node is a **module**: -![go2_basic](assets/go2_basic.svg) +![go2_nav](assets/go2_nav.svg) ## What the transport layer guarantees (and what it doesn’t) From 689e49067c88e2424d7515d57a46777574b9aa54 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 25 Jan 2026 14:10:20 +0800 Subject: [PATCH 12/13] titles fix --- docs/concepts/transports.md | 40 ++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index 018fd91cb0..7ddc29714f 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -29,7 +29,7 @@ So: treat the API as uniform, but pick a backend whose semantics match the task. --- -# Benchmarks +## Benchmarks Quick view on performance of our pubsub backends: @@ -41,7 +41,7 @@ python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_b --- -# Abstraction layers +## Abstraction layers
Pikchr @@ -77,7 +77,7 @@ We’ll go through these layers top-down. --- -# Using transports with blueprints +## Using transports with blueprints See [Blueprints](blueprints.md) for the blueprint API. @@ -106,7 +106,7 @@ ros = nav.transports( --- -# Using transports with modules +## Using transports with modules Each **stream** on a module can use a different transport. Set `.transport` on the stream **before starting** modules. @@ -163,7 +163,7 @@ See [Modules](modules.md) for more on module architecture. --- -# Inspecting LCM traffic (CLI) +## Inspecting LCM traffic (CLI) `lcmspy` shows topic frequency/bandwidth stats: @@ -178,7 +178,7 @@ Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28 --- -# Implementing a transport +## Implementing a transport At the stream layer, a transport is implemented by subclassing `Transport` (see [`core/stream.py`](/dimos/core/stream.py#L83)) and implementing: @@ -194,13 +194,13 @@ Your `Transport.__init__` args can be anything meaningful for your backend: Encoding is an implementation detail, but we encourage using LCM-compatible message types when possible. -## Encoding helpers +### Encoding helpers Many of our message types provide `lcm_encode` / `lcm_decode` for compact, language-agnostic binary encoding (often faster than pickle). For details, see [LCM](/docs/concepts/lcm.md). --- -# PubSub transports +## PubSub transports Even though transport can be anything (TCP connection, unix socket) for now all our transport backends implement the `PubSub` interface. @@ -232,7 +232,7 @@ print(inspect.getsource(PubSub.subscribe)) Topic/message types are flexible: bytes, JSON, or our ROS-compatible [LCM](/docs/concepts/lcm.md) types. We also have pickle-based transports for arbitrary Python objects. -## LCM (UDP multicast) +### LCM (UDP multicast) LCM is UDP multicast. It’s very fast on a robot LAN, but it’s **best-effort** (packets can drop). For local emission it autoconfigures system in a way in which it's more robust and faster then other more common protocols like ROS, DDS @@ -262,7 +262,7 @@ lcm.stop() Received velocity: x=1.0, y=0.0, z=0.5 ``` -## Shared memory (IPC) +### Shared memory (IPC) Shared memory is highest performance, but only works on the **same machine**. @@ -290,7 +290,7 @@ Received: [{'data': [1, 2, 3]}] --- -# A minimal transport: `Memory` +## A minimal transport: `Memory` The simplest toy backend is `Memory` (single process). Start from there when implementing a new pubsub backend. @@ -323,13 +323,13 @@ See [`memory.py`](/dimos/protocol/pubsub/memory.py) for the complete source. --- -# Encode/decode mixins +## Encode/decode mixins Transports often need to serialize messages before sending and deserialize after receiving. `PubSubEncoderMixin` at [`pubsub/spec.py`](/dimos/protocol/pubsub/spec.py#L95) provides a clean way to add encoding/decoding to any pubsub implementation. -## Available mixins +### Available mixins | Mixin | Encoding | Use case | |----------------------|-----------------|------------------------------------| @@ -339,9 +339,9 @@ Transports often need to serialize messages before sending and deserialize after `LCMEncoderMixin` is especially useful: you can use LCM message definitions with *any* transport (not just UDP multicast). See [LCM](/docs/concepts/lcm.md) for details. -## Creating a custom mixin +### Creating a custom mixin -```python session=jsonencoder +```python session=jsonencoder no-result from dimos.protocol.pubsub.spec import PubSubEncoderMixin import json @@ -355,7 +355,7 @@ class JsonEncoderMixin(PubSubEncoderMixin[str, dict, bytes]): Combine with a pubsub implementation via multiple inheritance: -```python session=jsonencoder +```python session=jsonencoder no-result from dimos.protocol.pubsub.memory import Memory class MyJsonPubSub(JsonEncoderMixin, Memory): @@ -364,7 +364,7 @@ class MyJsonPubSub(JsonEncoderMixin, Memory): Swap serialization by changing the mixin: -```python session=jsonencoder +```python session=jsonencoder no-result from dimos.protocol.pubsub.spec import PickleEncoderMixin class MyPicklePubSub(PickleEncoderMixin, Memory): @@ -373,13 +373,13 @@ class MyPicklePubSub(PickleEncoderMixin, Memory): --- -# Testing and benchmarks +## Testing and benchmarks -## Spec tests +### Spec tests See [`pubsub/test_spec.py`](/dimos/protocol/pubsub/test_spec.py) for the grid tests your new backend should pass. -## Benchmarks +### Benchmarks Add your backend to benchmarks to compare in context: From d57573cf7bf032263d4d165376f4f4386f396139 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 25 Jan 2026 19:38:50 +0800 Subject: [PATCH 13/13] ci: run pre-commit on all pushes and PRs --- .github/workflows/code-cleanup.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/code-cleanup.yml b/.github/workflows/code-cleanup.yml index 48f6ea281e..7177b3f9d1 100644 --- a/.github/workflows/code-cleanup.yml +++ b/.github/workflows/code-cleanup.yml @@ -1,8 +1,7 @@ name: code-cleanup on: push: - paths-ignore: - - '**.md' + pull_request: permissions: contents: write