diff --git a/dimos/core/transport.py b/dimos/core/transport.py index fe26b5a93f..2586706feb 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -14,19 +14,23 @@ from __future__ import annotations -from typing import Any, TypeVar - -import dimos.core.colors as colors - -T = TypeVar("T") - +import threading from typing import ( TYPE_CHECKING, + Any, TypeVar, ) +import dimos.core.colors as colors from dimos.core.stream import In, Out, Stream, Transport from dimos.msgs.protocol import DimosMsg + +try: + import cyclonedds as _cyclonedds # noqa: F401 + + DDS_AVAILABLE = True +except ImportError: + DDS_AVAILABLE = False from dimos.protocol.pubsub.impl.jpeg_shm import JpegSharedMemory from dimos.protocol.pubsub.impl.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic @@ -278,4 +282,41 @@ def stop(self) -> None: self._ros = None +if DDS_AVAILABLE: + from dimos.protocol.pubsub.impl.ddspubsub import DDS, Topic as DDSTopic + + class DDSTransport(PubSubTransport[T]): + def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def] + super().__init__(DDSTopic(topic, type)) + self.dds = DDS(**kwargs) + self._started: bool = False + self._start_lock = threading.RLock() + + def start(self) -> None: + with self._start_lock: + if not self._started: + self.dds.start() + self._started = True + + def stop(self) -> None: + with self._start_lock: + if self._started: + self.dds.stop() + self._started = False + + def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] + with self._start_lock: + if not self._started: + self.start() + self.dds.publish(self.topic, msg) + + def subscribe( + self, callback: Callable[[T], None], selfstream: Stream[T] | None = None + ) -> Callable[[], None]: + with self._start_lock: + if not self._started: + self.start() + return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) + + class ZenohTransport(PubSubTransport[T]): ... diff --git a/dimos/protocol/pubsub/benchmark/testdata.py b/dimos/protocol/pubsub/benchmark/testdata.py index 244d09f105..ad604131e0 100644 --- a/dimos/protocol/pubsub/benchmark/testdata.py +++ b/dimos/protocol/pubsub/benchmark/testdata.py @@ -14,12 +14,20 @@ from collections.abc import Generator from contextlib import contextmanager +from dataclasses import dataclass from typing import TYPE_CHECKING, Any import numpy as np from dimos.msgs.sensor_msgs.Image import Image, ImageFormat from dimos.protocol.pubsub.benchmark.type import Case + +try: + import cyclonedds as _cyclonedds # noqa: F401 + + DDS_AVAILABLE = True +except ImportError: + DDS_AVAILABLE = False from dimos.protocol.pubsub.impl.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic from dimos.protocol.pubsub.impl.memory import Memory from dimos.protocol.pubsub.impl.shmpubsub import ( @@ -173,6 +181,67 @@ def shm_lcm_pubsub_channel() -> Generator[LCMSharedMemory, None, None]: ) ) +if DDS_AVAILABLE: + from cyclonedds.idl import IdlStruct + from cyclonedds.idl.types import sequence, uint8 + from cyclonedds.qos import Policy, Qos + + from dimos.protocol.pubsub.impl.ddspubsub import ( + DDS, + Topic as DDSTopic, + ) + + @dataclass + class DDSBenchmarkData(IdlStruct): # type: ignore[misc] + """DDS message type for benchmarking with variable-size byte payload.""" + + data: sequence[uint8] # type: ignore[type-arg] + + @contextmanager + def dds_high_throughput_pubsub_channel() -> Generator[DDS, None, None]: + """DDS with high-throughput QoS preset.""" + HIGH_THROUGHPUT_QOS = Qos( + Policy.Reliability.BestEffort, + Policy.History.KeepLast(depth=1), + Policy.Durability.Volatile, + ) + dds_pubsub = DDS(qos=HIGH_THROUGHPUT_QOS) + dds_pubsub.start() + yield dds_pubsub + dds_pubsub.stop() + + @contextmanager + def dds_reliable_pubsub_channel() -> Generator[DDS, None, None]: + """DDS with reliable QoS preset.""" + RELIABLE_QOS = Qos( + Policy.Reliability.Reliable(max_blocking_time=0), + Policy.History.KeepLast(depth=5000), + Policy.Durability.Volatile, + ) + dds_pubsub = DDS(qos=RELIABLE_QOS) + dds_pubsub.start() + yield dds_pubsub + dds_pubsub.stop() + + def dds_msggen(size: int) -> tuple[DDSTopic, DDSBenchmarkData]: + """Generate DDS message for benchmark.""" + topic = DDSTopic(name="benchmark/dds", data_type=DDSBenchmarkData) + return (topic, DDSBenchmarkData(data=list(make_data_bytes(size)))) # type: ignore[arg-type] + + testcases.append( + Case( + pubsub_context=dds_high_throughput_pubsub_channel, + msg_gen=dds_msggen, + ) + ) + + testcases.append( + Case( + pubsub_context=dds_reliable_pubsub_channel, + msg_gen=dds_msggen, + ) + ) + try: from dimos.protocol.pubsub.impl.redispubsub import Redis diff --git a/dimos/protocol/pubsub/impl/ddspubsub.py b/dimos/protocol/pubsub/impl/ddspubsub.py new file mode 100644 index 0000000000..1e6dc36296 --- /dev/null +++ b/dimos/protocol/pubsub/impl/ddspubsub.py @@ -0,0 +1,161 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +import threading +from typing import TYPE_CHECKING, Any, TypeAlias + +from cyclonedds.core import Listener +from cyclonedds.pub import DataWriter as DDSDataWriter +from cyclonedds.qos import Policy, Qos +from cyclonedds.sub import DataReader as DDSDataReader +from cyclonedds.topic import Topic as DDSTopic + +from dimos.protocol.pubsub.spec import PubSub +from dimos.protocol.service.ddsservice import DDSService +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from cyclonedds.idl import IdlStruct + +logger = setup_logger() + + +@dataclass(frozen=True) +class Topic: + """Represents a DDS topic.""" + + name: str + data_type: type[IdlStruct] + + def __str__(self) -> str: + return f"{self.name}#{self.data_type.__name__}" + + +MessageCallback: TypeAlias = Callable[[Any, Topic], None] + + +class _DDSMessageListener(Listener): # type: ignore[misc] + """Listener for DataReader that dispatches messages to callbacks.""" + + __slots__ = ("_callbacks", "_lock", "_topic") + + def __init__(self, topic: Topic) -> None: + super().__init__() # type: ignore[no-untyped-call] + self._topic = topic + self._callbacks: tuple[MessageCallback, ...] = () + self._lock = threading.Lock() + + def add_callback(self, callback: MessageCallback) -> None: + """Add a callback to the listener.""" + with self._lock: + self._callbacks = (*self._callbacks, callback) + + def remove_callback(self, callback: MessageCallback) -> None: + """Remove a callback from the listener.""" + with self._lock: + self._callbacks = tuple(cb for cb in self._callbacks if cb is not callback) + + def on_data_available(self, reader: DDSDataReader[Any]) -> None: + """Called when data is available on the reader.""" + try: + samples = reader.take() + except Exception as e: + logger.error(f"Error reading from topic {self._topic}: {e}", exc_info=True) + return + for sample in samples: + if sample is not None: + for callback in self._callbacks: + try: + callback(sample, self._topic) + except Exception as e: + logger.error(f"Callback error on topic {self._topic}: {e}", exc_info=True) + + +class DDS(DDSService, PubSub[Topic, Any]): + def __init__(self, qos: Qos | None = None, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._qos = qos + self._writers: dict[Topic, DDSDataWriter[Any]] = {} + self._writer_lock = threading.Lock() + self._readers: dict[Topic, DDSDataReader[Any]] = {} + self._reader_lock = threading.Lock() + self._listeners: dict[Topic, _DDSMessageListener] = {} + + @property + def qos(self) -> Qos | None: + """Get the QoS settings.""" + return self._qos + + def _get_writer(self, topic: Topic) -> DDSDataWriter[Any]: + """Get or create a DataWriter for the given topic.""" + with self._writer_lock: + if topic not in self._writers: + dds_topic = DDSTopic(self.participant, topic.name, topic.data_type) + self._writers[topic] = DDSDataWriter(self.participant, dds_topic, qos=self._qos) + return self._writers[topic] + + def publish(self, topic: Topic, message: Any) -> None: + """Publish a message to a DDS topic.""" + writer = self._get_writer(topic) + try: + writer.write(message) + except Exception as e: + logger.error(f"Error publishing to topic {topic}: {e}", exc_info=True) + + def _get_listener(self, topic: Topic) -> _DDSMessageListener: + """Get or create a listener and reader for the given topic.""" + with self._reader_lock: + if topic not in self._readers: + dds_topic = DDSTopic(self.participant, topic.name, topic.data_type) + listener = _DDSMessageListener(topic) + self._readers[topic] = DDSDataReader( + self.participant, dds_topic, qos=self._qos, listener=listener + ) + self._listeners[topic] = listener + return self._listeners[topic] + + def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]: + """Subscribe to a DDS topic with a callback.""" + listener = self._get_listener(topic) + listener.add_callback(callback) + return lambda: self._unsubscribe_callback(topic, callback) + + def _unsubscribe_callback(self, topic: Topic, callback: MessageCallback) -> None: + """Unsubscribe a callback from a topic.""" + with self._reader_lock: + listener = self._listeners.get(topic) + if listener: + listener.remove_callback(callback) + + def stop(self) -> None: + """Stop the DDS service and clean up resources.""" + with self._reader_lock: + self._readers.clear() + self._listeners.clear() + with self._writer_lock: + self._writers.clear() + super().stop() + + +__all__ = [ + "DDS", + "MessageCallback", + "Policy", + "Qos", + "Topic", +] diff --git a/dimos/protocol/service/__init__.py b/dimos/protocol/service/__init__.py index bcafa6af4a..fb9df08ca9 100644 --- a/dimos/protocol/service/__init__.py +++ b/dimos/protocol/service/__init__.py @@ -1,4 +1,4 @@ -from dimos.protocol.service.lcmservice import LCMService as LCMService +from dimos.protocol.service.lcmservice import LCMService from dimos.protocol.service.spec import Configurable as Configurable, Service as Service __all__ = [ diff --git a/dimos/protocol/service/ddsservice.py b/dimos/protocol/service/ddsservice.py new file mode 100644 index 0000000000..6ed04c07ad --- /dev/null +++ b/dimos/protocol/service/ddsservice.py @@ -0,0 +1,80 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import dataclass +import threading +from typing import TYPE_CHECKING, Any + +try: + from cyclonedds.domain import DomainParticipant + + DDS_AVAILABLE = True +except ImportError: + DDS_AVAILABLE = False + DomainParticipant = None # type: ignore[assignment, misc] + +from dimos.protocol.service.spec import Service +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from cyclonedds.qos import Qos + +logger = setup_logger() + +_participants: dict[int, DomainParticipant] = {} +_participants_lock = threading.Lock() + + +@dataclass +class DDSConfig: + """Configuration for DDS service.""" + + domain_id: int = 0 + qos: Qos | None = None + + +class DDSService(Service[DDSConfig]): + default_config = DDSConfig + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + def start(self) -> None: + """Start the DDS service.""" + domain_id = self.config.domain_id + with _participants_lock: + if domain_id not in _participants: + _participants[domain_id] = DomainParticipant(domain_id) + logger.info(f"DDS service started with Cyclone DDS domain {domain_id}") + super().start() + + def stop(self) -> None: + """Stop the DDS service.""" + super().stop() + + @property + def participant(self) -> DomainParticipant: + """Get the DomainParticipant instance for this service's domain.""" + domain_id = self.config.domain_id + if domain_id not in _participants: + raise RuntimeError(f"DomainParticipant not initialized for domain {domain_id}") + return _participants[domain_id] + + +__all__ = [ + "DDSConfig", + "DDSService", +] diff --git a/docker/python/Dockerfile b/docker/python/Dockerfile index b85404f51a..1572bbc3ba 100644 --- a/docker/python/Dockerfile +++ b/docker/python/Dockerfile @@ -2,8 +2,7 @@ ARG FROM_IMAGE=ghcr.io/dimensionalos/ros:dev FROM ${FROM_IMAGE} # Install basic requirements -RUN apt-get update -RUN apt-get install -y \ +RUN apt-get update && apt-get install -y \ python-is-python3 \ curl \ gnupg2 \ @@ -13,7 +12,7 @@ RUN apt-get install -y \ portaudio19-dev \ git \ mesa-utils \ - libgl1-mesa-glx \ + libgl1 \ libgl1-mesa-dri \ software-properties-common \ libxcb1-dev \ diff --git a/docker/ros/Dockerfile b/docker/ros/Dockerfile index 2eccd643c0..5a0d16656a 100644 --- a/docker/ros/Dockerfile +++ b/docker/ros/Dockerfile @@ -14,8 +14,7 @@ ENV LANG=en_US.UTF-8 ENV ROS_DISTRO=humble # Install basic requirements -RUN apt-get update -RUN apt-get install -y \ +RUN apt-get update && apt-get install -y \ curl \ gnupg2 \ lsb-release \ @@ -24,7 +23,7 @@ RUN apt-get install -y \ portaudio19-dev \ git \ mesa-utils \ - libgl1-mesa-glx \ + libgl1 \ libgl1-mesa-dri \ software-properties-common \ libxcb1-dev \ diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index e4b62b01ce..371cb0c2a3 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -148,6 +148,7 @@ if __name__ == "__main__": ``` + ``` Initialized dimos local cluster with 2 workers, memory limit: auto 2026-01-24T13:17:50.190559Z [info ] Deploying module. [dimos/core/__init__.py] module=CameraModule @@ -288,6 +289,41 @@ shm.stop() Received: [{'data': [1, 2, 3]}] ``` +### DDS Transport + +For network communication, DDS uses the Data Distribution Service (DDS) protocol: + +```python session=dds_demo ansi=false +from dataclasses import dataclass +from cyclonedds.idl import IdlStruct + +from dimos.protocol.pubsub.impl.ddspubsub import DDS, Topic + +@dataclass +class SensorReading(IdlStruct): + value: float + +dds = DDS() +dds.start() + +received = [] +sensor_topic = Topic(name="sensors/temperature", data_type=SensorReading) + +dds.subscribe(sensor_topic, lambda msg, t: received.append(msg)) +dds.publish(sensor_topic, SensorReading(value=22.5)) + +import time +time.sleep(0.1) + +print(f"Received: {received}") +dds.stop() +``` + + +``` +Received: [SensorReading(value=22.5)] +``` + --- ## A minimal transport: `Memory` diff --git a/docs/development/README.md b/docs/development/README.md index 0e5a4a2021..e574be9d8f 100644 --- a/docs/development/README.md +++ b/docs/development/README.md @@ -71,6 +71,35 @@ uv add git+https://github.com/openai/CLIP.git uv add git+https://github.com/facebookresearch/detectron2.git ``` +### Optional: DDS Transport Support + +The `dds` extra provides DDS (Data Distribution Service) transport support via [Eclipse Cyclone DDS](https://cyclonedds.io/docs/cyclonedds-python/latest/). This requires installing system libraries before the Python package can be built. + +**Ubuntu/Debian:** + +```bash +# Install the CycloneDDS development library +sudo apt install cyclonedds-dev + +# Create a compatibility directory structure +# (required because Ubuntu's multiarch layout doesn't match the expected CMake layout) +sudo mkdir -p /opt/cyclonedds/{lib,bin,include} +sudo ln -sf /usr/lib/x86_64-linux-gnu/libddsc.so* /opt/cyclonedds/lib/ +sudo ln -sf /usr/lib/x86_64-linux-gnu/libcycloneddsidl.so* /opt/cyclonedds/lib/ +sudo ln -sf /usr/bin/idlc /opt/cyclonedds/bin/ +sudo ln -sf /usr/bin/ddsperf /opt/cyclonedds/bin/ +sudo ln -sf /usr/include/dds /opt/cyclonedds/include/ + +# Install with the dds extra +CYCLONEDDS_HOME=/opt/cyclonedds uv pip install -e '.[dds]' +``` + +To install all extras including DDS: + +```bash +CYCLONEDDS_HOME=/opt/cyclonedds uv sync --extra dds +``` +