Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ yolo11n.pt
# symlink one of .envrc.* if you'd like to use
.envrc
.claude
**/CLAUDE.md
.direnv/

/logs
Expand Down
4 changes: 2 additions & 2 deletions dimos/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def teardown(self, worker) -> None: # type: ignore[no-untyped-def]
import sys

if "cupy" in sys.modules:
import cupy as cp # type: ignore[import-not-found]
import cupy as cp # type: ignore[import-not-found, import-untyped]

# Clear memory pools
mempool = cp.get_default_memory_pool()
Expand Down Expand Up @@ -175,7 +175,7 @@ def close_all() -> None:
try:
import gc

from dimos.protocol.pubsub import shmpubsub
from dimos.protocol.pubsub.impl import shmpubsub

for obj in gc.get_objects():
if isinstance(obj, shmpubsub.SharedMemoryPubSubBase):
Expand Down
10 changes: 5 additions & 5 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@

from dimos.core.stream import In, Out, Stream, Transport
from dimos.msgs.protocol import DimosMsg
from dimos.protocol.pubsub.jpeg_shm import JpegSharedMemory
from dimos.protocol.pubsub.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic
from dimos.protocol.pubsub.rospubsub import DimosROS, ROSTopic
from dimos.protocol.pubsub.shmpubsub import BytesSharedMemory, PickleSharedMemory
from dimos.protocol.pubsub.impl.jpeg_shm import JpegSharedMemory
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic
from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic
from dimos.protocol.pubsub.impl.shmpubsub import BytesSharedMemory, PickleSharedMemory

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down Expand Up @@ -112,7 +112,7 @@ def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: # type: ignore[assignment, override]
if not self._started:
self.start()
return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]
return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value, arg-type]


class JpegLcmTransport(LCMTransport): # type: ignore[type-arg]
Expand Down
5 changes: 3 additions & 2 deletions dimos/e2e_tests/lcm_spy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@

import lcm

from dimos.msgs import DimosMsg
from dimos.msgs.geometry_msgs import PoseStamped
from dimos.protocol.service.lcmservice import LCMMsg, LCMService
from dimos.protocol.service.lcmservice import LCMService


class LcmSpy(LCMService):
Expand Down Expand Up @@ -155,7 +156,7 @@ def listener(msg: bytes) -> None:
def wait_for_message_result(
self,
topic: str,
type: type[LCMMsg],
type: type[DimosMsg],
predicate: Callable[[Any], bool],
fail_message: str,
timeout: float = 30.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import glob
import os

from contact_graspnet_pytorch import config_utils # type: ignore[import-not-found]
from contact_graspnet_pytorch.contact_grasp_estimator import ( # type: ignore[import-not-found]
from contact_graspnet_pytorch import config_utils # type: ignore[import-not-found, import-untyped]
from contact_graspnet_pytorch.contact_grasp_estimator import ( # type: ignore[import-not-found, import-untyped]
GraspEstimator,
)
from contact_graspnet_pytorch.data import ( # type: ignore[import-not-found]
from contact_graspnet_pytorch.data import ( # type: ignore[import-not-found, import-untyped]
load_available_input_data,
)
import numpy as np
Expand Down
3 changes: 2 additions & 1 deletion dimos/msgs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dimos.msgs.helpers import resolve_msg_type
from dimos.msgs.protocol import DimosMsg

__all__ = ["DimosMsg"]
__all__ = ["DimosMsg", "resolve_msg_type"]
53 changes: 53 additions & 0 deletions dimos/msgs/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from functools import lru_cache
import importlib
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from dimos.msgs import DimosMsg


@lru_cache(maxsize=256)
def resolve_msg_type(type_name: str) -> type[DimosMsg] | None:
"""Resolve a message type name to its class.

Args:
type_name: Type name in format "module.ClassName" (e.g., "geometry_msgs.Vector3")

Returns:
The message class or None if not found.
"""
try:
module_name, class_name = type_name.rsplit(".", 1)
except ValueError:
return None

# Try different import paths
import_paths = [
f"dimos.msgs.{module_name}",
f"dimos_lcm.{module_name}",
]

for path in import_paths:
try:
module = importlib.import_module(path)
return getattr(module, class_name) # type: ignore[no-any-return]
except (ImportError, AttributeError):
continue

return None
2 changes: 1 addition & 1 deletion dimos/msgs/nav_msgs/test_OccupancyGrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from dimos.msgs.geometry_msgs import Pose
from dimos.msgs.nav_msgs import OccupancyGrid
from dimos.msgs.sensor_msgs import PointCloud2
from dimos.protocol.pubsub.lcmpubsub import LCM, Topic
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic
from dimos.utils.data import get_data


Expand Down
2 changes: 1 addition & 1 deletion dimos/msgs/sensor_msgs/Image.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
)

try:
import cupy as cp # type: ignore[import-not-found]
import cupy as cp # type: ignore[import-not-found, import-untyped]
except Exception:
cp = None

Expand Down
4 changes: 2 additions & 2 deletions dimos/msgs/sensor_msgs/image_impls/AbstractImage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import rerun as rr

try:
import cupy as cp # type: ignore[import-not-found]
import cupy as cp # type: ignore[import-not-found, import-untyped]

HAS_CUDA = True
except Exception: # pragma: no cover - optional dependency
Expand All @@ -35,7 +35,7 @@
# NVRTC defaults to C++11; libcu++ in recent CUDA requires at least C++17.
if HAS_CUDA:
try:
import cupy.cuda.compiler as _cupy_compiler # type: ignore[import-not-found]
import cupy.cuda.compiler as _cupy_compiler # type: ignore[import-not-found, import-untyped]

if not getattr(_cupy_compiler, "_dimos_force_cxx17", False):
_orig_compile_using_nvrtc = _cupy_compiler.compile_using_nvrtc
Expand Down
4 changes: 2 additions & 2 deletions dimos/msgs/sensor_msgs/image_impls/CudaImage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
)

try:
import cupy as cp # type: ignore[import-not-found]
from cupyx.scipy import ( # type: ignore[import-not-found]
import cupy as cp # type: ignore[import-not-found, import-untyped]
from cupyx.scipy import ( # type: ignore[import-not-found, import-untyped]
ndimage as cndimage,
signal as csignal,
)
Expand Down
2 changes: 1 addition & 1 deletion dimos/msgs/tf2_msgs/test_TFMessage_lcmpub.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from dimos.msgs.geometry_msgs import Quaternion, Transform, Vector3
from dimos.msgs.tf2_msgs import TFMessage
from dimos.protocol.pubsub.lcmpubsub import LCM, Topic
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic


# Publishes a series of transforms representing a robot kinematic chain
Expand Down
2 changes: 1 addition & 1 deletion dimos/perception/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@

# Optional CuPy support
try: # pragma: no cover - optional dependency
import cupy as cp # type: ignore[import-not-found]
import cupy as cp # type: ignore[import-not-found, import-untyped]

_HAS_CUDA = True
except Exception: # pragma: no cover - optional dependency
Expand Down
4 changes: 2 additions & 2 deletions dimos/protocol/pubsub/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import dimos.protocol.pubsub.lcmpubsub as lcm
from dimos.protocol.pubsub.memory import Memory
import dimos.protocol.pubsub.impl.lcmpubsub as lcm
from dimos.protocol.pubsub.impl.memory import Memory
from dimos.protocol.pubsub.spec import PubSub

__all__ = [
Expand Down
20 changes: 15 additions & 5 deletions dimos/protocol/pubsub/benchmark/testdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

from dimos.msgs.sensor_msgs.Image import Image, ImageFormat
from dimos.protocol.pubsub.benchmark.type import Case
from dimos.protocol.pubsub.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic
from dimos.protocol.pubsub.memory import Memory
from dimos.protocol.pubsub.shmpubsub import BytesSharedMemory, LCMSharedMemory, PickleSharedMemory
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic
from dimos.protocol.pubsub.impl.memory import Memory
from dimos.protocol.pubsub.impl.shmpubsub import (
BytesSharedMemory,
LCMSharedMemory,
PickleSharedMemory,
)


def make_data_bytes(size: int) -> bytes:
Expand Down Expand Up @@ -171,7 +175,7 @@ def shm_lcm_pubsub_channel() -> Generator[LCMSharedMemory, None, None]:


try:
from dimos.protocol.pubsub.redispubsub import Redis
from dimos.protocol.pubsub.impl.redispubsub import Redis

@contextmanager
def redis_pubsub_channel() -> Generator[Redis, None, None]:
Expand Down Expand Up @@ -199,7 +203,13 @@ def redis_msggen(size: int) -> tuple[str, Any]:
print("Redis not available")


from dimos.protocol.pubsub.rospubsub import ROS_AVAILABLE, DimosROS, RawROS, RawROSTopic, ROSTopic
from dimos.protocol.pubsub.impl.rospubsub import (
ROS_AVAILABLE,
DimosROS,
RawROS,
RawROSTopic,
ROSTopic,
)

if ROS_AVAILABLE:
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
Expand Down
7 changes: 6 additions & 1 deletion dimos/protocol/pubsub/benchmark/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,9 @@ def fmt(v: float) -> str:
return f"{v:.1f}s"
return f"{v * 1000:.0f}ms"

self._print_heatmap("Latency", lambda r: r.receive_time, fmt, high_is_good=False)
self._print_heatmap(
"Latency",
lambda r: r.receive_time,
fmt,
high_is_good=False,
)
97 changes: 97 additions & 0 deletions dimos/protocol/pubsub/bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Bridge utilities for connecting pubsub systems."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Generic, Protocol, TypeVar

from dimos.protocol.service.spec import Service

if TYPE_CHECKING:
from collections.abc import Callable

from dimos.protocol.pubsub.spec import AllPubSub, PubSub


TopicT = TypeVar("TopicT")
MsgT = TypeVar("MsgT")
TopicFrom = TypeVar("TopicFrom")
TopicTo = TypeVar("TopicTo")
MsgFrom = TypeVar("MsgFrom")
MsgTo = TypeVar("MsgTo")


class Translator(Protocol[TopicFrom, TopicTo, MsgFrom, MsgTo]): # type: ignore[misc]
"""Protocol for translating topics and messages between pubsub systems."""

def topic(self, topic: TopicFrom) -> TopicTo:
"""Translate a topic from source to destination format."""
...

def msg(self, msg: MsgFrom) -> MsgTo:
"""Translate a message from source to destination format."""
...


def bridge(
pubsub1: AllPubSub[TopicFrom, MsgFrom],
pubsub2: PubSub[TopicTo, MsgTo],
translator: Translator[TopicFrom, TopicTo, MsgFrom, MsgTo],
# optionally we can override subscribe_all
# and only bridge a specific part of the pubsub tree
topic_from: TopicFrom | None = None,
) -> Callable[[], None]:
def pass_msg(msg: MsgFrom, topic: TopicFrom) -> None:
pubsub2.publish(translator.topic(topic), translator.msg(msg))

# Bridge only specific messages from pubsub1 to pubsub2
if topic_from:
return pubsub1.subscribe(topic_from, pass_msg)

# Bridge all messages from pubsub1 to pubsub2
return pubsub1.subscribe_all(pass_msg)


@dataclass
class BridgeConfig(Generic[TopicFrom, TopicTo, MsgFrom, MsgTo]):
"""Configuration for a one-way bridge."""

source: AllPubSub[TopicFrom, MsgFrom]
destination: PubSub[TopicTo, MsgTo]
translator: Translator[TopicFrom, TopicTo, MsgFrom, MsgTo]
subscribe_topic: TopicFrom | None = None


class Bridge(Service[BridgeConfig[TopicFrom, TopicTo, MsgFrom, MsgTo]]):
"""Service that bridges messages from one pubsub to another."""

_unsubscribe: Callable[[], None] | None = None

def start(self) -> None:
super().start()
self._unsubscribe = bridge(
self.config.source,
self.config.destination,
self.config.translator,
self.config.subscribe_topic,
)

def stop(self) -> None:
if self._unsubscribe:
self._unsubscribe()
self._unsubscribe = None
super().stop()
Loading
Loading