Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1429cad
Add cyclonedds package
Kaweees Feb 3, 2026
0538ab7
CI code cleanup
Kaweees Feb 3, 2026
6eb1286
Create DDS transport
Kaweees Feb 3, 2026
b003a0c
Fix broken DDS import path in transports documentation
Kaweees Feb 3, 2026
bb2adf8
Add threading import to transport module
Kaweees Feb 3, 2026
f98ee87
Refactor DDS service to support multiple DomainParticipants
Kaweees Feb 3, 2026
eaad360
Improve DDSTransport type hinting
Kaweees Feb 4, 2026
870f450
Update type hint for DDSTransport.subscribe method
Kaweees Feb 4, 2026
0246eae
Move cyclonedds to separate index
Kaweees Feb 4, 2026
f83eebf
Move HIGH_THROUGHPUT_QOS and RELIABLE_QOS into testdata
Kaweees Feb 6, 2026
978f62f
Make DDS an optional transport and update installation documentation
Kaweees Feb 6, 2026
af5af9f
Documentation nitpick
Kaweees Feb 6, 2026
f189b8f
Don't execute ddspubsub in transport testcases if dds is missing.
Kaweees Feb 6, 2026
f0991a6
Add cyclonedds to mypy type checking overrides
Kaweees Feb 10, 2026
8c68e31
Merge branch 'dev' into miguel/dds_transport
Kaweees Feb 10, 2026
7ff72f6
Refactor: type ignore for DDSBenchmarkData and _DDSMessageListener
Kaweees Feb 10, 2026
34820bf
Refactor: combine apt-get update and install commands to prevent stal…
Kaweees Feb 10, 2026
88e0c35
Refactor: Update Dockerfiles to replace 'libgl1-mesa-glx' with 'libgl…
Kaweees Feb 10, 2026
ffb2726
pin langchain for now
paul-nechifor Feb 11, 2026
5ca9949
resolve merge
spomichter Feb 13, 2026
d689796
uv lock
spomichter Feb 13, 2026
2ddf6cf
resolve merge conflicts
spomichter Feb 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@

from __future__ import annotations

from typing import Any, TypeVar

import dimos.core.colors as colors

T = TypeVar("T")

import threading
from typing import (
TYPE_CHECKING,
Any,
TypeVar,
)

import dimos.core.colors as colors
from dimos.core.stream import In, Out, Stream, Transport
from dimos.msgs.protocol import DimosMsg

try:
import cyclonedds as _cyclonedds # noqa: F401

DDS_AVAILABLE = True
except ImportError:
DDS_AVAILABLE = False
from dimos.protocol.pubsub.impl.jpeg_shm import JpegSharedMemory
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic
from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic
Expand Down Expand Up @@ -278,4 +282,41 @@ def stop(self) -> None:
self._ros = None


if DDS_AVAILABLE:
from dimos.protocol.pubsub.impl.ddspubsub import DDS, Topic as DDSTopic

class DDSTransport(PubSubTransport[T]):
def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(DDSTopic(topic, type))
self.dds = DDS(**kwargs)
self._started: bool = False
self._start_lock = threading.RLock()

def start(self) -> None:
with self._start_lock:
if not self._started:
self.dds.start()
self._started = True

def stop(self) -> None:
with self._start_lock:
if self._started:
self.dds.stop()
self._started = False

def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
with self._start_lock:
if not self._started:
self.start()
self.dds.publish(self.topic, msg)

def subscribe(
self, callback: Callable[[T], None], selfstream: Stream[T] | None = None
) -> Callable[[], None]:
with self._start_lock:
if not self._started:
self.start()
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg))


class ZenohTransport(PubSubTransport[T]): ...
69 changes: 69 additions & 0 deletions dimos/protocol/pubsub/benchmark/testdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@

from collections.abc import Generator
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

import numpy as np

from dimos.msgs.sensor_msgs.Image import Image, ImageFormat
from dimos.protocol.pubsub.benchmark.type import Case

try:
import cyclonedds as _cyclonedds # noqa: F401

DDS_AVAILABLE = True
except ImportError:
DDS_AVAILABLE = False
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic
from dimos.protocol.pubsub.impl.memory import Memory
from dimos.protocol.pubsub.impl.shmpubsub import (
Expand Down Expand Up @@ -173,6 +181,67 @@ def shm_lcm_pubsub_channel() -> Generator[LCMSharedMemory, None, None]:
)
)

if DDS_AVAILABLE:
from cyclonedds.idl import IdlStruct
from cyclonedds.idl.types import sequence, uint8
from cyclonedds.qos import Policy, Qos

from dimos.protocol.pubsub.impl.ddspubsub import (
DDS,
Topic as DDSTopic,
)

@dataclass
class DDSBenchmarkData(IdlStruct): # type: ignore[misc]
"""DDS message type for benchmarking with variable-size byte payload."""

data: sequence[uint8] # type: ignore[type-arg]

@contextmanager
def dds_high_throughput_pubsub_channel() -> Generator[DDS, None, None]:
"""DDS with high-throughput QoS preset."""
HIGH_THROUGHPUT_QOS = Qos(
Policy.Reliability.BestEffort,
Policy.History.KeepLast(depth=1),
Policy.Durability.Volatile,
)
dds_pubsub = DDS(qos=HIGH_THROUGHPUT_QOS)
dds_pubsub.start()
yield dds_pubsub
dds_pubsub.stop()

@contextmanager
def dds_reliable_pubsub_channel() -> Generator[DDS, None, None]:
"""DDS with reliable QoS preset."""
RELIABLE_QOS = Qos(
Policy.Reliability.Reliable(max_blocking_time=0),
Policy.History.KeepLast(depth=5000),
Policy.Durability.Volatile,
)
dds_pubsub = DDS(qos=RELIABLE_QOS)
dds_pubsub.start()
yield dds_pubsub
dds_pubsub.stop()

def dds_msggen(size: int) -> tuple[DDSTopic, DDSBenchmarkData]:
"""Generate DDS message for benchmark."""
topic = DDSTopic(name="benchmark/dds", data_type=DDSBenchmarkData)
return (topic, DDSBenchmarkData(data=list(make_data_bytes(size)))) # type: ignore[arg-type]

testcases.append(
Case(
pubsub_context=dds_high_throughput_pubsub_channel,
msg_gen=dds_msggen,
)
)

testcases.append(
Case(
pubsub_context=dds_reliable_pubsub_channel,
msg_gen=dds_msggen,
)
)


try:
from dimos.protocol.pubsub.impl.redispubsub import Redis
Expand Down
161 changes: 161 additions & 0 deletions dimos/protocol/pubsub/impl/ddspubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
import threading
from typing import TYPE_CHECKING, Any, TypeAlias

from cyclonedds.core import Listener
from cyclonedds.pub import DataWriter as DDSDataWriter
from cyclonedds.qos import Policy, Qos
from cyclonedds.sub import DataReader as DDSDataReader
from cyclonedds.topic import Topic as DDSTopic

from dimos.protocol.pubsub.spec import PubSub
from dimos.protocol.service.ddsservice import DDSService
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from cyclonedds.idl import IdlStruct

logger = setup_logger()


@dataclass(frozen=True)
class Topic:
"""Represents a DDS topic."""

name: str
data_type: type[IdlStruct]

def __str__(self) -> str:
return f"{self.name}#{self.data_type.__name__}"


MessageCallback: TypeAlias = Callable[[Any, Topic], None]


class _DDSMessageListener(Listener): # type: ignore[misc]
"""Listener for DataReader that dispatches messages to callbacks."""

__slots__ = ("_callbacks", "_lock", "_topic")

def __init__(self, topic: Topic) -> None:
super().__init__() # type: ignore[no-untyped-call]
self._topic = topic
self._callbacks: tuple[MessageCallback, ...] = ()
self._lock = threading.Lock()

def add_callback(self, callback: MessageCallback) -> None:
"""Add a callback to the listener."""
with self._lock:
self._callbacks = (*self._callbacks, callback)

def remove_callback(self, callback: MessageCallback) -> None:
"""Remove a callback from the listener."""
with self._lock:
self._callbacks = tuple(cb for cb in self._callbacks if cb is not callback)

def on_data_available(self, reader: DDSDataReader[Any]) -> None:
"""Called when data is available on the reader."""
try:
samples = reader.take()
except Exception as e:
logger.error(f"Error reading from topic {self._topic}: {e}", exc_info=True)
return
for sample in samples:
if sample is not None:
for callback in self._callbacks:
try:
callback(sample, self._topic)
except Exception as e:
logger.error(f"Callback error on topic {self._topic}: {e}", exc_info=True)


class DDS(DDSService, PubSub[Topic, Any]):
def __init__(self, qos: Qos | None = None, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._qos = qos
self._writers: dict[Topic, DDSDataWriter[Any]] = {}
self._writer_lock = threading.Lock()
self._readers: dict[Topic, DDSDataReader[Any]] = {}
self._reader_lock = threading.Lock()
self._listeners: dict[Topic, _DDSMessageListener] = {}

@property
def qos(self) -> Qos | None:
"""Get the QoS settings."""
return self._qos

def _get_writer(self, topic: Topic) -> DDSDataWriter[Any]:
"""Get or create a DataWriter for the given topic."""
with self._writer_lock:
if topic not in self._writers:
dds_topic = DDSTopic(self.participant, topic.name, topic.data_type)
self._writers[topic] = DDSDataWriter(self.participant, dds_topic, qos=self._qos)
return self._writers[topic]

def publish(self, topic: Topic, message: Any) -> None:
"""Publish a message to a DDS topic."""
writer = self._get_writer(topic)
try:
writer.write(message)
except Exception as e:
logger.error(f"Error publishing to topic {topic}: {e}", exc_info=True)

def _get_listener(self, topic: Topic) -> _DDSMessageListener:
"""Get or create a listener and reader for the given topic."""
with self._reader_lock:
if topic not in self._readers:
dds_topic = DDSTopic(self.participant, topic.name, topic.data_type)
listener = _DDSMessageListener(topic)
self._readers[topic] = DDSDataReader(
self.participant, dds_topic, qos=self._qos, listener=listener
)
self._listeners[topic] = listener
return self._listeners[topic]

def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]:
"""Subscribe to a DDS topic with a callback."""
listener = self._get_listener(topic)
listener.add_callback(callback)
return lambda: self._unsubscribe_callback(topic, callback)

def _unsubscribe_callback(self, topic: Topic, callback: MessageCallback) -> None:
"""Unsubscribe a callback from a topic."""
with self._reader_lock:
listener = self._listeners.get(topic)
if listener:
listener.remove_callback(callback)

def stop(self) -> None:
"""Stop the DDS service and clean up resources."""
with self._reader_lock:
self._readers.clear()
self._listeners.clear()
with self._writer_lock:
self._writers.clear()
super().stop()


__all__ = [
"DDS",
"MessageCallback",
"Policy",
"Qos",
"Topic",
]
2 changes: 1 addition & 1 deletion dimos/protocol/service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dimos.protocol.service.lcmservice import LCMService as LCMService
from dimos.protocol.service.lcmservice import LCMService
from dimos.protocol.service.spec import Configurable as Configurable, Service as Service

__all__ = [
Expand Down
Loading
Loading