From db8d183d187b0a0c74f0b9b477b4cb6d9e8abf8a Mon Sep 17 00:00:00 2001 From: Matthew Daiter Date: Sat, 30 Aug 2025 12:05:26 +0000 Subject: [PATCH 01/11] CPU shared memory transports --- dimos/core/__init__.py | 2 +- dimos/core/transport.py | 48 ++++ dimos/msgs/sensor_msgs/Image.py | 16 ++ dimos/protocol/pubsub/shm/ipc_factory.py | 272 ++++++++++++++++++ dimos/protocol/pubsub/shmpubsub.py | 323 ++++++++++++++++++++++ dimos/protocol/pubsub/test_spec.py | 21 ++ dimos/robot/unitree_webrtc/unitree_go2.py | 8 +- onnx/metric3d_vit_small.onnx | 3 + 8 files changed, 688 insertions(+), 5 deletions(-) create mode 100644 dimos/protocol/pubsub/shm/ipc_factory.py create mode 100644 dimos/protocol/pubsub/shmpubsub.py create mode 100644 onnx/metric3d_vit_small.onnx diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 0b7755e2e3..67a495bb28 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -10,7 +10,7 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleBase from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport -from dimos.core.transport import LCMTransport, ZenohTransport, pLCMTransport +from dimos.core.transport import LCMTransport, ZenohTransport, pLCMTransport, SHMTransport, pSHMTransport from dimos.protocol.rpc.lcmrpc import LCMRPC from dimos.protocol.rpc.spec import RPCSpec from dimos.protocol.tf import LCMTF, TF, PubSubTF, TFConfig, TFSpec diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 8874482e0a..0729cfb50e 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -40,6 +40,7 @@ from dimos.core.stream import In, RemoteIn, Transport from dimos.protocol.pubsub.lcmpubsub import LCM, PickleLCM from dimos.protocol.pubsub.lcmpubsub import Topic as LCMTopic +from dimos.protocol.pubsub.shmpubsub import SharedMemory, PickleSharedMemory T = TypeVar("T") @@ -105,6 +106,53 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> self._started = True return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) +class pSHMTransport(PubSubTransport[T]): + _started: bool = False + + def __init__(self, topic: str, **kwargs): + super().__init__(topic) + self.shm = PickleSharedMemory(**kwargs) + + def __reduce__(self): + return (pSHMTransport, (self.topic,)) + + def broadcast(self, _, msg): + if not self._started: + self.shm.start() + self._started = True + + self.shm.publish(self.topic, msg) + + def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: + if not self._started: + self.shm.start() + self._started = True + return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) + + +class SHMTransport(PubSubTransport[T]): + _started: bool = False + + def __init__(self, topic: str, **kwargs): + super().__init__(topic) + self.shm = SharedMemory(**kwargs) + + def __reduce__(self): + return (SHMTransport, (self.topic,)) + + def broadcast(self, _, msg): + if not self._started: + self.shm.start() + self._started = True + + self.shm.publish(self.topic, msg) + + def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: + if not self._started: + self.shm.start() + self._started = True + return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) + class DaskTransport(Transport[T]): subscribers: List[Callable[[T], None]] diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index fb57cfcd3e..5f48780ad4 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -343,6 +343,22 @@ def lcm_decode(cls, data: bytes, **kwargs) -> "Image": **kwargs, ) + def as_memoryview(self) -> memoryview: + """ + Return a memoryview of the image data for CPU shared memory transport. + """ + if isinstance(self.data, (bytes, bytearray)): + return memoryview(self.data) + elif isinstance(self.data, np.ndarray): + return memoryview(self.data.tobytes()) + else: + raise TypeError(f"Unsupported data type {type(self.data)}") + + @classmethod + def from_memoryview(cls, mem: memoryview, width: int, height: int, format: "ImageFormat"): + """Reconstruct an Image from a CPU memoryview (SharedMemory buffer).""" + return cls(bytes(mem), width=width, height=height, format=format) + def _get_row_step(self) -> int: """Calculate row step (bytes per row).""" bytes_per_pixel = self._get_bytes_per_pixel() diff --git a/dimos/protocol/pubsub/shm/ipc_factory.py b/dimos/protocol/pubsub/shm/ipc_factory.py new file mode 100644 index 0000000000..348522e2b0 --- /dev/null +++ b/dimos/protocol/pubsub/shm/ipc_factory.py @@ -0,0 +1,272 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# frame_ipc.py +# Python 3.9+ +import base64 +import time +from abc import ABC, abstractmethod +import os +from typing import Optional, Tuple + +import numpy as np +from multiprocessing.shared_memory import SharedMemory +from multiprocessing.managers import SharedMemoryManager + +_UNLINK_ON_GC = os.getenv("DIMOS_IPC_UNLINK_ON_GC", "0").lower() not in ("0", "false", "no") + + +def _open_shm_with_retry(name: str) -> SharedMemory: + tries = int(os.getenv("DIMOS_IPC_ATTACH_RETRIES", "40")) # ~40 tries + base_ms = float(os.getenv("DIMOS_IPC_ATTACH_BACKOFF_MS", "5")) # 5 ms + cap_ms = float(os.getenv("DIMOS_IPC_ATTACH_BACKOFF_CAP_MS", "200")) # 200 ms + last = None + for i in range(tries): + try: + return SharedMemory(name=name) + except FileNotFoundError as e: + last = e + # exponential backoff, capped + time.sleep(min((base_ms * (2**i)), cap_ms) / 1000.0) + raise FileNotFoundError(f"SHM not found after {tries} retries: {name}") from last + + +def _sanitize_shm_name(name: str) -> str: + # Python's SharedMemory expects names like 'psm_abc', without leading '/' + return name.lstrip("/") if isinstance(name, str) else name + +# --------------------------- +# 1) Abstract interface +# --------------------------- + + +class FrameChannel(ABC): + """Single-slot 'freshest frame' IPC channel with a tiny control block. + - Double-buffered to avoid torn reads. + - Descriptor is JSON-safe; attach() reconstructs in another process. + """ + + @property + @abstractmethod + def device(self) -> str: # "cpu" or "cuda" + ... + + @property + @abstractmethod + def shape(self) -> tuple: ... + + @property + @abstractmethod + def dtype(self) -> np.dtype: ... + + @abstractmethod + def publish(self, frame) -> None: + """Write into inactive buffer, then flip visible index (write control last).""" + ... + + @abstractmethod + def read(self, last_seq: int = -1, require_new: bool = True): + """Return (seq:int, ts_ns:int, view-or-None).""" + ... + + @abstractmethod + def descriptor(self) -> dict: + """Tiny JSON-safe descriptor (names/handles/shape/dtype/device).""" + ... + + @classmethod + @abstractmethod + def attach(cls, desc: dict) -> "FrameChannel": + """Attach in another process.""" + ... + + @abstractmethod + def close(self) -> None: + """Detach resources (owner also unlinks manager if applicable).""" + ... + + +from multiprocessing.shared_memory import SharedMemory +import weakref, os + + +def _safe_unlink(name): + try: + shm = SharedMemory(name=name) + shm.unlink() + except FileNotFoundError: + pass + except Exception: + pass + + +# --------------------------- +# 2) CPU shared-memory backend +# --------------------------- + + +class CpuShmChannel(FrameChannel): + def __init__(self, shape, dtype=np.uint8): + self._shape = tuple(shape) + self._dtype = np.dtype(dtype) + self._nbytes = int(self._dtype.itemsize * np.prod(self._shape)) + + # Create two buffers back-to-back + tiny control block + self._shm_data = SharedMemory(create=True, size=2 * self._nbytes) + self._shm_ctrl = SharedMemory(create=True, size=24) + self._ctrl = np.ndarray((3,), dtype=np.int64, buffer=self._shm_ctrl.buf) + self._ctrl[:] = 0 + + # Owner-only finalizers (in case close() isn’t called) + self._finalizer_data = ( + weakref.finalize(self, _safe_unlink, self._shm_data.name) if _UNLINK_ON_GC else None + ) + self._finalizer_ctrl = ( + weakref.finalize(self, _safe_unlink, self._shm_ctrl.name) if _UNLINK_ON_GC else None + ) + + @property + def device(self): + return "cpu" + + @property + def shape(self): + return self._shape + + @property + def dtype(self): + return self._dtype + + def publish(self, frame): + assert isinstance(frame, np.ndarray) + assert frame.shape == self._shape and frame.dtype == self._dtype + active = int(self._ctrl[2]) + inactive = 1 - active + view = np.ndarray( + self._shape, + dtype=self._dtype, + buffer=self._shm_data.buf, + offset=inactive * self._nbytes, + ) + np.copyto(view, frame, casting="no") + ts = np.int64(time.time_ns()) + # Publish order: ts -> idx -> seq + self._ctrl[1] = ts + self._ctrl[2] = inactive + self._ctrl[0] += 1 + + def read(self, last_seq: int = -1, require_new=True): + for _ in range(3): + seq1 = int(self._ctrl[0]) + idx = int(self._ctrl[2]) + ts = int(self._ctrl[1]) + view = np.ndarray( + self._shape, dtype=self._dtype, buffer=self._shm_data.buf, offset=idx * self._nbytes + ) + if seq1 == int(self._ctrl[0]): + if require_new and seq1 == last_seq: + return seq1, ts, None + return seq1, ts, view + return last_seq, 0, None + + def descriptor(self): + return { + "kind": "cpu", + "shape": self._shape, + "dtype": self._dtype.str, + "nbytes": self._nbytes, + "data_name": self._shm_data.name, + "ctrl_name": self._shm_ctrl.name, + } + + @classmethod + def attach(cls, desc): + obj = object.__new__(cls) + obj._shape = tuple(desc["shape"]) + obj._dtype = np.dtype(desc["dtype"]) + obj._nbytes = int(desc["nbytes"]) + data_name = desc["data_name"] + ctrl_name = desc["ctrl_name"] + try: + obj._shm_data = _open_shm_with_retry(data_name) + obj._shm_ctrl = _open_shm_with_retry(ctrl_name) + except FileNotFoundError as e: + raise FileNotFoundError( + f"CPU IPC attach failed: control/data SHM not found " + f"(ctrl='{ctrl_name}', data='{data_name}'). " + f"Ensure the writer is running on the same host and the channel is alive." + ) from e + obj._ctrl = np.ndarray((3,), dtype=np.int64, buffer=obj._shm_ctrl.buf) + # attachments don’t own/unlink + obj._finalizer_data = obj._finalizer_ctrl = None + return obj + + def close(self): + if getattr(self, "_is_owner", False): + try: + self._shm_ctrl.close() + finally: + try: + _safe_unlink(self._shm_ctrl.name) + except: + pass + if hasattr(self, "_shm_data"): + try: + self._shm_data.close() + finally: + try: + _safe_unlink(self._shm_data.name) + except: + pass + return + # readers: just close handles + try: + self._shm_ctrl.close() + except: + pass + try: + self._shm_data.close() + except: + pass + + +# --------------------------- +# 3) Factories +# --------------------------- + + +class CPU_IPC_Factory: + """Creates/attaches CPU shared-memory channels.""" + + @staticmethod + def create(shape, dtype=np.uint8) -> CpuShmChannel: + return CpuShmChannel(shape, dtype=dtype) + + @staticmethod + def attach(desc: dict) -> CpuShmChannel: + assert desc.get("kind") == "cpu", "Descriptor kind mismatch" + return CpuShmChannel.attach(desc) + +# --------------------------- +# 4) Runtime selector +# --------------------------- + + +def make_frame_channel( + shape, dtype=np.uint8, prefer: str = "auto", device: int = 0 +) -> FrameChannel: + """Choose CUDA IPC if available (or requested), otherwise CPU SHM.""" + # TODO: Implement the CUDA version of creating this factory + return CPU_IPC_Factory.create(shape, dtype=dtype) + diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py new file mode 100644 index 0000000000..6b7f96e944 --- /dev/null +++ b/dimos/protocol/pubsub/shmpubsub.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python3 +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# --------------------------------------------------------------------------- +# SharedMemory Pub/Sub over unified IPC channels (CPU/CUDA) +# --------------------------------------------------------------------------- + +from __future__ import annotations + +import os +import struct +import threading +import time +from dataclasses import dataclass +from typing import Any, Callable, Dict, Optional + +import numpy as np + +from dimos.protocol.pubsub.spec import PubSub, PubSubEncoderMixin, PickleEncoderMixin +from dimos.protocol.pubsub.shm.ipc_factory import CPU_IPC_Factory +from dimos.utils.logging_config import setup_logger + +logger = setup_logger("dimos.protocol.pubsub.sharedmemory") + + +# -------------------------------------------------------------------------------------- +# Configuration (kept local to PubSub now that Service is gone) +# -------------------------------------------------------------------------------------- + + +@dataclass +class SharedMemoryConfig: + prefer: str = "auto" # "auto" | "cpu" (DIMOS_IPC_BACKEND overrides), TODO: "cuda" + default_capacity: int = 3686400 # payload bytes (excludes 4-byte header) + close_channels_on_stop: bool = True + + +# -------------------------------------------------------------------------------------- +# Core PubSub with integrated SHM/IPC transport (previously the Service logic) +# -------------------------------------------------------------------------------------- + + +class SharedMemoryPubSubBase(PubSub[str, Any]): + """ + Pub/Sub over SharedMemory/CUDA-IPC, modeled after LCMPubSubBase but self-contained. + Wire format per topic/frame: [len:uint32_le] + payload bytes (padded to fixed capacity). + Features ported from Service: + - start()/stop() lifecycle + - one frame channel per topic + - per-topic fanout thread (reads from channel, invokes subscribers) + - CPU/CUDA backend selection (auto + env override) + - reconfigure(topic, capacity=...) + - drop initial empty frame; synchronous local delivery; echo suppression + """ + + # Per-topic state + # TODO: implement "is_cuda" below capacity, above cp + class _TopicState: + __slots__ = ( + "channel", + "subs", + "stop", + "thread", + "last_seq", + "shape", + "dtype", + "capacity", + "cp", + "last_local_payload", + "last_local_seq", + ) + + def __init__(self, channel, capacity: int, cp_mod): + self.channel = channel + self.capacity = int(capacity) + self.shape = (self.capacity + 4,) # +4 for uint32 length header + self.dtype = np.uint8 + self.subs: list[Callable[[bytes, str], None]] = [] + self.stop = threading.Event() + self.thread: Optional[threading.Thread] = None + self.last_seq = 0 # start at 0 to avoid b"" on first poll + # TODO: implement an initializer variable for is_cuda once CUDA IPC is in + self.cp = cp_mod + self.last_local_payload: Optional[bytes] = None + self.last_local_seq: Optional[int] = None + + # ----- init / lifecycle ------------------------------------------------- + + def __init__( + self, + *, + prefer: str = "auto", + default_capacity: int = 3686400, + close_channels_on_stop: bool = True, + **_: Any, + ) -> None: + super().__init__() + self.config = SharedMemoryConfig( + prefer=prefer, + default_capacity=default_capacity, + close_channels_on_stop=close_channels_on_stop, + ) + self._topics: Dict[str, SharedMemoryPubSubBase._TopicState] = {} + self._lock = threading.Lock() + + def start(self) -> None: + pref = (self.config.prefer or "auto").lower() + backend = os.getenv("DIMOS_IPC_BACKEND", pref).lower() + logger.info(f"SharedMemory PubSub starting (backend={backend})") + # No global thread needed; per-topic fanout starts on first subscribe. + + def stop(self) -> None: + with self._lock: + for topic, st in list(self._topics.items()): + # stop fanout + try: + if st.thread: + st.stop.set() + st.thread.join(timeout=0.5) + st.thread = None + except Exception: + pass + # close/unlink channels if configured + if self.config.close_channels_on_stop: + try: + st.channel.close() + except Exception: + pass + self._topics.clear() + logger.info("SharedMemory PubSub stopped.") + + # ----- PubSub API (bytes on the wire) ---------------------------------- + + def publish(self, topic: str, message: bytes) -> None: + if not isinstance(message, (bytes, bytearray, memoryview)): + raise TypeError(f"publish expects bytes-like, got {type(message)!r}") + + st = self._ensure_topic(topic) + payload = memoryview(message) + L = len(payload) + if L > st.capacity: + raise ValueError(f"Payload too large: {L} > capacity {st.capacity}") + + # Build host frame [len:4] + payload + host = np.zeros(st.shape, dtype=st.dtype) + host[:4] = np.frombuffer(struct.pack(" Callable[[], None]: + """Subscribe a callback(message: bytes, topic). Returns unsubscribe.""" + st = self._ensure_topic(topic) + st.subs.append(callback) + if st.thread is None: + st.thread = threading.Thread(target=self._fanout_loop, args=(topic, st), daemon=True) + st.thread.start() + + def _unsub(): + try: + st.subs.remove(callback) + except ValueError: + pass + if not st.subs and st.thread: + st.stop.set() + st.thread.join(timeout=0.5) + st.thread = None + st.stop.clear() + + return _unsub + + # Optional utility like in LCMPubSubBase + def wait_for_message(self, topic: str, timeout: float = 1.0) -> Any: + """Wait once; if an encoder mixin is present, returned value is decoded.""" + received: Any = None + evt = threading.Event() + + def _handler(msg: bytes, _topic: str): + nonlocal received + try: + if hasattr(self, "decode"): # provided by encoder mixin + received = self.decode(msg, topic) # type: ignore[misc] + else: + received = msg + finally: + evt.set() + + unsub = self.subscribe(topic, _handler) + try: + evt.wait(timeout) + return received + finally: + try: + unsub() + except Exception: + pass + + # ----- Capacity mgmt ---------------------------------------------------- + + def reconfigure(self, topic: str, *, capacity: int) -> dict: + """Change payload capacity (bytes) for a topic; returns new descriptor.""" + st = self._ensure_topic(topic) + new_cap = int(capacity) + new_shape = (new_cap + 4,) + desc = st.channel.reconfigure(new_shape, np.uint8) + st.capacity = new_cap + st.shape = new_shape + st.dtype = np.uint8 + st.last_seq = 0 + return desc + + # ----- Internals -------------------------------------------------------- + + def _ensure_topic(self, topic: str) -> _TopicState: + with self._lock: + st = self._topics.get(topic) + if st is not None: + return st + + prefer = (os.getenv("DIMOS_IPC_BACKEND") or self.config.prefer or "auto").lower() + shape = (int(self.config.default_capacity) + 4,) + dtype = np.uint8 + + cp_mod = None + # TODO: implement the CUDA version of this + ch = CPU_IPC_Factory.create(shape, dtype=dtype) + logger.info("SharedMemory using CPU backend") + + # TODO: CUDA needs to be specified in this once we enable it + st = SharedMemoryPubSubBase._TopicState( + ch, int(self.config.default_capacity), cp_mod + ) + self._topics[topic] = st + return st + + def _fanout_loop(self, topic: str, st: _TopicState) -> None: + while not st.stop.is_set(): + seq, ts_ns, view = st.channel.read(last_seq=st.last_seq, require_new=True) + if view is None: + time.sleep(0.001) + continue + st.last_seq = seq + + host = np.array(view, copy=True) + + try: + L = struct.unpack(" st.capacity: + continue + payload = host[4 : 4 + L].tobytes() + + # 🔴 Suppress echo of our own synchronous publish + if getattr(st, "last_local_seq", None) == seq: + st.last_local_seq = None + continue + + except Exception: + continue + + for cb in list(st.subs): + try: + cb(payload, topic) + except Exception: + pass + + +# -------------------------------------------------------------------------------------- +# Encoders + concrete PubSub classes +# -------------------------------------------------------------------------------------- + +class SharedMemoryBytesEncoderMixin(PubSubEncoderMixin[str, bytes]): + """Identity encoder for raw bytes.""" + + def encode(self, msg: bytes, _: str) -> bytes: + if isinstance(msg, (bytes, bytearray, memoryview)): + return bytes(msg) + raise TypeError(f"SharedMemory expects bytes-like, got {type(msg)!r}") + + def decode(self, msg: bytes, _: str) -> bytes: + return msg + + +class SharedMemory( + SharedMemoryBytesEncoderMixin, + SharedMemoryPubSubBase, +): + """SharedMemory pubsub that transports raw bytes.""" + + ... + + +class PickleSharedMemory( + PickleEncoderMixin[str, Any], + SharedMemoryPubSubBase, +): + """SharedMemory pubsub that transports arbitrary Python objects via pickle.""" + + ... + diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py index caaf43b965..f1da3460c3 100644 --- a/dimos/protocol/pubsub/test_spec.py +++ b/dimos/protocol/pubsub/test_spec.py @@ -84,6 +84,27 @@ def lcm_context(): print("LCM not available") +try: + from dimos.protocol.pubsub.shmpubsub import SharedMemory, PickleSharedMemory + + @contextmanager + def shared_memory_cpu_context(): + shared_mem_pubsub = PickleSharedMemory(prefer="cpu") + shared_mem_pubsub.start() + yield shared_mem_pubsub + shared_mem_pubsub.stop() + testdata.append( + ( + shared_memory_cpu_context, + "/shared_mem_topic_cpu", + [b"shared_mem_value1", b"shared_mem_value2", b"shared_mem_value3"], + ) + ) + +except (ConnectionError, ImportError): + print("Shared Memory not available") + + @pytest.mark.parametrize("pubsub_context, topic, values", testdata) def test_store(pubsub_context, topic, values): with pubsub_context() as x: diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index ca1ffd8aa8..0ed63bf354 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -389,7 +389,7 @@ def _deploy_connection(self): self.connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage) self.connection.odom.transport = core.LCMTransport("/odom", PoseStamped) - self.connection.video.transport = core.LCMTransport("/go2/color_image", Image) + self.connection.video.transport = core.pSHMTransport("/go2/color_image") self.connection.movecmd.transport = core.LCMTransport("/cmd_vel", Twist) self.connection.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo) self.connection.camera_pose.transport = core.LCMTransport("/go2/camera_pose", PoseStamped) @@ -477,7 +477,7 @@ def _deploy_perception(self): output_dir=self.spatial_memory_dir, ) - self.spatial_memory_module.video.transport = core.LCMTransport("/go2/color_image", Image) + self.spatial_memory_module.video.transport = core.pSHMTransport("/go2/color_image") self.spatial_memory_module.odom.transport = core.LCMTransport( "/go2/camera_pose", PoseStamped ) @@ -509,8 +509,8 @@ def _deploy_camera(self): self.depth_module = self.dimos.deploy(DepthModule, gt_depth_scale=gt_depth_scale) # Set up transports - self.depth_module.color_image.transport = core.LCMTransport("/go2/color_image", Image) - self.depth_module.depth_image.transport = core.LCMTransport("/go2/depth_image", Image) + self.depth_module.color_image.transport = core.pSHMTransport("/go2/color_image") + self.depth_module.depth_image.transport = core.pSHMTransport("/go2/depth_image") self.depth_module.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo) logger.info("Camera module deployed and connected") diff --git a/onnx/metric3d_vit_small.onnx b/onnx/metric3d_vit_small.onnx new file mode 100644 index 0000000000..bfddd41628 --- /dev/null +++ b/onnx/metric3d_vit_small.onnx @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:14805174265dd721ac3b396bd5ee7190c708cec41150ed298267f6c3126bc060 +size 151333865 From 6e13028b4902f8443c80c5bc0b0ff4bdf58131c8 Mon Sep 17 00:00:00 2001 From: mdaiter <1377571+mdaiter@users.noreply.github.com> Date: Sat, 30 Aug 2025 12:05:49 +0000 Subject: [PATCH 02/11] CI code cleanup --- dimos/core/__init__.py | 8 +++++++- dimos/core/transport.py | 3 ++- dimos/msgs/sensor_msgs/Image.py | 2 +- dimos/protocol/pubsub/shm/ipc_factory.py | 3 ++- dimos/protocol/pubsub/shmpubsub.py | 8 +++----- dimos/protocol/pubsub/test_spec.py | 1 + 6 files changed, 16 insertions(+), 9 deletions(-) diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 67a495bb28..64f4f53c46 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -10,7 +10,13 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleBase from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport -from dimos.core.transport import LCMTransport, ZenohTransport, pLCMTransport, SHMTransport, pSHMTransport +from dimos.core.transport import ( + LCMTransport, + ZenohTransport, + pLCMTransport, + SHMTransport, + pSHMTransport, +) from dimos.protocol.rpc.lcmrpc import LCMRPC from dimos.protocol.rpc.spec import RPCSpec from dimos.protocol.tf import LCMTF, TF, PubSubTF, TFConfig, TFSpec diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 0729cfb50e..77f471bafe 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -106,9 +106,10 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> self._started = True return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) + class pSHMTransport(PubSubTransport[T]): _started: bool = False - + def __init__(self, topic: str, **kwargs): super().__init__(topic) self.shm = PickleSharedMemory(**kwargs) diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 5f48780ad4..4ad6afee06 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -353,7 +353,7 @@ def as_memoryview(self) -> memoryview: return memoryview(self.data.tobytes()) else: raise TypeError(f"Unsupported data type {type(self.data)}") - + @classmethod def from_memoryview(cls, mem: memoryview, width: int, height: int, format: "ImageFormat"): """Reconstruct an Image from a CPU memoryview (SharedMemory buffer).""" diff --git a/dimos/protocol/pubsub/shm/ipc_factory.py b/dimos/protocol/pubsub/shm/ipc_factory.py index 348522e2b0..ddae57b185 100644 --- a/dimos/protocol/pubsub/shm/ipc_factory.py +++ b/dimos/protocol/pubsub/shm/ipc_factory.py @@ -46,6 +46,7 @@ def _sanitize_shm_name(name: str) -> str: # Python's SharedMemory expects names like 'psm_abc', without leading '/' return name.lstrip("/") if isinstance(name, str) else name + # --------------------------- # 1) Abstract interface # --------------------------- @@ -258,6 +259,7 @@ def attach(desc: dict) -> CpuShmChannel: assert desc.get("kind") == "cpu", "Descriptor kind mismatch" return CpuShmChannel.attach(desc) + # --------------------------- # 4) Runtime selector # --------------------------- @@ -269,4 +271,3 @@ def make_frame_channel( """Choose CUDA IPC if available (or requested), otherwise CPU SHM.""" # TODO: Implement the CUDA version of creating this factory return CPU_IPC_Factory.create(shape, dtype=dtype) - diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py index 6b7f96e944..0bfb1f5b18 100644 --- a/dimos/protocol/pubsub/shmpubsub.py +++ b/dimos/protocol/pubsub/shmpubsub.py @@ -251,9 +251,7 @@ def _ensure_topic(self, topic: str) -> _TopicState: logger.info("SharedMemory using CPU backend") # TODO: CUDA needs to be specified in this once we enable it - st = SharedMemoryPubSubBase._TopicState( - ch, int(self.config.default_capacity), cp_mod - ) + st = SharedMemoryPubSubBase._TopicState(ch, int(self.config.default_capacity), cp_mod) self._topics[topic] = st return st @@ -289,9 +287,10 @@ def _fanout_loop(self, topic: str, st: _TopicState) -> None: # -------------------------------------------------------------------------------------- -# Encoders + concrete PubSub classes +# Encoders + concrete PubSub classes # -------------------------------------------------------------------------------------- + class SharedMemoryBytesEncoderMixin(PubSubEncoderMixin[str, bytes]): """Identity encoder for raw bytes.""" @@ -320,4 +319,3 @@ class PickleSharedMemory( """SharedMemory pubsub that transports arbitrary Python objects via pickle.""" ... - diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py index f1da3460c3..787acf889b 100644 --- a/dimos/protocol/pubsub/test_spec.py +++ b/dimos/protocol/pubsub/test_spec.py @@ -93,6 +93,7 @@ def shared_memory_cpu_context(): shared_mem_pubsub.start() yield shared_mem_pubsub shared_mem_pubsub.stop() + testdata.append( ( shared_memory_cpu_context, From 2cec4e34f7608d5e07ca768296e3eccdb6db6b64 Mon Sep 17 00:00:00 2001 From: Matthew Daiter Date: Sat, 30 Aug 2025 13:13:01 +0000 Subject: [PATCH 03/11] Touchups --- dimos/msgs/sensor_msgs/Image.py | 16 --------------- dimos/protocol/pubsub/test_spec.py | 33 +++++++++++++----------------- 2 files changed, 14 insertions(+), 35 deletions(-) diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 4ad6afee06..fb57cfcd3e 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -343,22 +343,6 @@ def lcm_decode(cls, data: bytes, **kwargs) -> "Image": **kwargs, ) - def as_memoryview(self) -> memoryview: - """ - Return a memoryview of the image data for CPU shared memory transport. - """ - if isinstance(self.data, (bytes, bytearray)): - return memoryview(self.data) - elif isinstance(self.data, np.ndarray): - return memoryview(self.data.tobytes()) - else: - raise TypeError(f"Unsupported data type {type(self.data)}") - - @classmethod - def from_memoryview(cls, mem: memoryview, width: int, height: int, format: "ImageFormat"): - """Reconstruct an Image from a CPU memoryview (SharedMemory buffer).""" - return cls(bytes(mem), width=width, height=height, format=format) - def _get_row_step(self) -> int: """Calculate row step (bytes per row).""" bytes_per_pixel = self._get_bytes_per_pixel() diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py index 787acf889b..bbfdb383af 100644 --- a/dimos/protocol/pubsub/test_spec.py +++ b/dimos/protocol/pubsub/test_spec.py @@ -84,27 +84,22 @@ def lcm_context(): print("LCM not available") -try: - from dimos.protocol.pubsub.shmpubsub import SharedMemory, PickleSharedMemory - - @contextmanager - def shared_memory_cpu_context(): - shared_mem_pubsub = PickleSharedMemory(prefer="cpu") - shared_mem_pubsub.start() - yield shared_mem_pubsub - shared_mem_pubsub.stop() +from dimos.protocol.pubsub.shmpubsub import SharedMemory, PickleSharedMemory - testdata.append( - ( - shared_memory_cpu_context, - "/shared_mem_topic_cpu", - [b"shared_mem_value1", b"shared_mem_value2", b"shared_mem_value3"], - ) +@contextmanager +def shared_memory_cpu_context(): + shared_mem_pubsub = PickleSharedMemory(prefer="cpu") + shared_mem_pubsub.start() + yield shared_mem_pubsub + shared_mem_pubsub.stop() + +testdata.append( + ( + shared_memory_cpu_context, + "/shared_mem_topic_cpu", + [b"shared_mem_value1", b"shared_mem_value2", b"shared_mem_value3"], ) - -except (ConnectionError, ImportError): - print("Shared Memory not available") - +) @pytest.mark.parametrize("pubsub_context, topic, values", testdata) def test_store(pubsub_context, topic, values): From 03b2b5eb586dd4683c115fbfba4041b4b7787e42 Mon Sep 17 00:00:00 2001 From: mdaiter <1377571+mdaiter@users.noreply.github.com> Date: Sat, 30 Aug 2025 13:13:22 +0000 Subject: [PATCH 04/11] CI code cleanup --- dimos/protocol/pubsub/test_spec.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py index bbfdb383af..0f9486ec09 100644 --- a/dimos/protocol/pubsub/test_spec.py +++ b/dimos/protocol/pubsub/test_spec.py @@ -86,6 +86,7 @@ def lcm_context(): from dimos.protocol.pubsub.shmpubsub import SharedMemory, PickleSharedMemory + @contextmanager def shared_memory_cpu_context(): shared_mem_pubsub = PickleSharedMemory(prefer="cpu") @@ -93,6 +94,7 @@ def shared_memory_cpu_context(): yield shared_mem_pubsub shared_mem_pubsub.stop() + testdata.append( ( shared_memory_cpu_context, @@ -101,6 +103,7 @@ def shared_memory_cpu_context(): ) ) + @pytest.mark.parametrize("pubsub_context, topic, values", testdata) def test_store(pubsub_context, topic, values): with pubsub_context() as x: From e9007ccf871e37c438fbee9bf9db8d1120c7ef0d Mon Sep 17 00:00:00 2001 From: Matthew Daiter Date: Sat, 30 Aug 2025 13:53:28 +0000 Subject: [PATCH 05/11] add supression count to avoid double send --- dimos/protocol/pubsub/shmpubsub.py | 48 +++++++++++++++++------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py index 0bfb1f5b18..ec0650a120 100644 --- a/dimos/protocol/pubsub/shmpubsub.py +++ b/dimos/protocol/pubsub/shmpubsub.py @@ -23,6 +23,7 @@ import struct import threading import time +from collections import defaultdict from dataclasses import dataclass from typing import Any, Callable, Dict, Optional @@ -79,7 +80,7 @@ class _TopicState: "capacity", "cp", "last_local_payload", - "last_local_seq", + "suppress_counts", ) def __init__(self, channel, capacity: int, cp_mod): @@ -94,7 +95,7 @@ def __init__(self, channel, capacity: int, cp_mod): # TODO: implement an initializer variable for is_cuda once CUDA IPC is in self.cp = cp_mod self.last_local_payload: Optional[bytes] = None - self.last_local_seq: Optional[int] = None + self.suppress_counts = defaultdict(int) # ----- init / lifecycle ------------------------------------------------- @@ -142,35 +143,37 @@ def stop(self) -> None: logger.info("SharedMemory PubSub stopped.") # ----- PubSub API (bytes on the wire) ---------------------------------- - + def publish(self, topic: str, message: bytes) -> None: if not isinstance(message, (bytes, bytearray, memoryview)): raise TypeError(f"publish expects bytes-like, got {type(message)!r}") st = self._ensure_topic(topic) - payload = memoryview(message) - L = len(payload) + + # Normalize once + payload_bytes = bytes(message) + L = len(payload_bytes) if L > st.capacity: raise ValueError(f"Payload too large: {L} > capacity {st.capacity}") - # Build host frame [len:4] + payload - host = np.zeros(st.shape, dtype=st.dtype) - host[:4] = np.frombuffer(struct.pack(" Callable[[], None]: """Subscribe a callback(message: bytes, topic). Returns unsubscribe.""" @@ -269,12 +272,17 @@ def _fanout_loop(self, topic: str, st: _TopicState) -> None: L = struct.unpack(" st.capacity: continue + payload = host[4 : 4 + L].tobytes() - # 🔴 Suppress echo of our own synchronous publish - if getattr(st, "last_local_seq", None) == seq: - st.last_local_seq = None - continue + # Drop exactly the number of local echoes we created + cnt = st.suppress_counts.get(payload, 0) + if cnt > 0: + if cnt == 1: + del st.suppress_counts[payload] + else: + st.suppress_counts[payload] = cnt - 1 + continue # suppressed except Exception: continue From 84757ab1e7d4f55f7773219ea964fd5a1d67bd4a Mon Sep 17 00:00:00 2001 From: mdaiter <1377571+mdaiter@users.noreply.github.com> Date: Sat, 30 Aug 2025 13:53:54 +0000 Subject: [PATCH 06/11] CI code cleanup --- dimos/protocol/pubsub/shmpubsub.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py index ec0650a120..1c4b07b5ce 100644 --- a/dimos/protocol/pubsub/shmpubsub.py +++ b/dimos/protocol/pubsub/shmpubsub.py @@ -143,7 +143,7 @@ def stop(self) -> None: logger.info("SharedMemory PubSub stopped.") # ----- PubSub API (bytes on the wire) ---------------------------------- - + def publish(self, topic: str, message: bytes) -> None: if not isinstance(message, (bytes, bytearray, memoryview)): raise TypeError(f"publish expects bytes-like, got {type(message)!r}") @@ -174,7 +174,6 @@ def publish(self, topic: str, message: bytes) -> None: st.channel.publish(host) - def subscribe(self, topic: str, callback: Callable[[bytes, str], Any]) -> Callable[[], None]: """Subscribe a callback(message: bytes, topic). Returns unsubscribe.""" st = self._ensure_topic(topic) From 7c6175a859fece845cc98547312436596caffca2 Mon Sep 17 00:00:00 2001 From: Matthew Daiter Date: Mon, 1 Sep 2025 12:55:15 +0000 Subject: [PATCH 07/11] IPC final touches for CPU --- dimos/protocol/pubsub/shm/ipc_factory.py | 48 +++++++++++++++++------- dimos/protocol/pubsub/shmpubsub.py | 32 ++++++++-------- 2 files changed, 51 insertions(+), 29 deletions(-) diff --git a/dimos/protocol/pubsub/shm/ipc_factory.py b/dimos/protocol/pubsub/shm/ipc_factory.py index ddae57b185..e5877eeacb 100644 --- a/dimos/protocol/pubsub/shm/ipc_factory.py +++ b/dimos/protocol/pubsub/shm/ipc_factory.py @@ -116,26 +116,48 @@ def _safe_unlink(name): # 2) CPU shared-memory backend # --------------------------- - class CpuShmChannel(FrameChannel): - def __init__(self, shape, dtype=np.uint8): + def __init__(self, shape, dtype=np.uint8, *, data_name=None, ctrl_name=None): self._shape = tuple(shape) self._dtype = np.dtype(dtype) self._nbytes = int(self._dtype.itemsize * np.prod(self._shape)) - # Create two buffers back-to-back + tiny control block - self._shm_data = SharedMemory(create=True, size=2 * self._nbytes) - self._shm_ctrl = SharedMemory(create=True, size=24) + def _create_or_open(name, size): + try: + shm = SharedMemory(create=True, size=size, name=name) + owner = True + except FileExistsError: + shm = SharedMemory(name=name) # attach existing + owner = False + return shm, owner + + if data_name is None or ctrl_name is None: + # fallback: random names (old behavior) + self._shm_data = SharedMemory(create=True, size=2 * self._nbytes) + self._shm_ctrl = SharedMemory(create=True, size=24) + self._is_owner = True + else: + self._shm_data, own_d = _create_or_open(data_name, 2 * self._nbytes) + self._shm_ctrl, own_c = _create_or_open(ctrl_name, 24) + self._is_owner = own_d and own_c + self._ctrl = np.ndarray((3,), dtype=np.int64, buffer=self._shm_ctrl.buf) - self._ctrl[:] = 0 + if self._is_owner: + self._ctrl[:] = 0 # initialize only once - # Owner-only finalizers (in case close() isn’t called) - self._finalizer_data = ( - weakref.finalize(self, _safe_unlink, self._shm_data.name) if _UNLINK_ON_GC else None - ) - self._finalizer_ctrl = ( - weakref.finalize(self, _safe_unlink, self._shm_ctrl.name) if _UNLINK_ON_GC else None - ) + # only owners set unlink finalizers (beware cross-process timing) + self._finalizer_data = weakref.finalize(self, _safe_unlink, self._shm_data.name) if (_UNLINK_ON_GC and self._is_owner) else None + self._finalizer_ctrl = weakref.finalize(self, _safe_unlink, self._shm_ctrl.name) if (_UNLINK_ON_GC and self._is_owner) else None + + def descriptor(self): + return { + "kind": "cpu", + "shape": self._shape, + "dtype": self._dtype.str, + "nbytes": self._nbytes, + "data_name": self._shm_data.name, + "ctrl_name": self._shm_ctrl.name, + } @property def device(self): diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py index 1c4b07b5ce..68204601b7 100644 --- a/dimos/protocol/pubsub/shmpubsub.py +++ b/dimos/protocol/pubsub/shmpubsub.py @@ -19,6 +19,7 @@ from __future__ import annotations +import hashlib import os import struct import threading @@ -30,7 +31,7 @@ import numpy as np from dimos.protocol.pubsub.spec import PubSub, PubSubEncoderMixin, PickleEncoderMixin -from dimos.protocol.pubsub.shm.ipc_factory import CPU_IPC_Factory +from dimos.protocol.pubsub.shm.ipc_factory import CpuShmChannel, CPU_IPC_Factory from dimos.utils.logging_config import setup_logger logger = setup_logger("dimos.protocol.pubsub.sharedmemory") @@ -154,6 +155,7 @@ def publish(self, topic: str, message: bytes) -> None: payload_bytes = bytes(message) L = len(payload_bytes) if L > st.capacity: + logger.error(f"Payload too large: {L} > capacity {st.capacity}") raise ValueError(f"Payload too large: {L} > capacity {st.capacity}") # Mark this payload to suppress its single echo (handles back-to-back publishes) @@ -164,6 +166,7 @@ def publish(self, topic: str, message: bytes) -> None: try: cb(payload_bytes, topic) except Exception: + logger.warn(f"Payload couldn't be pushed to topic: {topic}") pass # Build host frame [len:4] + payload and publish @@ -232,7 +235,7 @@ def reconfigure(self, topic: str, *, capacity: int) -> dict: st.capacity = new_cap st.shape = new_shape st.dtype = np.uint8 - st.last_seq = 0 + st.last_seq = -1 return desc # ----- Internals -------------------------------------------------------- @@ -240,20 +243,17 @@ def reconfigure(self, topic: str, *, capacity: int) -> dict: def _ensure_topic(self, topic: str) -> _TopicState: with self._lock: st = self._topics.get(topic) - if st is not None: - return st - - prefer = (os.getenv("DIMOS_IPC_BACKEND") or self.config.prefer or "auto").lower() - shape = (int(self.config.default_capacity) + 4,) - dtype = np.uint8 - - cp_mod = None - # TODO: implement the CUDA version of this - ch = CPU_IPC_Factory.create(shape, dtype=dtype) - logger.info("SharedMemory using CPU backend") - - # TODO: CUDA needs to be specified in this once we enable it - st = SharedMemoryPubSubBase._TopicState(ch, int(self.config.default_capacity), cp_mod) + if st is not None: return st + cap = int(self.config.default_capacity) + + def _names_for_topic(topic: str, capacity: int) -> tuple[str, str]: + # Python’s SharedMemory requires names without a leading '/' + h = hashlib.blake2b(f"{topic}:{capacity}".encode(), digest_size=12).hexdigest() + return f"psm_{h}_data", f"psm_{h}_ctrl" + + data_name, ctrl_name = _names_for_topic(topic, cap) + ch = CpuShmChannel((cap + 4,), np.uint8, data_name=data_name, ctrl_name=ctrl_name) + st = SharedMemoryPubSubBase._TopicState(ch, cap, None) self._topics[topic] = st return st From 46e1bfa7034dcdfd34e8ad98e0ea37be54da0937 Mon Sep 17 00:00:00 2001 From: mdaiter <1377571+mdaiter@users.noreply.github.com> Date: Mon, 1 Sep 2025 12:55:35 +0000 Subject: [PATCH 08/11] CI code cleanup --- dimos/protocol/pubsub/shm/ipc_factory.py | 15 ++++++++++++--- dimos/protocol/pubsub/shmpubsub.py | 5 +++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/dimos/protocol/pubsub/shm/ipc_factory.py b/dimos/protocol/pubsub/shm/ipc_factory.py index e5877eeacb..3d6dbc17e3 100644 --- a/dimos/protocol/pubsub/shm/ipc_factory.py +++ b/dimos/protocol/pubsub/shm/ipc_factory.py @@ -116,6 +116,7 @@ def _safe_unlink(name): # 2) CPU shared-memory backend # --------------------------- + class CpuShmChannel(FrameChannel): def __init__(self, shape, dtype=np.uint8, *, data_name=None, ctrl_name=None): self._shape = tuple(shape) @@ -127,7 +128,7 @@ def _create_or_open(name, size): shm = SharedMemory(create=True, size=size, name=name) owner = True except FileExistsError: - shm = SharedMemory(name=name) # attach existing + shm = SharedMemory(name=name) # attach existing owner = False return shm, owner @@ -146,8 +147,16 @@ def _create_or_open(name, size): self._ctrl[:] = 0 # initialize only once # only owners set unlink finalizers (beware cross-process timing) - self._finalizer_data = weakref.finalize(self, _safe_unlink, self._shm_data.name) if (_UNLINK_ON_GC and self._is_owner) else None - self._finalizer_ctrl = weakref.finalize(self, _safe_unlink, self._shm_ctrl.name) if (_UNLINK_ON_GC and self._is_owner) else None + self._finalizer_data = ( + weakref.finalize(self, _safe_unlink, self._shm_data.name) + if (_UNLINK_ON_GC and self._is_owner) + else None + ) + self._finalizer_ctrl = ( + weakref.finalize(self, _safe_unlink, self._shm_ctrl.name) + if (_UNLINK_ON_GC and self._is_owner) + else None + ) def descriptor(self): return { diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py index 68204601b7..6539cfefdb 100644 --- a/dimos/protocol/pubsub/shmpubsub.py +++ b/dimos/protocol/pubsub/shmpubsub.py @@ -243,9 +243,10 @@ def reconfigure(self, topic: str, *, capacity: int) -> dict: def _ensure_topic(self, topic: str) -> _TopicState: with self._lock: st = self._topics.get(topic) - if st is not None: return st + if st is not None: + return st cap = int(self.config.default_capacity) - + def _names_for_topic(topic: str, capacity: int) -> tuple[str, str]: # Python’s SharedMemory requires names without a leading '/' h = hashlib.blake2b(f"{topic}:{capacity}".encode(), digest_size=12).hexdigest() From c2fbe9f6f65506ced6c672dfa8c2ed7f9a5a85b9 Mon Sep 17 00:00:00 2001 From: Matthew Daiter Date: Mon, 8 Sep 2025 22:43:24 +0000 Subject: [PATCH 09/11] Add default capacities in unitree_go2.py --- dimos/robot/unitree_webrtc/unitree_go2.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index 0ed63bf354..d89be95765 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -78,6 +78,17 @@ warnings.filterwarnings("ignore", message="coroutine.*was never awaited") warnings.filterwarnings("ignore", message="H264Decoder.*failed to decode") +""" +Constants for shared memory +Usually, auto-detection for size would be preferred. Sadly, though, channels are made +and frozen *before* the first frame is received. +Therefore, a maximum capacity for color image and depth image transfer should be defined +ahead of time. +""" +# Default color image size: 1920x1080 frame x 3 (RGB) x uint8 +DEFAULT_CAPACITY_COLOR_IMAGE = 1920 * 1080 * 3 +# Default depth image size: 1280x720 frame * 4 (float32 size) +DEFAULT_CAPACITY_DEPTH_IMAGE = 1280 * 720 * 4 class FakeRTC: """Fake WebRTC connection for testing with recorded data.""" @@ -389,7 +400,7 @@ def _deploy_connection(self): self.connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage) self.connection.odom.transport = core.LCMTransport("/odom", PoseStamped) - self.connection.video.transport = core.pSHMTransport("/go2/color_image") + self.connection.video.transport = core.pSHMTransport("/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE) self.connection.movecmd.transport = core.LCMTransport("/cmd_vel", Twist) self.connection.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo) self.connection.camera_pose.transport = core.LCMTransport("/go2/camera_pose", PoseStamped) @@ -477,7 +488,8 @@ def _deploy_perception(self): output_dir=self.spatial_memory_dir, ) - self.spatial_memory_module.video.transport = core.pSHMTransport("/go2/color_image") + color_image_default_capacity = 1920 * 1080 * 4 + self.spatial_memory_module.video.transport = core.pSHMTransport("/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE) self.spatial_memory_module.odom.transport = core.LCMTransport( "/go2/camera_pose", PoseStamped ) @@ -509,8 +521,8 @@ def _deploy_camera(self): self.depth_module = self.dimos.deploy(DepthModule, gt_depth_scale=gt_depth_scale) # Set up transports - self.depth_module.color_image.transport = core.pSHMTransport("/go2/color_image") - self.depth_module.depth_image.transport = core.pSHMTransport("/go2/depth_image") + self.depth_module.color_image.transport = core.pSHMTransport("/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE) + self.depth_module.depth_image.transport = core.pSHMTransport("/go2/depth_image", default_capacity=DEFAULT_CAPACITY_DEPTH_IMAGE) self.depth_module.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo) logger.info("Camera module deployed and connected") From f785141ce54c8b1709c53125694a6dc7cfd3ae75 Mon Sep 17 00:00:00 2001 From: mdaiter <1377571+mdaiter@users.noreply.github.com> Date: Mon, 8 Sep 2025 22:43:57 +0000 Subject: [PATCH 10/11] CI code cleanup --- dimos/robot/unitree_webrtc/unitree_go2.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index d89be95765..7cd046b5f3 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -90,6 +90,7 @@ # Default depth image size: 1280x720 frame * 4 (float32 size) DEFAULT_CAPACITY_DEPTH_IMAGE = 1280 * 720 * 4 + class FakeRTC: """Fake WebRTC connection for testing with recorded data.""" @@ -400,7 +401,9 @@ def _deploy_connection(self): self.connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage) self.connection.odom.transport = core.LCMTransport("/odom", PoseStamped) - self.connection.video.transport = core.pSHMTransport("/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE) + self.connection.video.transport = core.pSHMTransport( + "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ) self.connection.movecmd.transport = core.LCMTransport("/cmd_vel", Twist) self.connection.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo) self.connection.camera_pose.transport = core.LCMTransport("/go2/camera_pose", PoseStamped) @@ -489,7 +492,9 @@ def _deploy_perception(self): ) color_image_default_capacity = 1920 * 1080 * 4 - self.spatial_memory_module.video.transport = core.pSHMTransport("/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE) + self.spatial_memory_module.video.transport = core.pSHMTransport( + "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ) self.spatial_memory_module.odom.transport = core.LCMTransport( "/go2/camera_pose", PoseStamped ) @@ -521,8 +526,12 @@ def _deploy_camera(self): self.depth_module = self.dimos.deploy(DepthModule, gt_depth_scale=gt_depth_scale) # Set up transports - self.depth_module.color_image.transport = core.pSHMTransport("/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE) - self.depth_module.depth_image.transport = core.pSHMTransport("/go2/depth_image", default_capacity=DEFAULT_CAPACITY_DEPTH_IMAGE) + self.depth_module.color_image.transport = core.pSHMTransport( + "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ) + self.depth_module.depth_image.transport = core.pSHMTransport( + "/go2/depth_image", default_capacity=DEFAULT_CAPACITY_DEPTH_IMAGE + ) self.depth_module.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo) logger.info("Camera module deployed and connected") From 924d5186431424a09bca85727910e8bf49d06df4 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Mon, 8 Sep 2025 05:32:50 +0300 Subject: [PATCH 11/11] use SHM in Foxglove --- dimos/robot/foxglove_bridge.py | 11 +++++++++-- dimos/robot/unitree_webrtc/unitree_go2.py | 2 +- pyproject.toml | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/dimos/robot/foxglove_bridge.py b/dimos/robot/foxglove_bridge.py index fdd65281c8..30fa248784 100644 --- a/dimos/robot/foxglove_bridge.py +++ b/dimos/robot/foxglove_bridge.py @@ -25,8 +25,9 @@ class FoxgloveBridge(Module): _thread: threading.Thread _loop: asyncio.AbstractEventLoop - def __init__(self, *args, **kwargs): + def __init__(self, *args, shm_channels=None, **kwargs): super().__init__(*args, **kwargs) + self.shm_channels = shm_channels or [] self.start() @rpc @@ -35,7 +36,13 @@ def run_bridge(): self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) try: - bridge = LCMFoxgloveBridge(host="0.0.0.0", port=8765, debug=False, num_threads=4) + bridge = LCMFoxgloveBridge( + host="0.0.0.0", + port=8765, + debug=False, + num_threads=4, + shm_channels=self.shm_channels, + ) self._loop.run_until_complete(bridge.run()) except Exception as e: print(f"Foxglove bridge error: {e}") diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index 0ed63bf354..e4c16e198f 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -464,7 +464,7 @@ def _deploy_visualization(self): self.websocket_vis.path.connect(self.global_planner.path) self.websocket_vis.global_costmap.connect(self.mapper.global_costmap) - self.foxglove_bridge = FoxgloveBridge() + self.foxglove_bridge = FoxgloveBridge(shm_channels=["/go2/color_image#sensor_msgs.Image"]) def _deploy_perception(self): """Deploy and configure perception modules.""" diff --git a/pyproject.toml b/pyproject.toml index 5af8fa590a..f826636c68 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -102,7 +102,7 @@ dependencies = [ "dask[complete]==2025.5.1", # LCM / DimOS utilities - "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@de4038871a4f166c3007ef6b6bc3ff83642219b2" + "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@03e320b325edf3ead9b74746baea318d431030bc" ] [project.scripts]