Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
bb0e818
Create DDSPubSubBase, DDSTopic
Kaweees Jan 16, 2026
ee0c529
Create PickleDDS
Kaweees Jan 16, 2026
f5c7ee8
Fix hash/equality inconsistency in DDSTopic
Kaweees Jan 16, 2026
48f548e
Add DDSMsg
Kaweees Jan 16, 2026
5779803
Create DDSTransport
Kaweees Jan 16, 2026
8f6f318
Add broadcast and subscribe methods to DDSTransport
Kaweees Jan 16, 2026
d259ac5
Create DDSService
Kaweees Jan 17, 2026
a8167a3
Add CycloneDDS package
Kaweees Jan 17, 2026
6256dd5
Remove unnecessary attributes
Kaweees Jan 18, 2026
d6620c0
Add threading and serialization methods to DDSService
Kaweees Jan 18, 2026
90b14bd
Ensure broadcast and subscribe methods initialize DDS if not started
Kaweees Jan 18, 2026
23e3c2e
Add Transport benchmarking capabilities to CycloneDDS (#1055)
Kaweees Jan 18, 2026
beb875b
Fix DDS segmentation fault using bytearray for binary data storage
Kaweees Jan 18, 2026
7d9390a
Refactor DDS PubSub implementation to use CycloneDDS Topic
Kaweees Jan 19, 2026
60b318c
Remove DDS pickling
Kaweees Jan 19, 2026
cb07796
Remove unused encoding/decoding methods and use a sequence of uint8 f…
Kaweees Jan 19, 2026
5be358e
Restore double-checked locking pattern for performance
Kaweees Jan 19, 2026
a5cc633
Encode DDS payloads as an array of bytes without string encoding to m…
Kaweees Jan 20, 2026
c130f64
Consolidate locking logic and remove redundant checks
Kaweees Jan 28, 2026
6cdc21f
Remove DDS transport pickling method
Kaweees Jan 28, 2026
eaa6eb4
Removed unused DDSMsg class, use IdlStruct instead
Kaweees Jan 28, 2026
ef9e5fb
Enhance error handling in DDS message listener
Kaweees Jan 28, 2026
3842d48
Add threading lock to DDSTransport for safe start checks
Kaweees Jan 28, 2026
7853979
Thread safe reader creation after callback is registered
Kaweees Jan 28, 2026
dbfee76
Add DDS transport documentation and example usage
Kaweees Jan 28, 2026
61c4fae
Move DDS benchmarks to testdata.py
Kaweees Jan 28, 2026
b02301c
Move _participant singleton from a class variable to a module-level v…
Kaweees Jan 28, 2026
9f949f6
Remove list casting for dds message benchmarks
Kaweees Jan 29, 2026
aefec92
Merge remote-tracking branch 'origin/dev' into miguel/dds_transport
Kaweees Jan 29, 2026
94686a9
CI code cleanup
Kaweees Jan 29, 2026
22ba09c
Add qos configutation option for DDS transport
Kaweees Jan 29, 2026
201a734
Update buffer sizes in system configurator
Kaweees Jan 29, 2026
fcc4d13
Add high-throughput and reliable QoS presets for DDS PubSub
Kaweees Jan 29, 2026
bf6196f
Add back DDS testcases generation
Kaweees Jan 29, 2026
8da904d
Rename typename to data_type and use participant property
Kaweees Feb 1, 2026
2a4bc90
Fix DDS Transport documentation example
Kaweees Feb 1, 2026
0e5c2f5
Add locks to dds start/stop
Kaweees Feb 1, 2026
cfea1d0
CI code cleanup
Kaweees Feb 1, 2026
1347ac2
Add thread safety to _DDSMessageListener.
Kaweees Feb 1, 2026
f9fdd1a
Update dimos/protocol/pubsub/ddspubsub.py
Kaweees Feb 2, 2026
225eb68
Update dimos/protocol/pubsub/ddspubsub.py
Kaweees Feb 2, 2026
9f763ce
Update dimos/protocol/pubsub/ddspubsub.py
Kaweees Feb 2, 2026
79060be
Refactor DDS transport and service classes for improved thread safety…
Kaweees Feb 2, 2026
7e06cb1
CI code cleanup
Kaweees Feb 2, 2026
2a5bfa3
Merge branch 'dev' into miguel/dds_transport
Kaweees Feb 2, 2026
cbf2b92
CI code cleanup
Kaweees Feb 2, 2026
ba0adcd
Remove duplicate imports
Kaweees Feb 2, 2026
d720b2c
Improve resource cleanup for DDS class
Kaweees Feb 2, 2026
7ebfe9b
Remove extra imports
Kaweees Feb 2, 2026
5031a51
Enhance type hints
Kaweees Feb 3, 2026
f3b7352
Refactor DDS transport methods to ensure proper start/stop logic
Kaweees Feb 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dimos/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dimos.core.rpc_client import RPCClient
from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport
from dimos.core.transport import (
DDSTransport,
LCMTransport,
SHMTransport,
ZenohTransport,
Expand All @@ -36,6 +37,7 @@
"LCMRPC",
"LCMTF",
"TF",
"DDSTransport",
"DimosCluster",
"In",
"LCMTransport",
Expand Down
34 changes: 34 additions & 0 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import annotations

import threading
from typing import Any, TypeVar

import dimos.core.colors as colors
Expand All @@ -27,6 +28,7 @@

from dimos.core.stream import In, Out, Stream, Transport
from dimos.msgs.protocol import DimosMsg
from dimos.protocol.pubsub.ddspubsub import DDS, Topic as DDSTopic
from dimos.protocol.pubsub.impl.jpeg_shm import JpegSharedMemory
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic
from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic
Expand Down Expand Up @@ -258,4 +260,36 @@ def stop(self) -> None:
self._ros = None


class DDSTransport(PubSubTransport[T]):
def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(DDSTopic(topic, type))
self.dds = DDS(**kwargs)
self._started: bool = False
self._start_lock = threading.RLock()

def start(self) -> None:
with self._start_lock:
if not self._started:
self.dds.start()
self._started = True

def stop(self) -> None:
with self._start_lock:
if self._started:
self.dds.stop()
self._started = False

def broadcast(self, _, msg) -> None:
with self._start_lock:
if not self._started:
self.start()
self.dds.publish(self.topic, msg)

def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override]
with self._start_lock:
if not self._started:
self.start()
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]


class ZenohTransport(PubSubTransport[T]): ...
2 changes: 2 additions & 0 deletions dimos/protocol/pubsub/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import dimos.protocol.pubsub.ddspubsub as dds
import dimos.protocol.pubsub.impl.lcmpubsub as lcm
from dimos.protocol.pubsub.impl.memory import Memory
from dimos.protocol.pubsub.spec import PubSub

__all__ = [
"Memory",
"PubSub",
"dds",
"lcm",
]
55 changes: 55 additions & 0 deletions dimos/protocol/pubsub/benchmark/testdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@

from collections.abc import Generator
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any

from cyclonedds.idl import IdlStruct
from cyclonedds.idl.types import sequence, uint8
import numpy as np

from dimos.msgs.sensor_msgs.Image import Image, ImageFormat
from dimos.protocol.pubsub.benchmark.type import Case
from dimos.protocol.pubsub.ddspubsub import (
DDS,
HIGH_THROUGHPUT_QOS,
RELIABLE_QOS,
Topic as DDSTopic,
)
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic
from dimos.protocol.pubsub.impl.memory import Memory
from dimos.protocol.pubsub.impl.shmpubsub import (
Expand Down Expand Up @@ -174,6 +183,52 @@ def shm_lcm_pubsub_channel() -> Generator[LCMSharedMemory, None, None]:
)


@dataclass
class DDSBenchmarkData(IdlStruct):
"""DDS message type for benchmarking with variable-size byte payload."""

data: sequence[uint8] # type: ignore[type-arg]


@contextmanager
def dds_high_throughput_pubsub_channel() -> Generator[DDS, None, None]:
"""DDS with HIGH_THROUGHPUT_QOS - BestEffort, optimized for speed."""
dds_pubsub = DDS(qos=HIGH_THROUGHPUT_QOS)
dds_pubsub.start()
yield dds_pubsub
dds_pubsub.stop()


@contextmanager
def dds_reliable_pubsub_channel() -> Generator[DDS, None, None]:
"""DDS with RELIABLE_QOS - guaranteed delivery."""
dds_pubsub = DDS(qos=RELIABLE_QOS)
dds_pubsub.start()
yield dds_pubsub
dds_pubsub.stop()


def dds_msggen(size: int) -> tuple[DDSTopic, DDSBenchmarkData]:
"""Generate DDS message for benchmark."""
topic = DDSTopic(name="benchmark/dds", data_type=DDSBenchmarkData)
return (topic, DDSBenchmarkData(data=list(make_data_bytes(size))))


testcases.append(
Case(
pubsub_context=dds_high_throughput_pubsub_channel,
msg_gen=dds_msggen,
)
)

testcases.append(
Case(
pubsub_context=dds_reliable_pubsub_channel,
msg_gen=dds_msggen,
)
)


try:
from dimos.protocol.pubsub.impl.redispubsub import Redis

Expand Down
178 changes: 178 additions & 0 deletions dimos/protocol/pubsub/ddspubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
import threading
from typing import TYPE_CHECKING, Any, TypeAlias

from cyclonedds.core import Listener
from cyclonedds.pub import DataWriter as DDSDataWriter
from cyclonedds.qos import Policy, Qos
from cyclonedds.sub import DataReader as DDSDataReader
from cyclonedds.topic import Topic as DDSTopic

from dimos.protocol.pubsub.spec import PubSub
from dimos.protocol.service.ddsservice import DDSService
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from cyclonedds.idl import IdlStruct

logger = setup_logger()


# High-throughput QoS preset
HIGH_THROUGHPUT_QOS = Qos(
Policy.Reliability.BestEffort,
Policy.History.KeepLast(depth=1),
Policy.Durability.Volatile,
)

# Reliable QoS preset
RELIABLE_QOS = Qos(
Policy.Reliability.Reliable(max_blocking_time=0),
Policy.History.KeepLast(depth=5000),
Policy.Durability.Volatile,
)


@dataclass(frozen=True)
class Topic:
"""Represents a DDS topic."""

name: str
data_type: type[IdlStruct]

def __str__(self) -> str:
return f"{self.name}#{self.data_type.__name__}"


MessageCallback: TypeAlias = Callable[[Any, Topic], None]


class _DDSMessageListener(Listener):
"""Listener for DataReader that dispatches messages to callbacks."""

__slots__ = ("_callbacks", "_lock", "_topic")

def __init__(self, topic: Topic) -> None:
super().__init__()
self._topic = topic
self._callbacks: tuple[MessageCallback, ...] = ()
self._lock = threading.Lock()

def add_callback(self, callback: MessageCallback) -> None:
"""Add a callback to the listener."""
with self._lock:
self._callbacks = (*self._callbacks, callback)

def remove_callback(self, callback: MessageCallback) -> None:
"""Remove a callback from the listener."""
with self._lock:
self._callbacks = tuple(cb for cb in self._callbacks if cb is not callback)

def on_data_available(self, reader: DDSDataReader[Any]) -> None:
"""Called when data is available on the reader."""
try:
samples = reader.take()
except Exception as e:
logger.error(f"Error reading from topic {self._topic}: {e}", exc_info=True)
return
for sample in samples:
if sample is not None:
for callback in self._callbacks:
try:
callback(sample, self._topic)
except Exception as e:
logger.error(f"Callback error on topic {self._topic}: {e}", exc_info=True)


class DDS(DDSService, PubSub[Topic, Any]):
def __init__(self, qos: Qos | None = None, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._qos = qos
self._writers: dict[Topic, DDSDataWriter[Any]] = {}
self._writer_lock = threading.Lock()
self._readers: dict[Topic, DDSDataReader[Any]] = {}
self._reader_lock = threading.Lock()
self._listeners: dict[Topic, _DDSMessageListener] = {}

@property
def qos(self) -> Qos | None:
"""Get the QoS settings."""
return self._qos

def _get_writer(self, topic: Topic) -> DDSDataWriter[Any]:
"""Get or create a DataWriter for the given topic."""
with self._writer_lock:
if topic not in self._writers:
dds_topic = DDSTopic(self.participant, topic.name, topic.data_type)
self._writers[topic] = DDSDataWriter(self.participant, dds_topic, qos=self._qos)
return self._writers[topic]

def publish(self, topic: Topic, message: Any) -> None:
"""Publish a message to a DDS topic."""
writer = self._get_writer(topic)
try:
writer.write(message)
except Exception as e:
logger.error(f"Error publishing to topic {topic}: {e}", exc_info=True)

def _get_listener(self, topic: Topic) -> _DDSMessageListener:
"""Get or create a listener and reader for the given topic."""
with self._reader_lock:
if topic not in self._readers:
dds_topic = DDSTopic(self.participant, topic.name, topic.data_type)
listener = _DDSMessageListener(topic)
self._readers[topic] = DDSDataReader(
self.participant, dds_topic, qos=self._qos, listener=listener
)
self._listeners[topic] = listener
return self._listeners[topic]

def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]:
"""Subscribe to a DDS topic with a callback."""
listener = self._get_listener(topic)
listener.add_callback(callback)
return lambda: self._unsubscribe_callback(topic, callback)

def _unsubscribe_callback(self, topic: Topic, callback: MessageCallback) -> None:
"""Unsubscribe a callback from a topic."""
with self._reader_lock:
listener = self._listeners.get(topic)
if listener:
listener.remove_callback(callback)

def stop(self) -> None:
"""Stop the DDS service and clean up resources."""
with self._reader_lock:
self._readers.clear()
self._listeners.clear()
with self._writer_lock:
self._writers.clear()
super().stop()


__all__ = [
"DDS",
"HIGH_THROUGHPUT_QOS",
"RELIABLE_QOS",
"MessageCallback",
"Policy",
"Qos",
"Topic",
]
Loading
Loading