diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 0b7755e2e3..64f4f53c46 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -10,7 +10,13 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleBase from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport -from dimos.core.transport import LCMTransport, ZenohTransport, pLCMTransport +from dimos.core.transport import ( + LCMTransport, + ZenohTransport, + pLCMTransport, + SHMTransport, + pSHMTransport, +) from dimos.protocol.rpc.lcmrpc import LCMRPC from dimos.protocol.rpc.spec import RPCSpec from dimos.protocol.tf import LCMTF, TF, PubSubTF, TFConfig, TFSpec diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 8874482e0a..77f471bafe 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -40,6 +40,7 @@ from dimos.core.stream import In, RemoteIn, Transport from dimos.protocol.pubsub.lcmpubsub import LCM, PickleLCM from dimos.protocol.pubsub.lcmpubsub import Topic as LCMTopic +from dimos.protocol.pubsub.shmpubsub import SharedMemory, PickleSharedMemory T = TypeVar("T") @@ -106,6 +107,54 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) +class pSHMTransport(PubSubTransport[T]): + _started: bool = False + + def __init__(self, topic: str, **kwargs): + super().__init__(topic) + self.shm = PickleSharedMemory(**kwargs) + + def __reduce__(self): + return (pSHMTransport, (self.topic,)) + + def broadcast(self, _, msg): + if not self._started: + self.shm.start() + self._started = True + + self.shm.publish(self.topic, msg) + + def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: + if not self._started: + self.shm.start() + self._started = True + return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) + + +class SHMTransport(PubSubTransport[T]): + _started: bool = False + + def __init__(self, topic: str, **kwargs): + super().__init__(topic) + self.shm = SharedMemory(**kwargs) + + def __reduce__(self): + return (SHMTransport, (self.topic,)) + + def broadcast(self, _, msg): + if not self._started: + self.shm.start() + self._started = True + + self.shm.publish(self.topic, msg) + + def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: + if not self._started: + self.shm.start() + self._started = True + return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) + + class DaskTransport(Transport[T]): subscribers: List[Callable[[T], None]] _started: bool = False diff --git a/dimos/protocol/pubsub/shm/ipc_factory.py b/dimos/protocol/pubsub/shm/ipc_factory.py new file mode 100644 index 0000000000..3d6dbc17e3 --- /dev/null +++ b/dimos/protocol/pubsub/shm/ipc_factory.py @@ -0,0 +1,304 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# frame_ipc.py +# Python 3.9+ +import base64 +import time +from abc import ABC, abstractmethod +import os +from typing import Optional, Tuple + +import numpy as np +from multiprocessing.shared_memory import SharedMemory +from multiprocessing.managers import SharedMemoryManager + +_UNLINK_ON_GC = os.getenv("DIMOS_IPC_UNLINK_ON_GC", "0").lower() not in ("0", "false", "no") + + +def _open_shm_with_retry(name: str) -> SharedMemory: + tries = int(os.getenv("DIMOS_IPC_ATTACH_RETRIES", "40")) # ~40 tries + base_ms = float(os.getenv("DIMOS_IPC_ATTACH_BACKOFF_MS", "5")) # 5 ms + cap_ms = float(os.getenv("DIMOS_IPC_ATTACH_BACKOFF_CAP_MS", "200")) # 200 ms + last = None + for i in range(tries): + try: + return SharedMemory(name=name) + except FileNotFoundError as e: + last = e + # exponential backoff, capped + time.sleep(min((base_ms * (2**i)), cap_ms) / 1000.0) + raise FileNotFoundError(f"SHM not found after {tries} retries: {name}") from last + + +def _sanitize_shm_name(name: str) -> str: + # Python's SharedMemory expects names like 'psm_abc', without leading '/' + return name.lstrip("/") if isinstance(name, str) else name + + +# --------------------------- +# 1) Abstract interface +# --------------------------- + + +class FrameChannel(ABC): + """Single-slot 'freshest frame' IPC channel with a tiny control block. + - Double-buffered to avoid torn reads. + - Descriptor is JSON-safe; attach() reconstructs in another process. + """ + + @property + @abstractmethod + def device(self) -> str: # "cpu" or "cuda" + ... + + @property + @abstractmethod + def shape(self) -> tuple: ... + + @property + @abstractmethod + def dtype(self) -> np.dtype: ... + + @abstractmethod + def publish(self, frame) -> None: + """Write into inactive buffer, then flip visible index (write control last).""" + ... + + @abstractmethod + def read(self, last_seq: int = -1, require_new: bool = True): + """Return (seq:int, ts_ns:int, view-or-None).""" + ... + + @abstractmethod + def descriptor(self) -> dict: + """Tiny JSON-safe descriptor (names/handles/shape/dtype/device).""" + ... + + @classmethod + @abstractmethod + def attach(cls, desc: dict) -> "FrameChannel": + """Attach in another process.""" + ... + + @abstractmethod + def close(self) -> None: + """Detach resources (owner also unlinks manager if applicable).""" + ... + + +from multiprocessing.shared_memory import SharedMemory +import weakref, os + + +def _safe_unlink(name): + try: + shm = SharedMemory(name=name) + shm.unlink() + except FileNotFoundError: + pass + except Exception: + pass + + +# --------------------------- +# 2) CPU shared-memory backend +# --------------------------- + + +class CpuShmChannel(FrameChannel): + def __init__(self, shape, dtype=np.uint8, *, data_name=None, ctrl_name=None): + self._shape = tuple(shape) + self._dtype = np.dtype(dtype) + self._nbytes = int(self._dtype.itemsize * np.prod(self._shape)) + + def _create_or_open(name, size): + try: + shm = SharedMemory(create=True, size=size, name=name) + owner = True + except FileExistsError: + shm = SharedMemory(name=name) # attach existing + owner = False + return shm, owner + + if data_name is None or ctrl_name is None: + # fallback: random names (old behavior) + self._shm_data = SharedMemory(create=True, size=2 * self._nbytes) + self._shm_ctrl = SharedMemory(create=True, size=24) + self._is_owner = True + else: + self._shm_data, own_d = _create_or_open(data_name, 2 * self._nbytes) + self._shm_ctrl, own_c = _create_or_open(ctrl_name, 24) + self._is_owner = own_d and own_c + + self._ctrl = np.ndarray((3,), dtype=np.int64, buffer=self._shm_ctrl.buf) + if self._is_owner: + self._ctrl[:] = 0 # initialize only once + + # only owners set unlink finalizers (beware cross-process timing) + self._finalizer_data = ( + weakref.finalize(self, _safe_unlink, self._shm_data.name) + if (_UNLINK_ON_GC and self._is_owner) + else None + ) + self._finalizer_ctrl = ( + weakref.finalize(self, _safe_unlink, self._shm_ctrl.name) + if (_UNLINK_ON_GC and self._is_owner) + else None + ) + + def descriptor(self): + return { + "kind": "cpu", + "shape": self._shape, + "dtype": self._dtype.str, + "nbytes": self._nbytes, + "data_name": self._shm_data.name, + "ctrl_name": self._shm_ctrl.name, + } + + @property + def device(self): + return "cpu" + + @property + def shape(self): + return self._shape + + @property + def dtype(self): + return self._dtype + + def publish(self, frame): + assert isinstance(frame, np.ndarray) + assert frame.shape == self._shape and frame.dtype == self._dtype + active = int(self._ctrl[2]) + inactive = 1 - active + view = np.ndarray( + self._shape, + dtype=self._dtype, + buffer=self._shm_data.buf, + offset=inactive * self._nbytes, + ) + np.copyto(view, frame, casting="no") + ts = np.int64(time.time_ns()) + # Publish order: ts -> idx -> seq + self._ctrl[1] = ts + self._ctrl[2] = inactive + self._ctrl[0] += 1 + + def read(self, last_seq: int = -1, require_new=True): + for _ in range(3): + seq1 = int(self._ctrl[0]) + idx = int(self._ctrl[2]) + ts = int(self._ctrl[1]) + view = np.ndarray( + self._shape, dtype=self._dtype, buffer=self._shm_data.buf, offset=idx * self._nbytes + ) + if seq1 == int(self._ctrl[0]): + if require_new and seq1 == last_seq: + return seq1, ts, None + return seq1, ts, view + return last_seq, 0, None + + def descriptor(self): + return { + "kind": "cpu", + "shape": self._shape, + "dtype": self._dtype.str, + "nbytes": self._nbytes, + "data_name": self._shm_data.name, + "ctrl_name": self._shm_ctrl.name, + } + + @classmethod + def attach(cls, desc): + obj = object.__new__(cls) + obj._shape = tuple(desc["shape"]) + obj._dtype = np.dtype(desc["dtype"]) + obj._nbytes = int(desc["nbytes"]) + data_name = desc["data_name"] + ctrl_name = desc["ctrl_name"] + try: + obj._shm_data = _open_shm_with_retry(data_name) + obj._shm_ctrl = _open_shm_with_retry(ctrl_name) + except FileNotFoundError as e: + raise FileNotFoundError( + f"CPU IPC attach failed: control/data SHM not found " + f"(ctrl='{ctrl_name}', data='{data_name}'). " + f"Ensure the writer is running on the same host and the channel is alive." + ) from e + obj._ctrl = np.ndarray((3,), dtype=np.int64, buffer=obj._shm_ctrl.buf) + # attachments don’t own/unlink + obj._finalizer_data = obj._finalizer_ctrl = None + return obj + + def close(self): + if getattr(self, "_is_owner", False): + try: + self._shm_ctrl.close() + finally: + try: + _safe_unlink(self._shm_ctrl.name) + except: + pass + if hasattr(self, "_shm_data"): + try: + self._shm_data.close() + finally: + try: + _safe_unlink(self._shm_data.name) + except: + pass + return + # readers: just close handles + try: + self._shm_ctrl.close() + except: + pass + try: + self._shm_data.close() + except: + pass + + +# --------------------------- +# 3) Factories +# --------------------------- + + +class CPU_IPC_Factory: + """Creates/attaches CPU shared-memory channels.""" + + @staticmethod + def create(shape, dtype=np.uint8) -> CpuShmChannel: + return CpuShmChannel(shape, dtype=dtype) + + @staticmethod + def attach(desc: dict) -> CpuShmChannel: + assert desc.get("kind") == "cpu", "Descriptor kind mismatch" + return CpuShmChannel.attach(desc) + + +# --------------------------- +# 4) Runtime selector +# --------------------------- + + +def make_frame_channel( + shape, dtype=np.uint8, prefer: str = "auto", device: int = 0 +) -> FrameChannel: + """Choose CUDA IPC if available (or requested), otherwise CPU SHM.""" + # TODO: Implement the CUDA version of creating this factory + return CPU_IPC_Factory.create(shape, dtype=dtype) diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py new file mode 100644 index 0000000000..6539cfefdb --- /dev/null +++ b/dimos/protocol/pubsub/shmpubsub.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# --------------------------------------------------------------------------- +# SharedMemory Pub/Sub over unified IPC channels (CPU/CUDA) +# --------------------------------------------------------------------------- + +from __future__ import annotations + +import hashlib +import os +import struct +import threading +import time +from collections import defaultdict +from dataclasses import dataclass +from typing import Any, Callable, Dict, Optional + +import numpy as np + +from dimos.protocol.pubsub.spec import PubSub, PubSubEncoderMixin, PickleEncoderMixin +from dimos.protocol.pubsub.shm.ipc_factory import CpuShmChannel, CPU_IPC_Factory +from dimos.utils.logging_config import setup_logger + +logger = setup_logger("dimos.protocol.pubsub.sharedmemory") + + +# -------------------------------------------------------------------------------------- +# Configuration (kept local to PubSub now that Service is gone) +# -------------------------------------------------------------------------------------- + + +@dataclass +class SharedMemoryConfig: + prefer: str = "auto" # "auto" | "cpu" (DIMOS_IPC_BACKEND overrides), TODO: "cuda" + default_capacity: int = 3686400 # payload bytes (excludes 4-byte header) + close_channels_on_stop: bool = True + + +# -------------------------------------------------------------------------------------- +# Core PubSub with integrated SHM/IPC transport (previously the Service logic) +# -------------------------------------------------------------------------------------- + + +class SharedMemoryPubSubBase(PubSub[str, Any]): + """ + Pub/Sub over SharedMemory/CUDA-IPC, modeled after LCMPubSubBase but self-contained. + Wire format per topic/frame: [len:uint32_le] + payload bytes (padded to fixed capacity). + Features ported from Service: + - start()/stop() lifecycle + - one frame channel per topic + - per-topic fanout thread (reads from channel, invokes subscribers) + - CPU/CUDA backend selection (auto + env override) + - reconfigure(topic, capacity=...) + - drop initial empty frame; synchronous local delivery; echo suppression + """ + + # Per-topic state + # TODO: implement "is_cuda" below capacity, above cp + class _TopicState: + __slots__ = ( + "channel", + "subs", + "stop", + "thread", + "last_seq", + "shape", + "dtype", + "capacity", + "cp", + "last_local_payload", + "suppress_counts", + ) + + def __init__(self, channel, capacity: int, cp_mod): + self.channel = channel + self.capacity = int(capacity) + self.shape = (self.capacity + 4,) # +4 for uint32 length header + self.dtype = np.uint8 + self.subs: list[Callable[[bytes, str], None]] = [] + self.stop = threading.Event() + self.thread: Optional[threading.Thread] = None + self.last_seq = 0 # start at 0 to avoid b"" on first poll + # TODO: implement an initializer variable for is_cuda once CUDA IPC is in + self.cp = cp_mod + self.last_local_payload: Optional[bytes] = None + self.suppress_counts = defaultdict(int) + + # ----- init / lifecycle ------------------------------------------------- + + def __init__( + self, + *, + prefer: str = "auto", + default_capacity: int = 3686400, + close_channels_on_stop: bool = True, + **_: Any, + ) -> None: + super().__init__() + self.config = SharedMemoryConfig( + prefer=prefer, + default_capacity=default_capacity, + close_channels_on_stop=close_channels_on_stop, + ) + self._topics: Dict[str, SharedMemoryPubSubBase._TopicState] = {} + self._lock = threading.Lock() + + def start(self) -> None: + pref = (self.config.prefer or "auto").lower() + backend = os.getenv("DIMOS_IPC_BACKEND", pref).lower() + logger.info(f"SharedMemory PubSub starting (backend={backend})") + # No global thread needed; per-topic fanout starts on first subscribe. + + def stop(self) -> None: + with self._lock: + for topic, st in list(self._topics.items()): + # stop fanout + try: + if st.thread: + st.stop.set() + st.thread.join(timeout=0.5) + st.thread = None + except Exception: + pass + # close/unlink channels if configured + if self.config.close_channels_on_stop: + try: + st.channel.close() + except Exception: + pass + self._topics.clear() + logger.info("SharedMemory PubSub stopped.") + + # ----- PubSub API (bytes on the wire) ---------------------------------- + + def publish(self, topic: str, message: bytes) -> None: + if not isinstance(message, (bytes, bytearray, memoryview)): + raise TypeError(f"publish expects bytes-like, got {type(message)!r}") + + st = self._ensure_topic(topic) + + # Normalize once + payload_bytes = bytes(message) + L = len(payload_bytes) + if L > st.capacity: + logger.error(f"Payload too large: {L} > capacity {st.capacity}") + raise ValueError(f"Payload too large: {L} > capacity {st.capacity}") + + # Mark this payload to suppress its single echo (handles back-to-back publishes) + st.suppress_counts[payload_bytes] += 1 + + # Synchronous local delivery first (zero extra copies) + for cb in list(st.subs): + try: + cb(payload_bytes, topic) + except Exception: + logger.warn(f"Payload couldn't be pushed to topic: {topic}") + pass + + # Build host frame [len:4] + payload and publish + host = np.zeros(st.shape, dtype=st.dtype) + host[:4] = np.frombuffer(struct.pack(" Callable[[], None]: + """Subscribe a callback(message: bytes, topic). Returns unsubscribe.""" + st = self._ensure_topic(topic) + st.subs.append(callback) + if st.thread is None: + st.thread = threading.Thread(target=self._fanout_loop, args=(topic, st), daemon=True) + st.thread.start() + + def _unsub(): + try: + st.subs.remove(callback) + except ValueError: + pass + if not st.subs and st.thread: + st.stop.set() + st.thread.join(timeout=0.5) + st.thread = None + st.stop.clear() + + return _unsub + + # Optional utility like in LCMPubSubBase + def wait_for_message(self, topic: str, timeout: float = 1.0) -> Any: + """Wait once; if an encoder mixin is present, returned value is decoded.""" + received: Any = None + evt = threading.Event() + + def _handler(msg: bytes, _topic: str): + nonlocal received + try: + if hasattr(self, "decode"): # provided by encoder mixin + received = self.decode(msg, topic) # type: ignore[misc] + else: + received = msg + finally: + evt.set() + + unsub = self.subscribe(topic, _handler) + try: + evt.wait(timeout) + return received + finally: + try: + unsub() + except Exception: + pass + + # ----- Capacity mgmt ---------------------------------------------------- + + def reconfigure(self, topic: str, *, capacity: int) -> dict: + """Change payload capacity (bytes) for a topic; returns new descriptor.""" + st = self._ensure_topic(topic) + new_cap = int(capacity) + new_shape = (new_cap + 4,) + desc = st.channel.reconfigure(new_shape, np.uint8) + st.capacity = new_cap + st.shape = new_shape + st.dtype = np.uint8 + st.last_seq = -1 + return desc + + # ----- Internals -------------------------------------------------------- + + def _ensure_topic(self, topic: str) -> _TopicState: + with self._lock: + st = self._topics.get(topic) + if st is not None: + return st + cap = int(self.config.default_capacity) + + def _names_for_topic(topic: str, capacity: int) -> tuple[str, str]: + # Python’s SharedMemory requires names without a leading '/' + h = hashlib.blake2b(f"{topic}:{capacity}".encode(), digest_size=12).hexdigest() + return f"psm_{h}_data", f"psm_{h}_ctrl" + + data_name, ctrl_name = _names_for_topic(topic, cap) + ch = CpuShmChannel((cap + 4,), np.uint8, data_name=data_name, ctrl_name=ctrl_name) + st = SharedMemoryPubSubBase._TopicState(ch, cap, None) + self._topics[topic] = st + return st + + def _fanout_loop(self, topic: str, st: _TopicState) -> None: + while not st.stop.is_set(): + seq, ts_ns, view = st.channel.read(last_seq=st.last_seq, require_new=True) + if view is None: + time.sleep(0.001) + continue + st.last_seq = seq + + host = np.array(view, copy=True) + + try: + L = struct.unpack(" st.capacity: + continue + + payload = host[4 : 4 + L].tobytes() + + # Drop exactly the number of local echoes we created + cnt = st.suppress_counts.get(payload, 0) + if cnt > 0: + if cnt == 1: + del st.suppress_counts[payload] + else: + st.suppress_counts[payload] = cnt - 1 + continue # suppressed + + except Exception: + continue + + for cb in list(st.subs): + try: + cb(payload, topic) + except Exception: + pass + + +# -------------------------------------------------------------------------------------- +# Encoders + concrete PubSub classes +# -------------------------------------------------------------------------------------- + + +class SharedMemoryBytesEncoderMixin(PubSubEncoderMixin[str, bytes]): + """Identity encoder for raw bytes.""" + + def encode(self, msg: bytes, _: str) -> bytes: + if isinstance(msg, (bytes, bytearray, memoryview)): + return bytes(msg) + raise TypeError(f"SharedMemory expects bytes-like, got {type(msg)!r}") + + def decode(self, msg: bytes, _: str) -> bytes: + return msg + + +class SharedMemory( + SharedMemoryBytesEncoderMixin, + SharedMemoryPubSubBase, +): + """SharedMemory pubsub that transports raw bytes.""" + + ... + + +class PickleSharedMemory( + PickleEncoderMixin[str, Any], + SharedMemoryPubSubBase, +): + """SharedMemory pubsub that transports arbitrary Python objects via pickle.""" + + ... diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py index caaf43b965..0f9486ec09 100644 --- a/dimos/protocol/pubsub/test_spec.py +++ b/dimos/protocol/pubsub/test_spec.py @@ -84,6 +84,26 @@ def lcm_context(): print("LCM not available") +from dimos.protocol.pubsub.shmpubsub import SharedMemory, PickleSharedMemory + + +@contextmanager +def shared_memory_cpu_context(): + shared_mem_pubsub = PickleSharedMemory(prefer="cpu") + shared_mem_pubsub.start() + yield shared_mem_pubsub + shared_mem_pubsub.stop() + + +testdata.append( + ( + shared_memory_cpu_context, + "/shared_mem_topic_cpu", + [b"shared_mem_value1", b"shared_mem_value2", b"shared_mem_value3"], + ) +) + + @pytest.mark.parametrize("pubsub_context, topic, values", testdata) def test_store(pubsub_context, topic, values): with pubsub_context() as x: diff --git a/dimos/robot/foxglove_bridge.py b/dimos/robot/foxglove_bridge.py index fdd65281c8..30fa248784 100644 --- a/dimos/robot/foxglove_bridge.py +++ b/dimos/robot/foxglove_bridge.py @@ -25,8 +25,9 @@ class FoxgloveBridge(Module): _thread: threading.Thread _loop: asyncio.AbstractEventLoop - def __init__(self, *args, **kwargs): + def __init__(self, *args, shm_channels=None, **kwargs): super().__init__(*args, **kwargs) + self.shm_channels = shm_channels or [] self.start() @rpc @@ -35,7 +36,13 @@ def run_bridge(): self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) try: - bridge = LCMFoxgloveBridge(host="0.0.0.0", port=8765, debug=False, num_threads=4) + bridge = LCMFoxgloveBridge( + host="0.0.0.0", + port=8765, + debug=False, + num_threads=4, + shm_channels=self.shm_channels, + ) self._loop.run_until_complete(bridge.run()) except Exception as e: print(f"Foxglove bridge error: {e}") diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index ca1ffd8aa8..378dc01b99 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -78,6 +78,18 @@ warnings.filterwarnings("ignore", message="coroutine.*was never awaited") warnings.filterwarnings("ignore", message="H264Decoder.*failed to decode") +""" +Constants for shared memory +Usually, auto-detection for size would be preferred. Sadly, though, channels are made +and frozen *before* the first frame is received. +Therefore, a maximum capacity for color image and depth image transfer should be defined +ahead of time. +""" +# Default color image size: 1920x1080 frame x 3 (RGB) x uint8 +DEFAULT_CAPACITY_COLOR_IMAGE = 1920 * 1080 * 3 +# Default depth image size: 1280x720 frame * 4 (float32 size) +DEFAULT_CAPACITY_DEPTH_IMAGE = 1280 * 720 * 4 + class FakeRTC: """Fake WebRTC connection for testing with recorded data.""" @@ -389,7 +401,9 @@ def _deploy_connection(self): self.connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage) self.connection.odom.transport = core.LCMTransport("/odom", PoseStamped) - self.connection.video.transport = core.LCMTransport("/go2/color_image", Image) + self.connection.video.transport = core.pSHMTransport( + "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ) self.connection.movecmd.transport = core.LCMTransport("/cmd_vel", Twist) self.connection.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo) self.connection.camera_pose.transport = core.LCMTransport("/go2/camera_pose", PoseStamped) @@ -464,7 +478,7 @@ def _deploy_visualization(self): self.websocket_vis.path.connect(self.global_planner.path) self.websocket_vis.global_costmap.connect(self.mapper.global_costmap) - self.foxglove_bridge = FoxgloveBridge() + self.foxglove_bridge = FoxgloveBridge(shm_channels=["/go2/color_image#sensor_msgs.Image"]) def _deploy_perception(self): """Deploy and configure perception modules.""" @@ -477,7 +491,10 @@ def _deploy_perception(self): output_dir=self.spatial_memory_dir, ) - self.spatial_memory_module.video.transport = core.LCMTransport("/go2/color_image", Image) + color_image_default_capacity = 1920 * 1080 * 4 + self.spatial_memory_module.video.transport = core.pSHMTransport( + "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ) self.spatial_memory_module.odom.transport = core.LCMTransport( "/go2/camera_pose", PoseStamped ) @@ -509,8 +526,12 @@ def _deploy_camera(self): self.depth_module = self.dimos.deploy(DepthModule, gt_depth_scale=gt_depth_scale) # Set up transports - self.depth_module.color_image.transport = core.LCMTransport("/go2/color_image", Image) - self.depth_module.depth_image.transport = core.LCMTransport("/go2/depth_image", Image) + self.depth_module.color_image.transport = core.pSHMTransport( + "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ) + self.depth_module.depth_image.transport = core.pSHMTransport( + "/go2/depth_image", default_capacity=DEFAULT_CAPACITY_DEPTH_IMAGE + ) self.depth_module.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo) logger.info("Camera module deployed and connected") diff --git a/onnx/metric3d_vit_small.onnx b/onnx/metric3d_vit_small.onnx new file mode 100644 index 0000000000..bfddd41628 --- /dev/null +++ b/onnx/metric3d_vit_small.onnx @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:14805174265dd721ac3b396bd5ee7190c708cec41150ed298267f6c3126bc060 +size 151333865 diff --git a/pyproject.toml b/pyproject.toml index 5af8fa590a..f826636c68 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -102,7 +102,7 @@ dependencies = [ "dask[complete]==2025.5.1", # LCM / DimOS utilities - "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@de4038871a4f166c3007ef6b6bc3ff83642219b2" + "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@03e320b325edf3ead9b74746baea318d431030bc" ] [project.scripts]