From 62bb2a4f5105bfea3d554f0bf788a7558abcf32f Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 14:58:43 +0800 Subject: [PATCH 01/18] refactor(pubsub): reorganize module structure and extract encoders - Move implementations to impl/ subdirectory (lcmpubsub, memory, shmpubsub, etc.) - Extract encoder mixins to encoders.py (PubSubEncoderMixin, PickleEncoderMixin, LCMEncoderMixin, JpegEncoderMixin) - Add AllPubSub and DiscoveryPubSub mixins with complementary default implementations - Add GlobPubSub and RegexPubSub marker classes with docstring examples - Update imports across codebase - Add CLAUDE.md to .gitignore --- .gitignore | 1 + dimos/core/transport.py | 8 +- dimos/msgs/nav_msgs/test_OccupancyGrid.py | 2 +- dimos/msgs/tf2_msgs/test_TFMessage_lcmpub.py | 2 +- dimos/protocol/pubsub/__init__.py | 4 +- dimos/protocol/pubsub/benchmark/testdata.py | 20 +- dimos/protocol/pubsub/encoders.py | 126 ++++++++++++ dimos/protocol/pubsub/impl/__init__.py | 2 + dimos/protocol/pubsub/{ => impl}/jpeg_shm.py | 4 +- dimos/protocol/pubsub/{ => impl}/lcmpubsub.py | 62 +++--- dimos/protocol/pubsub/{ => impl}/memory.py | 3 +- .../protocol/pubsub/{ => impl}/redispubsub.py | 0 dimos/protocol/pubsub/{ => impl}/rospubsub.py | 2 +- .../pubsub/{ => impl}/rospubsub_conversion.py | 2 +- dimos/protocol/pubsub/{ => impl}/shmpubsub.py | 7 +- .../pubsub/{ => impl}/test_lcmpubsub.py | 2 +- .../pubsub/{ => impl}/test_rospubsub.py | 2 +- dimos/protocol/pubsub/spec.py | 183 +++++++++++------- dimos/protocol/pubsub/test_encoder.py | 2 +- dimos/protocol/pubsub/test_spec.py | 10 +- dimos/protocol/rpc/pubsubrpc.py | 4 +- dimos/protocol/rpc/redisrpc.py | 2 +- dimos/protocol/skill/comms.py | 2 +- dimos/protocol/tf/tf.py | 2 +- dimos/robot/cli/topic.py | 2 +- dimos/robot/ros_bridge.py | 2 +- dimos/robot/test_ros_bridge.py | 2 +- dimos/utils/cli/agentspy/agentspy.py | 2 +- dimos/utils/cli/agentspy/demo_agentspy.py | 2 +- dimos/utils/cli/lcmspy/test_lcmspy.py | 2 +- docs/concepts/transports.md | 2 +- 31 files changed, 336 insertions(+), 132 deletions(-) create mode 100644 dimos/protocol/pubsub/encoders.py create mode 100644 dimos/protocol/pubsub/impl/__init__.py rename dimos/protocol/pubsub/{ => impl}/jpeg_shm.py (92%) rename dimos/protocol/pubsub/{ => impl}/lcmpubsub.py (65%) rename dimos/protocol/pubsub/{ => impl}/memory.py (94%) rename dimos/protocol/pubsub/{ => impl}/redispubsub.py (100%) rename dimos/protocol/pubsub/{ => impl}/rospubsub.py (99%) rename dimos/protocol/pubsub/{ => impl}/rospubsub_conversion.py (99%) rename dimos/protocol/pubsub/{ => impl}/shmpubsub.py (98%) rename dimos/protocol/pubsub/{ => impl}/test_lcmpubsub.py (99%) rename dimos/protocol/pubsub/{ => impl}/test_rospubsub.py (99%) diff --git a/.gitignore b/.gitignore index e52d08ba32..e7bcd25310 100644 --- a/.gitignore +++ b/.gitignore @@ -54,6 +54,7 @@ yolo11n.pt # symlink one of .envrc.* if you'd like to use .envrc .claude +**/CLAUDE.md .direnv/ /logs diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 4c1b19ee2e..2e856b5e67 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -27,10 +27,10 @@ from dimos.core.stream import In, Out, Stream, Transport from dimos.msgs.protocol import DimosMsg -from dimos.protocol.pubsub.jpeg_shm import JpegSharedMemory -from dimos.protocol.pubsub.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic -from dimos.protocol.pubsub.rospubsub import DimosROS, ROSTopic -from dimos.protocol.pubsub.shmpubsub import BytesSharedMemory, PickleSharedMemory +from dimos.protocol.pubsub.impl.jpeg_shm import JpegSharedMemory +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic +from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic +from dimos.protocol.pubsub.impl.shmpubsub import BytesSharedMemory, PickleSharedMemory if TYPE_CHECKING: from collections.abc import Callable diff --git a/dimos/msgs/nav_msgs/test_OccupancyGrid.py b/dimos/msgs/nav_msgs/test_OccupancyGrid.py index 262a872c68..29ef196de8 100644 --- a/dimos/msgs/nav_msgs/test_OccupancyGrid.py +++ b/dimos/msgs/nav_msgs/test_OccupancyGrid.py @@ -26,7 +26,7 @@ from dimos.msgs.geometry_msgs import Pose from dimos.msgs.nav_msgs import OccupancyGrid from dimos.msgs.sensor_msgs import PointCloud2 -from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic from dimos.utils.data import get_data diff --git a/dimos/msgs/tf2_msgs/test_TFMessage_lcmpub.py b/dimos/msgs/tf2_msgs/test_TFMessage_lcmpub.py index 0846f91ee6..396d796193 100644 --- a/dimos/msgs/tf2_msgs/test_TFMessage_lcmpub.py +++ b/dimos/msgs/tf2_msgs/test_TFMessage_lcmpub.py @@ -18,7 +18,7 @@ from dimos.msgs.geometry_msgs import Quaternion, Transform, Vector3 from dimos.msgs.tf2_msgs import TFMessage -from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic # Publishes a series of transforms representing a robot kinematic chain diff --git a/dimos/protocol/pubsub/__init__.py b/dimos/protocol/pubsub/__init__.py index 89bd292fda..7914d606e5 100644 --- a/dimos/protocol/pubsub/__init__.py +++ b/dimos/protocol/pubsub/__init__.py @@ -1,3 +1,3 @@ -import dimos.protocol.pubsub.lcmpubsub as lcm -from dimos.protocol.pubsub.memory import Memory +import dimos.protocol.pubsub.impl.lcmpubsub as lcm +from dimos.protocol.pubsub.impl.memory import Memory from dimos.protocol.pubsub.spec import PubSub diff --git a/dimos/protocol/pubsub/benchmark/testdata.py b/dimos/protocol/pubsub/benchmark/testdata.py index 80fc2e3307..bc0941e50b 100644 --- a/dimos/protocol/pubsub/benchmark/testdata.py +++ b/dimos/protocol/pubsub/benchmark/testdata.py @@ -20,9 +20,13 @@ from dimos.msgs.sensor_msgs.Image import Image, ImageFormat from dimos.protocol.pubsub.benchmark.type import Case -from dimos.protocol.pubsub.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic -from dimos.protocol.pubsub.memory import Memory -from dimos.protocol.pubsub.shmpubsub import BytesSharedMemory, LCMSharedMemory, PickleSharedMemory +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic +from dimos.protocol.pubsub.impl.memory import Memory +from dimos.protocol.pubsub.impl.shmpubsub import ( + BytesSharedMemory, + LCMSharedMemory, + PickleSharedMemory, +) def make_data_bytes(size: int) -> bytes: @@ -171,7 +175,7 @@ def shm_lcm_pubsub_channel() -> Generator[LCMSharedMemory, None, None]: try: - from dimos.protocol.pubsub.redispubsub import Redis + from dimos.protocol.pubsub.impl.redispubsub import Redis @contextmanager def redis_pubsub_channel() -> Generator[Redis, None, None]: @@ -199,7 +203,13 @@ def redis_msggen(size: int) -> tuple[str, Any]: print("Redis not available") -from dimos.protocol.pubsub.rospubsub import ROS_AVAILABLE, DimosROS, RawROS, RawROSTopic, ROSTopic +from dimos.protocol.pubsub.impl.rospubsub import ( + ROS_AVAILABLE, + DimosROS, + RawROS, + RawROSTopic, + ROSTopic, +) if ROS_AVAILABLE: from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy diff --git a/dimos/protocol/pubsub/encoders.py b/dimos/protocol/pubsub/encoders.py new file mode 100644 index 0000000000..a375674bae --- /dev/null +++ b/dimos/protocol/pubsub/encoders.py @@ -0,0 +1,126 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Encoder mixins for PubSub implementations.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +import pickle +from typing import TYPE_CHECKING, Generic, TypeVar + +if TYPE_CHECKING: + from collections.abc import Callable + +TopicT = TypeVar("TopicT") +MsgT = TypeVar("MsgT") +EncodingT = TypeVar("EncodingT") + + +class PubSubEncoderMixin(Generic[TopicT, MsgT, EncodingT], ABC): + """Mixin that encodes messages before publishing and decodes them after receiving. + + This will override publish and subscribe methods to add encoding/decoding. + + Usage: Just specify encoder and decoder as a subclass: + + class MyPubSubWithJSON(PubSubEncoderMixin, MyPubSub): + def encoder(msg, topic): + json.dumps(msg).encode('utf-8') + def decoder(msg, topic): + data: json.loads(data.decode('utf-8')) + """ + + @abstractmethod + def encode(self, msg: MsgT, topic: TopicT) -> EncodingT: ... + + @abstractmethod + def decode(self, msg: EncodingT, topic: TopicT) -> MsgT: ... + + def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] + super().__init__(*args, **kwargs) + self._encode_callback_map: dict = {} # type: ignore[type-arg] + + def publish(self, topic: TopicT, message: MsgT) -> None: + """Encode the message and publish it.""" + if getattr(self, "_stop_event", None) is not None and self._stop_event.is_set(): # type: ignore[attr-defined] + return + encoded_message = self.encode(message, topic) + if encoded_message is None: + return + super().publish(topic, encoded_message) # type: ignore[misc] + + def subscribe( + self, topic: TopicT, callback: Callable[[MsgT, TopicT], None] + ) -> Callable[[], None]: + """Subscribe with automatic decoding.""" + + def wrapper_cb(encoded_data: EncodingT, topic: TopicT) -> None: + decoded_message = self.decode(encoded_data, topic) + callback(decoded_message, topic) + + return super().subscribe(topic, wrapper_cb) # type: ignore[misc, no-any-return] + + +class PickleEncoderMixin(PubSubEncoderMixin[TopicT, MsgT, bytes]): + """Encoder mixin that uses pickle for serialization.""" + + def encode(self, msg: MsgT, *_: TopicT) -> bytes: # type: ignore[return] + try: + return pickle.dumps(msg) + except Exception as e: + print("Pickle encoding error:", e) + import traceback + + traceback.print_exc() + print("Tried to pickle:", msg) + + def decode(self, msg: bytes, _: TopicT) -> MsgT: + return pickle.loads(msg) # type: ignore[no-any-return] + + +from typing import Any, Protocol + + +class TypedTopic(Protocol): + """Protocol for topics that carry type information for decoding.""" + + @property + def lcm_type(self) -> type | None: ... + + +class LCMEncoderMixin(PubSubEncoderMixin[TopicT, Any, bytes]): + """Encoder mixin for DimosMsg using LCM binary encoding.""" + + def encode(self, msg: Any, _: TopicT) -> bytes: + return msg.lcm_encode() # type: ignore[no-any-return] + + def decode(self, msg: bytes, topic: TopicT) -> Any: + lcm_type = getattr(topic, "lcm_type", None) + if lcm_type is None: + raise ValueError("Cannot decode: topic has no lcm_type") + return lcm_type.lcm_decode(msg) + + +class JpegEncoderMixin(PubSubEncoderMixin[TopicT, Any, bytes]): + """Encoder mixin for DimosMsg using JPEG encoding (for images).""" + + def encode(self, msg: Any, _: TopicT) -> bytes: + return msg.lcm_jpeg_encode() # type: ignore[no-any-return] + + def decode(self, msg: bytes, topic: TopicT) -> Any: + lcm_type = getattr(topic, "lcm_type", None) + if lcm_type is None: + raise ValueError("Cannot decode: topic has no lcm_type") + return lcm_type.lcm_jpeg_decode(msg) diff --git a/dimos/protocol/pubsub/impl/__init__.py b/dimos/protocol/pubsub/impl/__init__.py new file mode 100644 index 0000000000..5512277de6 --- /dev/null +++ b/dimos/protocol/pubsub/impl/__init__.py @@ -0,0 +1,2 @@ +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, LCMPubSubBase, PickleLCM +from dimos.protocol.pubsub.impl.memory import Memory diff --git a/dimos/protocol/pubsub/jpeg_shm.py b/dimos/protocol/pubsub/impl/jpeg_shm.py similarity index 92% rename from dimos/protocol/pubsub/jpeg_shm.py rename to dimos/protocol/pubsub/impl/jpeg_shm.py index f2c9e35814..919275f07d 100644 --- a/dimos/protocol/pubsub/jpeg_shm.py +++ b/dimos/protocol/pubsub/impl/jpeg_shm.py @@ -18,8 +18,8 @@ from dimos.msgs.sensor_msgs.Image import Image from dimos.msgs.sensor_msgs.image_impls.AbstractImage import ImageFormat -from dimos.protocol.pubsub.shmpubsub import SharedMemoryPubSubBase -from dimos.protocol.pubsub.spec import PubSubEncoderMixin +from dimos.protocol.pubsub.encoders import PubSubEncoderMixin +from dimos.protocol.pubsub.impl.shmpubsub import SharedMemoryPubSubBase class JpegSharedMemoryEncoderMixin(PubSubEncoderMixin[str, Image, bytes]): diff --git a/dimos/protocol/pubsub/lcmpubsub.py b/dimos/protocol/pubsub/impl/lcmpubsub.py similarity index 65% rename from dimos/protocol/pubsub/lcmpubsub.py rename to dimos/protocol/pubsub/impl/lcmpubsub.py index 471b8d6076..30ad4dc05a 100644 --- a/dimos/protocol/pubsub/lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/lcmpubsub.py @@ -17,7 +17,13 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any -from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin +from dimos.protocol.pubsub.encoders import ( + JpegEncoderMixin, + LCMEncoderMixin, + PickleEncoderMixin, +) +from dimos.protocol.pubsub.patterns import RegexSubscribable +from dimos.protocol.pubsub.spec import PubSub from dimos.protocol.service.lcmservice import ( LCMConfig, LCMService, @@ -27,6 +33,7 @@ if TYPE_CHECKING: from collections.abc import Callable + import re import threading from dimos.msgs import DimosMsg @@ -45,7 +52,13 @@ def __str__(self) -> str: return f"{self.topic}#{self.lcm_type.msg_name}" -class LCMPubSubBase(LCMService, PubSub[Topic, Any]): +class LCMPubSubBase(RegexSubscribable[bytes, str], LCMService, PubSub[Topic, Any]): + """LCM-based PubSub with native regex subscription support. + + LCM natively supports regex patterns in subscribe(), so we implement + RegexSubscribable directly without needing discovery-based fallback. + """ + default_config = LCMConfig _stop_event: threading.Event _thread: threading.Thread | None @@ -86,33 +99,36 @@ def unsubscribe() -> None: return unsubscribe + def subscribe_regex( + self, + pattern: re.Pattern[str], + callback: Callable[[bytes, str], Any], + ) -> Callable[[], None]: + """Subscribe to channels matching a regex pattern. -class LCMEncoderMixin(PubSubEncoderMixin[Topic, Any, bytes]): - def encode(self, msg: DimosMsg, _: Topic) -> bytes: - return msg.lcm_encode() - - def decode(self, msg: bytes, topic: Topic) -> DimosMsg: - if topic.lcm_type is None: - raise ValueError( - f"Cannot decode message for topic '{topic.topic}': no lcm_type specified" - ) - return topic.lcm_type.lcm_decode(msg) + Note: Callback receives raw bytes and the channel string (not Topic), + since we don't know the message type for pattern subscriptions. + """ + if self.l is None: + logger.error("Tried to subscribe after LCM was closed") + return lambda: None + # LCM subscribe accepts regex string directly + lcm_subscription = self.l.subscribe( + pattern.pattern, lambda channel, msg: callback(msg, channel) + ) + lcm_subscription.set_queue_capacity(10000) -class JpegEncoderMixin(PubSubEncoderMixin[Topic, Any, bytes]): - def encode(self, msg: DimosMsg, _: Topic) -> bytes: - return msg.lcm_jpeg_encode() # type: ignore[attr-defined, no-any-return] + def unsubscribe() -> None: + if self.l is None: + return + self.l.unsubscribe(lcm_subscription) - def decode(self, msg: bytes, topic: Topic) -> DimosMsg: - if topic.lcm_type is None: - raise ValueError( - f"Cannot decode message for topic '{topic.topic}': no lcm_type specified" - ) - return topic.lcm_type.lcm_jpeg_decode(msg) # type: ignore[attr-defined, no-any-return] + return unsubscribe class LCM( - LCMEncoderMixin, + LCMEncoderMixin, # type: ignore[type-arg] LCMPubSubBase, ): ... @@ -124,7 +140,7 @@ class PickleLCM( class JpegLCM( - JpegEncoderMixin, + JpegEncoderMixin, # type: ignore[type-arg] LCMPubSubBase, ): ... diff --git a/dimos/protocol/pubsub/memory.py b/dimos/protocol/pubsub/impl/memory.py similarity index 94% rename from dimos/protocol/pubsub/memory.py rename to dimos/protocol/pubsub/impl/memory.py index e46fc10500..3425a5ee3d 100644 --- a/dimos/protocol/pubsub/memory.py +++ b/dimos/protocol/pubsub/impl/memory.py @@ -17,7 +17,8 @@ from typing import Any from dimos.protocol import encode -from dimos.protocol.pubsub.spec import PubSub, PubSubEncoderMixin +from dimos.protocol.pubsub.encoders import PubSubEncoderMixin +from dimos.protocol.pubsub.spec import PubSub class Memory(PubSub[str, Any]): diff --git a/dimos/protocol/pubsub/redispubsub.py b/dimos/protocol/pubsub/impl/redispubsub.py similarity index 100% rename from dimos/protocol/pubsub/redispubsub.py rename to dimos/protocol/pubsub/impl/redispubsub.py diff --git a/dimos/protocol/pubsub/rospubsub.py b/dimos/protocol/pubsub/impl/rospubsub.py similarity index 99% rename from dimos/protocol/pubsub/rospubsub.py rename to dimos/protocol/pubsub/impl/rospubsub.py index fdb64aa257..c9de67bdaa 100644 --- a/dimos/protocol/pubsub/rospubsub.py +++ b/dimos/protocol/pubsub/impl/rospubsub.py @@ -38,7 +38,7 @@ import uuid from dimos.msgs import DimosMsg -from dimos.protocol.pubsub.rospubsub_conversion import ( +from dimos.protocol.pubsub.impl.rospubsub_conversion import ( derive_ros_type, dimos_to_ros, ros_to_dimos, diff --git a/dimos/protocol/pubsub/rospubsub_conversion.py b/dimos/protocol/pubsub/impl/rospubsub_conversion.py similarity index 99% rename from dimos/protocol/pubsub/rospubsub_conversion.py rename to dimos/protocol/pubsub/impl/rospubsub_conversion.py index 18181d76b3..275033a5ac 100644 --- a/dimos/protocol/pubsub/rospubsub_conversion.py +++ b/dimos/protocol/pubsub/impl/rospubsub_conversion.py @@ -30,7 +30,7 @@ if TYPE_CHECKING: from dimos.msgs import DimosMsg - from dimos.protocol.pubsub.rospubsub import ROSMessage + from dimos.protocol.pubsub.impl.rospubsub import ROSMessage # Complex types that need LCM roundtrip (explicit list) diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/impl/shmpubsub.py similarity index 98% rename from dimos/protocol/pubsub/shmpubsub.py rename to dimos/protocol/pubsub/impl/shmpubsub.py index faeb485f70..416e257281 100644 --- a/dimos/protocol/pubsub/shmpubsub.py +++ b/dimos/protocol/pubsub/impl/shmpubsub.py @@ -32,9 +32,10 @@ import numpy as np import numpy.typing as npt -from dimos.protocol.pubsub.lcmpubsub import LCMEncoderMixin, Topic +from dimos.protocol.pubsub.encoders import LCMEncoderMixin, PickleEncoderMixin +from dimos.protocol.pubsub.impl.lcmpubsub import Topic from dimos.protocol.pubsub.shm.ipc_factory import CpuShmChannel -from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub +from dimos.protocol.pubsub.spec import PubSub from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: @@ -336,7 +337,7 @@ def reconfigure(self, topic: Topic, *, capacity: int) -> dict: # type: ignore[t class LCMSharedMemory( - LCMEncoderMixin, + LCMEncoderMixin[Topic], LCMSharedMemoryPubSubBase, ): """SharedMemory pubsub that uses LCM binary encoding (no pickle overhead).""" diff --git a/dimos/protocol/pubsub/test_lcmpubsub.py b/dimos/protocol/pubsub/impl/test_lcmpubsub.py similarity index 99% rename from dimos/protocol/pubsub/test_lcmpubsub.py rename to dimos/protocol/pubsub/impl/test_lcmpubsub.py index 8165be9fef..9467e6a4cc 100644 --- a/dimos/protocol/pubsub/test_lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/test_lcmpubsub.py @@ -19,7 +19,7 @@ import pytest from dimos.msgs.geometry_msgs import Pose, Quaternion, Vector3 -from dimos.protocol.pubsub.lcmpubsub import ( +from dimos.protocol.pubsub.impl.lcmpubsub import ( LCM, LCMPubSubBase, PickleLCM, diff --git a/dimos/protocol/pubsub/test_rospubsub.py b/dimos/protocol/pubsub/impl/test_rospubsub.py similarity index 99% rename from dimos/protocol/pubsub/test_rospubsub.py rename to dimos/protocol/pubsub/impl/test_rospubsub.py index 3a3a020586..6cf49c37b2 100644 --- a/dimos/protocol/pubsub/test_rospubsub.py +++ b/dimos/protocol/pubsub/impl/test_rospubsub.py @@ -23,7 +23,7 @@ from dimos.msgs.geometry_msgs.Twist import Twist from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 -from dimos.protocol.pubsub.rospubsub import DimosROS, ROSTopic +from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic # Add msg_name to LCM PointStamped for testing nested message conversion PointStamped.msg_name = "geometry_msgs.PointStamped" diff --git a/dimos/protocol/pubsub/spec.py b/dimos/protocol/pubsub/spec.py index b4e82d3993..7c4650dbb8 100644 --- a/dimos/protocol/pubsub/spec.py +++ b/dimos/protocol/pubsub/spec.py @@ -17,28 +17,16 @@ from collections.abc import AsyncIterator, Callable from contextlib import asynccontextmanager from dataclasses import dataclass -import pickle from typing import Any, Generic, TypeVar MsgT = TypeVar("MsgT") TopicT = TypeVar("TopicT") -EncodingT = TypeVar("EncodingT") -class PubSub(Generic[TopicT, MsgT], ABC): - """Abstract base class for pub/sub implementations with sugar methods.""" - - @abstractmethod - def publish(self, topic: TopicT, message: MsgT) -> None: - """Publish a message to a topic.""" - ... - - @abstractmethod - def subscribe( - self, topic: TopicT, callback: Callable[[MsgT, TopicT], None] - ) -> Callable[[], None]: - """Subscribe to a topic with a callback. returns unsubscribe function""" - ... +class PubSubBaseMixin(Generic[TopicT, MsgT]): + """Mixin class providing sugar methods for PubSub implementations. + Depends on the basic publish and subscribe methods being implemented. + """ @dataclass(slots=True) class _Subscription: @@ -50,101 +38,160 @@ class _Subscription: def unsubscribe(self) -> None: self._unsubscribe_fn() - # context-manager helper - def __enter__(self): # type: ignore[no-untyped-def] + def __enter__(self) -> "PubSubBaseMixin._Subscription": return self - def __exit__(self, *exc) -> None: # type: ignore[no-untyped-def] + def __exit__(self, *exc: Any) -> None: self.unsubscribe() - # public helper: returns disposable object def sub(self, topic: TopicT, cb: Callable[[MsgT, TopicT], None]) -> "_Subscription": - unsubscribe_fn = self.subscribe(topic, cb) - return self._Subscription(self, topic, cb, unsubscribe_fn) + unsubscribe_fn = self.subscribe(topic, cb) # type: ignore[attr-defined] + return self._Subscription(self, topic, cb, unsubscribe_fn) # type: ignore[arg-type] - # async iterator async def aiter(self, topic: TopicT, *, max_pending: int | None = None) -> AsyncIterator[MsgT]: q: asyncio.Queue[MsgT] = asyncio.Queue(maxsize=max_pending or 0) def _cb(msg: MsgT, topic: TopicT) -> None: q.put_nowait(msg) - unsubscribe_fn = self.subscribe(topic, _cb) + unsubscribe_fn = self.subscribe(topic, _cb) # type: ignore[attr-defined] try: while True: yield await q.get() finally: unsubscribe_fn() - # async context manager returning a queue - @asynccontextmanager - async def queue(self, topic: TopicT, *, max_pending: int | None = None): # type: ignore[no-untyped-def] + async def queue( + self, topic: TopicT, *, max_pending: int | None = None + ) -> AsyncIterator[asyncio.Queue[MsgT]]: q: asyncio.Queue[MsgT] = asyncio.Queue(maxsize=max_pending or 0) def _queue_cb(msg: MsgT, topic: TopicT) -> None: q.put_nowait(msg) - unsubscribe_fn = self.subscribe(topic, _queue_cb) + unsubscribe_fn = self.subscribe(topic, _queue_cb) # type: ignore[attr-defined] try: yield q finally: unsubscribe_fn() -class PubSubEncoderMixin(Generic[TopicT, MsgT, EncodingT], ABC): - """Mixin that encodes messages before publishing and decodes them after receiving. +class PubSub(PubSubBaseMixin[TopicT, MsgT], ABC): + """Abstract base class for pub/sub implementations with sugar methods.""" - Usage: Just specify encoder and decoder as a subclass: + @abstractmethod + def publish(self, topic: TopicT, message: MsgT) -> None: + """Publish a message to a topic.""" + ... - class MyPubSubWithJSON(PubSubEncoderMixin, MyPubSub): - def encoder(msg, topic): - json.dumps(msg).encode('utf-8') - def decoder(msg, topic): - data: json.loads(data.decode('utf-8')) + @abstractmethod + def subscribe( + self, topic: TopicT, callback: Callable[[MsgT, TopicT], None] + ) -> Callable[[], None]: + """Subscribe to a topic with a callback. returns unsubscribe function""" + ... + + +# AllPubSub and DiscoveryPubSub are complementary mixins: +# +# AllPubsub supports subscribing to all topics (Redis, LCM, MQTT) +# DiscoveryPubSub supports discovering new topics (ROS) +# +# These capabilities are orthogonal but they can implement one another. +# Implementations should subclass whichever matches their native capability. +# The other method will be synthesized automatically. +# +# - AllPubSub: Native support for subscribing to all topics at once. +# Provides a default subscribe_new_topics() by tracking seen topics. +# +# - DiscoveryPubSub: Native support for discovering new topics as they appear. +# Provides a default subscribe_all() by subscribing to each discovered topic. + + +class AllPubSub(PubSub[TopicT, MsgT], ABC): + """Mixin for PubSub that supports subscribing to all topics. + + Subclass from this if you support native subscribe-all (e.g. MQTT #, Redis *). + Provides a default subscribe_new_topics() implementation. """ @abstractmethod - def encode(self, msg: MsgT, topic: TopicT) -> EncodingT: ... + def subscribe_all(self, callback: Callable[[MsgT, TopicT], Any]) -> Callable[[], None]: + """Subscribe to all topics.""" + ... + + def subscribe_new_topics(self, callback: Callable[[TopicT], Any]) -> Callable[[], None]: + """Discover new topics by tracking seen topics from subscribe_all.""" + seen: set[TopicT] = set() + + def on_msg(msg: MsgT, topic: TopicT) -> None: + if topic not in seen: + seen.add(topic) + callback(topic) + + return self.subscribe_all(on_msg) + + +class DiscoveryPubSub(PubSub[TopicT, MsgT], ABC): + """Mixin for PubSub that supports discovery of topics. + + Subclass from this if you support topic discovery (e.g. MQTT, Redis, NATS, RabbitMQ). + """ @abstractmethod - def decode(self, msg: EncodingT, topic: TopicT) -> MsgT: ... + def subscribe_new_topics(self, callback: Callable[[TopicT], Any]) -> Callable[[], None]: + """Get notified when new topics are discovered.""" + ... - def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] - super().__init__(*args, **kwargs) - self._encode_callback_map: dict = {} # type: ignore[type-arg] + def subscribe_all(self, callback: Callable[[MsgT, TopicT], Any]) -> Callable[[], None]: + """Subscribe to all topics by subscribing to each discovered topic.""" + subscriptions: list[Callable[[], None]] = [] - def publish(self, topic: TopicT, message: MsgT) -> None: - """Encode the message and publish it.""" - if getattr(self, "_stop_event", None) is not None and self._stop_event.is_set(): # type: ignore[attr-defined] - return - encoded_message = self.encode(message, topic) - if encoded_message is None: - return - super().publish(topic, encoded_message) # type: ignore[misc] + def on_new_topic(topic: TopicT) -> None: + unsub = self.subscribe(topic, callback) + subscriptions.append(unsub) - def subscribe( - self, topic: TopicT, callback: Callable[[MsgT, TopicT], None] - ) -> Callable[[], None]: - """Subscribe with automatic decoding.""" + discovery_unsub = self.subscribe_new_topics(on_new_topic) - def wrapper_cb(encoded_data: EncodingT, topic: TopicT) -> None: - decoded_message = self.decode(encoded_data, topic) - callback(decoded_message, topic) + def unsubscribe_all() -> None: + discovery_unsub() + for unsub in subscriptions: + unsub() - return super().subscribe(topic, wrapper_cb) # type: ignore[misc, no-any-return] + return unsubscribe_all -class PickleEncoderMixin(PubSubEncoderMixin[TopicT, MsgT, bytes]): - def encode(self, msg: MsgT, *_: TopicT) -> bytes: # type: ignore[return] - try: - return pickle.dumps(msg) - except Exception as e: - print("Pickle encoding error:", e) - import traceback +class GlobPubSub(AllPubSub[TopicT, MsgT]): + """Mixin only used for specifying that the PubSub uses glob-style topic matching. + + Subclass from this if you support glob style matching (e.g. MQTT, Redis, NATS, RabbitMQ). + + Examples: + - "sensor/*" matches "sensor/temp", "sensor/humidity" (single level) + - "robot/**" matches "robot/arm/joint1", "robot/camera/rgb" (multi-level) + """ + + @abstractmethod + def subscribe_all(self, callback: Callable[[MsgT, TopicT], None]) -> Callable[[], None]: + raise NotImplementedError("Not Implemented") - traceback.print_exc() - print("Tried to pickle:", msg) + ... + + +class RegexPubSub(AllPubSub[TopicT, MsgT]): + """Mixin only used for specifying that the PubSub uses regex-style topic matching. + + Subclass from this if you support regex pattern matching (e.g. LCM, ZeroMQ, Zenoh). + + Examples: + - "sensor/.*" matches "sensor/temp", "sensor/humidity" + - "robot/(arm|leg)/.*" matches "robot/arm/joint1", "robot/leg/motor2" + - "device/[0-9]+" matches "device/1", "device/42", "device/123" + """ + + @abstractmethod + def subscribe_all(self, callback: Callable[[MsgT, TopicT], None]) -> Callable[[], None]: + raise NotImplementedError("Not Implemented") - def decode(self, msg: bytes, _: TopicT) -> MsgT: - return pickle.loads(msg) # type: ignore[no-any-return] + ... diff --git a/dimos/protocol/pubsub/test_encoder.py b/dimos/protocol/pubsub/test_encoder.py index 38aac4664d..dec1e42972 100644 --- a/dimos/protocol/pubsub/test_encoder.py +++ b/dimos/protocol/pubsub/test_encoder.py @@ -17,7 +17,7 @@ import json from typing import Any -from dimos.protocol.pubsub.memory import Memory, MemoryWithJSONEncoder +from dimos.protocol.pubsub.impl.memory import Memory, MemoryWithJSONEncoder def test_json_encoded_pubsub() -> None: diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py index 357e1dfa1e..0bdfa62628 100644 --- a/dimos/protocol/pubsub/test_spec.py +++ b/dimos/protocol/pubsub/test_spec.py @@ -23,8 +23,8 @@ import pytest from dimos.msgs.geometry_msgs import Vector3 -from dimos.protocol.pubsub.lcmpubsub import LCM, Topic -from dimos.protocol.pubsub.memory import Memory +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic +from dimos.protocol.pubsub.impl.memory import Memory @contextmanager @@ -44,7 +44,7 @@ def memory_context() -> Generator[Memory, None, None]: ] try: - from dimos.protocol.pubsub.redispubsub import Redis + from dimos.protocol.pubsub.impl.redispubsub import Redis @contextmanager def redis_context() -> Generator[Redis, None, None]: @@ -70,7 +70,7 @@ def redis_context() -> Generator[Redis, None, None]: QoSReliabilityPolicy, ) - from dimos.protocol.pubsub.rospubsub import RawROS, RawROSTopic + from dimos.protocol.pubsub.impl.rospubsub import RawROS, RawROSTopic # Use RELIABLE QoS with larger depth for testing _test_qos = QoSProfile( @@ -124,7 +124,7 @@ def lcm_context() -> Generator[LCM, None, None]: ) -from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory +from dimos.protocol.pubsub.impl.shmpubsub import PickleSharedMemory @contextmanager diff --git a/dimos/protocol/rpc/pubsubrpc.py b/dimos/protocol/rpc/pubsubrpc.py index 05df80aec0..394f1afc15 100644 --- a/dimos/protocol/rpc/pubsubrpc.py +++ b/dimos/protocol/rpc/pubsubrpc.py @@ -28,8 +28,8 @@ ) from dimos.constants import LCM_MAX_CHANNEL_NAME_LENGTH -from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic -from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory +from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM, Topic +from dimos.protocol.pubsub.impl.shmpubsub import PickleSharedMemory from dimos.protocol.pubsub.spec import PubSub from dimos.protocol.rpc.rpc_utils import deserialize_exception, serialize_exception from dimos.protocol.rpc.spec import Args, RPCSpec diff --git a/dimos/protocol/rpc/redisrpc.py b/dimos/protocol/rpc/redisrpc.py index aa8a5b87c5..32c3794bf4 100644 --- a/dimos/protocol/rpc/redisrpc.py +++ b/dimos/protocol/rpc/redisrpc.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dimos.protocol.pubsub.redispubsub import Redis +from dimos.protocol.pubsub.impl.redispubsub import Redis from dimos.protocol.rpc.pubsubrpc import PubSubRPCMixin diff --git a/dimos/protocol/skill/comms.py b/dimos/protocol/skill/comms.py index 0720140b79..8af78e8f5d 100644 --- a/dimos/protocol/skill/comms.py +++ b/dimos/protocol/skill/comms.py @@ -17,7 +17,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Generic, TypeVar -from dimos.protocol.pubsub.lcmpubsub import PickleLCM +from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM from dimos.protocol.service import Service # type: ignore[attr-defined] from dimos.protocol.skill.type import SkillMsg diff --git a/dimos/protocol/tf/tf.py b/dimos/protocol/tf/tf.py index 3688b013cf..44d5303a05 100644 --- a/dimos/protocol/tf/tf.py +++ b/dimos/protocol/tf/tf.py @@ -22,7 +22,7 @@ from dimos.msgs.geometry_msgs import Transform from dimos.msgs.tf2_msgs import TFMessage -from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic from dimos.protocol.pubsub.spec import PubSub from dimos.protocol.service.lcmservice import Service # type: ignore[attr-defined] from dimos.types.timestamped import TimestampedCollection diff --git a/dimos/robot/cli/topic.py b/dimos/robot/cli/topic.py index 582099c4b6..1f7ada4f28 100644 --- a/dimos/robot/cli/topic.py +++ b/dimos/robot/cli/topic.py @@ -21,7 +21,7 @@ import typer from dimos.core.transport import LCMTransport, pLCMTransport -from dimos.protocol.pubsub.lcmpubsub import LCMPubSubBase +from dimos.protocol.pubsub.impl.lcmpubsub import LCMPubSubBase _modules_to_try = [ "dimos.msgs.geometry_msgs", diff --git a/dimos/robot/ros_bridge.py b/dimos/robot/ros_bridge.py index 48d201ca32..015b69393e 100644 --- a/dimos/robot/ros_bridge.py +++ b/dimos/robot/ros_bridge.py @@ -37,7 +37,7 @@ QoSDurabilityPolicy = None # type: ignore[assignment, misc] from dimos.core.resource import Resource -from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic from dimos.utils.logging_config import setup_logger logger = setup_logger(level=logging.INFO) diff --git a/dimos/robot/test_ros_bridge.py b/dimos/robot/test_ros_bridge.py index cf7b2ac0cf..e31be24176 100644 --- a/dimos/robot/test_ros_bridge.py +++ b/dimos/robot/test_ros_bridge.py @@ -37,7 +37,7 @@ from dimos.msgs.geometry_msgs import TwistStamped from dimos.msgs.sensor_msgs import PointCloud2 from dimos.msgs.tf2_msgs import TFMessage -from dimos.protocol.pubsub.lcmpubsub import LCM, Topic +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic from dimos.robot.ros_bridge import BridgeDirection, ROSBridge diff --git a/dimos/utils/cli/agentspy/agentspy.py b/dimos/utils/cli/agentspy/agentspy.py index 52760cb2da..a0ee43a62d 100644 --- a/dimos/utils/cli/agentspy/agentspy.py +++ b/dimos/utils/cli/agentspy/agentspy.py @@ -29,7 +29,7 @@ from textual.binding import Binding from textual.widgets import Footer, RichLog -from dimos.protocol.pubsub.lcmpubsub import PickleLCM +from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM from dimos.utils.cli import theme # Type alias for all message types we might receive diff --git a/dimos/utils/cli/agentspy/demo_agentspy.py b/dimos/utils/cli/agentspy/demo_agentspy.py index c747ab65f6..5229295038 100755 --- a/dimos/utils/cli/agentspy/demo_agentspy.py +++ b/dimos/utils/cli/agentspy/demo_agentspy.py @@ -25,7 +25,7 @@ ) from dimos.protocol.pubsub import lcm # type: ignore[attr-defined] -from dimos.protocol.pubsub.lcmpubsub import PickleLCM +from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM def test_publish_messages() -> None: diff --git a/dimos/utils/cli/lcmspy/test_lcmspy.py b/dimos/utils/cli/lcmspy/test_lcmspy.py index 3016a723fe..14877bbb9a 100644 --- a/dimos/utils/cli/lcmspy/test_lcmspy.py +++ b/dimos/utils/cli/lcmspy/test_lcmspy.py @@ -16,7 +16,7 @@ import pytest -from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic +from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM, Topic from dimos.utils.cli.lcmspy.lcmspy import GraphLCMSpy, GraphTopic, LCMSpy, Topic as TopicSpy diff --git a/docs/concepts/transports.md b/docs/concepts/transports.md index 7ddc29714f..faac9a2ec5 100644 --- a/docs/concepts/transports.md +++ b/docs/concepts/transports.md @@ -319,7 +319,7 @@ Received 2 messages: {'temperature': 23.0} ``` -See [`memory.py`](/dimos/protocol/pubsub/memory.py) for the complete source. +See [`memory.py`](/dimos/protocol/pubsub/impl/memory.py) for the complete source. --- From 4567f6aaef65368d81f70eadd1b74160e2c897f1 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 15:05:20 +0800 Subject: [PATCH 02/18] small fix to pass CI tests --- dimos/protocol/pubsub/impl/lcmpubsub.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dimos/protocol/pubsub/impl/lcmpubsub.py b/dimos/protocol/pubsub/impl/lcmpubsub.py index 30ad4dc05a..853af1fb95 100644 --- a/dimos/protocol/pubsub/impl/lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/lcmpubsub.py @@ -22,7 +22,6 @@ LCMEncoderMixin, PickleEncoderMixin, ) -from dimos.protocol.pubsub.patterns import RegexSubscribable from dimos.protocol.pubsub.spec import PubSub from dimos.protocol.service.lcmservice import ( LCMConfig, @@ -52,7 +51,7 @@ def __str__(self) -> str: return f"{self.topic}#{self.lcm_type.msg_name}" -class LCMPubSubBase(RegexSubscribable[bytes, str], LCMService, PubSub[Topic, Any]): +class LCMPubSubBase(LCMService, PubSub[Topic, Any]): """LCM-based PubSub with native regex subscription support. LCM natively supports regex patterns in subscribe(), so we implement From 750c46246e513dc501b585e5e5610d0c5323cf0b Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 15:32:00 +0800 Subject: [PATCH 03/18] fix: update shmpubsub import path after impl/ reorganization --- dimos/core/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index b56fe74f4f..b7b52b8836 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -174,7 +174,7 @@ def close_all() -> None: try: import gc - from dimos.protocol.pubsub import shmpubsub + from dimos.protocol.pubsub.impl import shmpubsub for obj in gc.get_objects(): if isinstance(obj, shmpubsub.SharedMemoryPubSubBase): From 213ccdb982cf632fde6ec148a23b6765b39a2249 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 15:33:52 +0800 Subject: [PATCH 04/18] lcm class cleanup --- dimos/protocol/pubsub/impl/lcmpubsub.py | 29 ------------------------- 1 file changed, 29 deletions(-) diff --git a/dimos/protocol/pubsub/impl/lcmpubsub.py b/dimos/protocol/pubsub/impl/lcmpubsub.py index 853af1fb95..cacb1c39c9 100644 --- a/dimos/protocol/pubsub/impl/lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/lcmpubsub.py @@ -61,11 +61,9 @@ class LCMPubSubBase(LCMService, PubSub[Topic, Any]): default_config = LCMConfig _stop_event: threading.Event _thread: threading.Thread | None - _callbacks: dict[str, list[Callable[[Any], None]]] def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(**kwargs) - self._callbacks = {} def publish(self, topic: Topic, message: bytes) -> None: """Publish a message to the specified channel.""" @@ -98,33 +96,6 @@ def unsubscribe() -> None: return unsubscribe - def subscribe_regex( - self, - pattern: re.Pattern[str], - callback: Callable[[bytes, str], Any], - ) -> Callable[[], None]: - """Subscribe to channels matching a regex pattern. - - Note: Callback receives raw bytes and the channel string (not Topic), - since we don't know the message type for pattern subscriptions. - """ - if self.l is None: - logger.error("Tried to subscribe after LCM was closed") - return lambda: None - - # LCM subscribe accepts regex string directly - lcm_subscription = self.l.subscribe( - pattern.pattern, lambda channel, msg: callback(msg, channel) - ) - lcm_subscription.set_queue_capacity(10000) - - def unsubscribe() -> None: - if self.l is None: - return - self.l.unsubscribe(lcm_subscription) - - return unsubscribe - class LCM( LCMEncoderMixin, # type: ignore[type-arg] From d9295ae41559de5ba9882e28c641ced942e5955d Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 16:03:50 +0800 Subject: [PATCH 05/18] encoder typing fixes --- dimos/protocol/pubsub/encoders.py | 64 ++++++++++++------------------- 1 file changed, 25 insertions(+), 39 deletions(-) diff --git a/dimos/protocol/pubsub/encoders.py b/dimos/protocol/pubsub/encoders.py index a375674bae..62abf77579 100644 --- a/dimos/protocol/pubsub/encoders.py +++ b/dimos/protocol/pubsub/encoders.py @@ -18,7 +18,10 @@ from abc import ABC, abstractmethod import pickle -from typing import TYPE_CHECKING, Generic, TypeVar +from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast + +from dimos.msgs import DimosMsg +from dimos.msgs.sensor_msgs import Image if TYPE_CHECKING: from collections.abc import Callable @@ -32,6 +35,7 @@ class PubSubEncoderMixin(Generic[TopicT, MsgT, EncodingT], ABC): """Mixin that encodes messages before publishing and decodes them after receiving. This will override publish and subscribe methods to add encoding/decoding. + Must be mixed with a class implementing PubSubProtocol[TopicT, EncodingT]. Usage: Just specify encoder and decoder as a subclass: @@ -42,20 +46,19 @@ def decoder(msg, topic): data: json.loads(data.decode('utf-8')) """ + # Declare expected methods from PubSubProtocol for type checking + if TYPE_CHECKING: + _base_publish: Callable[[TopicT, EncodingT], None] + _base_subscribe: Callable[[TopicT, Callable[[EncodingT, TopicT], None]], Callable[[], None]] + @abstractmethod def encode(self, msg: MsgT, topic: TopicT) -> EncodingT: ... @abstractmethod def decode(self, msg: EncodingT, topic: TopicT) -> MsgT: ... - def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] - super().__init__(*args, **kwargs) - self._encode_callback_map: dict = {} # type: ignore[type-arg] - def publish(self, topic: TopicT, message: MsgT) -> None: """Encode the message and publish it.""" - if getattr(self, "_stop_event", None) is not None and self._stop_event.is_set(): # type: ignore[attr-defined] - return encoded_message = self.encode(message, topic) if encoded_message is None: return @@ -70,57 +73,40 @@ def wrapper_cb(encoded_data: EncodingT, topic: TopicT) -> None: decoded_message = self.decode(encoded_data, topic) callback(decoded_message, topic) - return super().subscribe(topic, wrapper_cb) # type: ignore[misc, no-any-return] + return cast("Callable[[], None]", super().subscribe(topic, wrapper_cb)) # type: ignore[misc] class PickleEncoderMixin(PubSubEncoderMixin[TopicT, MsgT, bytes]): - """Encoder mixin that uses pickle for serialization.""" - - def encode(self, msg: MsgT, *_: TopicT) -> bytes: # type: ignore[return] - try: - return pickle.dumps(msg) - except Exception as e: - print("Pickle encoding error:", e) - import traceback + """Encoder mixin that uses pickle for serialization. Works with any Python object.""" - traceback.print_exc() - print("Tried to pickle:", msg) + def encode(self, msg: MsgT, _: TopicT) -> bytes: + return pickle.dumps(msg) def decode(self, msg: bytes, _: TopicT) -> MsgT: - return pickle.loads(msg) # type: ignore[no-any-return] - - -from typing import Any, Protocol - - -class TypedTopic(Protocol): - """Protocol for topics that carry type information for decoding.""" - - @property - def lcm_type(self) -> type | None: ... + return cast("MsgT", pickle.loads(msg)) -class LCMEncoderMixin(PubSubEncoderMixin[TopicT, Any, bytes]): +class LCMEncoderMixin(PubSubEncoderMixin[TopicT, DimosMsg, bytes]): """Encoder mixin for DimosMsg using LCM binary encoding.""" - def encode(self, msg: Any, _: TopicT) -> bytes: - return msg.lcm_encode() # type: ignore[no-any-return] + def encode(self, msg: DimosMsg, _: TopicT) -> bytes: + return msg.lcm_encode() - def decode(self, msg: bytes, topic: TopicT) -> Any: + def decode(self, msg: bytes, topic: TopicT) -> DimosMsg: lcm_type = getattr(topic, "lcm_type", None) if lcm_type is None: raise ValueError("Cannot decode: topic has no lcm_type") - return lcm_type.lcm_decode(msg) + return cast("DimosMsg", lcm_type.lcm_decode(msg)) -class JpegEncoderMixin(PubSubEncoderMixin[TopicT, Any, bytes]): +class JpegEncoderMixin(PubSubEncoderMixin[TopicT, Image, bytes]): """Encoder mixin for DimosMsg using JPEG encoding (for images).""" - def encode(self, msg: Any, _: TopicT) -> bytes: - return msg.lcm_jpeg_encode() # type: ignore[no-any-return] + def encode(self, msg: Image, _: TopicT) -> bytes: + return msg.lcm_jpeg_encode() - def decode(self, msg: bytes, topic: TopicT) -> Any: + def decode(self, msg: bytes, topic: TopicT) -> Image: lcm_type = getattr(topic, "lcm_type", None) if lcm_type is None: raise ValueError("Cannot decode: topic has no lcm_type") - return lcm_type.lcm_jpeg_decode(msg) + return cast("Image", lcm_type.lcm_jpeg_decode(msg)) From 978ad35fcce999dffb736506792ac47033e21d0c Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 16:50:41 +0800 Subject: [PATCH 06/18] feat: add Glob and regex pattern support to LCM Topic subscriptions - Add Glob class for glob-style pattern matching (*, **, ?) - Topic now accepts str, re.Pattern, or Glob for flexible subscription patterns - Pattern subscriptions return the actual matched channel in callback - Remove duplicate LCMMsg and Topic from lcmservice.py (use DimosMsg) - Add PubSubProtocol for structural typing - Add tests for regex and glob pattern subscriptions --- dimos/e2e_tests/lcm_spy.py | 5 +- dimos/protocol/pubsub/impl/lcmpubsub.py | 106 ++++++++++++++++--- dimos/protocol/pubsub/impl/test_lcm_regex.py | 102 ++++++++++++++++++ dimos/protocol/pubsub/spec.py | 3 +- dimos/protocol/service/lcmservice.py | 33 ++---- dimos/protocol/service/test_lcmservice.py | 2 +- 6 files changed, 205 insertions(+), 46 deletions(-) create mode 100644 dimos/protocol/pubsub/impl/test_lcm_regex.py diff --git a/dimos/e2e_tests/lcm_spy.py b/dimos/e2e_tests/lcm_spy.py index de0864dcd2..9efed09d5e 100644 --- a/dimos/e2e_tests/lcm_spy.py +++ b/dimos/e2e_tests/lcm_spy.py @@ -22,8 +22,9 @@ import lcm +from dimos.msgs import DimosMsg from dimos.msgs.geometry_msgs import PoseStamped -from dimos.protocol.service.lcmservice import LCMMsg, LCMService +from dimos.protocol.service.lcmservice import LCMService class LcmSpy(LCMService): @@ -155,7 +156,7 @@ def listener(msg: bytes) -> None: def wait_for_message_result( self, topic: str, - type: type[LCMMsg], + type: type[DimosMsg], predicate: Callable[[Any], bool], fail_message: str, timeout: float = 30.0, diff --git a/dimos/protocol/pubsub/impl/lcmpubsub.py b/dimos/protocol/pubsub/impl/lcmpubsub.py index cacb1c39c9..ec16cf53f3 100644 --- a/dimos/protocol/pubsub/impl/lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/lcmpubsub.py @@ -15,6 +15,7 @@ from __future__ import annotations from dataclasses import dataclass +import re from typing import TYPE_CHECKING, Any from dimos.protocol.pubsub.encoders import ( @@ -23,16 +24,11 @@ PickleEncoderMixin, ) from dimos.protocol.pubsub.spec import PubSub -from dimos.protocol.service.lcmservice import ( - LCMConfig, - LCMService, - autoconf, -) +from dimos.protocol.service.lcmservice import LCMConfig, LCMService, autoconf from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: from collections.abc import Callable - import re import threading from dimos.msgs import DimosMsg @@ -40,15 +36,83 @@ logger = setup_logger() +class Glob: + """Glob pattern that compiles to regex for LCM subscriptions. + + Supports: + * - matches any characters except / + ** - matches any characters including / + ? - matches single character + + Example: + Topic(topic=Glob("/sensor/*")) # matches /sensor/temp, /sensor/humidity + Topic(topic=Glob("/robot/**")) # matches /robot/arm/joint1, /robot/leg/motor + """ + + def __init__(self, pattern: str) -> None: + self._glob = pattern + self._regex = self._compile(pattern) + + @staticmethod + def _compile(pattern: str) -> str: + """Convert glob pattern to regex.""" + result = [] + i = 0 + while i < len(pattern): + c = pattern[i] + if c == "*": + if i + 1 < len(pattern) and pattern[i + 1] == "*": + result.append(".*") + i += 2 + else: + result.append("[^/]*") + i += 1 + elif c == "?": + result.append(".") + i += 1 + elif c in r"\^$.|+[]{}()": + result.append("\\" + c) + i += 1 + else: + result.append(c) + i += 1 + return "".join(result) + + @property + def pattern(self) -> str: + """Return the regex pattern string.""" + return self._regex + + @property + def glob(self) -> str: + """Return the original glob pattern.""" + return self._glob + + def __repr__(self) -> str: + return f"Glob({self._glob!r})" + + @dataclass class Topic: - topic: str = "" + topic: str | re.Pattern[str] | Glob lcm_type: type[DimosMsg] | None = None + @property + def is_pattern(self) -> bool: + return isinstance(self.topic, re.Pattern | Glob) + + @property + def pattern(self) -> str: + if isinstance(self.topic, re.Pattern): + return self.topic.pattern + if isinstance(self.topic, Glob): + return self.topic.pattern + return self.topic + def __str__(self) -> str: if self.lcm_type is None: - return self.topic - return f"{self.topic}#{self.lcm_type.msg_name}" + return self.pattern + return f"{self.pattern}#{self.lcm_type.msg_name}" class LCMPubSubBase(LCMService, PubSub[Topic, Any]): @@ -62,19 +126,17 @@ class LCMPubSubBase(LCMService, PubSub[Topic, Any]): _stop_event: threading.Event _thread: threading.Thread | None - def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def] - super().__init__(**kwargs) - - def publish(self, topic: Topic, message: bytes) -> None: + def publish(self, topic: Topic | str, message: bytes) -> None: """Publish a message to the specified channel.""" if self.l is None: logger.error("Tried to publish after LCM was closed") return - self.l.publish(str(topic), message) + topic_str = str(topic) if isinstance(topic, Topic) else topic + self.l.publish(topic_str, message) def subscribe( - self, topic: Topic, callback: Callable[[bytes, Topic], Any] + self, topic: Topic | str, callback: Callable[[bytes, Topic | str], Any] ) -> Callable[[], None]: if self.l is None: logger.error("Tried to subscribe after LCM was closed") @@ -84,7 +146,17 @@ def noop() -> None: return noop - lcm_subscription = self.l.subscribe(str(topic), lambda _, msg: callback(msg, topic)) + # Handle both Topic objects and raw strings + if isinstance(topic, Topic) and topic.is_pattern: + + def handler(channel: str, msg: bytes) -> None: + matched_topic = Topic(topic=channel, lcm_type=topic.lcm_type) + callback(msg, matched_topic) + + lcm_subscription = self.l.subscribe(str(topic), handler) + else: + topic_str = str(topic) if isinstance(topic, Topic) else topic + lcm_subscription = self.l.subscribe(topic_str, lambda _, msg: callback(msg, topic)) # Set queue capacity to 10000 to handle high-volume bursts lcm_subscription.set_queue_capacity(10000) @@ -117,9 +189,11 @@ class JpegLCM( __all__ = [ "LCM", + "Glob", "JpegLCM", "LCMEncoderMixin", "LCMPubSubBase", "PickleLCM", + "Topic", "autoconf", ] diff --git a/dimos/protocol/pubsub/impl/test_lcm_regex.py b/dimos/protocol/pubsub/impl/test_lcm_regex.py new file mode 100644 index 0000000000..17e3009d83 --- /dev/null +++ b/dimos/protocol/pubsub/impl/test_lcm_regex.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for LCM regex subscription support.""" + +from collections.abc import Generator +import time + +import pytest + +from dimos.protocol.pubsub.impl.lcmpubsub import Glob, LCMPubSubBase, Topic + + +@pytest.fixture +def lcm() -> Generator[LCMPubSubBase, None, None]: + lcm = LCMPubSubBase(autoconf=True) + lcm.start() + yield lcm + lcm.stop() + + +def test_subscribe_regex_via_topic(lcm: LCMPubSubBase) -> None: + """Test that regex pattern in Topic matches multiple channels and returns actual topic.""" + import re + + received: list[tuple[bytes, Topic]] = [] + + # Use re.compile() to indicate this is a pattern subscription + pattern_topic = Topic(topic=re.compile(r"/sensor/.*")) + lcm.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) + + lcm.publish(Topic("/sensor/temp"), b"temp_data") + lcm.publish(Topic("/sensor/humidity"), b"humidity_data") + lcm.publish(Topic("/other/topic"), b"should_not_match") + + time.sleep(0.1) + + assert len(received) == 2 + + # Check we received the actual matched topics, not the pattern + topics = {r[1].topic for r in received} + assert "/sensor/temp" in topics + assert "/sensor/humidity" in topics + + # Check data + data = {r[0] for r in received} + assert b"temp_data" in data + assert b"humidity_data" in data + + +def test_subscribe_glob_via_topic(lcm: LCMPubSubBase) -> None: + """Test that Glob pattern in Topic matches channels using glob syntax.""" + received: list[tuple[bytes, Topic]] = [] + + # Use Glob for glob-style pattern matching + pattern_topic = Topic(topic=Glob("/sensor/*")) + lcm.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) + + lcm.publish(Topic("/sensor/temp"), b"temp_data") + lcm.publish(Topic("/sensor/humidity"), b"humidity_data") + lcm.publish(Topic("/sensor/nested/deep"), b"should_not_match_single_star") + lcm.publish(Topic("/other/topic"), b"should_not_match") + + time.sleep(0.1) + + assert len(received) == 2 + topics = {r[1].topic for r in received} + assert "/sensor/temp" in topics + assert "/sensor/humidity" in topics + + +def test_subscribe_glob_doublestar(lcm: LCMPubSubBase) -> None: + """Test that ** in Glob matches nested paths.""" + received: list[tuple[bytes, Topic]] = [] + + pattern_topic = Topic(topic=Glob("/robot/**")) + lcm.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) + + lcm.publish(Topic("/robot/arm"), b"arm") + lcm.publish(Topic("/robot/arm/joint1"), b"joint1") + lcm.publish(Topic("/robot/leg/motor/speed"), b"speed") + lcm.publish(Topic("/sensor/temp"), b"should_not_match") + + time.sleep(0.1) + + assert len(received) == 3 + topics = {r[1].topic for r in received} + assert "/robot/arm" in topics + assert "/robot/arm/joint1" in topics + assert "/robot/leg/motor/speed" in topics diff --git a/dimos/protocol/pubsub/spec.py b/dimos/protocol/pubsub/spec.py index 7c4650dbb8..8064e94a11 100644 --- a/dimos/protocol/pubsub/spec.py +++ b/dimos/protocol/pubsub/spec.py @@ -133,6 +133,7 @@ def on_msg(msg: MsgT, topic: TopicT) -> None: return self.subscribe_all(on_msg) +# this is for ros class DiscoveryPubSub(PubSub[TopicT, MsgT], ABC): """Mixin for PubSub that supports discovery of topics. @@ -179,7 +180,7 @@ def subscribe_all(self, callback: Callable[[MsgT, TopicT], None]) -> Callable[[] ... -class RegexPubSub(AllPubSub[TopicT, MsgT]): +class RegexPubSub(GlobPubSub[TopicT, MsgT]): """Mixin only used for specifying that the PubSub uses regex-style topic matching. Subclass from this if you support regex pattern matching (e.g. LCM, ZeroMQ, Zenoh). diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py index cf0a0647d8..0515f3c346 100644 --- a/dimos/protocol/service/lcmservice.py +++ b/dimos/protocol/service/lcmservice.py @@ -20,10 +20,13 @@ import platform import threading import traceback -from typing import Protocol, runtime_checkable +from typing import TYPE_CHECKING import lcm +if TYPE_CHECKING: + from dimos.msgs import DimosMsg + from dimos.protocol.service.spec import Service from dimos.protocol.service.system_configurator import ( BufferConfiguratorLinux, @@ -79,34 +82,12 @@ def __post_init__(self) -> None: self.url = _DEFAULT_LCM_URL -@runtime_checkable -class LCMMsg(Protocol): - msg_name: str - - @classmethod - def lcm_decode(cls, data: bytes) -> LCMMsg: - """Decode bytes into an LCM message instance.""" - ... - - def lcm_encode(self) -> bytes: - """Encode this message instance into bytes.""" - ... - - -@dataclass -class Topic: - topic: str = "" - lcm_type: type[LCMMsg] | None = None - - def __str__(self) -> str: - if self.lcm_type is None: - return self.topic - return f"{self.topic}#{self.lcm_type.msg_name}" - - _LCM_LOOP_TIMEOUT = 50 +# this class just sets up cpp LCM instance +# and runs its handle loop in a thread +# higher order stuff is done by pubsub/impl/lcmpubsub.py class LCMService(Service[LCMConfig]): default_config = LCMConfig l: lcm.LCM | None diff --git a/dimos/protocol/service/test_lcmservice.py b/dimos/protocol/service/test_lcmservice.py index fe6522ecd6..4231302426 100644 --- a/dimos/protocol/service/test_lcmservice.py +++ b/dimos/protocol/service/test_lcmservice.py @@ -16,11 +16,11 @@ import time from unittest.mock import MagicMock, patch +from dimos.protocol.pubsub.impl.lcmpubsub import Topic from dimos.protocol.service.lcmservice import ( _DEFAULT_LCM_URL, LCMConfig, LCMService, - Topic, autoconf, ) from dimos.protocol.service.system_configurator import ( From b0e95f227560f0aecc34ce7a997619aa760af139 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 17:05:36 +0800 Subject: [PATCH 07/18] feat: add subscribe_all and Topic.from_channel_str for typed pattern subscriptions - Add subscribe_all() method to LCMPubSubBase for subscribing to all topics - Add Topic.from_channel_str() factory method to parse channel strings with embedded type info - Channel format: /topic#module.ClassName enables automatic type extraction - Add test for subscribe_all with typed message decoding --- dimos/protocol/pubsub/impl/lcmpubsub.py | 32 ++++++++++-- dimos/protocol/pubsub/impl/test_lcm_regex.py | 51 +++++++++++++++++++- dimos/protocol/pubsub/spec.py | 39 +-------------- 3 files changed, 79 insertions(+), 43 deletions(-) diff --git a/dimos/protocol/pubsub/impl/lcmpubsub.py b/dimos/protocol/pubsub/impl/lcmpubsub.py index ec16cf53f3..8d67e8ee24 100644 --- a/dimos/protocol/pubsub/impl/lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/lcmpubsub.py @@ -23,7 +23,7 @@ LCMEncoderMixin, PickleEncoderMixin, ) -from dimos.protocol.pubsub.spec import PubSub +from dimos.protocol.pubsub.spec import AllPubSub, PubSub from dimos.protocol.service.lcmservice import LCMConfig, LCMService, autoconf from dimos.utils.logging_config import setup_logger @@ -114,8 +114,30 @@ def __str__(self) -> str: return self.pattern return f"{self.pattern}#{self.lcm_type.msg_name}" + @staticmethod + def from_channel_str(channel: str, default_lcm_type: type[DimosMsg] | None = None) -> Topic: + """Create Topic from channel string. + + Channel format: /topic#module.ClassName + Falls back to default_lcm_type if type cannot be parsed. + """ + if "#" not in channel: + return Topic(topic=channel, lcm_type=default_lcm_type) + + topic_str, type_name = channel.rsplit("#", 1) + try: + # type_name format: "geometry_msgs.Vector3" + module_name, class_name = type_name.rsplit(".", 1) + import importlib -class LCMPubSubBase(LCMService, PubSub[Topic, Any]): + module = importlib.import_module(f"dimos.msgs.{module_name}") + lcm_type = getattr(module, class_name) + return Topic(topic=topic_str, lcm_type=lcm_type) + except (ValueError, ImportError, AttributeError): + return Topic(topic=topic_str, lcm_type=default_lcm_type) + + +class LCMPubSubBase(LCMService, AllPubSub[Topic, Any]): """LCM-based PubSub with native regex subscription support. LCM natively supports regex patterns in subscribe(), so we implement @@ -135,6 +157,9 @@ def publish(self, topic: Topic | str, message: bytes) -> None: topic_str = str(topic) if isinstance(topic, Topic) else topic self.l.publish(topic_str, message) + def subscribe_all(self, callback: Callable[[bytes, Topic], Any]) -> Callable[[], None]: + return self.subscribe(Topic(re.compile(".*")), callback) + def subscribe( self, topic: Topic | str, callback: Callable[[bytes, Topic | str], Any] ) -> Callable[[], None]: @@ -150,8 +175,7 @@ def noop() -> None: if isinstance(topic, Topic) and topic.is_pattern: def handler(channel: str, msg: bytes) -> None: - matched_topic = Topic(topic=channel, lcm_type=topic.lcm_type) - callback(msg, matched_topic) + callback(msg, Topic.from_channel_str(channel, topic.lcm_type)) lcm_subscription = self.l.subscribe(str(topic), handler) else: diff --git a/dimos/protocol/pubsub/impl/test_lcm_regex.py b/dimos/protocol/pubsub/impl/test_lcm_regex.py index 17e3009d83..559ebc1dee 100644 --- a/dimos/protocol/pubsub/impl/test_lcm_regex.py +++ b/dimos/protocol/pubsub/impl/test_lcm_regex.py @@ -20,7 +20,8 @@ import pytest -from dimos.protocol.pubsub.impl.lcmpubsub import Glob, LCMPubSubBase, Topic +from dimos.msgs.geometry_msgs import Pose, Quaternion, Vector3 +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Glob, LCMPubSubBase, Topic @pytest.fixture @@ -100,3 +101,51 @@ def test_subscribe_glob_doublestar(lcm: LCMPubSubBase) -> None: assert "/robot/arm" in topics assert "/robot/arm/joint1" in topics assert "/robot/leg/motor/speed" in topics + + +@pytest.fixture +def lcm_typed() -> Generator[LCM, None, None]: + lcm = LCM(autoconf=True) + lcm.start() + yield lcm + lcm.stop() + + +def test_subscribe_all_with_typed_messages(lcm_typed: LCM) -> None: + """Test that subscribe_all receives correctly typed and decoded messages.""" + from typing import Any + + received: list[tuple[Any, Topic]] = [] + + lcm_typed.subscribe_all(lambda msg, topic: received.append((msg, topic))) + + # Publish typed messages to different topics + vec = Vector3(1.0, 2.0, 3.0) + quat = Quaternion(0.0, 0.0, 0.0, 1.0) + pose = Pose(vec, quat) + + lcm_typed.publish(Topic("/sensor/position", lcm_type=Vector3), vec) + lcm_typed.publish(Topic("/sensor/orientation", lcm_type=Quaternion), quat) + lcm_typed.publish(Topic("/robot/pose", lcm_type=Pose), pose) + + time.sleep(0.1) + + assert len(received) == 3 + + # Check topics are correct (str(topic) includes type info: /topic#module.ClassName) + topics = {str(r[1]) for r in received} + assert "/sensor/position#geometry_msgs.Vector3" in topics + assert "/sensor/orientation#geometry_msgs.Quaternion" in topics + assert "/robot/pose#geometry_msgs.Pose" in topics + + # Check types and values are correctly decoded + for msg, topic in received: + if "position" in topic.pattern: + assert isinstance(msg, Vector3) + assert msg == vec + elif "orientation" in topic.pattern: + assert isinstance(msg, Quaternion) + assert msg == quat + elif "pose" in topic.pattern: + assert isinstance(msg, Pose) + assert msg == pose diff --git a/dimos/protocol/pubsub/spec.py b/dimos/protocol/pubsub/spec.py index 8064e94a11..389439b433 100644 --- a/dimos/protocol/pubsub/spec.py +++ b/dimos/protocol/pubsub/spec.py @@ -107,8 +107,6 @@ def subscribe( # # - DiscoveryPubSub: Native support for discovering new topics as they appear. # Provides a default subscribe_all() by subscribing to each discovered topic. - - class AllPubSub(PubSub[TopicT, MsgT], ABC): """Mixin for PubSub that supports subscribing to all topics. @@ -133,7 +131,7 @@ def on_msg(msg: MsgT, topic: TopicT) -> None: return self.subscribe_all(on_msg) -# this is for ros +# This is for ros for now class DiscoveryPubSub(PubSub[TopicT, MsgT], ABC): """Mixin for PubSub that supports discovery of topics. @@ -161,38 +159,3 @@ def unsubscribe_all() -> None: unsub() return unsubscribe_all - - -class GlobPubSub(AllPubSub[TopicT, MsgT]): - """Mixin only used for specifying that the PubSub uses glob-style topic matching. - - Subclass from this if you support glob style matching (e.g. MQTT, Redis, NATS, RabbitMQ). - - Examples: - - "sensor/*" matches "sensor/temp", "sensor/humidity" (single level) - - "robot/**" matches "robot/arm/joint1", "robot/camera/rgb" (multi-level) - """ - - @abstractmethod - def subscribe_all(self, callback: Callable[[MsgT, TopicT], None]) -> Callable[[], None]: - raise NotImplementedError("Not Implemented") - - ... - - -class RegexPubSub(GlobPubSub[TopicT, MsgT]): - """Mixin only used for specifying that the PubSub uses regex-style topic matching. - - Subclass from this if you support regex pattern matching (e.g. LCM, ZeroMQ, Zenoh). - - Examples: - - "sensor/.*" matches "sensor/temp", "sensor/humidity" - - "robot/(arm|leg)/.*" matches "robot/arm/joint1", "robot/leg/motor2" - - "device/[0-9]+" matches "device/1", "device/42", "device/123" - """ - - @abstractmethod - def subscribe_all(self, callback: Callable[[MsgT, TopicT], None]) -> Callable[[], None]: - raise NotImplementedError("Not Implemented") - - ... From 496d7ea7147b3a167e4455ae4dd733f389661270 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 17:08:05 +0800 Subject: [PATCH 08/18] refactor: extract resolve_msg_type to dimos.msgs.helpers Move message type resolution logic to a dedicated helper module with lru_cache for performance. Supports fallback to dimos_lcm module path. --- dimos/msgs/__init__.py | 3 +- dimos/msgs/helpers.py | 53 +++++++++++++++++++++++++ dimos/protocol/pubsub/impl/lcmpubsub.py | 14 ++----- 3 files changed, 59 insertions(+), 11 deletions(-) create mode 100644 dimos/msgs/helpers.py diff --git a/dimos/msgs/__init__.py b/dimos/msgs/__init__.py index b2bcabab01..4395dbcc51 100644 --- a/dimos/msgs/__init__.py +++ b/dimos/msgs/__init__.py @@ -1,3 +1,4 @@ +from dimos.msgs.helpers import resolve_msg_type from dimos.msgs.protocol import DimosMsg -__all__ = ["DimosMsg"] +__all__ = ["DimosMsg", "resolve_msg_type"] diff --git a/dimos/msgs/helpers.py b/dimos/msgs/helpers.py new file mode 100644 index 0000000000..3a6041e287 --- /dev/null +++ b/dimos/msgs/helpers.py @@ -0,0 +1,53 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from functools import lru_cache +import importlib +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from dimos.msgs import DimosMsg + + +@lru_cache(maxsize=256) +def resolve_msg_type(type_name: str) -> type[DimosMsg] | None: + """Resolve a message type name to its class. + + Args: + type_name: Type name in format "module.ClassName" (e.g., "geometry_msgs.Vector3") + + Returns: + The message class or None if not found. + """ + try: + module_name, class_name = type_name.rsplit(".", 1) + except ValueError: + return None + + # Try different import paths + import_paths = [ + f"dimos.msgs.{module_name}", + f"dimos_lcm.{module_name}", + ] + + for path in import_paths: + try: + module = importlib.import_module(path) + return getattr(module, class_name) + except (ImportError, AttributeError): + continue + + return None diff --git a/dimos/protocol/pubsub/impl/lcmpubsub.py b/dimos/protocol/pubsub/impl/lcmpubsub.py index 8d67e8ee24..ae746b51b4 100644 --- a/dimos/protocol/pubsub/impl/lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/lcmpubsub.py @@ -121,20 +121,14 @@ def from_channel_str(channel: str, default_lcm_type: type[DimosMsg] | None = Non Channel format: /topic#module.ClassName Falls back to default_lcm_type if type cannot be parsed. """ + from dimos.msgs import resolve_msg_type + if "#" not in channel: return Topic(topic=channel, lcm_type=default_lcm_type) topic_str, type_name = channel.rsplit("#", 1) - try: - # type_name format: "geometry_msgs.Vector3" - module_name, class_name = type_name.rsplit(".", 1) - import importlib - - module = importlib.import_module(f"dimos.msgs.{module_name}") - lcm_type = getattr(module, class_name) - return Topic(topic=topic_str, lcm_type=lcm_type) - except (ValueError, ImportError, AttributeError): - return Topic(topic=topic_str, lcm_type=default_lcm_type) + lcm_type = resolve_msg_type(type_name) + return Topic(topic=topic_str, lcm_type=lcm_type or default_lcm_type) class LCMPubSubBase(LCMService, AllPubSub[Topic, Any]): From 4e3fa637e763936b30b773b29e41488287b4e54b Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 17:09:49 +0800 Subject: [PATCH 09/18] lcm pattern sub test rename --- .../{test_lcm_regex.py => test_lcmpubsub_patternsub.py} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename dimos/protocol/pubsub/impl/{test_lcm_regex.py => test_lcmpubsub_patternsub.py} (95%) diff --git a/dimos/protocol/pubsub/impl/test_lcm_regex.py b/dimos/protocol/pubsub/impl/test_lcmpubsub_patternsub.py similarity index 95% rename from dimos/protocol/pubsub/impl/test_lcm_regex.py rename to dimos/protocol/pubsub/impl/test_lcmpubsub_patternsub.py index 559ebc1dee..35445c5256 100644 --- a/dimos/protocol/pubsub/impl/test_lcm_regex.py +++ b/dimos/protocol/pubsub/impl/test_lcmpubsub_patternsub.py @@ -124,9 +124,9 @@ def test_subscribe_all_with_typed_messages(lcm_typed: LCM) -> None: quat = Quaternion(0.0, 0.0, 0.0, 1.0) pose = Pose(vec, quat) - lcm_typed.publish(Topic("/sensor/position", lcm_type=Vector3), vec) - lcm_typed.publish(Topic("/sensor/orientation", lcm_type=Quaternion), quat) - lcm_typed.publish(Topic("/robot/pose", lcm_type=Pose), pose) + lcm_typed.publish(Topic("/sensor/position", Vector3), vec) + lcm_typed.publish(Topic("/sensor/orientation", Quaternion), quat) + lcm_typed.publish(Topic("/robot/pose", Pose), pose) time.sleep(0.1) From d077c6774a96e252e256082687b833fcf00a59cf Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 18:36:36 +0800 Subject: [PATCH 10/18] pattern sub tests and docs --- dimos/protocol/pubsub/impl/lcmpubsub.py | 6 +- .../pubsub/impl/test_lcmpubsub_patternsub.py | 151 ----------- dimos/protocol/pubsub/test_pattern_sub.py | 242 ++++++++++++++++++ docs/development/grid_testing.md | 116 +++++++++ 4 files changed, 363 insertions(+), 152 deletions(-) delete mode 100644 dimos/protocol/pubsub/impl/test_lcmpubsub_patternsub.py create mode 100644 dimos/protocol/pubsub/test_pattern_sub.py create mode 100644 docs/development/grid_testing.md diff --git a/dimos/protocol/pubsub/impl/lcmpubsub.py b/dimos/protocol/pubsub/impl/lcmpubsub.py index ae746b51b4..ecfe609260 100644 --- a/dimos/protocol/pubsub/impl/lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/lcmpubsub.py @@ -171,7 +171,11 @@ def noop() -> None: def handler(channel: str, msg: bytes) -> None: callback(msg, Topic.from_channel_str(channel, topic.lcm_type)) - lcm_subscription = self.l.subscribe(str(topic), handler) + pattern_str = str(topic) + if not pattern_str.endswith("*"): + pattern_str = f"{pattern_str}(#.*)?" + + lcm_subscription = self.l.subscribe(pattern_str, handler) else: topic_str = str(topic) if isinstance(topic, Topic) else topic lcm_subscription = self.l.subscribe(topic_str, lambda _, msg: callback(msg, topic)) diff --git a/dimos/protocol/pubsub/impl/test_lcmpubsub_patternsub.py b/dimos/protocol/pubsub/impl/test_lcmpubsub_patternsub.py deleted file mode 100644 index 35445c5256..0000000000 --- a/dimos/protocol/pubsub/impl/test_lcmpubsub_patternsub.py +++ /dev/null @@ -1,151 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Tests for LCM regex subscription support.""" - -from collections.abc import Generator -import time - -import pytest - -from dimos.msgs.geometry_msgs import Pose, Quaternion, Vector3 -from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Glob, LCMPubSubBase, Topic - - -@pytest.fixture -def lcm() -> Generator[LCMPubSubBase, None, None]: - lcm = LCMPubSubBase(autoconf=True) - lcm.start() - yield lcm - lcm.stop() - - -def test_subscribe_regex_via_topic(lcm: LCMPubSubBase) -> None: - """Test that regex pattern in Topic matches multiple channels and returns actual topic.""" - import re - - received: list[tuple[bytes, Topic]] = [] - - # Use re.compile() to indicate this is a pattern subscription - pattern_topic = Topic(topic=re.compile(r"/sensor/.*")) - lcm.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) - - lcm.publish(Topic("/sensor/temp"), b"temp_data") - lcm.publish(Topic("/sensor/humidity"), b"humidity_data") - lcm.publish(Topic("/other/topic"), b"should_not_match") - - time.sleep(0.1) - - assert len(received) == 2 - - # Check we received the actual matched topics, not the pattern - topics = {r[1].topic for r in received} - assert "/sensor/temp" in topics - assert "/sensor/humidity" in topics - - # Check data - data = {r[0] for r in received} - assert b"temp_data" in data - assert b"humidity_data" in data - - -def test_subscribe_glob_via_topic(lcm: LCMPubSubBase) -> None: - """Test that Glob pattern in Topic matches channels using glob syntax.""" - received: list[tuple[bytes, Topic]] = [] - - # Use Glob for glob-style pattern matching - pattern_topic = Topic(topic=Glob("/sensor/*")) - lcm.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) - - lcm.publish(Topic("/sensor/temp"), b"temp_data") - lcm.publish(Topic("/sensor/humidity"), b"humidity_data") - lcm.publish(Topic("/sensor/nested/deep"), b"should_not_match_single_star") - lcm.publish(Topic("/other/topic"), b"should_not_match") - - time.sleep(0.1) - - assert len(received) == 2 - topics = {r[1].topic for r in received} - assert "/sensor/temp" in topics - assert "/sensor/humidity" in topics - - -def test_subscribe_glob_doublestar(lcm: LCMPubSubBase) -> None: - """Test that ** in Glob matches nested paths.""" - received: list[tuple[bytes, Topic]] = [] - - pattern_topic = Topic(topic=Glob("/robot/**")) - lcm.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) - - lcm.publish(Topic("/robot/arm"), b"arm") - lcm.publish(Topic("/robot/arm/joint1"), b"joint1") - lcm.publish(Topic("/robot/leg/motor/speed"), b"speed") - lcm.publish(Topic("/sensor/temp"), b"should_not_match") - - time.sleep(0.1) - - assert len(received) == 3 - topics = {r[1].topic for r in received} - assert "/robot/arm" in topics - assert "/robot/arm/joint1" in topics - assert "/robot/leg/motor/speed" in topics - - -@pytest.fixture -def lcm_typed() -> Generator[LCM, None, None]: - lcm = LCM(autoconf=True) - lcm.start() - yield lcm - lcm.stop() - - -def test_subscribe_all_with_typed_messages(lcm_typed: LCM) -> None: - """Test that subscribe_all receives correctly typed and decoded messages.""" - from typing import Any - - received: list[tuple[Any, Topic]] = [] - - lcm_typed.subscribe_all(lambda msg, topic: received.append((msg, topic))) - - # Publish typed messages to different topics - vec = Vector3(1.0, 2.0, 3.0) - quat = Quaternion(0.0, 0.0, 0.0, 1.0) - pose = Pose(vec, quat) - - lcm_typed.publish(Topic("/sensor/position", Vector3), vec) - lcm_typed.publish(Topic("/sensor/orientation", Quaternion), quat) - lcm_typed.publish(Topic("/robot/pose", Pose), pose) - - time.sleep(0.1) - - assert len(received) == 3 - - # Check topics are correct (str(topic) includes type info: /topic#module.ClassName) - topics = {str(r[1]) for r in received} - assert "/sensor/position#geometry_msgs.Vector3" in topics - assert "/sensor/orientation#geometry_msgs.Quaternion" in topics - assert "/robot/pose#geometry_msgs.Pose" in topics - - # Check types and values are correctly decoded - for msg, topic in received: - if "position" in topic.pattern: - assert isinstance(msg, Vector3) - assert msg == vec - elif "orientation" in topic.pattern: - assert isinstance(msg, Quaternion) - assert msg == quat - elif "pose" in topic.pattern: - assert isinstance(msg, Pose) - assert msg == pose diff --git a/dimos/protocol/pubsub/test_pattern_sub.py b/dimos/protocol/pubsub/test_pattern_sub.py new file mode 100644 index 0000000000..d8b6b98687 --- /dev/null +++ b/dimos/protocol/pubsub/test_pattern_sub.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Grid tests for subscribe_all pattern subscriptions.""" + +from collections.abc import Callable, Generator +from contextlib import AbstractContextManager, contextmanager +from dataclasses import dataclass, field +import re +import time +from typing import Any, Generic, TypeVar + +import pytest + +from dimos.msgs.geometry_msgs import Pose, Quaternion, Vector3 +from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Glob, LCMPubSubBase, Topic +from dimos.protocol.pubsub.spec import AllPubSub, PubSub + +TopicT = TypeVar("TopicT") +MsgT = TypeVar("MsgT") + +# Type alias for (publisher, subscriber) tuple +PubSubPair = tuple[PubSub[TopicT, MsgT], AllPubSub[TopicT, MsgT]] + + +@dataclass +class Case(Generic[TopicT, MsgT]): + """Test case for grid testing pubsub implementations.""" + + name: str + pubsub_context: Callable[[], AbstractContextManager[PubSubPair[TopicT, MsgT]]] + topic_values: list[tuple[TopicT, MsgT]] + tags: set[str] = field(default_factory=set) + # Pattern tests: (pattern_topic, {indices of topic_values that should match}) + glob_patterns: list[tuple[TopicT, set[int]]] = field(default_factory=list) + regex_patterns: list[tuple[TopicT, set[int]]] = field(default_factory=list) + + +# --- Context managers --- + + +@contextmanager +def lcm_typed_context() -> Generator[tuple[LCM, LCM], None, None]: + pub = LCM(autoconf=True) + sub = LCM(autoconf=True) + pub.start() + sub.start() + yield pub, sub + pub.stop() + sub.stop() + + +@contextmanager +def lcm_bytes_context() -> Generator[tuple[LCMPubSubBase, LCMPubSubBase], None, None]: + pub = LCMPubSubBase(autoconf=True) + sub = LCMPubSubBase(autoconf=True) + pub.start() + sub.start() + yield pub, sub + pub.stop() + sub.stop() + + +# --- Test cases --- + +testcases: list[Case[Any, Any]] = [ + Case( + name="lcm_typed", + pubsub_context=lcm_typed_context, + topic_values=[ + (Topic("/sensor/position", Vector3), Vector3(1, 2, 3)), + (Topic("/sensor/orientation", Quaternion), Quaternion(0, 0, 0, 1)), + (Topic("/robot/arm", Pose), Pose(Vector3(4, 5, 6), Quaternion(0, 0, 0, 1))), + ], + tags={"all", "glob", "regex"}, + glob_patterns=[ + (Topic(topic=Glob("/sensor/*")), {0, 1}), + (Topic(topic=Glob("/**/arm")), {2}), + (Topic(topic=Glob("/**")), {0, 1, 2}), + ], + regex_patterns=[ + (Topic(re.compile(r"/sensor/.*")), {0, 1}), + (Topic(re.compile(r".*/arm"), Pose), {2}), + (Topic(re.compile(r".*/arm")), {2}), + (Topic(re.compile(r".*/arm#geometry.*")), {2}), + ], + ), + Case( + name="lcm_bytes", + pubsub_context=lcm_bytes_context, + topic_values=[ + (Topic("/sensor/temp"), b"temp"), + (Topic("/sensor/humidity"), b"humidity"), + (Topic("/robot/arm"), b"arm"), + ], + tags={"all", "glob", "regex"}, + glob_patterns=[ + (Topic(topic=Glob("/sensor/*")), {0, 1}), + (Topic(topic=Glob("/**/arm")), {2}), + (Topic(topic=Glob("/**")), {0, 1, 2}), + ], + regex_patterns=[ + (Topic(re.compile(r"/sensor/.*")), {0, 1}), + (Topic(re.compile(r".*/arm")), {2}), + ], + ), +] + +# Build filtered lists for parametrize +all_cases = [c for c in testcases if "all" in c.tags] +glob_cases = [c for c in testcases if "glob" in c.tags] +regex_cases = [c for c in testcases if "regex" in c.tags] + + +@pytest.mark.parametrize("tc", all_cases, ids=lambda c: c.name) +def test_subscribe_all_receives_all_topics(tc: Case[Any, Any]) -> None: + """Test that subscribe_all receives messages from all topics.""" + received: list[tuple[Any, Any]] = [] + + with tc.pubsub_context() as (pub, sub): + sub.subscribe_all(lambda msg, topic: received.append((msg, topic))) + + for topic, value in tc.topic_values: + pub.publish(topic, value) + + time.sleep(0.1) + + assert len(received) == len(tc.topic_values) + + # Verify all messages were received + received_msgs = [r[0] for r in received] + expected_msgs = [v for _, v in tc.topic_values] + for expected in expected_msgs: + assert expected in received_msgs + + +@pytest.mark.parametrize("tc", all_cases, ids=lambda c: c.name) +def test_subscribe_all_unsubscribe(tc: Case[Any, Any]) -> None: + """Test that unsubscribe stops receiving messages.""" + received: list[tuple[Any, Any]] = [] + topic, value = tc.topic_values[0] + + with tc.pubsub_context() as (pub, sub): + unsub = sub.subscribe_all(lambda msg, topic: received.append((msg, topic))) + + pub.publish(topic, value) + time.sleep(0.1) + assert len(received) == 1 + + unsub() + + pub.publish(topic, value) + time.sleep(0.1) + assert len(received) == 1 # No new messages + + +@pytest.mark.parametrize("tc", all_cases, ids=lambda c: c.name) +def test_subscribe_all_with_regular_subscribe(tc: Case[Any, Any]) -> None: + """Test that subscribe_all coexists with regular subscriptions.""" + all_received: list[tuple[Any, Any]] = [] + specific_received: list[tuple[Any, Any]] = [] + topic1, value1 = tc.topic_values[0] + topic2, value2 = tc.topic_values[1] + + with tc.pubsub_context() as (pub, sub): + sub.subscribe_all(lambda msg, topic: all_received.append((msg, topic))) + sub.subscribe(topic1, lambda msg, topic: specific_received.append((msg, topic))) + + pub.publish(topic1, value1) + pub.publish(topic2, value2) + time.sleep(0.1) + + # subscribe_all gets both + assert len(all_received) == 2 + + # specific subscription gets only topic1 + assert len(specific_received) == 1 + assert specific_received[0][0] == value1 + + +@pytest.mark.parametrize("tc", glob_cases, ids=lambda c: c.name) +def test_subscribe_glob(tc: Case[Any, Any]) -> None: + """Test that glob pattern subscriptions receive only matching topics.""" + for pattern_topic, expected_indices in tc.glob_patterns: + received: list[tuple[Any, Any]] = [] + + with tc.pubsub_context() as (pub, sub): + sub.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) + + for topic, value in tc.topic_values: + pub.publish(topic, value) + + time.sleep(0.1) + + assert len(received) == len(expected_indices), ( + f"Expected {len(expected_indices)} messages for pattern {pattern_topic}, " + f"got {len(received)}" + ) + + # Verify we received the expected messages + expected_msgs = [tc.topic_values[i][1] for i in expected_indices] + received_msgs = [r[0] for r in received] + for expected in expected_msgs: + assert expected in received_msgs + + +@pytest.mark.parametrize("tc", regex_cases, ids=lambda c: c.name) +def test_subscribe_regex(tc: Case[Any, Any]) -> None: + """Test that regex pattern subscriptions receive only matching topics.""" + for pattern_topic, expected_indices in tc.regex_patterns: + received: list[tuple[Any, Any]] = [] + + with tc.pubsub_context() as (pub, sub): + sub.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) + + for topic, value in tc.topic_values: + pub.publish(topic, value) + + time.sleep(0.1) + + assert len(received) == len(expected_indices), ( + f"Expected {len(expected_indices)} messages for pattern {pattern_topic}, " + f"got {len(received)}" + ) + + # Verify we received the expected messages + expected_msgs = [tc.topic_values[i][1] for i in expected_indices] + received_msgs = [r[0] for r in received] + for expected in expected_msgs: + assert expected in received_msgs diff --git a/docs/development/grid_testing.md b/docs/development/grid_testing.md new file mode 100644 index 0000000000..e5daab7b32 --- /dev/null +++ b/docs/development/grid_testing.md @@ -0,0 +1,116 @@ +# Grid Testing Strategy + +Grid tests run the same test logic across multiple implementations or configurations using pytest's parametrize feature. + +## Case Type Pattern + +Define a `Case` dataclass that holds everything needed to run tests against a specific implementation: + +```python +from collections.abc import Callable, Iterator +from contextlib import AbstractContextManager +from dataclasses import dataclass, field +from typing import Any, Generic + +@dataclass +class Case(Generic[TopicT, MsgT]): + name: str # For pytest id + pubsub_context: Callable[[], AbstractContextManager[...]] # Context manager factory + topic_values: list[tuple[TopicT, MsgT]] # Pre-generated test data (always 3 pairs) + tags: set[str] = field(default_factory=set) # Capability tags for filtering + + def __iter__(self) -> Iterator[Any]: + """Makes Case work with pytest.parametrize unpacking.""" + return iter((self.pubsub_context, self.topic_values)) +``` + +## Capability Tags + +Use tags to indicate what features each implementation supports: + +```python +testcases = [ + Case( + name="lcm_typed", + pubsub_context=lcm_typed_context, + topic_values=[...], + tags={"all", "glob", "regex"}, # LCM supports all pattern types + ), + Case( + name="shm_pickle", + pubsub_context=shm_context, + topic_values=[...], + tags={"all"}, # SharedMemory only supports subscribe_all + ), +] +``` + +## Filtered Test Lists + +Build separate lists for each capability to use with parametrize: + +```python +all_cases = [c for c in testcases if "all" in c.tags] +glob_cases = [c for c in testcases if "glob" in c.tags] +regex_cases = [c for c in testcases if "regex" in c.tags] +``` + +## Test Functions + +Use the filtered lists in parametrize decorators: + +```python +@pytest.mark.parametrize("case", all_cases, ids=lambda c: c.name) +def test_subscribe_all(case: Case) -> None: + with case.pubsub_context() as pubsub: + # Test logic using case.topic_values + ... + +@pytest.mark.parametrize("case", glob_cases, ids=lambda c: c.name) +def test_subscribe_glob(case: Case) -> None: + if not glob_cases: + pytest.skip("no implementations support glob") + with case.pubsub_context() as pubsub: + ... +``` + +## Context Managers + +Each implementation provides a context manager factory: + +```python +@contextmanager +def lcm_typed_context() -> Generator[LCM, None, None]: + lcm = LCM(autoconf=True) + lcm.start() + yield lcm + lcm.stop() +``` + +## Test Data Guidelines + +- Always provide exactly 3 topic/value pairs for consistency +- For typed implementations, use different types per topic to verify type handling +- For bytes implementations, use simple distinguishable byte strings + +```python +# Typed test data - different types per topic +typed_topic_values = [ + (Topic("/sensor/position", Vector3), Vector3(1, 2, 3)), + (Topic("/sensor/orientation", Quaternion), Quaternion(0, 0, 0, 1)), + (Topic("/robot/pose", Pose), Pose(...)), +] + +# Bytes test data +bytes_topic_values = [ + (Topic("/topic1"), b"msg1"), + (Topic("/topic2"), b"msg2"), + (Topic("/topic3"), b"msg3"), +] +``` + +## Examples + +- `dimos/protocol/pubsub/test_spec.py` - Basic pubsub operations +- `dimos/protocol/pubsub/test_subscribe_all.py` - Pattern subscriptions +- `dimos/protocol/pubsub/benchmark/testdata.py` - Benchmark cases From 3a0c0897c185bceb8a63a4f5a703ff41ad5ce8e8 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 18:41:50 +0800 Subject: [PATCH 11/18] bridge type spec --- dimos/protocol/pubsub/bridge.py | 96 +++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 dimos/protocol/pubsub/bridge.py diff --git a/dimos/protocol/pubsub/bridge.py b/dimos/protocol/pubsub/bridge.py new file mode 100644 index 0000000000..daa1e83f24 --- /dev/null +++ b/dimos/protocol/pubsub/bridge.py @@ -0,0 +1,96 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Bridge utilities for connecting pubsub systems.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Generic, Protocol, TypeVar + +from dimos.protocol.service.spec import Configurable, Service + +if TYPE_CHECKING: + from collections.abc import Callable + + from dimos.protocol.pubsub.spec import AllPubSub, PubSub + + +TopicT = TypeVar("TopicT") +MsgT = TypeVar("MsgT") +TopicFrom = TypeVar("TopicFrom") +TopicTo = TypeVar("TopicTo") +MsgFrom = TypeVar("MsgFrom") +MsgTo = TypeVar("MsgTo") + + +class Translator(Protocol[TopicFrom, TopicTo, MsgFrom, MsgTo]): # type: ignore[misc] + """Protocol for translating topics and messages between pubsub systems.""" + + def topic(self, topic: TopicFrom) -> TopicTo: + """Translate a topic from source to destination format.""" + ... + + def msg(self, msg: MsgFrom) -> MsgTo: + """Translate a message from source to destination format.""" + ... + + +def bridge( + pubsub1: AllPubSub[MsgFrom, TopicFrom], + pubsub2: PubSub[TopicTo, MsgTo], + translator: Translator[TopicFrom, TopicTo, MsgFrom, MsgTo], + # optionally we can override subscribe_all + # and only bridge a specific part of the pubsub tree + topic_from: TopicFrom | None = None, +) -> Callable[[], None]: + def pass_msg(msg: MsgTo, topic: TopicTo) -> None: + return pubsub2.publish(translator.topic(topic), translator.msg(msg)) + + # Bridge only specific messages from pubsub1 to pubsub2 + if topic_from: + return pubsub1.subscribe(topic_from, pass_msg) + + # Bridge all messages from pubsub1 to pubsub2 + return pubsub1.subscribe_all(pass_msg) + + +@dataclass +class BridgeConfig(Generic[TopicFrom, TopicTo, MsgFrom, MsgTo]): + """Configuration for a one-way bridge.""" + + source: AllPubSub[MsgFrom, TopicFrom] + destination: PubSub[TopicTo, MsgTo] + translator: Translator[TopicFrom, TopicTo, MsgFrom, MsgTo] + subscribe_topic: TopicFrom | None = None + + +class Bridge(Service[BridgeConfig[TopicFrom, TopicTo, MsgFrom, MsgTo]]): + """Service that bridges messages from one pubsub to another.""" + + _unsubscribe: Callable[[], None] | None = None + + def start(self) -> None: + super().start() + self._unsubscribe = bridge( + self.config.source, + self.config.destination, + self.config.translator, + ) + + def stop(self) -> None: + if self._unsubscribe: + self._unsubscribe() + self._unsubscribe = None + super().stop() From c8bdc335bdc9fe75d981db8873ed570bcdb0d4c7 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 20:05:32 +0800 Subject: [PATCH 12/18] fix: bridge.py type annotations and missing subscribe_topic param - Fix AllPubSub type parameter order (TopicFrom, MsgFrom not MsgFrom, TopicFrom) - Fix pass_msg callback signature to match spec (MsgFrom, TopicFrom) - Pass subscribe_topic config to bridge() function --- dimos/protocol/pubsub/bridge.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dimos/protocol/pubsub/bridge.py b/dimos/protocol/pubsub/bridge.py index daa1e83f24..403f8713e3 100644 --- a/dimos/protocol/pubsub/bridge.py +++ b/dimos/protocol/pubsub/bridge.py @@ -48,15 +48,15 @@ def msg(self, msg: MsgFrom) -> MsgTo: def bridge( - pubsub1: AllPubSub[MsgFrom, TopicFrom], + pubsub1: AllPubSub[TopicFrom, MsgFrom], pubsub2: PubSub[TopicTo, MsgTo], translator: Translator[TopicFrom, TopicTo, MsgFrom, MsgTo], # optionally we can override subscribe_all # and only bridge a specific part of the pubsub tree topic_from: TopicFrom | None = None, ) -> Callable[[], None]: - def pass_msg(msg: MsgTo, topic: TopicTo) -> None: - return pubsub2.publish(translator.topic(topic), translator.msg(msg)) + def pass_msg(msg: MsgFrom, topic: TopicFrom) -> None: + pubsub2.publish(translator.topic(topic), translator.msg(msg)) # Bridge only specific messages from pubsub1 to pubsub2 if topic_from: @@ -70,7 +70,7 @@ def pass_msg(msg: MsgTo, topic: TopicTo) -> None: class BridgeConfig(Generic[TopicFrom, TopicTo, MsgFrom, MsgTo]): """Configuration for a one-way bridge.""" - source: AllPubSub[MsgFrom, TopicFrom] + source: AllPubSub[TopicFrom, MsgFrom] destination: PubSub[TopicTo, MsgTo] translator: Translator[TopicFrom, TopicTo, MsgFrom, MsgTo] subscribe_topic: TopicFrom | None = None @@ -87,6 +87,7 @@ def start(self) -> None: self.config.source, self.config.destination, self.config.translator, + self.config.subscribe_topic, ) def stop(self) -> None: From 1bcb6c733f8f35a05a49ab968719f40579282946 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 20:12:16 +0800 Subject: [PATCH 13/18] fix: resolve mypy type errors across pubsub modules - helpers.py: Add type: ignore for getattr Any return - lcmpubsub.py: Add type: ignore for callback type variance and mixin incompatibility - shmpubsub.py: Add type: ignore for mixin incompatibility - transport.py: Add arg-type to existing type: ignore --- dimos/core/transport.py | 2 +- dimos/msgs/helpers.py | 2 +- dimos/protocol/pubsub/impl/lcmpubsub.py | 10 +++++----- dimos/protocol/pubsub/impl/shmpubsub.py | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 2e856b5e67..6605b13f3d 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -112,7 +112,7 @@ def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override] if not self._started: self.start() - return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value] + return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value, arg-type] class JpegLcmTransport(LCMTransport): # type: ignore[type-arg] diff --git a/dimos/msgs/helpers.py b/dimos/msgs/helpers.py index 3a6041e287..8464ec4ab1 100644 --- a/dimos/msgs/helpers.py +++ b/dimos/msgs/helpers.py @@ -46,7 +46,7 @@ def resolve_msg_type(type_name: str) -> type[DimosMsg] | None: for path in import_paths: try: module = importlib.import_module(path) - return getattr(module, class_name) + return getattr(module, class_name) # type: ignore[no-any-return] except (ImportError, AttributeError): continue diff --git a/dimos/protocol/pubsub/impl/lcmpubsub.py b/dimos/protocol/pubsub/impl/lcmpubsub.py index ecfe609260..f0679ea009 100644 --- a/dimos/protocol/pubsub/impl/lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/lcmpubsub.py @@ -152,9 +152,9 @@ def publish(self, topic: Topic | str, message: bytes) -> None: self.l.publish(topic_str, message) def subscribe_all(self, callback: Callable[[bytes, Topic], Any]) -> Callable[[], None]: - return self.subscribe(Topic(re.compile(".*")), callback) + return self.subscribe(Topic(re.compile(".*")), callback) # type: ignore[arg-type] - def subscribe( + def subscribe( # type: ignore[override] self, topic: Topic | str, callback: Callable[[bytes, Topic | str], Any] ) -> Callable[[], None]: if self.l is None: @@ -191,19 +191,19 @@ def unsubscribe() -> None: return unsubscribe -class LCM( +class LCM( # type: ignore[misc] LCMEncoderMixin, # type: ignore[type-arg] LCMPubSubBase, ): ... -class PickleLCM( +class PickleLCM( # type: ignore[misc] PickleEncoderMixin, # type: ignore[type-arg] LCMPubSubBase, ): ... -class JpegLCM( +class JpegLCM( # type: ignore[misc] JpegEncoderMixin, # type: ignore[type-arg] LCMPubSubBase, ): ... diff --git a/dimos/protocol/pubsub/impl/shmpubsub.py b/dimos/protocol/pubsub/impl/shmpubsub.py index 416e257281..3389a54b4c 100644 --- a/dimos/protocol/pubsub/impl/shmpubsub.py +++ b/dimos/protocol/pubsub/impl/shmpubsub.py @@ -336,7 +336,7 @@ def reconfigure(self, topic: Topic, *, capacity: int) -> dict: # type: ignore[t return self._shm.reconfigure(str(topic), capacity=capacity) -class LCMSharedMemory( +class LCMSharedMemory( # type: ignore[misc] LCMEncoderMixin[Topic], LCMSharedMemoryPubSubBase, ): From 46ce9ee0f947cdc13f9337a823c2a262ba12ff74 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 20:33:20 +0800 Subject: [PATCH 14/18] pattern sub test fix --- dimos/protocol/pubsub/encoders.py | 2 +- dimos/protocol/pubsub/test_pattern_sub.py | 73 +++++++++++------------ 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/dimos/protocol/pubsub/encoders.py b/dimos/protocol/pubsub/encoders.py index 62abf77579..ca4ddeac17 100644 --- a/dimos/protocol/pubsub/encoders.py +++ b/dimos/protocol/pubsub/encoders.py @@ -95,7 +95,7 @@ def encode(self, msg: DimosMsg, _: TopicT) -> bytes: def decode(self, msg: bytes, topic: TopicT) -> DimosMsg: lcm_type = getattr(topic, "lcm_type", None) if lcm_type is None: - raise ValueError("Cannot decode: topic has no lcm_type") + return None return cast("DimosMsg", lcm_type.lcm_decode(msg)) diff --git a/dimos/protocol/pubsub/test_pattern_sub.py b/dimos/protocol/pubsub/test_pattern_sub.py index d8b6b98687..fd06993173 100644 --- a/dimos/protocol/pubsub/test_pattern_sub.py +++ b/dimos/protocol/pubsub/test_pattern_sub.py @@ -48,72 +48,71 @@ class Case(Generic[TopicT, MsgT]): regex_patterns: list[tuple[TopicT, set[int]]] = field(default_factory=list) -# --- Context managers --- - - @contextmanager def lcm_typed_context() -> Generator[tuple[LCM, LCM], None, None]: pub = LCM(autoconf=True) - sub = LCM(autoconf=True) + sub = LCM(autoconf=False) pub.start() sub.start() - yield pub, sub - pub.stop() - sub.stop() + try: + yield pub, sub + finally: + pub.stop() + sub.stop() @contextmanager def lcm_bytes_context() -> Generator[tuple[LCMPubSubBase, LCMPubSubBase], None, None]: pub = LCMPubSubBase(autoconf=True) - sub = LCMPubSubBase(autoconf=True) + sub = LCMPubSubBase(autoconf=False) pub.start() sub.start() - yield pub, sub - pub.stop() - sub.stop() - + try: + yield pub, sub + finally: + pub.stop() + sub.stop() -# --- Test cases --- testcases: list[Case[Any, Any]] = [ Case( name="lcm_typed", pubsub_context=lcm_typed_context, topic_values=[ - (Topic("/sensor/position", Vector3), Vector3(1, 2, 3)), - (Topic("/sensor/orientation", Quaternion), Quaternion(0, 0, 0, 1)), - (Topic("/robot/arm", Pose), Pose(Vector3(4, 5, 6), Quaternion(0, 0, 0, 1))), + (Topic("/pattern_sub/sensor/position", Vector3), Vector3(1, 2, 3)), + (Topic("/pattern_sub/sensor/orientation", Quaternion), Quaternion(0, 0, 0, 1)), + (Topic("/pattern_sub/robot/arm", Pose), Pose(Vector3(4, 5, 6), Quaternion(0, 0, 0, 1))), ], tags={"all", "glob", "regex"}, glob_patterns=[ - (Topic(topic=Glob("/sensor/*")), {0, 1}), - (Topic(topic=Glob("/**/arm")), {2}), - (Topic(topic=Glob("/**")), {0, 1, 2}), + (Topic(topic=Glob("/pattern_sub/sensor/*")), {0, 1}), + (Topic(topic=Glob("/pattern_sub/**/arm")), {2}), + (Topic(topic=Glob("/pattern_sub/**")), {0, 1, 2}), ], regex_patterns=[ - (Topic(re.compile(r"/sensor/.*")), {0, 1}), - (Topic(re.compile(r".*/arm"), Pose), {2}), - (Topic(re.compile(r".*/arm")), {2}), - (Topic(re.compile(r".*/arm#geometry.*")), {2}), + (Topic(re.compile(r"/pattern_sub/sensor/.*")), {0, 1}), + (Topic(re.compile(r"/pattern_sub.*/arm"), Pose), {2}), + (Topic(re.compile(r"/pattern_sub.*/arm")), {2}), + (Topic(re.compile(r"/pattern_sub.*/arm#geometry.*")), {2}), ], ), Case( name="lcm_bytes", pubsub_context=lcm_bytes_context, topic_values=[ - (Topic("/sensor/temp"), b"temp"), - (Topic("/sensor/humidity"), b"humidity"), - (Topic("/robot/arm"), b"arm"), + (Topic("/pattern_sub/sensor/temp"), b"temp"), + (Topic("/pattern_sub/sensor/humidity"), b"humidity"), + (Topic("/pattern_sub/robot/arm"), b"arm"), ], tags={"all", "glob", "regex"}, glob_patterns=[ - (Topic(topic=Glob("/sensor/*")), {0, 1}), - (Topic(topic=Glob("/**/arm")), {2}), - (Topic(topic=Glob("/**")), {0, 1, 2}), + (Topic(topic=Glob("/pattern_sub/sensor/*")), {0, 1}), + (Topic(topic=Glob("/pattern_sub/**/arm")), {2}), + (Topic(topic=Glob("/pattern_sub/**")), {0, 1, 2}), ], regex_patterns=[ - (Topic(re.compile(r"/sensor/.*")), {0, 1}), - (Topic(re.compile(r".*/arm")), {2}), + (Topic(re.compile(r"/pattern_sub/sensor/.*")), {0, 1}), + (Topic(re.compile(r"/pattern_sub.*/arm")), {2}), ], ), ] @@ -135,7 +134,7 @@ def test_subscribe_all_receives_all_topics(tc: Case[Any, Any]) -> None: for topic, value in tc.topic_values: pub.publish(topic, value) - time.sleep(0.1) + time.sleep(0.01) assert len(received) == len(tc.topic_values) @@ -156,13 +155,13 @@ def test_subscribe_all_unsubscribe(tc: Case[Any, Any]) -> None: unsub = sub.subscribe_all(lambda msg, topic: received.append((msg, topic))) pub.publish(topic, value) - time.sleep(0.1) + time.sleep(0.01) assert len(received) == 1 unsub() pub.publish(topic, value) - time.sleep(0.1) + time.sleep(0.01) assert len(received) == 1 # No new messages @@ -180,7 +179,7 @@ def test_subscribe_all_with_regular_subscribe(tc: Case[Any, Any]) -> None: pub.publish(topic1, value1) pub.publish(topic2, value2) - time.sleep(0.1) + time.sleep(0.01) # subscribe_all gets both assert len(all_received) == 2 @@ -202,7 +201,7 @@ def test_subscribe_glob(tc: Case[Any, Any]) -> None: for topic, value in tc.topic_values: pub.publish(topic, value) - time.sleep(0.1) + time.sleep(0.01) assert len(received) == len(expected_indices), ( f"Expected {len(expected_indices)} messages for pattern {pattern_topic}, " @@ -228,7 +227,7 @@ def test_subscribe_regex(tc: Case[Any, Any]) -> None: for topic, value in tc.topic_values: pub.publish(topic, value) - time.sleep(0.1) + time.sleep(0.01) assert len(received) == len(expected_indices), ( f"Expected {len(expected_indices)} messages for pattern {pattern_topic}, " From 3a4f03ae08ef7fc9ed0e2f72483ace7ee7151bd5 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 26 Jan 2026 20:50:32 +0800 Subject: [PATCH 15/18] pattern tests fixed --- dimos/protocol/pubsub/test_pattern_sub.py | 59 +++++++++++++++-------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/dimos/protocol/pubsub/test_pattern_sub.py b/dimos/protocol/pubsub/test_pattern_sub.py index fd06993173..1c2f125e41 100644 --- a/dimos/protocol/pubsub/test_pattern_sub.py +++ b/dimos/protocol/pubsub/test_pattern_sub.py @@ -79,40 +79,40 @@ def lcm_bytes_context() -> Generator[tuple[LCMPubSubBase, LCMPubSubBase], None, name="lcm_typed", pubsub_context=lcm_typed_context, topic_values=[ - (Topic("/pattern_sub/sensor/position", Vector3), Vector3(1, 2, 3)), - (Topic("/pattern_sub/sensor/orientation", Quaternion), Quaternion(0, 0, 0, 1)), - (Topic("/pattern_sub/robot/arm", Pose), Pose(Vector3(4, 5, 6), Quaternion(0, 0, 0, 1))), + (Topic("/sensor/position", Vector3), Vector3(1, 2, 3)), + (Topic("/sensor/orientation", Quaternion), Quaternion(0, 0, 0, 1)), + (Topic("/robot/arm", Pose), Pose(Vector3(4, 5, 6), Quaternion(0, 0, 0, 1))), ], tags={"all", "glob", "regex"}, glob_patterns=[ - (Topic(topic=Glob("/pattern_sub/sensor/*")), {0, 1}), - (Topic(topic=Glob("/pattern_sub/**/arm")), {2}), - (Topic(topic=Glob("/pattern_sub/**")), {0, 1, 2}), + (Topic(topic=Glob("/sensor/*")), {0, 1}), + (Topic(topic=Glob("/**/arm")), {2}), + (Topic(topic=Glob("/**")), {0, 1, 2}), ], regex_patterns=[ - (Topic(re.compile(r"/pattern_sub/sensor/.*")), {0, 1}), - (Topic(re.compile(r"/pattern_sub.*/arm"), Pose), {2}), - (Topic(re.compile(r"/pattern_sub.*/arm")), {2}), - (Topic(re.compile(r"/pattern_sub.*/arm#geometry.*")), {2}), + (Topic(re.compile(r"/sensor/.*")), {0, 1}), + (Topic(re.compile(r".*/arm"), Pose), {2}), + (Topic(re.compile(r".*/arm")), {2}), + (Topic(re.compile(r".*/arm#geometry.*")), {2}), ], ), Case( name="lcm_bytes", pubsub_context=lcm_bytes_context, topic_values=[ - (Topic("/pattern_sub/sensor/temp"), b"temp"), - (Topic("/pattern_sub/sensor/humidity"), b"humidity"), - (Topic("/pattern_sub/robot/arm"), b"arm"), + (Topic("/sensor/temp"), b"temp"), + (Topic("/sensor/humidity"), b"humidity"), + (Topic("/robot/arm"), b"arm"), ], tags={"all", "glob", "regex"}, glob_patterns=[ - (Topic(topic=Glob("/pattern_sub/sensor/*")), {0, 1}), - (Topic(topic=Glob("/pattern_sub/**/arm")), {2}), - (Topic(topic=Glob("/pattern_sub/**")), {0, 1, 2}), + (Topic(topic=Glob("/sensor/*")), {0, 1}), + (Topic(topic=Glob("/**/arm")), {2}), + (Topic(topic=Glob("/**")), {0, 1, 2}), ], regex_patterns=[ - (Topic(re.compile(r"/pattern_sub/sensor/.*")), {0, 1}), - (Topic(re.compile(r"/pattern_sub.*/arm")), {2}), + (Topic(re.compile(r"/sensor/.*")), {0, 1}), + (Topic(re.compile(r".*/arm")), {2}), ], ), ] @@ -123,13 +123,25 @@ def lcm_bytes_context() -> Generator[tuple[LCMPubSubBase, LCMPubSubBase], None, regex_cases = [c for c in testcases if "regex" in c.tags] +def _topic_matches_prefix(topic: Any, prefix: str = "/") -> bool: + """Check if topic string starts with prefix. + + LCM uses UDP multicast, so messages from other tests running in parallel + can leak into subscribe_all callbacks. We filter to only our test topics. + """ + topic_str = str(topic.topic if hasattr(topic, "topic") else topic) + return topic_str.startswith(prefix) + + @pytest.mark.parametrize("tc", all_cases, ids=lambda c: c.name) def test_subscribe_all_receives_all_topics(tc: Case[Any, Any]) -> None: """Test that subscribe_all receives messages from all topics.""" received: list[tuple[Any, Any]] = [] with tc.pubsub_context() as (pub, sub): + # Filter to only our test topics (LCM multicast can leak from parallel tests) sub.subscribe_all(lambda msg, topic: received.append((msg, topic))) + time.sleep(0.01) # Allow subscription to be ready for topic, value in tc.topic_values: pub.publish(topic, value) @@ -153,6 +165,7 @@ def test_subscribe_all_unsubscribe(tc: Case[Any, Any]) -> None: with tc.pubsub_context() as (pub, sub): unsub = sub.subscribe_all(lambda msg, topic: received.append((msg, topic))) + time.sleep(0.01) # Allow subscription to be ready pub.publish(topic, value) time.sleep(0.01) @@ -174,8 +187,13 @@ def test_subscribe_all_with_regular_subscribe(tc: Case[Any, Any]) -> None: topic2, value2 = tc.topic_values[1] with tc.pubsub_context() as (pub, sub): - sub.subscribe_all(lambda msg, topic: all_received.append((msg, topic))) + sub.subscribe_all( + lambda msg, topic: all_received.append((msg, topic)) + if _topic_matches_prefix(topic) + else None + ) sub.subscribe(topic1, lambda msg, topic: specific_received.append((msg, topic))) + time.sleep(0.01) # Allow subscriptions to be ready pub.publish(topic1, value1) pub.publish(topic2, value2) @@ -197,6 +215,7 @@ def test_subscribe_glob(tc: Case[Any, Any]) -> None: with tc.pubsub_context() as (pub, sub): sub.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) + time.sleep(0.01) # Allow subscription to be ready for topic, value in tc.topic_values: pub.publish(topic, value) @@ -224,6 +243,8 @@ def test_subscribe_regex(tc: Case[Any, Any]) -> None: with tc.pubsub_context() as (pub, sub): sub.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) + time.sleep(0.01) + for topic, value in tc.topic_values: pub.publish(topic, value) From 7db71762665cf854d35a0d2b3e76c7d6a9a26d03 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Thu, 29 Jan 2026 16:09:49 +0800 Subject: [PATCH 16/18] fix: add import-untyped to mypy ignore comments for cupy/contact_graspnet These third-party libraries are installed but missing py.typed markers, causing mypy to raise import-untyped errors in addition to import-not-found. --- dimos/core/__init__.py | 2 +- .../manipulation/contact_graspnet_pytorch/inference.py | 6 +++--- dimos/msgs/sensor_msgs/Image.py | 2 +- dimos/msgs/sensor_msgs/image_impls/AbstractImage.py | 4 ++-- dimos/msgs/sensor_msgs/image_impls/CudaImage.py | 4 ++-- dimos/perception/common/utils.py | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index ed6888632b..979c201189 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -69,7 +69,7 @@ def teardown(self, worker) -> None: # type: ignore[no-untyped-def] import sys if "cupy" in sys.modules: - import cupy as cp # type: ignore[import-not-found] + import cupy as cp # type: ignore[import-not-found, import-untyped] # Clear memory pools mempool = cp.get_default_memory_pool() diff --git a/dimos/models/manipulation/contact_graspnet_pytorch/inference.py b/dimos/models/manipulation/contact_graspnet_pytorch/inference.py index 76bb377869..b86f61b7fe 100644 --- a/dimos/models/manipulation/contact_graspnet_pytorch/inference.py +++ b/dimos/models/manipulation/contact_graspnet_pytorch/inference.py @@ -2,11 +2,11 @@ import glob import os -from contact_graspnet_pytorch import config_utils # type: ignore[import-not-found] -from contact_graspnet_pytorch.contact_grasp_estimator import ( # type: ignore[import-not-found] +from contact_graspnet_pytorch import config_utils # type: ignore[import-not-found, import-untyped] +from contact_graspnet_pytorch.contact_grasp_estimator import ( # type: ignore[import-not-found, import-untyped] GraspEstimator, ) -from contact_graspnet_pytorch.data import ( # type: ignore[import-not-found] +from contact_graspnet_pytorch.data import ( # type: ignore[import-not-found, import-untyped] load_available_input_data, ) import numpy as np diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index de3e7abeca..a9c842f1ab 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -47,7 +47,7 @@ ) try: - import cupy as cp # type: ignore[import-not-found] + import cupy as cp # type: ignore[import-not-found, import-untyped] except Exception: cp = None diff --git a/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py b/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py index b71c5476fc..9cff0e8bd4 100644 --- a/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py +++ b/dimos/msgs/sensor_msgs/image_impls/AbstractImage.py @@ -25,7 +25,7 @@ import rerun as rr try: - import cupy as cp # type: ignore[import-not-found] + import cupy as cp # type: ignore[import-not-found, import-untyped] HAS_CUDA = True except Exception: # pragma: no cover - optional dependency @@ -35,7 +35,7 @@ # NVRTC defaults to C++11; libcu++ in recent CUDA requires at least C++17. if HAS_CUDA: try: - import cupy.cuda.compiler as _cupy_compiler # type: ignore[import-not-found] + import cupy.cuda.compiler as _cupy_compiler # type: ignore[import-not-found, import-untyped] if not getattr(_cupy_compiler, "_dimos_force_cxx17", False): _orig_compile_using_nvrtc = _cupy_compiler.compile_using_nvrtc diff --git a/dimos/msgs/sensor_msgs/image_impls/CudaImage.py b/dimos/msgs/sensor_msgs/image_impls/CudaImage.py index be48699347..2c58071b85 100644 --- a/dimos/msgs/sensor_msgs/image_impls/CudaImage.py +++ b/dimos/msgs/sensor_msgs/image_impls/CudaImage.py @@ -30,8 +30,8 @@ ) try: - import cupy as cp # type: ignore[import-not-found] - from cupyx.scipy import ( # type: ignore[import-not-found] + import cupy as cp # type: ignore[import-not-found, import-untyped] + from cupyx.scipy import ( # type: ignore[import-not-found, import-untyped] ndimage as cndimage, signal as csignal, ) diff --git a/dimos/perception/common/utils.py b/dimos/perception/common/utils.py index a590daf0a3..76d27f4897 100644 --- a/dimos/perception/common/utils.py +++ b/dimos/perception/common/utils.py @@ -79,7 +79,7 @@ # Optional CuPy support try: # pragma: no cover - optional dependency - import cupy as cp # type: ignore[import-not-found] + import cupy as cp # type: ignore[import-not-found, import-untyped] _HAS_CUDA = True except Exception: # pragma: no cover - optional dependency From 07144d83333e2b21a832178bcb7b67b51582078c Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Thu, 29 Jan 2026 16:42:34 +0800 Subject: [PATCH 17/18] fix: add DecodingError and LCM_SELF_TEST filtering to pubsub encoders - Add DecodingError exception for skipping messages in decode() - Add LCMTopicProto protocol for type-safe LCM topic handling - Filter LCM_SELF_TEST topic in both encoders and lcmpubsub handler - Fix type annotations in shmpubsub and ros_bridge --- dimos/protocol/pubsub/benchmark/type.py | 7 +++- dimos/protocol/pubsub/encoders.py | 48 ++++++++++++++++--------- dimos/protocol/pubsub/impl/lcmpubsub.py | 4 ++- dimos/protocol/pubsub/impl/shmpubsub.py | 2 +- dimos/robot/ros_bridge.py | 4 +-- 5 files changed, 44 insertions(+), 21 deletions(-) diff --git a/dimos/protocol/pubsub/benchmark/type.py b/dimos/protocol/pubsub/benchmark/type.py index 79101df9c5..91a9ac8c11 100644 --- a/dimos/protocol/pubsub/benchmark/type.py +++ b/dimos/protocol/pubsub/benchmark/type.py @@ -263,4 +263,9 @@ def fmt(v: float) -> str: return f"{v:.1f}s" return f"{v * 1000:.0f}ms" - self._print_heatmap("Latency", lambda r: r.receive_time, fmt, high_is_good=False) + self._print_heatmap( + "Latency", + lambda r: r.receive_time, + fmt, + high_is_good=False, + ) diff --git a/dimos/protocol/pubsub/encoders.py b/dimos/protocol/pubsub/encoders.py index ca4ddeac17..886856af4f 100644 --- a/dimos/protocol/pubsub/encoders.py +++ b/dimos/protocol/pubsub/encoders.py @@ -18,7 +18,7 @@ from abc import ABC, abstractmethod import pickle -from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast +from typing import TYPE_CHECKING, Generic, Protocol, TypeVar, cast from dimos.msgs import DimosMsg from dimos.msgs.sensor_msgs import Image @@ -31,6 +31,12 @@ EncodingT = TypeVar("EncodingT") +class DecodingError(Exception): + """Raised by decode() to skip a message without calling the callback.""" + + pass + + class PubSubEncoderMixin(Generic[TopicT, MsgT, EncodingT], ABC): """Mixin that encodes messages before publishing and decodes them after receiving. @@ -70,7 +76,10 @@ def subscribe( """Subscribe with automatic decoding.""" def wrapper_cb(encoded_data: EncodingT, topic: TopicT) -> None: - decoded_message = self.decode(encoded_data, topic) + try: + decoded_message = self.decode(encoded_data, topic) + except DecodingError: + return callback(decoded_message, topic) return cast("Callable[[], None]", super().subscribe(topic, wrapper_cb)) # type: ignore[misc] @@ -86,27 +95,34 @@ def decode(self, msg: bytes, _: TopicT) -> MsgT: return cast("MsgT", pickle.loads(msg)) -class LCMEncoderMixin(PubSubEncoderMixin[TopicT, DimosMsg, bytes]): +class LCMTopicProto(Protocol): + """Protocol for topics usable with LCM encoders.""" + + topic: str # At decode time, always concrete string + lcm_type: type[DimosMsg] | None + + +class LCMEncoderMixin(PubSubEncoderMixin[LCMTopicProto, DimosMsg, bytes]): """Encoder mixin for DimosMsg using LCM binary encoding.""" - def encode(self, msg: DimosMsg, _: TopicT) -> bytes: + def encode(self, msg: DimosMsg, _: LCMTopicProto) -> bytes: return msg.lcm_encode() - def decode(self, msg: bytes, topic: TopicT) -> DimosMsg: - lcm_type = getattr(topic, "lcm_type", None) - if lcm_type is None: - return None - return cast("DimosMsg", lcm_type.lcm_decode(msg)) + def decode(self, msg: bytes, topic: LCMTopicProto) -> DimosMsg: + if topic.lcm_type is None: + raise DecodingError(f"Cannot decode: topic {topic.topic!r} has no lcm_type") + return topic.lcm_type.lcm_decode(msg) -class JpegEncoderMixin(PubSubEncoderMixin[TopicT, Image, bytes]): +class JpegEncoderMixin(PubSubEncoderMixin[LCMTopicProto, Image, bytes]): """Encoder mixin for DimosMsg using JPEG encoding (for images).""" - def encode(self, msg: Image, _: TopicT) -> bytes: + def encode(self, msg: Image, _: LCMTopicProto) -> bytes: return msg.lcm_jpeg_encode() - def decode(self, msg: bytes, topic: TopicT) -> Image: - lcm_type = getattr(topic, "lcm_type", None) - if lcm_type is None: - raise ValueError("Cannot decode: topic has no lcm_type") - return cast("Image", lcm_type.lcm_jpeg_decode(msg)) + def decode(self, msg: bytes, topic: LCMTopicProto) -> Image: + if topic.topic == "LCM_SELF_TEST": + raise DecodingError("Ignoring LCM_SELF_TEST topic") + if topic.lcm_type is None: + raise DecodingError(f"Cannot decode: topic {topic.topic!r} has no lcm_type") + return cast("type[Image]", topic.lcm_type).lcm_jpeg_decode(msg) diff --git a/dimos/protocol/pubsub/impl/lcmpubsub.py b/dimos/protocol/pubsub/impl/lcmpubsub.py index f0679ea009..652b506bfa 100644 --- a/dimos/protocol/pubsub/impl/lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/lcmpubsub.py @@ -23,7 +23,7 @@ LCMEncoderMixin, PickleEncoderMixin, ) -from dimos.protocol.pubsub.spec import AllPubSub, PubSub +from dimos.protocol.pubsub.spec import AllPubSub from dimos.protocol.service.lcmservice import LCMConfig, LCMService, autoconf from dimos.utils.logging_config import setup_logger @@ -169,6 +169,8 @@ def noop() -> None: if isinstance(topic, Topic) and topic.is_pattern: def handler(channel: str, msg: bytes) -> None: + if channel == "LCM_SELF_TEST": + return callback(msg, Topic.from_channel_str(channel, topic.lcm_type)) pattern_str = str(topic) diff --git a/dimos/protocol/pubsub/impl/shmpubsub.py b/dimos/protocol/pubsub/impl/shmpubsub.py index 3389a54b4c..db0a91e579 100644 --- a/dimos/protocol/pubsub/impl/shmpubsub.py +++ b/dimos/protocol/pubsub/impl/shmpubsub.py @@ -337,7 +337,7 @@ def reconfigure(self, topic: Topic, *, capacity: int) -> dict: # type: ignore[t class LCMSharedMemory( # type: ignore[misc] - LCMEncoderMixin[Topic], + LCMEncoderMixin, LCMSharedMemoryPubSubBase, ): """SharedMemory pubsub that uses LCM binary encoding (no pickle overhead).""" diff --git a/dimos/robot/ros_bridge.py b/dimos/robot/ros_bridge.py index 015b69393e..6e3e78542c 100644 --- a/dimos/robot/ros_bridge.py +++ b/dimos/robot/ros_bridge.py @@ -157,7 +157,7 @@ def ros_callback(msg) -> None: # type: ignore[no-untyped-def] def dimos_callback(msg, _topic) -> None: # type: ignore[no-untyped-def] self._dimos_to_ros(msg, ros_publisher, topic_name) - dimos_subscription = self.lcm.subscribe(dimos_topic, dimos_callback) + dimos_subscription = self.lcm.subscribe(dimos_topic, dimos_callback) # type: ignore[arg-type] logger.info(f" DIMOS → ROS: Subscribing to DIMOS topic {dimos_topic_name}") else: raise ValueError(f"Invalid bridge direction: {direction}") @@ -196,7 +196,7 @@ def _ros_to_dimos( topic_name: Name of the topic for tracking """ dimos_msg = dimos_type.from_ros_msg(ros_msg) # type: ignore[attr-defined] - self.lcm.publish(dimos_topic, dimos_msg) + self.lcm.publish(dimos_topic, dimos_msg) # type: ignore[arg-type] def _dimos_to_ros(self, dimos_msg: Any, ros_publisher, _topic_name: str) -> None: # type: ignore[no-untyped-def] """Convert DIMOS message to ROS and publish. From 9f1ffc7c015713917f1602ff1f2f47c677b91a94 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Thu, 29 Jan 2026 16:57:15 +0800 Subject: [PATCH 18/18] fix: resolve ruff warnings for explicit re-exports and loop variable binding - Use explicit re-export syntax in impl/__init__.py - Bind loop variable in lambda default argument to fix B023 --- dimos/protocol/pubsub/bridge.py | 2 +- dimos/protocol/pubsub/impl/__init__.py | 8 ++++++-- dimos/protocol/pubsub/test_pattern_sub.py | 4 ++-- dimos/protocol/service/lcmservice.py | 4 ---- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dimos/protocol/pubsub/bridge.py b/dimos/protocol/pubsub/bridge.py index 403f8713e3..f312caed7b 100644 --- a/dimos/protocol/pubsub/bridge.py +++ b/dimos/protocol/pubsub/bridge.py @@ -19,7 +19,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Generic, Protocol, TypeVar -from dimos.protocol.service.spec import Configurable, Service +from dimos.protocol.service.spec import Service if TYPE_CHECKING: from collections.abc import Callable diff --git a/dimos/protocol/pubsub/impl/__init__.py b/dimos/protocol/pubsub/impl/__init__.py index 5512277de6..63a5bfa6d6 100644 --- a/dimos/protocol/pubsub/impl/__init__.py +++ b/dimos/protocol/pubsub/impl/__init__.py @@ -1,2 +1,6 @@ -from dimos.protocol.pubsub.impl.lcmpubsub import LCM, LCMPubSubBase, PickleLCM -from dimos.protocol.pubsub.impl.memory import Memory +from dimos.protocol.pubsub.impl.lcmpubsub import ( + LCM as LCM, + LCMPubSubBase as LCMPubSubBase, + PickleLCM as PickleLCM, +) +from dimos.protocol.pubsub.impl.memory import Memory as Memory diff --git a/dimos/protocol/pubsub/test_pattern_sub.py b/dimos/protocol/pubsub/test_pattern_sub.py index 1c2f125e41..2706e09b66 100644 --- a/dimos/protocol/pubsub/test_pattern_sub.py +++ b/dimos/protocol/pubsub/test_pattern_sub.py @@ -214,7 +214,7 @@ def test_subscribe_glob(tc: Case[Any, Any]) -> None: received: list[tuple[Any, Any]] = [] with tc.pubsub_context() as (pub, sub): - sub.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) + sub.subscribe(pattern_topic, lambda msg, topic, r=received: r.append((msg, topic))) time.sleep(0.01) # Allow subscription to be ready for topic, value in tc.topic_values: @@ -241,7 +241,7 @@ def test_subscribe_regex(tc: Case[Any, Any]) -> None: received: list[tuple[Any, Any]] = [] with tc.pubsub_context() as (pub, sub): - sub.subscribe(pattern_topic, lambda msg, topic: received.append((msg, topic))) + sub.subscribe(pattern_topic, lambda msg, topic, r=received: r.append((msg, topic))) time.sleep(0.01) diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py index 0515f3c346..4655780fb3 100644 --- a/dimos/protocol/service/lcmservice.py +++ b/dimos/protocol/service/lcmservice.py @@ -20,13 +20,9 @@ import platform import threading import traceback -from typing import TYPE_CHECKING import lcm -if TYPE_CHECKING: - from dimos.msgs import DimosMsg - from dimos.protocol.service.spec import Service from dimos.protocol.service.system_configurator import ( BufferConfiguratorLinux,