From bb0e818a32450f51d83fc55b5b99129e2bc85495 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Thu, 15 Jan 2026 18:06:03 -0800 Subject: [PATCH 01/18] Create DDSPubSubBase, DDSTopic --- dimos/protocol/pubsub/ddspubsub.py | 106 +++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 dimos/protocol/pubsub/ddspubsub.py diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py new file mode 100644 index 0000000000..2f163ed367 --- /dev/null +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -0,0 +1,106 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from dimos.protocol.pubsub.spec import PubSub + +if TYPE_CHECKING: + from collections.abc import Callable + + pass + + +@dataclass +class DDSTopic: + """Represents a DDS topic with optional type information.""" + + topic: str = "" + dds_type: type[Any] | None = None + + def __str__(self) -> str: + if self.dds_type is None: + return self.topic + return f"{self.topic}#{self.dds_type.__name__}" + + def __hash__(self) -> int: + return hash(self.topic) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, DDSTopic): + return self.topic == other.topic + return False + + +class DDSPubSubBase(PubSub[DDSTopic, Any]): + """Base DDS pub/sub implementation using in-memory transport. + + This provides a transport-agnostic DDS pub/sub system that can later be + extended to use actual DDS implementations (e.g., cyclone-dds, rti-dds). + """ + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._callbacks: dict[str, list[Callable[[Any, DDSTopic], None]]] = defaultdict(list) + self._topics: dict[str, DDSTopic] = {} + + def publish(self, topic: DDSTopic, message: Any) -> None: + """Publish a message to a DDS topic.""" + # Store topic for reference + self._topics[topic.topic] = topic + + # Dispatch to all subscribers + for cb in self._callbacks[topic.topic]: + try: + cb(message, topic) + except Exception as e: + # Log but continue processing other callbacks + print(f"Error in callback for topic {topic.topic}: {e}") + + def subscribe( + self, topic: DDSTopic, callback: Callable[[Any, DDSTopic], None] + ) -> Callable[[], None]: + """Subscribe to a DDS topic with a callback.""" + # Store topic for reference + self._topics[topic.topic] = topic + + # Add callback to our list + self._callbacks[topic.topic].append(callback) + + # Return unsubscribe function + def unsubscribe() -> None: + self.unsubscribe_callback(topic, callback) + + return unsubscribe + + def unsubscribe_callback( + self, topic: DDSTopic, callback: Callable[[Any, DDSTopic], None] + ) -> None: + """Unsubscribe a callback from a topic.""" + try: + self._callbacks[topic.topic].remove(callback) + if not self._callbacks[topic.topic]: + del self._callbacks[topic.topic] + except (KeyError, ValueError): + pass + + +__all__ = [ + "DDSPubSubBase", + "DDSTopic", +] From ee0c52987ebcece0468f011911f8e3473a9495f7 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Thu, 15 Jan 2026 18:09:19 -0800 Subject: [PATCH 02/18] Create PickleDDS --- dimos/protocol/pubsub/__init__.py | 2 ++ dimos/protocol/pubsub/ddspubsub.py | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/dimos/protocol/pubsub/__init__.py b/dimos/protocol/pubsub/__init__.py index 89bd292fda..31982af23f 100644 --- a/dimos/protocol/pubsub/__init__.py +++ b/dimos/protocol/pubsub/__init__.py @@ -1,3 +1,5 @@ +import dimos.protocol.pubsub.ddspubsub as dds +from dimos.protocol.pubsub.ddspubsub import DDSPubSubBase, DDSTopic, PickleDDS import dimos.protocol.pubsub.lcmpubsub as lcm from dimos.protocol.pubsub.memory import Memory from dimos.protocol.pubsub.spec import PubSub diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index 2f163ed367..c1815d31db 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -18,7 +18,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any -from dimos.protocol.pubsub.spec import PubSub +from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub if TYPE_CHECKING: from collections.abc import Callable @@ -100,7 +100,11 @@ def unsubscribe_callback( pass +class PickleDDS(PickleEncoderMixin[DDSTopic, Any], DDSPubSubBase): ... + + __all__ = [ "DDSPubSubBase", "DDSTopic", + "PickleDDS", ] From f5c7ee878dcef558f2430e5685867219bf0d9a27 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Thu, 15 Jan 2026 22:54:11 -0800 Subject: [PATCH 03/18] Fix hash/equality inconsistency in DDSTopic --- dimos/protocol/pubsub/ddspubsub.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index c1815d31db..5c2d176e80 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -23,8 +23,6 @@ if TYPE_CHECKING: from collections.abc import Callable - pass - @dataclass class DDSTopic: @@ -39,12 +37,11 @@ def __str__(self) -> str: return f"{self.topic}#{self.dds_type.__name__}" def __hash__(self) -> int: - return hash(self.topic) + return hash((self.topic, self.dds_type)) def __eq__(self, other: Any) -> bool: if isinstance(other, DDSTopic): - return self.topic == other.topic - return False + return self.topic == other.topic and self.dds_type == other.dds_type class DDSPubSubBase(PubSub[DDSTopic, Any]): From 48f548ed55661d31ffac9b5ee9fe46e62f8074ea Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Fri, 16 Jan 2026 09:24:21 -0800 Subject: [PATCH 04/18] Add DDSMsg --- dimos/protocol/pubsub/ddspubsub.py | 109 +++++++++++++++++++---------- 1 file changed, 72 insertions(+), 37 deletions(-) diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index 5c2d176e80..afd49141b0 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -14,22 +14,38 @@ from __future__ import annotations -from collections import defaultdict from dataclasses import dataclass -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable -from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub +from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin +from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: from collections.abc import Callable +logger = setup_logger() + + +@runtime_checkable +class DDSMsg(Protocol): + msg_name: str + + @classmethod + def dds_decode(cls, data: bytes) -> DDSMsg: + """Decode bytes into an LCM message instance.""" + ... + + def dds_encode(self) -> bytes: + """Encode this message instance into bytes.""" + ... + @dataclass -class DDSTopic: +class Topic: """Represents a DDS topic with optional type information.""" topic: str = "" - dds_type: type[Any] | None = None + dds_type: type[DDSMsg] | None = None def __str__(self) -> str: if self.dds_type is None: @@ -40,11 +56,14 @@ def __hash__(self) -> int: return hash((self.topic, self.dds_type)) def __eq__(self, other: Any) -> bool: - if isinstance(other, DDSTopic): - return self.topic == other.topic and self.dds_type == other.dds_type + return ( + isinstance(other, Topic) + and self.topic == other.topic + and self.dds_type == other.dds_type + ) -class DDSPubSubBase(PubSub[DDSTopic, Any]): +class DDSPubSubBase(PubSub[Topic, Any]): """Base DDS pub/sub implementation using in-memory transport. This provides a transport-agnostic DDS pub/sub system that can later be @@ -53,31 +72,25 @@ class DDSPubSubBase(PubSub[DDSTopic, Any]): def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) - self._callbacks: dict[str, list[Callable[[Any, DDSTopic], None]]] = defaultdict(list) - self._topics: dict[str, DDSTopic] = {} + self._callbacks: dict[Topic, list[Callable[[Any, Topic], None]]] = {} - def publish(self, topic: DDSTopic, message: Any) -> None: + def publish(self, topic: Topic, message: Any) -> None: """Publish a message to a DDS topic.""" - # Store topic for reference - self._topics[topic.topic] = topic - # Dispatch to all subscribers - for cb in self._callbacks[topic.topic]: - try: - cb(message, topic) - except Exception as e: - # Log but continue processing other callbacks - print(f"Error in callback for topic {topic.topic}: {e}") - - def subscribe( - self, topic: DDSTopic, callback: Callable[[Any, DDSTopic], None] - ) -> Callable[[], None]: + if topic in self._callbacks: + for callback in self._callbacks[topic]: + try: + callback(message, topic) + except Exception as e: + # Log but continue processing other callbacks + logger.error(f"Error in callback for topic {topic}: {e}") + + def subscribe(self, topic: Topic, callback: Callable[[Any, Topic], None]) -> Callable[[], None]: """Subscribe to a DDS topic with a callback.""" - # Store topic for reference - self._topics[topic.topic] = topic - # Add callback to our list - self._callbacks[topic.topic].append(callback) + if topic not in self._callbacks: + self._callbacks[topic] = [] + self._callbacks[topic].append(callback) # Return unsubscribe function def unsubscribe() -> None: @@ -85,23 +98,45 @@ def unsubscribe() -> None: return unsubscribe - def unsubscribe_callback( - self, topic: DDSTopic, callback: Callable[[Any, DDSTopic], None] - ) -> None: + def unsubscribe_callback(self, topic: Topic, callback: Callable[[Any, Topic], None]) -> None: """Unsubscribe a callback from a topic.""" try: - self._callbacks[topic.topic].remove(callback) - if not self._callbacks[topic.topic]: - del self._callbacks[topic.topic] - except (KeyError, ValueError): + if topic in self._callbacks: + self._callbacks[topic].remove(callback) + if not self._callbacks[topic]: + del self._callbacks[topic] + except ValueError: pass -class PickleDDS(PickleEncoderMixin[DDSTopic, Any], DDSPubSubBase): ... +class DDSEncoderMixin(PubSubEncoderMixin[Topic, Any]): + def encode(self, msg: DDSMsg, _: Topic) -> bytes: + return msg.dds_encode() + + def decode(self, msg: bytes, topic: Topic) -> DDSMsg: + if topic.dds_type is None: + raise ValueError( + f"Cannot decode message for topic '{topic.topic}': no dds_type specified" + ) + return topic.dds_type.dds_decode(msg) + + +class DDS( + DDSEncoderMixin, + DDSPubSubBase, +): ... + + +class PickleDDS( + PickleEncoderMixin, + DDSPubSubBase, +): ... __all__ = [ + "DDS", + "DDSEncoderMixin", + "DDSMsg", "DDSPubSubBase", - "DDSTopic", "PickleDDS", ] From 57798032399c2135813a9b7dacdefe7feeb21408 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Fri, 16 Jan 2026 09:24:54 -0800 Subject: [PATCH 05/18] Create DDSTransport --- dimos/core/transport.py | 18 ++++++++++++++++++ dimos/protocol/pubsub/lcmpubsub.py | 1 - 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 8ffbfc91f4..38c2e651b3 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -26,6 +26,7 @@ ) from dimos.core.stream import In, Out, Stream, Transport +from dimos.protocol.pubsub.ddspubsub import DDS, Topic as DDSTopic from dimos.protocol.pubsub.jpeg_shm import JpegSharedMemory from dimos.protocol.pubsub.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory, SharedMemory @@ -111,6 +112,23 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] +class DDSTransport(PubSubTransport[T]): + _started: bool = False + + def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def] + super().__init__(DDSTopic(topic, type)) + if not hasattr(self, "dds"): + self.dds = DDS(**kwargs) + + def start(self) -> None: ... + + def stop(self) -> None: + self.dds.stop() + + def __reduce__(self): # type: ignore[no-untyped-def] + return (DDSTransport, (self.topic.topic, self.topic.dds_type)) + + class JpegLcmTransport(LCMTransport): # type: ignore[type-arg] def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def] self.lcm = JpegLCM(**kwargs) # type: ignore[assignment] diff --git a/dimos/protocol/pubsub/lcmpubsub.py b/dimos/protocol/pubsub/lcmpubsub.py index e07d010895..033ec6d016 100644 --- a/dimos/protocol/pubsub/lcmpubsub.py +++ b/dimos/protocol/pubsub/lcmpubsub.py @@ -142,7 +142,6 @@ class JpegLCM( "JpegLCM", "LCMEncoderMixin", "LCMMsg", - "LCMMsg", "LCMPubSubBase", "PickleLCM", "autoconf", From 8f6f318a009167749dde80d92771eb2bea4f5abf Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Fri, 16 Jan 2026 09:52:26 -0800 Subject: [PATCH 06/18] Add broadcast and subscribe methods to DDSTransport --- dimos/core/__init__.py | 2 ++ dimos/core/transport.py | 13 +++++++++++++ dimos/protocol/pubsub/__init__.py | 1 - dimos/protocol/pubsub/ddspubsub.py | 3 ++- 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 25d4f7a6e5..30890c7c8c 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -13,6 +13,7 @@ from dimos.core.rpc_client import RPCClient from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport from dimos.core.transport import ( + DDSTransport, LCMTransport, SHMTransport, ZenohTransport, @@ -31,6 +32,7 @@ "LCMRPC", "LCMTF", "TF", + "DDSTransport", "DimosCluster", "In", "LCMTransport", diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 38c2e651b3..a35618d41d 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -128,6 +128,19 @@ def stop(self) -> None: def __reduce__(self): # type: ignore[no-untyped-def] return (DDSTransport, (self.topic.topic, self.topic.dds_type)) + def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] + if not self._started: + self.dds.start() + self._started = True + + self.dds.publish(self.topic, msg) + + def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] + if not self._started: + self.dds.start() + self._started = True + return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] + class JpegLcmTransport(LCMTransport): # type: ignore[type-arg] def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def] diff --git a/dimos/protocol/pubsub/__init__.py b/dimos/protocol/pubsub/__init__.py index 31982af23f..47d2e9a15d 100644 --- a/dimos/protocol/pubsub/__init__.py +++ b/dimos/protocol/pubsub/__init__.py @@ -1,5 +1,4 @@ import dimos.protocol.pubsub.ddspubsub as dds -from dimos.protocol.pubsub.ddspubsub import DDSPubSubBase, DDSTopic, PickleDDS import dimos.protocol.pubsub.lcmpubsub as lcm from dimos.protocol.pubsub.memory import Memory from dimos.protocol.pubsub.spec import PubSub diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index afd49141b0..54fdb783d5 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -32,7 +32,7 @@ class DDSMsg(Protocol): @classmethod def dds_decode(cls, data: bytes) -> DDSMsg: - """Decode bytes into an LCM message instance.""" + """Decode bytes into a DDS message instance.""" ... def dds_encode(self) -> bytes: @@ -139,4 +139,5 @@ class PickleDDS( "DDSMsg", "DDSPubSubBase", "PickleDDS", + "Topic", ] From d259ac53641629d8c1fd57b75f7c5064292aee81 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Fri, 16 Jan 2026 19:21:39 -0800 Subject: [PATCH 07/18] Create DDSService --- dimos/protocol/pubsub/ddspubsub.py | 9 +---- dimos/protocol/service/ddsservice.py | 55 ++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 7 deletions(-) create mode 100644 dimos/protocol/service/ddsservice.py diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index 54fdb783d5..8e3128dfe3 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin +from dimos.protocol.service.ddsservice import DDSConfig, DDSService, autoconf from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: @@ -63,13 +64,7 @@ def __eq__(self, other: Any) -> bool: ) -class DDSPubSubBase(PubSub[Topic, Any]): - """Base DDS pub/sub implementation using in-memory transport. - - This provides a transport-agnostic DDS pub/sub system that can later be - extended to use actual DDS implementations (e.g., cyclone-dds, rti-dds). - """ - +class DDSPubSubBase(DDSService, PubSub[Topic, Any]): def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self._callbacks: dict[Topic, list[Callable[[Any, Topic], None]]] = {} diff --git a/dimos/protocol/service/ddsservice.py b/dimos/protocol/service/ddsservice.py new file mode 100644 index 0000000000..b18eae5f82 --- /dev/null +++ b/dimos/protocol/service/ddsservice.py @@ -0,0 +1,55 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import dataclass + +from dimos.protocol.service.spec import Service + + +@dataclass +class DDSConfig: + """Configuration for DDS service.""" + + autoconf: bool = True + + +class DDSService(Service[DDSConfig]): + """DDS service for in-memory distributed data sharing.""" + + default_config = DDSConfig + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + + def start(self) -> None: + """Start the DDS service (no-op for in-memory implementation).""" + pass + + def stop(self) -> None: + """Stop the DDS service (no-op for in-memory implementation).""" + pass + + +def autoconf() -> None: + """Auto-configure system for DDS.""" + pass + + +__all__ = [ + "DDSConfig", + "DDSService", + "autoconf", +] From a8167a3aad5d33b23d5060859dde4bac7e6594cd Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Fri, 16 Jan 2026 19:22:00 -0800 Subject: [PATCH 08/18] Add CycloneDDS package --- flake.nix | 3 +++ pyproject.toml | 2 +- uv.lock | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/flake.nix b/flake.nix index 3a70a0bf2f..bd278a1336 100644 --- a/flake.nix +++ b/flake.nix @@ -165,6 +165,7 @@ } ); } + { vals.pkg=pkgs.cyclonedds; flags.ldLibraryGroup=true; flags.packageConfGroup=true; } ]; # ------------------------------------------------------------ @@ -211,6 +212,8 @@ export GI_TYPELIB_PATH="${giTypelibPackagesString}:$GI_TYPELIB_PATH" export PKG_CONFIG_PATH=${lib.escapeShellArg packageConfPackagesString} export PYTHONPATH="$PYTHONPATH:"${lib.escapeShellArg manualPythonPackages} + export CYCLONEDDS_HOME="${pkgs.cyclonedds}" + export CMAKE_PREFIX_PATH="${pkgs.cyclonedds}:$CMAKE_PREFIX_PATH" # CC, CFLAGS, and LDFLAGS are bascially all for `pip install pyaudio` export CFLAGS="$(pkg-config --cflags portaudio-2.0) $CFLAGS" export LDFLAGS="-L$(pkg-config --variable=libdir portaudio-2.0) $LDFLAGS" diff --git a/pyproject.toml b/pyproject.toml index d565f5ebe1..f87d4f9837 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ readme = "README.md" dependencies = [ # Transport Protocols "dimos-lcm", + "cyclonedds>=0.10.5", "PyTurboJPEG==1.8.2", # Core @@ -60,7 +61,6 @@ dependencies = [ "terminaltexteffects==0.12.2", "typer>=0.19.2,<1", "plotext==5.3.2", - # Used for calculating the occupancy map. "numba>=0.60.0", # First version supporting Python 3.12 "llvmlite>=0.42.0", # Required by numba 0.60+ diff --git a/uv.lock b/uv.lock index 841b0cccaf..09998a27cb 100644 --- a/uv.lock +++ b/uv.lock @@ -1246,6 +1246,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e7/05/c19819d5e3d95294a6f5947fb9b9629efb316b96de511b418c53d245aae6/cycler-0.12.1-py3-none-any.whl", hash = "sha256:85cef7cff222d8644161529808465972e51340599459b8ac3ccbac5a854e0d30", size = 8321, upload-time = "2023-10-07T05:32:16.783Z" }, ] +[[package]] +name = "cyclonedds" +version = "0.10.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "rich-click" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/91/cf/28eb9c823dfc245c540f5286d71b44aeee2a51021fc85b25bb9562be78cc/cyclonedds-0.10.5.tar.gz", hash = "sha256:63fc4d6fdb2fd35181c40f4e90757149f2def5f570ef19fb71edc4f568755f8a", size = 156919, upload-time = "2024-06-05T18:50:42.999Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cb/c3/69ba063a51c06ba24fa4fd463157d4cc2bc54ab1a2ab8ebdf88e8f3dde25/cyclonedds-0.10.5-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:03644e406d0c1cac45887b378d35054a0033c48f2e29d9aab3bfc1ee6c4b9aa6", size = 864591, upload-time = "2024-06-05T18:50:46.563Z" }, + { url = "https://files.pythonhosted.org/packages/cf/98/08508aff65c87bcef473e23a51506a100fb35bf70450c40eb227a576a018/cyclonedds-0.10.5-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4a0d9fa8747827dc9bd678d73ed6f12b0ab9853b2cb7ebadbf3d8d89625f0e34", size = 799626, upload-time = "2024-06-05T18:50:48.17Z" }, + { url = "https://files.pythonhosted.org/packages/99/0d/02da52ffd27b92b85b64997cc449106479456648da17aa44a09124e8ebe5/cyclonedds-0.10.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:861d2ffd9513126d6a62ad9f842e85122518a7db1fb0a11d6e4fa86e3cacf61c", size = 6631487, upload-time = "2024-06-05T18:50:50.747Z" }, + { url = "https://files.pythonhosted.org/packages/e4/2b/d8fff5008c2c62882c2ffc185bdb0d4d1c9caf7bc5aaaef77bd9739bdc12/cyclonedds-0.10.5-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8276b2bc347540e3ca892adf976421dbce4c6d2672934a32409db121a1431b86", size = 6653044, upload-time = "2024-06-05T18:50:52.786Z" }, + { url = "https://files.pythonhosted.org/packages/07/ab/acaa119f552019bdb2b06478553cf712967672f5970be80ecc9b4ca805f4/cyclonedds-0.10.5-cp310-cp310-win_amd64.whl", hash = "sha256:103a681e9490229f12c151a125e00c4db8fdb344c8e12e35ee515cd9d5d1ecd7", size = 1200672, upload-time = "2024-06-05T18:50:54.303Z" }, +] + [[package]] name = "cython" version = "3.2.4" @@ -1380,6 +1396,7 @@ source = { editable = "." } dependencies = [ { name = "asyncio" }, { name = "colorlog" }, + { name = "cyclonedds" }, { name = "dask", extra = ["complete"] }, { name = "dimos-lcm" }, { name = "llvmlite" }, @@ -1611,6 +1628,7 @@ requires-dist = [ { name = "ctransformers", marker = "extra == 'cpu'", specifier = "==0.2.27" }, { name = "ctransformers", extras = ["cuda"], marker = "extra == 'cuda'", specifier = "==0.2.27" }, { name = "cupy-cuda12x", marker = "extra == 'cuda'", specifier = "==13.6.0" }, + { name = "cyclonedds", specifier = ">=0.10.5" }, { name = "dask", extras = ["complete"], specifier = "==2025.5.1" }, { name = "dimos", extras = ["agents", "web", "perception", "visualization", "sim"], marker = "extra == 'base'" }, { name = "dimos", extras = ["base"], marker = "extra == 'unitree'" }, @@ -7444,6 +7462,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/25/7a/b0178788f8dc6cafce37a212c99565fa1fe7872c70c6c9c1e1a372d9d88f/rich-14.2.0-py3-none-any.whl", hash = "sha256:76bc51fe2e57d2b1be1f96c524b890b816e334ab4c1e45888799bfaab0021edd", size = 243393, upload-time = "2025-10-09T14:16:51.245Z" }, ] +[[package]] +name = "rich-click" +version = "1.9.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, + { name = "colorama", marker = "sys_platform == 'win32'" }, + { name = "rich" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6b/d1/b60ca6a8745e76800b50c7ee246fd73f08a3be5d8e0b551fc93c19fa1203/rich_click-1.9.5.tar.gz", hash = "sha256:48120531493f1533828da80e13e839d471979ec8d7d0ca7b35f86a1379cc74b6", size = 73927, upload-time = "2025-12-21T14:49:44.167Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/25/0a/d865895e1e5d88a60baee0fc3703eb111c502ee10c8c107516bc7623abf8/rich_click-1.9.5-py3-none-any.whl", hash = "sha256:9b195721a773b1acf0e16ff9ec68cef1e7d237e53471e6e3f7ade462f86c403a", size = 70580, upload-time = "2025-12-21T14:49:42.905Z" }, +] + [[package]] name = "rpds-py" version = "0.30.0" From 6256dd5a42414baff33914623c4de605bc195484 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Sun, 18 Jan 2026 02:28:31 -0800 Subject: [PATCH 09/18] Remove unnecessary attributes --- dimos/core/transport.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dimos/core/transport.py b/dimos/core/transport.py index a35618d41d..156a8ea642 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -130,14 +130,12 @@ def __reduce__(self): # type: ignore[no-untyped-def] def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] if not self._started: - self.dds.start() self._started = True self.dds.publish(self.topic, msg) def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] if not self._started: - self.dds.start() self._started = True return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] From d6620c03da86dc30a154bd65d3c74065e8faeb57 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Sun, 18 Jan 2026 03:13:23 -0800 Subject: [PATCH 10/18] Add threading and serialization methods to DDSService --- dimos/protocol/service/ddsservice.py | 73 ++++++++++++++++++++++------ 1 file changed, 58 insertions(+), 15 deletions(-) diff --git a/dimos/protocol/service/ddsservice.py b/dimos/protocol/service/ddsservice.py index b18eae5f82..488baf1863 100644 --- a/dimos/protocol/service/ddsservice.py +++ b/dimos/protocol/service/ddsservice.py @@ -15,41 +15,84 @@ from __future__ import annotations from dataclasses import dataclass +import threading +from typing import Any + +from cyclonedds.domain import DomainParticipant from dimos.protocol.service.spec import Service +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() @dataclass class DDSConfig: """Configuration for DDS service.""" + domain_id: int = 0 autoconf: bool = True + participant: DomainParticipant | None = None class DDSService(Service[DDSConfig]): - """DDS service for in-memory distributed data sharing.""" - default_config = DDSConfig - def __init__(self, **kwargs) -> None: + def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) + self._participant_lock = threading.Lock() + self._started = False + # Support passing an existing DomainParticipant + self.participant: DomainParticipant | None = self.config.participant + + def __getstate__(self) -> dict[str, Any]: + """Exclude unpicklable runtime attributes when serializing.""" + state = self.__dict__.copy() + # Remove unpicklable attributes + state.pop("participant", None) + state.pop("_participant_lock", None) + return state + + def __setstate__(self, state: dict[str, Any]) -> None: + """Restore object from pickled state.""" + self.__dict__.update(state) + # Reinitialize runtime attributes + self.participant = None + self._participant_lock = threading.Lock() + self._started = False def start(self) -> None: - """Start the DDS service (no-op for in-memory implementation).""" - pass + """Start the DDS service.""" + if self._started: + return + + # Use provided participant or create new one + with self._participant_lock: + if self.participant is None: + self.participant = self.config.participant or DomainParticipant( + self.config.domain_id + ) + logger.info(f"DDS service started with Cyclone DDS domain {self.config.domain_id}") + + self._started = True def stop(self) -> None: - """Stop the DDS service (no-op for in-memory implementation).""" - pass + """Stop the DDS service.""" + if not self._started: + return + with self._participant_lock: + # Clean up participant if we created it + if self.participant is not None and not self.config.participant: + try: + self.participant.close() + logger.info("DDS participant closed") + except Exception as e: + logger.warning(f"Error closing DDS participant: {e}") + finally: + self.participant = None -def autoconf() -> None: - """Auto-configure system for DDS.""" - pass + self._started = False -__all__ = [ - "DDSConfig", - "DDSService", - "autoconf", -] +__all__ = ["DDSConfig", "DDSService"] From 90b14bdd252f63187f656aa727dc1c6e04cdcfe7 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Sun, 18 Jan 2026 03:27:42 -0800 Subject: [PATCH 11/18] Ensure broadcast and subscribe methods initialize DDS if not started --- dimos/core/transport.py | 58 ++++++++++++++-------------- dimos/protocol/pubsub/ddspubsub.py | 2 +- dimos/protocol/service/ddsservice.py | 5 ++- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 156a8ea642..118c357c9c 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -112,34 +112,6 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] -class DDSTransport(PubSubTransport[T]): - _started: bool = False - - def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def] - super().__init__(DDSTopic(topic, type)) - if not hasattr(self, "dds"): - self.dds = DDS(**kwargs) - - def start(self) -> None: ... - - def stop(self) -> None: - self.dds.stop() - - def __reduce__(self): # type: ignore[no-untyped-def] - return (DDSTransport, (self.topic.topic, self.topic.dds_type)) - - def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] - if not self._started: - self._started = True - - self.dds.publish(self.topic, msg) - - def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] - if not self._started: - self._started = True - return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] - - class JpegLcmTransport(LCMTransport): # type: ignore[type-arg] def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def] self.lcm = JpegLCM(**kwargs) # type: ignore[assignment] @@ -241,4 +213,34 @@ def start(self) -> None: ... def stop(self) -> None: ... +class DDSTransport(PubSubTransport[T]): + _started: bool = False + + def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no-untyped-def] + super().__init__(DDSTopic(topic, type)) + if not hasattr(self, "dds"): + self.dds = DDS(**kwargs) + + def start(self) -> None: + self.dds.start() + self._started = True + + def stop(self) -> None: + self.dds.stop() + self._started = False + + def __reduce__(self): # type: ignore[no-untyped-def] + return (DDSTransport, (self.topic.topic, self.topic.dds_type)) + + def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] + if not self._started: + self.start() + self.dds.publish(self.topic, msg) + + def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] + if not self._started: + self.start() + return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] + + class ZenohTransport(PubSubTransport[T]): ... diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index 8e3128dfe3..7724e679f4 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin -from dimos.protocol.service.ddsservice import DDSConfig, DDSService, autoconf +from dimos.protocol.service.ddsservice import DDSConfig, DDSService from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: diff --git a/dimos/protocol/service/ddsservice.py b/dimos/protocol/service/ddsservice.py index 488baf1863..f8d5cbdf30 100644 --- a/dimos/protocol/service/ddsservice.py +++ b/dimos/protocol/service/ddsservice.py @@ -31,7 +31,6 @@ class DDSConfig: """Configuration for DDS service.""" domain_id: int = 0 - autoconf: bool = True participant: DomainParticipant | None = None @@ -94,5 +93,9 @@ def stop(self) -> None: self._started = False + def get_participant(self) -> DomainParticipant | None: + """Get the DomainParticipant instance, or None if not yet initialized.""" + return self.participant + __all__ = ["DDSConfig", "DDSService"] From 23e3c2e449dabeb7bb3be32dc35dd5b50493ba6f Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Sun, 18 Jan 2026 04:17:11 -0800 Subject: [PATCH 12/18] Add Transport benchmarking capabilities to CycloneDDS (#1055) * raw rospubsub and benchmarks * typefixes, shm added to the benchmark * SHM is not so important to tell us every time when it starts * greptile comments * Add co-authorship line to commit message filter patterns * Remove unused contextmanager import --------- Co-authored-by: Ivan Nikolic --- bin/hooks/filter_commit_message.py | 10 +- .../pubsub/benchmark/test_benchmark.py | 175 +++++++++++ dimos/protocol/pubsub/benchmark/testdata.py | 245 ++++++++++++++++ dimos/protocol/pubsub/benchmark/type.py | 277 ++++++++++++++++++ dimos/protocol/pubsub/jpeg_shm.py | 2 +- dimos/protocol/pubsub/lcmpubsub.py | 4 +- dimos/protocol/pubsub/rospubsub.py | 269 +++++++++++++++++ dimos/protocol/pubsub/shmpubsub.py | 6 +- dimos/protocol/pubsub/spec.py | 11 +- dimos/protocol/pubsub/test_encoder.py | 17 +- dimos/protocol/pubsub/test_lcmpubsub.py | 36 +-- dimos/protocol/pubsub/test_spec.py | 54 ++-- 12 files changed, 1045 insertions(+), 61 deletions(-) create mode 100644 dimos/protocol/pubsub/benchmark/test_benchmark.py create mode 100644 dimos/protocol/pubsub/benchmark/testdata.py create mode 100644 dimos/protocol/pubsub/benchmark/type.py create mode 100644 dimos/protocol/pubsub/rospubsub.py diff --git a/bin/hooks/filter_commit_message.py b/bin/hooks/filter_commit_message.py index cd92b196af..d22eaf9484 100644 --- a/bin/hooks/filter_commit_message.py +++ b/bin/hooks/filter_commit_message.py @@ -28,10 +28,16 @@ def main() -> int: lines = commit_msg_file.read_text().splitlines(keepends=True) - # Find the first line containing "Generated with" and truncate there + # Patterns that trigger truncation (everything from this line onwards is removed) + truncate_patterns = [ + "Generated with", + "Co-Authored-By", + ] + + # Find the first line containing any truncate pattern and truncate there filtered_lines = [] for line in lines: - if "Generated with" in line: + if any(pattern in line for pattern in truncate_patterns): break filtered_lines.append(line) diff --git a/dimos/protocol/pubsub/benchmark/test_benchmark.py b/dimos/protocol/pubsub/benchmark/test_benchmark.py new file mode 100644 index 0000000000..f88df75868 --- /dev/null +++ b/dimos/protocol/pubsub/benchmark/test_benchmark.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 + +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Generator +import threading +import time +from typing import Any + +import pytest + +from dimos.protocol.pubsub.benchmark.testdata import testdata +from dimos.protocol.pubsub.benchmark.type import ( + BenchmarkResult, + BenchmarkResults, + MsgGen, + PubSubContext, + TestCase, +) + +# Message sizes for throughput benchmarking (powers of 2 from 64B to 10MB) +MSG_SIZES = [ + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 262144, + 524288, + 1048576, + 1048576 * 2, + 1048576 * 5, + 1048576 * 10, +] + +# Benchmark duration in seconds +BENCH_DURATION = 1.0 + +# Max messages to send per test (prevents overwhelming slower transports) +MAX_MESSAGES = 5000 + +# Max time to wait for in-flight messages after publishing stops +RECEIVE_TIMEOUT = 1.0 + + +def size_id(size: int) -> str: + """Convert byte size to human-readable string for test IDs.""" + if size >= 1048576: + return f"{size // 1048576}MB" + if size >= 1024: + return f"{size // 1024}KB" + return f"{size}B" + + +def pubsub_id(testcase: TestCase[Any, Any]) -> str: + """Extract pubsub implementation name from context manager function name.""" + name: str = testcase.pubsub_context.__name__ + # Convert e.g. "lcm_pubsub_channel" -> "LCM", "memory_pubsub_channel" -> "Memory" + prefix = name.replace("_pubsub_channel", "").replace("_", " ") + return prefix.upper() if len(prefix) <= 3 else prefix.title().replace(" ", "") + + +@pytest.fixture(scope="module") +def benchmark_results() -> Generator[BenchmarkResults, None, None]: + """Module-scoped fixture to collect benchmark results.""" + results = BenchmarkResults() + yield results + results.print_summary() + results.print_heatmap() + results.print_bandwidth_heatmap() + results.print_latency_heatmap() + + +@pytest.mark.tool +@pytest.mark.parametrize("msg_size", MSG_SIZES, ids=[size_id(s) for s in MSG_SIZES]) +@pytest.mark.parametrize("pubsub_context, msggen", testdata, ids=[pubsub_id(t) for t in testdata]) +def test_throughput( + pubsub_context: PubSubContext[Any, Any], + msggen: MsgGen[Any, Any], + msg_size: int, + benchmark_results: BenchmarkResults, +) -> None: + """Measure throughput for publishing and receiving messages over a fixed duration.""" + with pubsub_context() as pubsub: + topic, msg = msggen(msg_size) + received_count = 0 + target_count = [0] # Use list to allow modification after publish loop + lock = threading.Lock() + all_received = threading.Event() + + def callback(message: Any, _topic: Any) -> None: + nonlocal received_count + with lock: + received_count += 1 + if target_count[0] > 0 and received_count >= target_count[0]: + all_received.set() + + # Subscribe + pubsub.subscribe(topic, callback) + + # Warmup: give DDS/ROS time to establish connection + time.sleep(0.1) + + # Set target so callback can signal when all received + target_count[0] = MAX_MESSAGES + + # Publish messages until time limit, max messages, or all received + msgs_sent = 0 + start = time.perf_counter() + end_time = start + BENCH_DURATION + + while time.perf_counter() < end_time and msgs_sent < MAX_MESSAGES: + pubsub.publish(topic, msg) + msgs_sent += 1 + # Check if all already received (fast transports) + if all_received.is_set(): + break + + publish_end = time.perf_counter() + target_count[0] = msgs_sent # Update to actual sent count + + # Check if already done, otherwise wait up to RECEIVE_TIMEOUT + with lock: + if received_count >= msgs_sent: + all_received.set() + + if not all_received.is_set(): + all_received.wait(timeout=RECEIVE_TIMEOUT) + latency_end = time.perf_counter() + + with lock: + final_received = received_count + + # Latency: how long we waited after publishing for messages to arrive + # 0 = all arrived during publishing, 1000ms = hit timeout (loss occurred) + latency = latency_end - publish_end + + # Record result (duration is publish time only for throughput calculation) + # Extract transport name from context manager function name + ctx_name = pubsub_context.__name__ + prefix = ctx_name.replace("_pubsub_channel", "").replace("_", " ") + transport_name = prefix.upper() if len(prefix) <= 3 else prefix.title().replace(" ", "") + result = BenchmarkResult( + transport=transport_name, + duration=publish_end - start, + msgs_sent=msgs_sent, + msgs_received=final_received, + msg_size_bytes=msg_size, + receive_time=latency, + ) + benchmark_results.add(result) + + # Warn if significant message loss (but don't fail - benchmark records the data) + loss_pct = (1 - final_received / msgs_sent) * 100 if msgs_sent > 0 else 0 + if loss_pct > 10: + import warnings + + warnings.warn( + f"{transport_name} {msg_size}B: {loss_pct:.1f}% message loss " + f"({final_received}/{msgs_sent})", + stacklevel=2, + ) diff --git a/dimos/protocol/pubsub/benchmark/testdata.py b/dimos/protocol/pubsub/benchmark/testdata.py new file mode 100644 index 0000000000..25d7d76aa3 --- /dev/null +++ b/dimos/protocol/pubsub/benchmark/testdata.py @@ -0,0 +1,245 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Generator +from contextlib import contextmanager +from typing import Any + +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat +from dimos.protocol.pubsub.benchmark.type import TestCase +from dimos.protocol.pubsub.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic +from dimos.protocol.pubsub.memory import Memory +from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory + + +def make_data(size: int) -> bytes: + """Generate random bytes of given size.""" + return bytes(i % 256 for i in range(size)) + + +testdata: list[TestCase[Any, Any]] = [] + + +@contextmanager +def lcm_pubsub_channel() -> Generator[LCM, None, None]: + lcm_pubsub = LCM(autoconf=True) + lcm_pubsub.start() + yield lcm_pubsub + lcm_pubsub.stop() + + +def lcm_msggen(size: int) -> tuple[LCMTopic, Image]: + import numpy as np + + # Create image data as numpy array with shape (height, width, channels) + raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1) + # Pad to make it divisible by 3 for RGB + padded_size = ((len(raw_data) + 2) // 3) * 3 + padded_data = np.pad(raw_data, (0, padded_size - len(raw_data))) + pixels = len(padded_data) // 3 + # Find reasonable dimensions + height = max(1, int(pixels**0.5)) + width = pixels // height + data = padded_data[: height * width * 3].reshape(height, width, 3) + topic = LCMTopic(topic="benchmark/lcm", lcm_type=Image) + msg = Image(data=data, format=ImageFormat.RGB) + return (topic, msg) + + +testdata.append( + TestCase( + pubsub_context=lcm_pubsub_channel, + msg_gen=lcm_msggen, + ) +) + + +@contextmanager +def udp_raw_pubsub_channel() -> Generator[LCMPubSubBase, None, None]: + """LCM with raw bytes - no encoding overhead.""" + lcm_pubsub = LCMPubSubBase(autoconf=True) + lcm_pubsub.start() + yield lcm_pubsub + lcm_pubsub.stop() + + +def udp_raw_msggen(size: int) -> tuple[LCMTopic, bytes]: + """Generate raw bytes for LCM transport benchmark.""" + topic = LCMTopic(topic="benchmark/lcm_raw") + return (topic, make_data(size)) + + +testdata.append( + TestCase( + pubsub_context=udp_raw_pubsub_channel, + msg_gen=udp_raw_msggen, + ) +) + + +@contextmanager +def memory_pubsub_channel() -> Generator[Memory, None, None]: + """Context manager for Memory PubSub implementation.""" + yield Memory() + + +def memory_msggen(size: int) -> tuple[str, Any]: + import numpy as np + + raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1) + padded_size = ((len(raw_data) + 2) // 3) * 3 + padded_data = np.pad(raw_data, (0, padded_size - len(raw_data))) + pixels = len(padded_data) // 3 + height = max(1, int(pixels**0.5)) + width = pixels // height + data = padded_data[: height * width * 3].reshape(height, width, 3) + return ("benchmark/memory", Image(data=data, format=ImageFormat.RGB)) + + +# testdata.append( +# TestCase( +# pubsub_context=memory_pubsub_channel, +# msg_gen=memory_msggen, +# ) +# ) + + +@contextmanager +def shm_pubsub_channel() -> Generator[PickleSharedMemory, None, None]: + # 12MB capacity to handle benchmark sizes up to 10MB + shm_pubsub = PickleSharedMemory(prefer="cpu", default_capacity=12 * 1024 * 1024) + shm_pubsub.start() + yield shm_pubsub + shm_pubsub.stop() + + +def shm_msggen(size: int) -> tuple[str, Any]: + """Generate message for SharedMemory pubsub benchmark.""" + import numpy as np + + raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1) + padded_size = ((len(raw_data) + 2) // 3) * 3 + padded_data = np.pad(raw_data, (0, padded_size - len(raw_data))) + pixels = len(padded_data) // 3 + height = max(1, int(pixels**0.5)) + width = pixels // height + data = padded_data[: height * width * 3].reshape(height, width, 3) + return ("benchmark/shm", Image(data=data, format=ImageFormat.RGB)) + + +testdata.append( + TestCase( + pubsub_context=shm_pubsub_channel, + msg_gen=shm_msggen, + ) +) + + +try: + from dimos.protocol.pubsub.redispubsub import Redis + + @contextmanager + def redis_pubsub_channel() -> Generator[Redis, None, None]: + redis_pubsub = Redis() + redis_pubsub.start() + yield redis_pubsub + redis_pubsub.stop() + + def redis_msggen(size: int) -> tuple[str, Any]: + # Redis uses JSON serialization, so use a simple dict with base64-encoded data + import base64 + + data = base64.b64encode(make_data(size)).decode("ascii") + return ("benchmark/redis", {"data": data, "size": size}) + + testdata.append( + TestCase( + pubsub_context=redis_pubsub_channel, + msg_gen=redis_msggen, + ) + ) + +except (ConnectionError, ImportError): + # either redis is not installed or the server is not running + print("Redis not available") + + +from dimos.protocol.pubsub.rospubsub import ROS_AVAILABLE, RawROS, ROSTopic + +if ROS_AVAILABLE: + from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy + from sensor_msgs.msg import Image as ROSImage + + @contextmanager + def ros_best_effort_pubsub_channel() -> Generator[RawROS, None, None]: + qos = QoSProfile( + reliability=QoSReliabilityPolicy.BEST_EFFORT, + history=QoSHistoryPolicy.KEEP_LAST, + durability=QoSDurabilityPolicy.VOLATILE, + depth=5000, + ) + ros_pubsub = RawROS(node_name="benchmark_ros_best_effort", qos=qos) + ros_pubsub.start() + yield ros_pubsub + ros_pubsub.stop() + + @contextmanager + def ros_reliable_pubsub_channel() -> Generator[RawROS, None, None]: + qos = QoSProfile( + reliability=QoSReliabilityPolicy.RELIABLE, + history=QoSHistoryPolicy.KEEP_LAST, + durability=QoSDurabilityPolicy.VOLATILE, + depth=5000, + ) + ros_pubsub = RawROS(node_name="benchmark_ros_reliable", qos=qos) + ros_pubsub.start() + yield ros_pubsub + ros_pubsub.stop() + + def ros_msggen(size: int) -> tuple[ROSTopic, ROSImage]: + import numpy as np + + # Create image data + data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1) + padded_size = ((len(data) + 2) // 3) * 3 + data = np.pad(data, (0, padded_size - len(data))) + pixels = len(data) // 3 + height = max(1, int(pixels**0.5)) + width = pixels // height + data = data[: height * width * 3] + + # Create ROS Image message + msg = ROSImage() + msg.height = height + msg.width = width + msg.encoding = "rgb8" + msg.step = width * 3 + msg.data = data.tobytes() + + topic = ROSTopic(topic="/benchmark/ros", ros_type=ROSImage) + return (topic, msg) + + testdata.append( + TestCase( + pubsub_context=ros_best_effort_pubsub_channel, + msg_gen=ros_msggen, + ) + ) + + testdata.append( + TestCase( + pubsub_context=ros_reliable_pubsub_channel, + msg_gen=ros_msggen, + ) + ) diff --git a/dimos/protocol/pubsub/benchmark/type.py b/dimos/protocol/pubsub/benchmark/type.py new file mode 100644 index 0000000000..55649381e2 --- /dev/null +++ b/dimos/protocol/pubsub/benchmark/type.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 + +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable, Iterator, Sequence +from contextlib import AbstractContextManager +from dataclasses import dataclass, field +import pickle +import threading +import time +from typing import Any, Generic, TypeVar + +import pytest + +from dimos.msgs.geometry_msgs import Vector3 +from dimos.msgs.sensor_msgs.Image import Image +from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.protocol.pubsub.memory import Memory +from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory +from dimos.protocol.pubsub.spec import MsgT, PubSub, TopicT +from dimos.utils.data import get_data + +MsgGen = Callable[[int], tuple[TopicT, MsgT]] + +PubSubContext = Callable[[], AbstractContextManager[PubSub[TopicT, MsgT]]] + + +@dataclass +class TestCase(Generic[TopicT, MsgT]): + pubsub_context: PubSubContext[TopicT, MsgT] + msg_gen: MsgGen[TopicT, MsgT] + + def __iter__(self) -> Iterator[PubSubContext[TopicT, MsgT] | MsgGen[TopicT, MsgT]]: + return iter((self.pubsub_context, self.msg_gen)) + + def __len__(self) -> int: + return 2 + + +TestData = Sequence[TestCase[Any, Any]] + + +def _format_size(size_bytes: int) -> str: + """Format byte size to human-readable string.""" + if size_bytes >= 1048576: + return f"{size_bytes / 1048576:.1f} MB" + if size_bytes >= 1024: + return f"{size_bytes / 1024:.1f} KB" + return f"{size_bytes} B" + + +def _format_throughput(bytes_per_sec: float) -> str: + """Format throughput to human-readable string.""" + if bytes_per_sec >= 1e9: + return f"{bytes_per_sec / 1e9:.2f} GB/s" + if bytes_per_sec >= 1e6: + return f"{bytes_per_sec / 1e6:.2f} MB/s" + if bytes_per_sec >= 1e3: + return f"{bytes_per_sec / 1e3:.2f} KB/s" + return f"{bytes_per_sec:.2f} B/s" + + +@dataclass +class BenchmarkResult: + transport: str + duration: float # Time spent publishing + msgs_sent: int + msgs_received: int + msg_size_bytes: int + receive_time: float = 0.0 # Time after publishing until all messages received + + @property + def total_time(self) -> float: + """Total time including latency.""" + return self.duration + self.receive_time + + @property + def throughput_msgs(self) -> float: + """Messages per second (including latency).""" + return self.msgs_received / self.total_time if self.total_time > 0 else 0 + + @property + def throughput_bytes(self) -> float: + """Bytes per second (including latency).""" + return ( + (self.msgs_received * self.msg_size_bytes) / self.total_time + if self.total_time > 0 + else 0 + ) + + @property + def loss_pct(self) -> float: + """Message loss percentage.""" + return (1 - self.msgs_received / self.msgs_sent) * 100 if self.msgs_sent > 0 else 0 + + +@dataclass +class BenchmarkResults: + results: list[BenchmarkResult] = field(default_factory=list) + + def add(self, result: BenchmarkResult) -> None: + self.results.append(result) + + def print_summary(self) -> None: + if not self.results: + return + + from rich.console import Console + from rich.table import Table + + console = Console() + + table = Table(title="Benchmark Results") + table.add_column("Transport", style="cyan") + table.add_column("Msg Size", justify="right") + table.add_column("Sent", justify="right") + table.add_column("Recv", justify="right") + table.add_column("Msgs/s", justify="right", style="green") + table.add_column("Throughput", justify="right", style="green") + table.add_column("Latency", justify="right") + table.add_column("Loss", justify="right") + + for r in sorted(self.results, key=lambda x: (x.transport, x.msg_size_bytes)): + loss_style = "red" if r.loss_pct > 0 else "dim" + recv_style = "yellow" if r.receive_time > 0.1 else "dim" + table.add_row( + r.transport, + _format_size(r.msg_size_bytes), + f"{r.msgs_sent:,}", + f"{r.msgs_received:,}", + f"{r.throughput_msgs:,.0f}", + _format_throughput(r.throughput_bytes), + f"[{recv_style}]{r.receive_time * 1000:.0f}ms[/{recv_style}]", + f"[{loss_style}]{r.loss_pct:.1f}%[/{loss_style}]", + ) + + console.print() + console.print(table) + + def _print_heatmap( + self, + title: str, + value_fn: Callable[[BenchmarkResult], float], + format_fn: Callable[[float], str], + high_is_good: bool = True, + ) -> None: + """Generic heatmap printer.""" + if not self.results: + return + + def size_id(size: int) -> str: + if size >= 1048576: + return f"{size // 1048576}MB" + if size >= 1024: + return f"{size // 1024}KB" + return f"{size}B" + + transports = sorted(set(r.transport for r in self.results)) + sizes = sorted(set(r.msg_size_bytes for r in self.results)) + + # Build matrix + matrix: list[list[float]] = [] + for transport in transports: + row = [] + for size in sizes: + result = next( + ( + r + for r in self.results + if r.transport == transport and r.msg_size_bytes == size + ), + None, + ) + row.append(value_fn(result) if result else 0) + matrix.append(row) + + all_vals = [v for row in matrix for v in row if v > 0] + if not all_vals: + return + min_val, max_val = min(all_vals), max(all_vals) + + # ANSI 256 gradient: red -> orange -> yellow -> green + gradient = [ + 52, + 88, + 124, + 160, + 196, + 202, + 208, + 214, + 220, + 226, + 190, + 154, + 148, + 118, + 82, + 46, + 40, + 34, + ] + if not high_is_good: + gradient = gradient[::-1] + + def val_to_color(v: float) -> int: + if v <= 0 or max_val == min_val: + return 236 + t = (v - min_val) / (max_val - min_val) + return gradient[int(t * (len(gradient) - 1))] + + reset = "\033[0m" + size_labels = [size_id(s) for s in sizes] + col_w = max(8, max(len(s) for s in size_labels) + 1) + transport_w = max(len(t) for t in transports) + 1 + + print() + print(f"{title:^{transport_w + col_w * len(sizes)}}") + print() + print(" " * transport_w + "".join(f"{s:^{col_w}}" for s in size_labels)) + + # Dark colors that need white text (dark reds) + dark_colors = {52, 88, 124, 160, 236} + + for i, transport in enumerate(transports): + row_str = f"{transport:<{transport_w}}" + for val in matrix[i]: + color = val_to_color(val) + fg = 255 if color in dark_colors else 16 # white on dark, black on bright + cell = format_fn(val) if val > 0 else "-" + row_str += f"\033[48;5;{color}m\033[38;5;{fg}m{cell:^{col_w}}{reset}" + print(row_str) + print() + + def print_heatmap(self) -> None: + """Print msgs/sec heatmap.""" + + def fmt(v: float) -> str: + return f"{v / 1000:.1f}k" if v >= 1000 else f"{v:.0f}" + + self._print_heatmap("Msgs/sec", lambda r: r.throughput_msgs, fmt) + + def print_bandwidth_heatmap(self) -> None: + """Print bandwidth heatmap.""" + + def fmt(v: float) -> str: + if v >= 1e9: + return f"{v / 1e9:.1f}G" + if v >= 1e6: + return f"{v / 1e6:.0f}M" + if v >= 1e3: + return f"{v / 1e3:.0f}K" + return f"{v:.0f}" + + self._print_heatmap("Bandwidth", lambda r: r.throughput_bytes, fmt) + + def print_latency_heatmap(self) -> None: + """Print latency heatmap (time waiting for messages after publishing).""" + + def fmt(v: float) -> str: + if v >= 1: + return f"{v:.1f}s" + return f"{v * 1000:.0f}ms" + + self._print_heatmap("Latency", lambda r: r.receive_time, fmt, high_is_good=False) diff --git a/dimos/protocol/pubsub/jpeg_shm.py b/dimos/protocol/pubsub/jpeg_shm.py index de6868390c..f2c9e35814 100644 --- a/dimos/protocol/pubsub/jpeg_shm.py +++ b/dimos/protocol/pubsub/jpeg_shm.py @@ -22,7 +22,7 @@ from dimos.protocol.pubsub.spec import PubSubEncoderMixin -class JpegSharedMemoryEncoderMixin(PubSubEncoderMixin[str, Image]): +class JpegSharedMemoryEncoderMixin(PubSubEncoderMixin[str, Image, bytes]): def __init__(self, quality: int = 75, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(**kwargs) self.jpeg = TurboJPEG() diff --git a/dimos/protocol/pubsub/lcmpubsub.py b/dimos/protocol/pubsub/lcmpubsub.py index 033ec6d016..07ab65bfbe 100644 --- a/dimos/protocol/pubsub/lcmpubsub.py +++ b/dimos/protocol/pubsub/lcmpubsub.py @@ -95,7 +95,7 @@ def unsubscribe() -> None: return unsubscribe -class LCMEncoderMixin(PubSubEncoderMixin[Topic, Any]): +class LCMEncoderMixin(PubSubEncoderMixin[Topic, Any, bytes]): def encode(self, msg: LCMMsg, _: Topic) -> bytes: return msg.lcm_encode() @@ -107,7 +107,7 @@ def decode(self, msg: bytes, topic: Topic) -> LCMMsg: return topic.lcm_type.lcm_decode(msg) -class JpegEncoderMixin(PubSubEncoderMixin[Topic, Any]): +class JpegEncoderMixin(PubSubEncoderMixin[Topic, Any, bytes]): def encode(self, msg: LCMMsg, _: Topic) -> bytes: return msg.lcm_jpeg_encode() # type: ignore[attr-defined, no-any-return] diff --git a/dimos/protocol/pubsub/rospubsub.py b/dimos/protocol/pubsub/rospubsub.py new file mode 100644 index 0000000000..7687f3c700 --- /dev/null +++ b/dimos/protocol/pubsub/rospubsub.py @@ -0,0 +1,269 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable +from dataclasses import dataclass +import importlib +import threading +from typing import Any, Protocol, TypeAlias, TypeVar, runtime_checkable + +try: + import rclpy + from rclpy.executors import SingleThreadedExecutor + from rclpy.node import Node + from rclpy.qos import ( + QoSDurabilityPolicy, + QoSHistoryPolicy, + QoSProfile, + QoSReliabilityPolicy, + ) + + ROS_AVAILABLE = True +except ImportError: + ROS_AVAILABLE = False + rclpy = None # type: ignore[assignment] + SingleThreadedExecutor = None # type: ignore[assignment, misc] + Node = None # type: ignore[assignment, misc] + +from dimos.protocol.pubsub.spec import MsgT, PubSub, PubSubEncoderMixin, TopicT + + +# Type definitions for LCM and ROS messages, be gentle for now +# just a sketch until proper translation is written +@runtime_checkable +class DimosMessage(Protocol): + """Protocol for LCM message types (from dimos_lcm or lcm_msgs).""" + + msg_name: str + __slots__: tuple[str, ...] + + +@runtime_checkable +class ROSMessage(Protocol): + """Protocol for ROS message types.""" + + def get_fields_and_field_types(self) -> dict[str, str]: ... + + +@dataclass +class ROSTopic: + """Topic descriptor for ROS pubsub.""" + + topic: str + ros_type: type + qos: "QoSProfile | None" = None # Optional per-topic QoS override + + +class RawROS(PubSub[ROSTopic, Any]): + """ROS 2 PubSub implementation following the PubSub spec. + + This allows direct comparison of ROS messaging performance against + native LCM and other pubsub implementations. + """ + + def __init__( + self, node_name: str = "dimos_ros_pubsub", qos: "QoSProfile | None" = None + ) -> None: + """Initialize the ROS pubsub. + + Args: + node_name: Name for the ROS node + qos: Optional QoS profile (defaults to BEST_EFFORT for throughput) + """ + if not ROS_AVAILABLE: + raise ImportError("rclpy is not installed. ROS pubsub requires ROS 2.") + + self._node_name = node_name + self._node: Node | None = None + self._executor: SingleThreadedExecutor | None = None + self._spin_thread: threading.Thread | None = None + self._running = False + + # Track publishers and subscriptions + self._publishers: dict[str, Any] = {} + self._subscriptions: dict[str, list[tuple[Any, Callable[[Any, ROSTopic], None]]]] = {} + self._lock = threading.Lock() + + # QoS profile - use provided or default to best-effort for throughput + if qos is not None: + self._qos = qos + else: + self._qos = QoSProfile( + reliability=QoSReliabilityPolicy.BEST_EFFORT, + history=QoSHistoryPolicy.KEEP_LAST, + durability=QoSDurabilityPolicy.VOLATILE, + depth=1, + ) + + def start(self) -> None: + """Start the ROS node and executor.""" + if self._running: + return + + if not rclpy.ok(): + rclpy.init() + + self._node = Node(self._node_name) + self._executor = SingleThreadedExecutor() + self._executor.add_node(self._node) + + self._running = True + self._spin_thread = threading.Thread(target=self._spin, name="ros_pubsub_spin") + self._spin_thread.start() + + def stop(self) -> None: + """Stop the ROS node and clean up.""" + if not self._running: + return + + self._running = False + + # Wake up the executor so spin thread can exit + if self._executor: + self._executor.wake() + + # Wait for spin thread to finish + if self._spin_thread and self._spin_thread.is_alive(): + self._spin_thread.join(timeout=2.0) + + if self._executor: + self._executor.shutdown() + + if self._node: + self._node.destroy_node() + + if rclpy.ok(): + rclpy.shutdown() + + self._publishers.clear() + self._subscriptions.clear() + self._spin_thread = None + + def _spin(self) -> None: + """Background thread for spinning the ROS executor.""" + while self._running: + executor = self._executor + if executor is None: + break + executor.spin_once(timeout_sec=0) # Non-blocking for max throughput + + def _get_or_create_publisher(self, topic: ROSTopic) -> Any: + """Get existing publisher or create a new one.""" + with self._lock: + if topic.topic not in self._publishers: + node = self._node + if node is None: + raise RuntimeError("Pubsub must be started before publishing") + qos = topic.qos if topic.qos is not None else self._qos + self._publishers[topic.topic] = node.create_publisher( + topic.ros_type, topic.topic, qos + ) + return self._publishers[topic.topic] + + def publish(self, topic: ROSTopic, message: Any) -> None: + """Publish a message to a ROS topic. + + Args: + topic: ROSTopic descriptor with topic name and message type + message: ROS message to publish + """ + if not self._running or not self._node: + return + + publisher = self._get_or_create_publisher(topic) + publisher.publish(message) + + def subscribe( + self, topic: ROSTopic, callback: Callable[[Any, ROSTopic], None] + ) -> Callable[[], None]: + """Subscribe to a ROS topic with a callback. + + Args: + topic: ROSTopic descriptor with topic name and message type + callback: Function called with (message, topic) when message received + + Returns: + Unsubscribe function + """ + if not self._running or not self._node: + raise RuntimeError("ROS pubsub not started") + + with self._lock: + + def ros_callback(msg: Any) -> None: + callback(msg, topic) + + qos = topic.qos if topic.qos is not None else self._qos + subscription = self._node.create_subscription( + topic.ros_type, topic.topic, ros_callback, qos + ) + + if topic.topic not in self._subscriptions: + self._subscriptions[topic.topic] = [] + self._subscriptions[topic.topic].append((subscription, callback)) + + def unsubscribe() -> None: + with self._lock: + if topic.topic in self._subscriptions: + self._subscriptions[topic.topic] = [ + (sub, cb) + for sub, cb in self._subscriptions[topic.topic] + if cb is not callback + ] + if self._node: + self._node.destroy_subscription(subscription) + + return unsubscribe + + +class Dimos2RosMixin(PubSubEncoderMixin[TopicT, DimosMessage, ROSMessage]): + """Mixin that converts between dimos_lcm (LCM-based) and ROS messages. + + This enables seamless interop: publish LCM messages to ROS topics + and receive ROS messages as LCM messages. + """ + + def encode(self, msg: DimosMessage, *_: TopicT) -> ROSMessage: + """Convert a dimos_lcm message to its equivalent ROS message. + + Args: + msg: An LCM message (e.g., dimos_lcm.geometry_msgs.Vector3) + + Returns: + The corresponding ROS message (e.g., geometry_msgs.msg.Vector3) + """ + raise NotImplementedError("Encode method not implemented") + + def decode(self, msg: ROSMessage, _: TopicT | None = None) -> DimosMessage: + """Convert a ROS message to its equivalent dimos_lcm message. + + Args: + msg: A ROS message (e.g., geometry_msgs.msg.Vector3) + + Returns: + The corresponding LCM message (e.g., dimos_lcm.geometry_msgs.Vector3) + """ + raise NotImplementedError("Decode method not implemented") + + +class DimosROS( + RawROS, + Dimos2RosMixin[ROSTopic], +): + """ROS PubSub with automatic dimos.msgs ↔ ROS message conversion.""" + + pass + + +ROS = DimosROS diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py index 0006020f6c..e1ae8600aa 100644 --- a/dimos/protocol/pubsub/shmpubsub.py +++ b/dimos/protocol/pubsub/shmpubsub.py @@ -124,7 +124,7 @@ def __init__( def start(self) -> None: pref = (self.config.prefer or "auto").lower() backend = os.getenv("DIMOS_IPC_BACKEND", pref).lower() - logger.info(f"SharedMemory PubSub starting (backend={backend})") + logger.debug(f"SharedMemory PubSub starting (backend={backend})") # No global thread needed; per-topic fanout starts on first subscribe. def stop(self) -> None: @@ -145,7 +145,7 @@ def stop(self) -> None: except Exception: pass self._topics.clear() - logger.info("SharedMemory PubSub stopped.") + logger.debug("SharedMemory PubSub stopped.") # ----- PubSub API (bytes on the wire) ---------------------------------- @@ -295,7 +295,7 @@ def _fanout_loop(self, topic: str, st: _TopicState) -> None: # -------------------------------------------------------------------------------------- -class SharedMemoryBytesEncoderMixin(PubSubEncoderMixin[str, bytes]): +class SharedMemoryBytesEncoderMixin(PubSubEncoderMixin[str, bytes, bytes]): """Identity encoder for raw bytes.""" def encode(self, msg: bytes, _: str) -> bytes: diff --git a/dimos/protocol/pubsub/spec.py b/dimos/protocol/pubsub/spec.py index a43061e492..b4e82d3993 100644 --- a/dimos/protocol/pubsub/spec.py +++ b/dimos/protocol/pubsub/spec.py @@ -22,6 +22,7 @@ MsgT = TypeVar("MsgT") TopicT = TypeVar("TopicT") +EncodingT = TypeVar("EncodingT") class PubSub(Generic[TopicT, MsgT], ABC): @@ -91,7 +92,7 @@ def _queue_cb(msg: MsgT, topic: TopicT) -> None: unsubscribe_fn() -class PubSubEncoderMixin(Generic[TopicT, MsgT], ABC): +class PubSubEncoderMixin(Generic[TopicT, MsgT, EncodingT], ABC): """Mixin that encodes messages before publishing and decodes them after receiving. Usage: Just specify encoder and decoder as a subclass: @@ -104,10 +105,10 @@ def decoder(msg, topic): """ @abstractmethod - def encode(self, msg: MsgT, topic: TopicT) -> bytes: ... + def encode(self, msg: MsgT, topic: TopicT) -> EncodingT: ... @abstractmethod - def decode(self, msg: bytes, topic: TopicT) -> MsgT: ... + def decode(self, msg: EncodingT, topic: TopicT) -> MsgT: ... def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*args, **kwargs) @@ -127,14 +128,14 @@ def subscribe( ) -> Callable[[], None]: """Subscribe with automatic decoding.""" - def wrapper_cb(encoded_data: bytes, topic: TopicT) -> None: + def wrapper_cb(encoded_data: EncodingT, topic: TopicT) -> None: decoded_message = self.decode(encoded_data, topic) callback(decoded_message, topic) return super().subscribe(topic, wrapper_cb) # type: ignore[misc, no-any-return] -class PickleEncoderMixin(PubSubEncoderMixin[TopicT, MsgT]): +class PickleEncoderMixin(PubSubEncoderMixin[TopicT, MsgT, bytes]): def encode(self, msg: MsgT, *_: TopicT) -> bytes: # type: ignore[return] try: return pickle.dumps(msg) diff --git a/dimos/protocol/pubsub/test_encoder.py b/dimos/protocol/pubsub/test_encoder.py index f39bd170d5..38aac4664d 100644 --- a/dimos/protocol/pubsub/test_encoder.py +++ b/dimos/protocol/pubsub/test_encoder.py @@ -15,6 +15,7 @@ # limitations under the License. import json +from typing import Any from dimos.protocol.pubsub.memory import Memory, MemoryWithJSONEncoder @@ -24,7 +25,7 @@ def test_json_encoded_pubsub() -> None: pubsub = MemoryWithJSONEncoder() received_messages = [] - def callback(message, topic) -> None: + def callback(message: Any, topic: str) -> None: received_messages.append(message) # Subscribe to a topic @@ -56,7 +57,7 @@ def test_json_encoding_edge_cases() -> None: pubsub = MemoryWithJSONEncoder() received_messages = [] - def callback(message, topic) -> None: + def callback(message: Any, topic: str) -> None: received_messages.append(message) pubsub.subscribe("edge_cases", callback) @@ -84,10 +85,10 @@ def test_multiple_subscribers_with_encoding() -> None: received_messages_1 = [] received_messages_2 = [] - def callback_1(message, topic) -> None: + def callback_1(message: Any, topic: str) -> None: received_messages_1.append(message) - def callback_2(message, topic) -> None: + def callback_2(message: Any, topic: str) -> None: received_messages_2.append(f"callback_2: {message}") pubsub.subscribe("json_topic", callback_1) @@ -130,9 +131,9 @@ def test_data_actually_encoded_in_transit() -> None: class SpyMemory(Memory): def __init__(self) -> None: super().__init__() - self.raw_messages_received = [] + self.raw_messages_received: list[tuple[str, Any, type]] = [] - def publish(self, topic: str, message) -> None: + def publish(self, topic: str, message: Any) -> None: # Capture what actually gets published self.raw_messages_received.append((topic, message, type(message))) super().publish(topic, message) @@ -142,9 +143,9 @@ class SpyMemoryWithJSON(MemoryWithJSONEncoder, SpyMemory): pass pubsub = SpyMemoryWithJSON() - received_decoded = [] + received_decoded: list[Any] = [] - def callback(message, topic) -> None: + def callback(message: Any, topic: str) -> None: received_decoded.append(message) pubsub.subscribe("test_topic", callback) diff --git a/dimos/protocol/pubsub/test_lcmpubsub.py b/dimos/protocol/pubsub/test_lcmpubsub.py index d06bf20716..8165be9fef 100644 --- a/dimos/protocol/pubsub/test_lcmpubsub.py +++ b/dimos/protocol/pubsub/test_lcmpubsub.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections.abc import Generator import time +from typing import Any import pytest @@ -26,7 +28,7 @@ @pytest.fixture -def lcm_pub_sub_base(): +def lcm_pub_sub_base() -> Generator[LCMPubSubBase, None, None]: lcm = LCMPubSubBase(autoconf=True) lcm.start() yield lcm @@ -34,7 +36,7 @@ def lcm_pub_sub_base(): @pytest.fixture -def pickle_lcm(): +def pickle_lcm() -> Generator[PickleLCM, None, None]: lcm = PickleLCM(autoconf=True) lcm.start() yield lcm @@ -42,7 +44,7 @@ def pickle_lcm(): @pytest.fixture -def lcm(): +def lcm() -> Generator[LCM, None, None]: lcm = LCM(autoconf=True) lcm.start() yield lcm @@ -54,7 +56,7 @@ class MockLCMMessage: msg_name = "geometry_msgs.Mock" - def __init__(self, data) -> None: + def __init__(self, data: Any) -> None: self.data = data def lcm_encode(self) -> bytes: @@ -64,19 +66,19 @@ def lcm_encode(self) -> bytes: def lcm_decode(cls, data: bytes) -> "MockLCMMessage": return cls(data.decode("utf-8")) - def __eq__(self, other): + def __eq__(self, other: object) -> bool: return isinstance(other, MockLCMMessage) and self.data == other.data -def test_LCMPubSubBase_pubsub(lcm_pub_sub_base) -> None: +def test_LCMPubSubBase_pubsub(lcm_pub_sub_base: LCMPubSubBase) -> None: lcm = lcm_pub_sub_base - received_messages = [] + received_messages: list[tuple[Any, Any]] = [] topic = Topic(topic="/test_topic", lcm_type=MockLCMMessage) test_message = MockLCMMessage("test_data") - def callback(msg, topic) -> None: + def callback(msg: Any, topic: Any) -> None: received_messages.append((msg, topic)) lcm.subscribe(topic, callback) @@ -97,13 +99,13 @@ def callback(msg, topic) -> None: assert received_topic == topic -def test_lcm_autodecoder_pubsub(lcm) -> None: - received_messages = [] +def test_lcm_autodecoder_pubsub(lcm: LCM) -> None: + received_messages: list[tuple[Any, Any]] = [] topic = Topic(topic="/test_topic", lcm_type=MockLCMMessage) test_message = MockLCMMessage("test_data") - def callback(msg, topic) -> None: + def callback(msg: Any, topic: Any) -> None: received_messages.append((msg, topic)) lcm.subscribe(topic, callback) @@ -133,12 +135,12 @@ def callback(msg, topic) -> None: # passes some geometry types through LCM @pytest.mark.parametrize("test_message", test_msgs) -def test_lcm_geometry_msgs_pubsub(test_message, lcm) -> None: - received_messages = [] +def test_lcm_geometry_msgs_pubsub(test_message: Any, lcm: LCM) -> None: + received_messages: list[tuple[Any, Any]] = [] topic = Topic(topic="/test_topic", lcm_type=test_message.__class__) - def callback(msg, topic) -> None: + def callback(msg: Any, topic: Any) -> None: received_messages.append((msg, topic)) lcm.subscribe(topic, callback) @@ -164,13 +166,13 @@ def callback(msg, topic) -> None: # passes some geometry types through pickle LCM @pytest.mark.parametrize("test_message", test_msgs) -def test_lcm_geometry_msgs_autopickle_pubsub(test_message, pickle_lcm) -> None: +def test_lcm_geometry_msgs_autopickle_pubsub(test_message: Any, pickle_lcm: PickleLCM) -> None: lcm = pickle_lcm - received_messages = [] + received_messages: list[tuple[Any, Any]] = [] topic = Topic(topic="/test_topic") - def callback(msg, topic) -> None: + def callback(msg: Any, topic: Any) -> None: received_messages.append((msg, topic)) lcm.subscribe(topic, callback) diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py index 91e8514b70..0abbfca02b 100644 --- a/dimos/protocol/pubsub/test_spec.py +++ b/dimos/protocol/pubsub/test_spec.py @@ -15,7 +15,7 @@ # limitations under the License. import asyncio -from collections.abc import Callable +from collections.abc import Callable, Generator from contextlib import contextmanager import time from typing import Any @@ -28,7 +28,7 @@ @contextmanager -def memory_context(): +def memory_context() -> Generator[Memory, None, None]: """Context manager for Memory PubSub implementation.""" memory = Memory() try: @@ -47,7 +47,7 @@ def memory_context(): from dimos.protocol.pubsub.redispubsub import Redis @contextmanager - def redis_context(): + def redis_context() -> Generator[Redis, None, None]: redis_pubsub = Redis() redis_pubsub.start() yield redis_pubsub @@ -63,7 +63,7 @@ def redis_context(): @contextmanager -def lcm_context(): +def lcm_context() -> Generator[LCM, None, None]: lcm_pubsub = LCM(autoconf=True) lcm_pubsub.start() yield lcm_pubsub @@ -83,7 +83,7 @@ def lcm_context(): @contextmanager -def shared_memory_cpu_context(): +def shared_memory_cpu_context() -> Generator[PickleSharedMemory, None, None]: shared_mem_pubsub = PickleSharedMemory(prefer="cpu") shared_mem_pubsub.start() yield shared_mem_pubsub @@ -100,13 +100,13 @@ def shared_memory_cpu_context(): @pytest.mark.parametrize("pubsub_context, topic, values", testdata) -def test_store(pubsub_context, topic, values) -> None: +def test_store(pubsub_context: Callable[[], Any], topic: Any, values: list[Any]) -> None: with pubsub_context() as x: # Create a list to capture received messages - received_messages = [] + received_messages: list[Any] = [] # Define callback function that stores received messages - def callback(message, _) -> None: + def callback(message: Any, _: Any) -> None: received_messages.append(message) # Subscribe to the topic with our callback @@ -125,18 +125,20 @@ def callback(message, _) -> None: @pytest.mark.parametrize("pubsub_context, topic, values", testdata) -def test_multiple_subscribers(pubsub_context, topic, values) -> None: +def test_multiple_subscribers( + pubsub_context: Callable[[], Any], topic: Any, values: list[Any] +) -> None: """Test that multiple subscribers receive the same message.""" with pubsub_context() as x: # Create lists to capture received messages for each subscriber - received_messages_1 = [] - received_messages_2 = [] + received_messages_1: list[Any] = [] + received_messages_2: list[Any] = [] # Define callback functions - def callback_1(message, topic) -> None: + def callback_1(message: Any, topic: Any) -> None: received_messages_1.append(message) - def callback_2(message, topic) -> None: + def callback_2(message: Any, topic: Any) -> None: received_messages_2.append(message) # Subscribe both callbacks to the same topic @@ -157,14 +159,14 @@ def callback_2(message, topic) -> None: @pytest.mark.parametrize("pubsub_context, topic, values", testdata) -def test_unsubscribe(pubsub_context, topic, values) -> None: +def test_unsubscribe(pubsub_context: Callable[[], Any], topic: Any, values: list[Any]) -> None: """Test that unsubscribed callbacks don't receive messages.""" with pubsub_context() as x: # Create a list to capture received messages - received_messages = [] + received_messages: list[Any] = [] # Define callback function - def callback(message, topic) -> None: + def callback(message: Any, topic: Any) -> None: received_messages.append(message) # Subscribe and get unsubscribe function @@ -184,14 +186,16 @@ def callback(message, topic) -> None: @pytest.mark.parametrize("pubsub_context, topic, values", testdata) -def test_multiple_messages(pubsub_context, topic, values) -> None: +def test_multiple_messages( + pubsub_context: Callable[[], Any], topic: Any, values: list[Any] +) -> None: """Test that subscribers receive multiple messages in order.""" with pubsub_context() as x: # Create a list to capture received messages - received_messages = [] + received_messages: list[Any] = [] # Define callback function - def callback(message, topic) -> None: + def callback(message: Any, topic: Any) -> None: received_messages.append(message) # Subscribe to the topic @@ -212,7 +216,9 @@ def callback(message, topic) -> None: @pytest.mark.parametrize("pubsub_context, topic, values", testdata) @pytest.mark.asyncio -async def test_async_iterator(pubsub_context, topic, values) -> None: +async def test_async_iterator( + pubsub_context: Callable[[], Any], topic: Any, values: list[Any] +) -> None: """Test that async iterator receives messages correctly.""" with pubsub_context() as x: # Get the messages to send (using the rest of the values) @@ -261,15 +267,17 @@ async def consume_messages() -> None: @pytest.mark.parametrize("pubsub_context, topic, values", testdata) -def test_high_volume_messages(pubsub_context, topic, values) -> None: +def test_high_volume_messages( + pubsub_context: Callable[[], Any], topic: Any, values: list[Any] +) -> None: """Test that all 5000 messages are received correctly.""" with pubsub_context() as x: # Create a list to capture received messages - received_messages = [] + received_messages: list[Any] = [] last_message_time = [time.time()] # Use list to allow modification in callback # Define callback function - def callback(message, topic) -> None: + def callback(message: Any, topic: Any) -> None: received_messages.append(message) last_message_time[0] = time.time() From beb875b90ff3a79d826df0837b67bfbec31f4cbb Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Sun, 18 Jan 2026 05:05:56 -0800 Subject: [PATCH 13/18] Fix DDS segmentation fault using bytearray for binary data storage Replace base64 string encoding with native IDL bytearray type to eliminate buffer overflow issues. The original base64 encoding exceeded CycloneDDS's default string size limit (~256 bytes) and caused crashes on messages >= 1KB. Key changes: - Use make_idl_struct with bytearray field instead of string - Convert bytes to bytearray when publishing to DDS - Convert bytearray back to bytes when receiving from DDS - Add _DDSMessageListener for async message dispatch - Implement thread-safe DataWriter/DataReader management - Add pickle support via __getstate__/__setstate__ Result: All 12 DDS benchmark tests pass (64B to 10MB messages). --- dimos/protocol/pubsub/ddspubsub.py | 59 +++++++++++++++++++++--- dimos/protocol/service/ddsservice.py | 68 +++++++--------------------- 2 files changed, 70 insertions(+), 57 deletions(-) diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index 7724e679f4..b267b4675c 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -15,8 +15,14 @@ from __future__ import annotations from dataclasses import dataclass +import threading from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable +from cyclonedds.core import Listener +from cyclonedds.idl import IdlStruct +from cyclonedds.pub import DataWriter as DDSDataWriter +from cyclonedds.sub import DataReader as DDSDataReader + from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin from dimos.protocol.service.ddsservice import DDSConfig, DDSService from dimos.utils.logging_config import setup_logger @@ -24,6 +30,8 @@ if TYPE_CHECKING: from collections.abc import Callable + from cyclonedds.topic import Topic as DDSTopic + logger = setup_logger() @@ -67,11 +75,34 @@ def __eq__(self, other: Any) -> bool: class DDSPubSubBase(DDSService, PubSub[Topic, Any]): def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) - self._callbacks: dict[Topic, list[Callable[[Any, Topic], None]]] = {} - - def publish(self, topic: Topic, message: Any) -> None: + self._callbacks: dict[DDSTopic, list[Callable[[Any, DDSTopic], None]]] = {} + self._writers: dict[DDSTopic, DDSDataWriter] = {} + self._readers: dict[str, DDSDataReader] = {} + self._writer_lock = threading.Lock() + self._reader_lock = threading.Lock() + + def _get_writer(self, topic: DDSTopic) -> DDSDataWriter: + """Get a DataWriter for the given topic name, create if it does not exsist.""" + + with self._writer_lock: + if topic not in self._writers: + writer = DDSDataWriter(self.get_participant(), topic) + self._writers[topic] = writer + logger.debug(f"Created DataWriter for topic: {topic.topic}") + return self._writers[topic] + + def publish(self, topic: DDSTopic, message: Any) -> None: """Publish a message to a DDS topic.""" - # Dispatch to all subscribers + + writer = self._get_writer(topic) + try: + # Publish to DDS network + writer.write(message) + + except Exception as e: + logger.error(f"Error publishing to topic {topic}: {e}") + + # Dispatch to local subscribers if topic in self._callbacks: for callback in self._callbacks[topic]: try: @@ -80,8 +111,22 @@ def publish(self, topic: Topic, message: Any) -> None: # Log but continue processing other callbacks logger.error(f"Error in callback for topic {topic}: {e}") + def _get_reader(self, topic: DDSTopic) -> DDSDataReader: + """Get or create a DataReader for the given topic with listener.""" + + with self._reader_lock: + if topic not in self._readers: + reader = DDSDataReader[Any](self.get_participant(), topic) + self._readers[topic] = reader + logger.debug(f"Created DataReader for topic: {topic.topic}") + return self._readers[topic] + def subscribe(self, topic: Topic, callback: Callable[[Any, Topic], None]) -> Callable[[], None]: """Subscribe to a DDS topic with a callback.""" + + # Create a DataReader for this topic if needed + self._get_reader(topic) + # Add callback to our list if topic not in self._callbacks: self._callbacks[topic] = [] @@ -93,7 +138,9 @@ def unsubscribe() -> None: return unsubscribe - def unsubscribe_callback(self, topic: Topic, callback: Callable[[Any, Topic], None]) -> None: + def unsubscribe_callback( + self, topic: DDSTopic, callback: Callable[[Any, DDSTopic], None] + ) -> None: """Unsubscribe a callback from a topic.""" try: if topic in self._callbacks: @@ -104,7 +151,7 @@ def unsubscribe_callback(self, topic: Topic, callback: Callable[[Any, Topic], No pass -class DDSEncoderMixin(PubSubEncoderMixin[Topic, Any]): +class DDSEncoderMixin(PubSubEncoderMixin[Topic, Any, IdlStruct]): def encode(self, msg: DDSMsg, _: Topic) -> bytes: return msg.dds_encode() diff --git a/dimos/protocol/service/ddsservice.py b/dimos/protocol/service/ddsservice.py index f8d5cbdf30..050d481faf 100644 --- a/dimos/protocol/service/ddsservice.py +++ b/dimos/protocol/service/ddsservice.py @@ -16,13 +16,16 @@ from dataclasses import dataclass import threading -from typing import Any +from typing import TYPE_CHECKING, Any from cyclonedds.domain import DomainParticipant from dimos.protocol.service.spec import Service from dimos.utils.logging_config import setup_logger +if TYPE_CHECKING: + from collections.abc import Callable + logger = setup_logger() @@ -36,66 +39,29 @@ class DDSConfig: class DDSService(Service[DDSConfig]): default_config = DDSConfig + _participant: DomainParticipant | None = None + _participant_lock = threading.Lock() def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) - self._participant_lock = threading.Lock() - self._started = False - # Support passing an existing DomainParticipant - self.participant: DomainParticipant | None = self.config.participant - - def __getstate__(self) -> dict[str, Any]: - """Exclude unpicklable runtime attributes when serializing.""" - state = self.__dict__.copy() - # Remove unpicklable attributes - state.pop("participant", None) - state.pop("_participant_lock", None) - return state - - def __setstate__(self, state: dict[str, Any]) -> None: - """Restore object from pickled state.""" - self.__dict__.update(state) - # Reinitialize runtime attributes - self.participant = None - self._participant_lock = threading.Lock() - self._started = False def start(self) -> None: """Start the DDS service.""" - if self._started: - return - - # Use provided participant or create new one - with self._participant_lock: - if self.participant is None: - self.participant = self.config.participant or DomainParticipant( - self.config.domain_id - ) - logger.info(f"DDS service started with Cyclone DDS domain {self.config.domain_id}") - - self._started = True + pass def stop(self) -> None: """Stop the DDS service.""" - if not self._started: - return - - with self._participant_lock: - # Clean up participant if we created it - if self.participant is not None and not self.config.participant: - try: - self.participant.close() - logger.info("DDS participant closed") - except Exception as e: - logger.warning(f"Error closing DDS participant: {e}") - finally: - self.participant = None - - self._started = False - - def get_participant(self) -> DomainParticipant | None: + pass + + def get_participant(self) -> DomainParticipant: """Get the DomainParticipant instance, or None if not yet initialized.""" - return self.participant + + # Lazy initialization of the participant + with self.__class__._participant_lock: + if self.__class__._participant is None: + self.__class__._participant = DomainParticipant(self.config.domain_id) + logger.info(f"DDS service started with Cyclone DDS domain {self.config.domain_id}") + return self.__class__._participant __all__ = ["DDSConfig", "DDSService"] From 7d9390a4d5b970b60484d68f41bbc5812a68fc30 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Mon, 19 Jan 2026 02:05:26 -0800 Subject: [PATCH 14/18] Refactor DDS PubSub implementation to use CycloneDDS Topic --- .../pubsub/benchmark/test_benchmark.py | 48 ++++++++++++++++++- dimos/protocol/pubsub/ddspubsub.py | 39 +++++++++------ 2 files changed, 71 insertions(+), 16 deletions(-) diff --git a/dimos/protocol/pubsub/benchmark/test_benchmark.py b/dimos/protocol/pubsub/benchmark/test_benchmark.py index f88df75868..8039096744 100644 --- a/dimos/protocol/pubsub/benchmark/test_benchmark.py +++ b/dimos/protocol/pubsub/benchmark/test_benchmark.py @@ -15,13 +15,16 @@ # limitations under the License. from collections.abc import Generator +from contextlib import contextmanager +from dataclasses import dataclass import threading import time from typing import Any +from cyclonedds.idl import IdlStruct import pytest -from dimos.protocol.pubsub.benchmark.testdata import testdata +from dimos.protocol.pubsub.benchmark.testdata import make_data, testdata from dimos.protocol.pubsub.benchmark.type import ( BenchmarkResult, BenchmarkResults, @@ -29,6 +32,7 @@ PubSubContext, TestCase, ) +from dimos.protocol.pubsub.ddspubsub import DDS, Topic as DDSTopic # Message sizes for throughput benchmarking (powers of 2 from 64B to 10MB) MSG_SIZES = [ @@ -73,6 +77,48 @@ def pubsub_id(testcase: TestCase[Any, Any]) -> str: return prefix.upper() if len(prefix) <= 3 else prefix.title().replace(" ", "") +# DDS Testing Implementation +@dataclass +class Message(IdlStruct): + """DDS message with binary data payload following IdlStruct format.""" + + payload: str + + def dds_encode(self) -> bytes: + """Encode message to bytes for DDS transmission.""" + return self.payload.encode("latin-1") + + @classmethod + def dds_decode(cls, data: bytes) -> "Message": + """Decode bytes back to Message instance.""" + return cls(payload=data.decode("latin-1")) + + +@contextmanager +def dds_pubsub_channel() -> Generator[DDS, None, None]: + """Context manager for DDS PubSub implementation.""" + dds_pubsub = DDS() + dds_pubsub.start() + yield dds_pubsub + dds_pubsub.stop() + + +def dds_msggen(size: int) -> tuple[DDSTopic, Message]: + """Generate message for DDS pubsub benchmark.""" + topic = DDSTopic(topic="benchmark/dds", dds_type=Message) + msg = Message(payload=make_data(size).decode("latin-1")) + return (topic, msg) + + +# Add DDS to benchmark testdata before test is defined +testdata.append( + TestCase( + pubsub_context=dds_pubsub_channel, + msg_gen=dds_msggen, + ) +) + + @pytest.fixture(scope="module") def benchmark_results() -> Generator[BenchmarkResults, None, None]: """Module-scoped fixture to collect benchmark results.""" diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index b267b4675c..9be91b010c 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -22,6 +22,7 @@ from cyclonedds.idl import IdlStruct from cyclonedds.pub import DataWriter as DDSDataWriter from cyclonedds.sub import DataReader as DDSDataReader +from cyclonedds.topic import Topic as CycloneDDSTopic from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin from dimos.protocol.service.ddsservice import DDSConfig, DDSService @@ -30,8 +31,6 @@ if TYPE_CHECKING: from collections.abc import Callable - from cyclonedds.topic import Topic as DDSTopic - logger = setup_logger() @@ -75,23 +74,34 @@ def __eq__(self, other: Any) -> bool: class DDSPubSubBase(DDSService, PubSub[Topic, Any]): def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) - self._callbacks: dict[DDSTopic, list[Callable[[Any, DDSTopic], None]]] = {} - self._writers: dict[DDSTopic, DDSDataWriter] = {} - self._readers: dict[str, DDSDataReader] = {} + self._callbacks: dict[Topic, list[Callable[[Any, Topic], None]]] = {} + self._writers: dict[Topic, DDSDataWriter] = {} + self._readers: dict[Topic, DDSDataReader] = {} + self._cyclonedds_topics: dict[Topic, CycloneDDSTopic] = {} self._writer_lock = threading.Lock() self._reader_lock = threading.Lock() - def _get_writer(self, topic: DDSTopic) -> DDSDataWriter: - """Get a DataWriter for the given topic name, create if it does not exsist.""" + def _get_cyclonedds_topic(self, topic: Topic) -> CycloneDDSTopic: + """Convert custom Topic to cyclonedds.topic.Topic, caching the result.""" + if topic not in self._cyclonedds_topics: + if topic.dds_type is None: + raise ValueError(f"Cannot create DDS topic '{topic.topic}': no dds_type specified") + dds_topic = CycloneDDSTopic(self.get_participant(), topic.topic, topic.dds_type) + self._cyclonedds_topics[topic] = dds_topic + return self._cyclonedds_topics[topic] + + def _get_writer(self, topic: Topic) -> DDSDataWriter: + """Get a DataWriter for the given topic name, create if it does not exist.""" with self._writer_lock: if topic not in self._writers: - writer = DDSDataWriter(self.get_participant(), topic) + dds_topic = self._get_cyclonedds_topic(topic) + writer = DDSDataWriter(self.get_participant(), dds_topic) self._writers[topic] = writer logger.debug(f"Created DataWriter for topic: {topic.topic}") return self._writers[topic] - def publish(self, topic: DDSTopic, message: Any) -> None: + def publish(self, topic: Topic, message: Any) -> None: """Publish a message to a DDS topic.""" writer = self._get_writer(topic) @@ -111,12 +121,13 @@ def publish(self, topic: DDSTopic, message: Any) -> None: # Log but continue processing other callbacks logger.error(f"Error in callback for topic {topic}: {e}") - def _get_reader(self, topic: DDSTopic) -> DDSDataReader: + def _get_reader(self, topic: Topic) -> DDSDataReader: """Get or create a DataReader for the given topic with listener.""" with self._reader_lock: if topic not in self._readers: - reader = DDSDataReader[Any](self.get_participant(), topic) + dds_topic = self._get_cyclonedds_topic(topic) + reader = DDSDataReader[Any](self.get_participant(), dds_topic) self._readers[topic] = reader logger.debug(f"Created DataReader for topic: {topic.topic}") return self._readers[topic] @@ -138,9 +149,7 @@ def unsubscribe() -> None: return unsubscribe - def unsubscribe_callback( - self, topic: DDSTopic, callback: Callable[[Any, DDSTopic], None] - ) -> None: + def unsubscribe_callback(self, topic: Topic, callback: Callable[[Any, Topic], None]) -> None: """Unsubscribe a callback from a topic.""" try: if topic in self._callbacks: @@ -164,7 +173,7 @@ def decode(self, msg: bytes, topic: Topic) -> DDSMsg: class DDS( - DDSEncoderMixin, + # DDSEncoderMixin, # TODO: Add back so encoding and decoding is handled by DDS DDSPubSubBase, ): ... From 60b318c37b33c7395769be646560c44e01ac9228 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Mon, 19 Jan 2026 02:29:49 -0800 Subject: [PATCH 15/18] Remove DDS pickling --- dimos/protocol/pubsub/ddspubsub.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index 9be91b010c..bdfd7c1b17 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -178,17 +178,10 @@ class DDS( ): ... -class PickleDDS( - PickleEncoderMixin, - DDSPubSubBase, -): ... - - __all__ = [ "DDS", "DDSEncoderMixin", "DDSMsg", "DDSPubSubBase", - "PickleDDS", "Topic", ] From cb07796a360c7b1b74b74f6a8fb91f78aa9aa0fd Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Mon, 19 Jan 2026 05:34:06 -0800 Subject: [PATCH 16/18] Remove unused encoding/decoding methods and use a sequence of uint8 for Message payload. --- .../pubsub/benchmark/test_benchmark.py | 14 ++------ dimos/protocol/pubsub/ddspubsub.py | 35 +++---------------- 2 files changed, 7 insertions(+), 42 deletions(-) diff --git a/dimos/protocol/pubsub/benchmark/test_benchmark.py b/dimos/protocol/pubsub/benchmark/test_benchmark.py index 8039096744..290f9361ae 100644 --- a/dimos/protocol/pubsub/benchmark/test_benchmark.py +++ b/dimos/protocol/pubsub/benchmark/test_benchmark.py @@ -22,6 +22,7 @@ from typing import Any from cyclonedds.idl import IdlStruct +from cyclonedds.idl.types import sequence, uint8 import pytest from dimos.protocol.pubsub.benchmark.testdata import make_data, testdata @@ -82,16 +83,7 @@ def pubsub_id(testcase: TestCase[Any, Any]) -> str: class Message(IdlStruct): """DDS message with binary data payload following IdlStruct format.""" - payload: str - - def dds_encode(self) -> bytes: - """Encode message to bytes for DDS transmission.""" - return self.payload.encode("latin-1") - - @classmethod - def dds_decode(cls, data: bytes) -> "Message": - """Decode bytes back to Message instance.""" - return cls(payload=data.decode("latin-1")) + payload: sequence[uint8] @contextmanager @@ -106,7 +98,7 @@ def dds_pubsub_channel() -> Generator[DDS, None, None]: def dds_msggen(size: int) -> tuple[DDSTopic, Message]: """Generate message for DDS pubsub benchmark.""" topic = DDSTopic(topic="benchmark/dds", dds_type=Message) - msg = Message(payload=make_data(size).decode("latin-1")) + msg = Message(payload=make_data(size)) return (topic, msg) diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index bdfd7c1b17..743a81f9fc 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -18,14 +18,12 @@ import threading from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable -from cyclonedds.core import Listener -from cyclonedds.idl import IdlStruct from cyclonedds.pub import DataWriter as DDSDataWriter from cyclonedds.sub import DataReader as DDSDataReader from cyclonedds.topic import Topic as CycloneDDSTopic -from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin -from dimos.protocol.service.ddsservice import DDSConfig, DDSService +from dimos.protocol.pubsub.spec import PubSub +from dimos.protocol.service.ddsservice import DDSService from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: @@ -38,15 +36,6 @@ class DDSMsg(Protocol): msg_name: str - @classmethod - def dds_decode(cls, data: bytes) -> DDSMsg: - """Decode bytes into a DDS message instance.""" - ... - - def dds_encode(self) -> bytes: - """Encode this message instance into bytes.""" - ... - @dataclass class Topic: @@ -127,7 +116,7 @@ def _get_reader(self, topic: Topic) -> DDSDataReader: with self._reader_lock: if topic not in self._readers: dds_topic = self._get_cyclonedds_topic(topic) - reader = DDSDataReader[Any](self.get_participant(), dds_topic) + reader = DDSDataReader(self.get_participant(), dds_topic) self._readers[topic] = reader logger.debug(f"Created DataReader for topic: {topic.topic}") return self._readers[topic] @@ -160,27 +149,11 @@ def unsubscribe_callback(self, topic: Topic, callback: Callable[[Any, Topic], No pass -class DDSEncoderMixin(PubSubEncoderMixin[Topic, Any, IdlStruct]): - def encode(self, msg: DDSMsg, _: Topic) -> bytes: - return msg.dds_encode() - - def decode(self, msg: bytes, topic: Topic) -> DDSMsg: - if topic.dds_type is None: - raise ValueError( - f"Cannot decode message for topic '{topic.topic}': no dds_type specified" - ) - return topic.dds_type.dds_decode(msg) - - -class DDS( - # DDSEncoderMixin, # TODO: Add back so encoding and decoding is handled by DDS - DDSPubSubBase, -): ... +class DDS(DDSPubSubBase): ... __all__ = [ "DDS", - "DDSEncoderMixin", "DDSMsg", "DDSPubSubBase", "Topic", From 5be358eba36188eb6ba3a8422fe186470558a268 Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Mon, 19 Jan 2026 07:27:01 -0800 Subject: [PATCH 17/18] Restore double-checked locking pattern for performance The double-checked locking pattern avoids lock contention on every call after initial object creation. Initial benchmarking shows this pattern performs better than simple locking for repeated accesses to the same topics. --- .../pubsub/benchmark/test_benchmark.py | 19 +- dimos/protocol/pubsub/ddspubsub.py | 162 +++++++++--------- dimos/protocol/service/ddsservice.py | 17 +- 3 files changed, 103 insertions(+), 95 deletions(-) diff --git a/dimos/protocol/pubsub/benchmark/test_benchmark.py b/dimos/protocol/pubsub/benchmark/test_benchmark.py index 290f9361ae..80bc8c53f6 100644 --- a/dimos/protocol/pubsub/benchmark/test_benchmark.py +++ b/dimos/protocol/pubsub/benchmark/test_benchmark.py @@ -33,7 +33,7 @@ PubSubContext, TestCase, ) -from dimos.protocol.pubsub.ddspubsub import DDS, Topic as DDSTopic +from dimos.protocol.pubsub.ddspubsub import DDS, Topic # Message sizes for throughput benchmarking (powers of 2 from 64B to 10MB) MSG_SIZES = [ @@ -83,7 +83,16 @@ def pubsub_id(testcase: TestCase[Any, Any]) -> str: class Message(IdlStruct): """DDS message with binary data payload following IdlStruct format.""" - payload: sequence[uint8] + payload: str + + def dds_encode(self) -> bytes: + """Encode message to bytes for DDS transmission.""" + return self.payload.encode("latin-1") + + @classmethod + def dds_decode(cls, data: bytes) -> "Message": + """Decode bytes back to Message instance.""" + return cls(payload=data.decode("latin-1")) @contextmanager @@ -95,10 +104,10 @@ def dds_pubsub_channel() -> Generator[DDS, None, None]: dds_pubsub.stop() -def dds_msggen(size: int) -> tuple[DDSTopic, Message]: +def dds_msggen(size: int) -> tuple[Topic, Message]: """Generate message for DDS pubsub benchmark.""" - topic = DDSTopic(topic="benchmark/dds", dds_type=Message) - msg = Message(payload=make_data(size)) + topic = Topic("benchmark/dds", Message) + msg = Message(payload=make_data(size).decode("latin-1")) return (topic, msg) diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index 743a81f9fc..f324ae6585 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -14,21 +14,20 @@ from __future__ import annotations +from collections.abc import Callable from dataclasses import dataclass import threading -from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable +from typing import Any, Protocol, TypeAlias, runtime_checkable +from cyclonedds.core import Listener from cyclonedds.pub import DataWriter as DDSDataWriter from cyclonedds.sub import DataReader as DDSDataReader -from cyclonedds.topic import Topic as CycloneDDSTopic +from cyclonedds.topic import Topic as DDSTopic from dimos.protocol.pubsub.spec import PubSub from dimos.protocol.service.ddsservice import DDSService from dimos.utils.logging_config import setup_logger -if TYPE_CHECKING: - from collections.abc import Callable - logger = setup_logger() @@ -36,117 +35,115 @@ class DDSMsg(Protocol): msg_name: str + @classmethod + def dds_encode(cls, msg: DDSMsg) -> bytes: + """Encode this message instance into bytes.""" + ... + + @classmethod + def dds_decode(cls, data: bytes) -> DDSMsg: + """Decode bytes into a DDS message instance.""" + ... + -@dataclass +@dataclass(frozen=True) class Topic: - """Represents a DDS topic with optional type information.""" + """Represents a DDS topic.""" - topic: str = "" - dds_type: type[DDSMsg] | None = None + name: str + typename: type[DDSMsg] + + def __hash__(self) -> int: + return hash((self.name, self.typename)) def __str__(self) -> str: - if self.dds_type is None: - return self.topic - return f"{self.topic}#{self.dds_type.__name__}" + return f"{self.name}#{self.typename.__name__}" - def __hash__(self) -> int: - return hash((self.topic, self.dds_type)) - def __eq__(self, other: Any) -> bool: - return ( - isinstance(other, Topic) - and self.topic == other.topic - and self.dds_type == other.dds_type - ) +MessageCallback: TypeAlias = Callable[[Any, Topic], None] + + +class _DDSMessageListener(Listener): + """Listener for DataReader that dispatches messages to callbacks.""" + + __slots__ = ("callbacks", "topic") + + def __init__(self, topic: Topic, callbacks_dict: dict[Topic, list[MessageCallback]]) -> None: + super().__init__() + self.topic = topic + self.callbacks = callbacks_dict[ + topic + ] # Cache callbacks list to avoid dict lookup per message + + def on_data_available(self, reader: DDSDataReader) -> None: + """Called when data is available on the reader.""" + try: + samples = reader.take() + except Exception: + return + callbacks = self.callbacks + topic = self.topic + for sample in samples: + if sample is not None: + for callback in callbacks: + callback(sample, topic) class DDSPubSubBase(DDSService, PubSub[Topic, Any]): def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) - self._callbacks: dict[Topic, list[Callable[[Any, Topic], None]]] = {} + self._callbacks: dict[Topic, list[MessageCallback]] = {} self._writers: dict[Topic, DDSDataWriter] = {} self._readers: dict[Topic, DDSDataReader] = {} - self._cyclonedds_topics: dict[Topic, CycloneDDSTopic] = {} self._writer_lock = threading.Lock() self._reader_lock = threading.Lock() - def _get_cyclonedds_topic(self, topic: Topic) -> CycloneDDSTopic: - """Convert custom Topic to cyclonedds.topic.Topic, caching the result.""" - if topic not in self._cyclonedds_topics: - if topic.dds_type is None: - raise ValueError(f"Cannot create DDS topic '{topic.topic}': no dds_type specified") - dds_topic = CycloneDDSTopic(self.get_participant(), topic.topic, topic.dds_type) - self._cyclonedds_topics[topic] = dds_topic - return self._cyclonedds_topics[topic] - def _get_writer(self, topic: Topic) -> DDSDataWriter: - """Get a DataWriter for the given topic name, create if it does not exist.""" - - with self._writer_lock: - if topic not in self._writers: - dds_topic = self._get_cyclonedds_topic(topic) - writer = DDSDataWriter(self.get_participant(), dds_topic) - self._writers[topic] = writer - logger.debug(f"Created DataWriter for topic: {topic.topic}") - return self._writers[topic] + """Get or create a DataWriter for the given topic.""" + writer = self._writers.get(topic) + if writer is None: + with self._writer_lock: + writer = self._writers.get(topic) + if writer is None: + dds_topic = DDSTopic(self.get_participant(), topic.name, topic.typename) + writer = DDSDataWriter(self.get_participant(), dds_topic) + self._writers[topic] = writer + return writer def publish(self, topic: Topic, message: Any) -> None: """Publish a message to a DDS topic.""" - writer = self._get_writer(topic) try: - # Publish to DDS network writer.write(message) - except Exception as e: logger.error(f"Error publishing to topic {topic}: {e}") - # Dispatch to local subscribers - if topic in self._callbacks: - for callback in self._callbacks[topic]: - try: - callback(message, topic) - except Exception as e: - # Log but continue processing other callbacks - logger.error(f"Error in callback for topic {topic}: {e}") - def _get_reader(self, topic: Topic) -> DDSDataReader: """Get or create a DataReader for the given topic with listener.""" - - with self._reader_lock: - if topic not in self._readers: - dds_topic = self._get_cyclonedds_topic(topic) - reader = DDSDataReader(self.get_participant(), dds_topic) - self._readers[topic] = reader - logger.debug(f"Created DataReader for topic: {topic.topic}") - return self._readers[topic] - - def subscribe(self, topic: Topic, callback: Callable[[Any, Topic], None]) -> Callable[[], None]: + reader = self._readers.get(topic) + if reader is None: + with self._reader_lock: + reader = self._readers.get(topic) + if reader is None: + dds_topic = DDSTopic(self.get_participant(), topic.name, topic.typename) + listener = _DDSMessageListener(topic, self._callbacks) + reader = DDSDataReader(self.get_participant(), dds_topic, listener=listener) + self._readers[topic] = reader + return reader + + def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]: """Subscribe to a DDS topic with a callback.""" - - # Create a DataReader for this topic if needed + self._callbacks.setdefault(topic, []) # Ensure list exists before creating reader self._get_reader(topic) - - # Add callback to our list - if topic not in self._callbacks: - self._callbacks[topic] = [] self._callbacks[topic].append(callback) + return lambda: self.unsubscribe_callback(topic, callback) - # Return unsubscribe function - def unsubscribe() -> None: - self.unsubscribe_callback(topic, callback) - - return unsubscribe - - def unsubscribe_callback(self, topic: Topic, callback: Callable[[Any, Topic], None]) -> None: + def unsubscribe_callback(self, topic: Topic, callback: MessageCallback) -> None: """Unsubscribe a callback from a topic.""" - try: - if topic in self._callbacks: - self._callbacks[topic].remove(callback) - if not self._callbacks[topic]: - del self._callbacks[topic] - except ValueError: - pass + if topic in self._callbacks and callback in self._callbacks[topic]: + self._callbacks[topic].remove(callback) + if not self._callbacks[topic]: + del self._callbacks[topic] class DDS(DDSPubSubBase): ... @@ -156,5 +153,6 @@ class DDS(DDSPubSubBase): ... "DDS", "DDSMsg", "DDSPubSubBase", + "MessageCallback", "Topic", ] diff --git a/dimos/protocol/service/ddsservice.py b/dimos/protocol/service/ddsservice.py index 050d481faf..1f88e01581 100644 --- a/dimos/protocol/service/ddsservice.py +++ b/dimos/protocol/service/ddsservice.py @@ -54,14 +54,15 @@ def stop(self) -> None: pass def get_participant(self) -> DomainParticipant: - """Get the DomainParticipant instance, or None if not yet initialized.""" - - # Lazy initialization of the participant - with self.__class__._participant_lock: - if self.__class__._participant is None: - self.__class__._participant = DomainParticipant(self.config.domain_id) - logger.info(f"DDS service started with Cyclone DDS domain {self.config.domain_id}") - return self.__class__._participant + """Get the DomainParticipant instance, or create if not yet initialized.""" + if self.__class__._participant is None: + with self.__class__._participant_lock: + if self.__class__._participant is None: + self.__class__._participant = DomainParticipant(self.config.domain_id) + logger.info( + f"DDS service started with Cyclone DDS domain {self.config.domain_id}" + ) + return self.__class__._participant __all__ = ["DDSConfig", "DDSService"] From a5cc633944f355f8c2fc6c6d6e4c5cccffcc1d5c Mon Sep 17 00:00:00 2001 From: Miguel Villa Floran Date: Tue, 20 Jan 2026 07:23:26 -0800 Subject: [PATCH 18/18] Encode DDS payloads as an array of bytes without string encoding to minimize latency --- dimos/protocol/pubsub/benchmark/test_benchmark.py | 13 ++----------- dimos/protocol/pubsub/ddspubsub.py | 4 ++-- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/dimos/protocol/pubsub/benchmark/test_benchmark.py b/dimos/protocol/pubsub/benchmark/test_benchmark.py index 80bc8c53f6..af220adcc7 100644 --- a/dimos/protocol/pubsub/benchmark/test_benchmark.py +++ b/dimos/protocol/pubsub/benchmark/test_benchmark.py @@ -83,16 +83,7 @@ def pubsub_id(testcase: TestCase[Any, Any]) -> str: class Message(IdlStruct): """DDS message with binary data payload following IdlStruct format.""" - payload: str - - def dds_encode(self) -> bytes: - """Encode message to bytes for DDS transmission.""" - return self.payload.encode("latin-1") - - @classmethod - def dds_decode(cls, data: bytes) -> "Message": - """Decode bytes back to Message instance.""" - return cls(payload=data.decode("latin-1")) + payload: sequence[uint8] @contextmanager @@ -107,7 +98,7 @@ def dds_pubsub_channel() -> Generator[DDS, None, None]: def dds_msggen(size: int) -> tuple[Topic, Message]: """Generate message for DDS pubsub benchmark.""" topic = Topic("benchmark/dds", Message) - msg = Message(payload=make_data(size).decode("latin-1")) + msg = Message(payload=list(make_data(size))) return (topic, msg) diff --git a/dimos/protocol/pubsub/ddspubsub.py b/dimos/protocol/pubsub/ddspubsub.py index f324ae6585..4e64965870 100644 --- a/dimos/protocol/pubsub/ddspubsub.py +++ b/dimos/protocol/pubsub/ddspubsub.py @@ -37,12 +37,12 @@ class DDSMsg(Protocol): @classmethod def dds_encode(cls, msg: DDSMsg) -> bytes: - """Encode this message instance into bytes.""" + """Encode message to bytes for DDS transmission.""" ... @classmethod def dds_decode(cls, data: bytes) -> DDSMsg: - """Decode bytes into a DDS message instance.""" + """Decode message from bytes for DDS transmission.""" ...