From e302059b96cdd5d3f44cad24bd12591f36814177 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Tue, 21 Oct 2025 06:28:57 +0300 Subject: [PATCH 1/7] add JPEG LCM --- dimos/core/transport.py | 14 +- dimos/msgs/sensor_msgs/Image.py | 80 ++++++++ dimos/protocol/pubsub/lcmpubsub.py | 18 ++ dimos/utils/demo_image_encoding.py | 126 +++++++++++++ dimos/utils/fast_image_generator.py | 278 ++++++++++++++++++++++++++++ pyproject.toml | 5 +- 6 files changed, 518 insertions(+), 3 deletions(-) create mode 100644 dimos/utils/demo_image_encoding.py create mode 100644 dimos/utils/fast_image_generator.py diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 77f471bafe..343da3b3bc 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -38,7 +38,7 @@ import dimos.core.colors as colors from dimos.core.stream import In, RemoteIn, Transport -from dimos.protocol.pubsub.lcmpubsub import LCM, PickleLCM +from dimos.protocol.pubsub.lcmpubsub import LCM, JpegLCM, PickleLCM from dimos.protocol.pubsub.lcmpubsub import Topic as LCMTopic from dimos.protocol.pubsub.shmpubsub import SharedMemory, PickleSharedMemory @@ -88,7 +88,8 @@ class LCMTransport(PubSubTransport[T]): def __init__(self, topic: str, type: type, **kwargs): super().__init__(LCMTopic(topic, type)) - self.lcm = LCM(**kwargs) + if not hasattr(self, "lcm"): + self.lcm = LCM(**kwargs) def __reduce__(self): return (LCMTransport, (self.topic.topic, self.topic.lcm_type)) @@ -107,6 +108,15 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) +class JpegLcmTransport(LCMTransport): + def __init__(self, topic: str, type: type, **kwargs): + self.lcm = JpegLCM(**kwargs) + super().__init__(topic, type) + + def __reduce__(self): + return (JpegLcmTransport, (self.topic.topic, self.topic.lcm_type)) + + class pSHMTransport(PubSubTransport[T]): _started: bool = False diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 36f6f1d545..ea559ca7e5 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -26,6 +26,7 @@ from dimos_lcm.std_msgs.Header import Header from reactivex import operators as ops from reactivex.observable import Observable +from turbojpeg import TurboJPEG from dimos.msgs.sensor_msgs.image_impls.AbstractImage import ( HAS_CUDA, @@ -425,6 +426,85 @@ def lcm_decode(cls, data: bytes, **kwargs) -> "Image": ) ) + def lcm_jpeg_encode(self, quality: int = 75, frame_id: Optional[str] = None) -> bytes: + """Convert to LCM Image message with JPEG-compressed data. + + Args: + quality: JPEG compression quality (0-100, default 75) + frame_id: Optional frame ID override + + Returns: + LCM-encoded bytes with JPEG-compressed image data + """ + jpeg = TurboJPEG() + msg = LCMImage() + + # Header + msg.header = Header() + msg.header.seq = 0 + msg.header.frame_id = frame_id or self.frame_id + + # Set timestamp + if self.ts is not None: + msg.header.stamp.sec = int(self.ts) + msg.header.stamp.nsec = int((self.ts - int(self.ts)) * 1e9) + else: + now = time.time() + msg.header.stamp.sec = int(now) + msg.header.stamp.nsec = int((now - int(now)) * 1e9) + + # Get image in BGR format for JPEG encoding + bgr_image = self.to_bgr().to_opencv() + + # Encode as JPEG + jpeg_data = jpeg.encode(bgr_image, quality=quality) + + # Store JPEG data and metadata + msg.height = self.height + msg.width = self.width + msg.encoding = "jpeg" + msg.is_bigendian = False + msg.step = 0 # Not applicable for compressed format + + msg.data_length = len(jpeg_data) + msg.data = jpeg_data + + return msg.lcm_encode() + + @classmethod + def lcm_jpeg_decode(cls, data: bytes, **kwargs) -> "Image": + """Decode an LCM Image message with JPEG-compressed data. + + Args: + data: LCM-encoded bytes containing JPEG-compressed image + + Returns: + Image instance + """ + jpeg = TurboJPEG() + msg = LCMImage.lcm_decode(data) + + if msg.encoding != "jpeg": + raise ValueError(f"Expected JPEG encoding, got {msg.encoding}") + + # Decode JPEG data + bgr_array = jpeg.decode(msg.data) + + return cls( + NumpyImage( + bgr_array, + ImageFormat.BGR, + msg.header.frame_id if hasattr(msg, "header") else "", + ( + msg.header.stamp.sec + msg.header.stamp.nsec / 1e9 + if hasattr(msg, "header") + and hasattr(msg.header, "stamp") + and msg.header.stamp.sec > 0 + else time.time() + ), + ) + ) + # PnP wrappers def solve_pnp(self, *args, **kwargs): return self._impl.solve_pnp(*args, **kwargs) # type: ignore diff --git a/dimos/protocol/pubsub/lcmpubsub.py b/dimos/protocol/pubsub/lcmpubsub.py index 5fda6dbb83..c8165a22b6 100644 --- a/dimos/protocol/pubsub/lcmpubsub.py +++ b/dimos/protocol/pubsub/lcmpubsub.py @@ -109,6 +109,18 @@ def decode(self, msg: bytes, topic: Topic) -> LCMMsg: return topic.lcm_type.lcm_decode(msg) +class JpegEncoderMixin(PubSubEncoderMixin[Topic, Any]): + def encode(self, msg: LCMMsg, _: Topic) -> bytes: + return msg.lcm_jpeg_encode() + + def decode(self, msg: bytes, topic: Topic) -> LCMMsg: + if topic.lcm_type is None: + raise ValueError( + f"Cannot decode message for topic '{topic.topic}': no lcm_type specified" + ) + return topic.lcm_type.lcm_jpeg_decode(msg) + + class LCM( LCMEncoderMixin, LCMPubSubBase, @@ -119,3 +131,9 @@ class PickleLCM( PickleEncoderMixin, LCMPubSubBase, ): ... + + +class JpegLCM( + JpegEncoderMixin, + LCMPubSubBase, +): ... diff --git a/dimos/utils/demo_image_encoding.py b/dimos/utils/demo_image_encoding.py new file mode 100644 index 0000000000..84c08e31e4 --- /dev/null +++ b/dimos/utils/demo_image_encoding.py @@ -0,0 +1,126 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +# Usage + +Run it with uncompressed LCM: + + python dimos/utils/demo_image_encoding.py + +Run it with JPEG LCM: + + python dimos/utils/demo_image_encoding.py --use-jpeg +""" + +import argparse +import threading +import time + +from reactivex.disposable import Disposable +from dimos.core.dimos import Dimos +from dimos.core.module import Module +from dimos.core.stream import In, Out +from dimos.core.transport import JpegLcmTransport, LCMTransport +from dimos.msgs.sensor_msgs import Image +from dimos.utils.fast_image_generator import random_image +from dimos.robot.foxglove_bridge import FoxgloveBridge + + +class EmitterModule(Module): + image: Out[Image] = None + + _thread: threading.Thread | None = None + _stop_event: threading.Event | None = None + + def start(self): + super().start() + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._publish_image, daemon=True) + self._thread.start() + + def stop(self): + if self._thread: + self._stop_event.set() + self._thread.wait() + super().stop() + + def _publish_image(self): + open_file = open("/tmp/emitter-times", "w") + while not self._stop_event.is_set(): + start = time.time() + data = random_image(1280, 720) + total = time.time() - start + print("took", total) + open_file.write(str(time.time()) + "\n") + self.image.publish(Image(data=data)) + open_file.close() + + +class ReceiverModule(Module): + image: In[Image] = None + + _open_file = None + + def start(self): + super().start() + self._disposables.add(Disposable(self.image.subscribe(self._on_image))) + self._open_file = open("/tmp/receiver-times", "w") + + def stop(self): + self._open_file.close() + super().stop() + + def _on_image(self, image: Image): + self._open_file.write(str(time.time()) + "\n") + print("image") + + +def main(): + parser = argparse.ArgumentParser(description="Demo image encoding with transport options") + parser.add_argument( + "--use-jpeg", + action="store_true", + help="Use JPEG LCM transport instead of regular LCM transport", + ) + args = parser.parse_args() + + dimos = Dimos(n=2) + dimos.start() + emitter = dimos.deploy(EmitterModule) + receiver = dimos.deploy(ReceiverModule) + + if args.use_jpeg: + emitter.image.transport = JpegLcmTransport("/go2/color_image", Image) + else: + emitter.image.transport = LCMTransport("/go2/color_image", Image) + receiver.image.connect(emitter.image) + + foxglove_bridge = FoxgloveBridge() + foxglove_bridge.start() + + dimos.start_all_modules() + + try: + while True: + time.sleep(0.1) + except KeyboardInterrupt: + pass + finally: + foxglove_bridge.stop() + dimos.close() + + +if __name__ == "__main__": + main() diff --git a/dimos/utils/fast_image_generator.py b/dimos/utils/fast_image_generator.py new file mode 100644 index 0000000000..1698fc8e7c --- /dev/null +++ b/dimos/utils/fast_image_generator.py @@ -0,0 +1,278 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Fast stateful image generator with visual features for encoding tests.""" + +import numpy as np +from typing import Tuple, List +import math + + +class FastImageGenerator: + """ + Stateful image generator that creates images with visual features + suitable for testing image/video encoding at 30+ FPS. + + Features generated: + - Moving geometric shapes (tests motion vectors) + - Color gradients (tests gradient compression) + - Sharp edges and corners (tests edge preservation) + - Textured regions (tests detail retention) + - Smooth regions (tests flat area compression) + - High contrast boundaries (tests blocking artifacts) + """ + + def __init__(self, width: int = 1280, height: int = 720): + """Initialize the generator with pre-computed elements.""" + self.width = width + self.height = height + self.frame_count = 0 + + # Pre-allocate the main canvas + self.canvas = np.zeros((height, width, 3), dtype=np.float32) + + # Pre-compute coordinate grids for fast gradient generation + self.x_grid, self.y_grid = np.meshgrid( + np.linspace(0, 1, width, dtype=np.float32), np.linspace(0, 1, height, dtype=np.float32) + ) + + # Pre-compute base gradient patterns + self._init_gradients() + + # Initialize moving objects with their properties + self._init_moving_objects() + + # Pre-compute static texture pattern + self._init_texture() + + # Pre-allocate shape masks for reuse + self._init_shape_masks() + + def _init_gradients(self): + """Pre-compute gradient patterns.""" + # Diagonal gradient + self.diag_gradient = (self.x_grid + self.y_grid) * 0.5 + + # Radial gradient from center + cx, cy = 0.5, 0.5 + self.radial_gradient = np.sqrt((self.x_grid - cx) ** 2 + (self.y_grid - cy) ** 2) + self.radial_gradient = np.clip(1.0 - self.radial_gradient * 1.5, 0, 1) + + # Horizontal and vertical gradients + self.h_gradient = self.x_grid + self.v_gradient = self.y_grid + + def _init_moving_objects(self): + """Initialize properties of moving objects.""" + self.objects = [ + { + "type": "circle", + "x": 0.2, + "y": 0.3, + "vx": 0.002, + "vy": 0.003, + "radius": 60, + "color": np.array([255, 100, 100], dtype=np.float32), + }, + { + "type": "rect", + "x": 0.7, + "y": 0.6, + "vx": -0.003, + "vy": 0.002, + "width": 100, + "height": 80, + "color": np.array([100, 255, 100], dtype=np.float32), + }, + { + "type": "circle", + "x": 0.5, + "y": 0.5, + "vx": 0.004, + "vy": -0.002, + "radius": 40, + "color": np.array([100, 100, 255], dtype=np.float32), + }, + ] + + def _init_texture(self): + """Pre-compute a texture pattern.""" + # Create a simple checkerboard pattern at lower resolution + checker_size = 20 + checker_h = self.height // checker_size + checker_w = self.width // checker_size + + # Create small checkerboard + checker = np.indices((checker_h, checker_w)).sum(axis=0) % 2 + + # Upscale using repeat (fast) + self.texture = np.repeat(np.repeat(checker, checker_size, axis=0), checker_size, axis=1) + self.texture = self.texture[: self.height, : self.width].astype(np.float32) * 30 + + def _init_shape_masks(self): + """Pre-allocate reusable masks for shapes.""" + # Pre-allocate a mask array + self.temp_mask = np.zeros((self.height, self.width), dtype=np.float32) + + # Pre-compute indices for the entire image + self.y_indices, self.x_indices = np.indices((self.height, self.width)) + + def _draw_circle_fast(self, cx: int, cy: int, radius: int, color: np.ndarray): + """Draw a circle using vectorized operations - optimized version without anti-aliasing.""" + # Compute bounding box to minimize calculations + y1 = max(0, cy - radius - 1) + y2 = min(self.height, cy + radius + 2) + x1 = max(0, cx - radius - 1) + x2 = min(self.width, cx + radius + 2) + + # Work only on the bounding box region + if y1 < y2 and x1 < x2: + y_local, x_local = np.ogrid[y1:y2, x1:x2] + dist_sq = (x_local - cx) ** 2 + (y_local - cy) ** 2 + mask = dist_sq <= radius**2 + self.canvas[y1:y2, x1:x2][mask] = color + + def _draw_rect_fast(self, x: int, y: int, w: int, h: int, color: np.ndarray): + """Draw a rectangle using slicing.""" + # Clip to canvas boundaries + x1 = max(0, x) + y1 = max(0, y) + x2 = min(self.width, x + w) + y2 = min(self.height, y + h) + + if x1 < x2 and y1 < y2: + self.canvas[y1:y2, x1:x2] = color + + def _update_objects(self): + """Update positions of moving objects.""" + for obj in self.objects: + # Update position + obj["x"] += obj["vx"] + obj["y"] += obj["vy"] + + # Bounce off edges + if obj["type"] == "circle": + r = obj["radius"] / self.width + if obj["x"] - r <= 0 or obj["x"] + r >= 1: + obj["vx"] *= -1 + obj["x"] = np.clip(obj["x"], r, 1 - r) + + r = obj["radius"] / self.height + if obj["y"] - r <= 0 or obj["y"] + r >= 1: + obj["vy"] *= -1 + obj["y"] = np.clip(obj["y"], r, 1 - r) + + elif obj["type"] == "rect": + w = obj["width"] / self.width + h = obj["height"] / self.height + if obj["x"] <= 0 or obj["x"] + w >= 1: + obj["vx"] *= -1 + obj["x"] = np.clip(obj["x"], 0, 1 - w) + + if obj["y"] <= 0 or obj["y"] + h >= 1: + obj["vy"] *= -1 + obj["y"] = np.clip(obj["y"], 0, 1 - h) + + def generate_frame(self) -> np.ndarray: + """ + Generate a single frame with visual features - optimized for 30+ FPS. + + Returns: + numpy array of shape (height, width, 3) with uint8 values + """ + # Simple time-based animation parameter + t = self.frame_count * 0.02 + + # Fast gradient background - use only one gradient per frame + if self.frame_count % 2 == 0: + base_gradient = self.h_gradient + else: + base_gradient = self.v_gradient + + # Simple color mapping + self.canvas[:, :, 0] = base_gradient * 150 + 50 + self.canvas[:, :, 1] = base_gradient * 120 + 70 + self.canvas[:, :, 2] = (1 - base_gradient) * 140 + 60 + + # Add texture in corner - simplified without per-channel scaling + tex_size = self.height // 3 + self.canvas[:tex_size, :tex_size] += self.texture[:tex_size, :tex_size, np.newaxis] + + # Add test pattern bars - vectorized + bar_width = 50 + bar_start = self.width // 3 + for i in range(3): # Reduced from 5 to 3 bars + x1 = bar_start + i * bar_width * 2 + x2 = min(x1 + bar_width, self.width) + if x1 < self.width: + color_val = 180 + i * 30 + self.canvas[self.height // 2 :, x1:x2] = color_val + + # Update and draw only 2 moving objects (reduced from 3) + self._update_objects() + + # Draw only first 2 objects for speed + for obj in self.objects[:2]: + if obj["type"] == "circle": + cx = int(obj["x"] * self.width) + cy = int(obj["y"] * self.height) + self._draw_circle_fast(cx, cy, obj["radius"], obj["color"]) + elif obj["type"] == "rect": + x = int(obj["x"] * self.width) + y = int(obj["y"] * self.height) + self._draw_rect_fast(x, y, obj["width"], obj["height"], obj["color"]) + + # Simple horizontal lines pattern (faster than sine wave) + line_y = int(self.height * 0.8) + line_spacing = 10 + for i in range(0, 5): + y = line_y + i * line_spacing + if y < self.height: + self.canvas[y : y + 2, :] = [255, 200, 100] + + # Increment frame counter + self.frame_count += 1 + + # Direct conversion to uint8 (already in valid range) + return self.canvas.astype(np.uint8) + + def reset(self): + """Reset the generator to initial state.""" + self.frame_count = 0 + self._init_moving_objects() + + +# Convenience function for backward compatibility +_generator = None + + +def random_image(width: int, height: int) -> np.ndarray: + """ + Generate an image with visual features suitable for encoding tests. + Maintains state for efficient stream generation. + + Args: + width: Image width in pixels + height: Image height in pixels + + Returns: + numpy array of shape (height, width, 3) with uint8 values + """ + global _generator + + # Initialize or reinitialize if dimensions changed + if _generator is None or _generator.width != width or _generator.height != height: + _generator = FastImageGenerator(width, height) + + return _generator.generate_frame() diff --git a/pyproject.toml b/pyproject.toml index 2d3804c1fc..87061c3ff9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,9 @@ dependencies = [ "pyaudio", "requests", "wasmtime", + + # Image + "PyTurboJPEG==1.8.2", # Audio "openai-whisper", @@ -104,7 +107,7 @@ dependencies = [ "dask[complete]==2025.5.1", # LCM / DimOS utilities - "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@03e320b325edf3ead9b74746baea318d431030bc", + "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@bceb618ed431562a93f3d806c79656f85695a15d", # CLI "pydantic-settings>=2.11.0,<3", From 03611796119d954d35a9e3031812f1b2c4b54483 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Wed, 22 Oct 2025 00:34:52 +0300 Subject: [PATCH 2/7] fix join --- dimos/utils/demo_image_encoding.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/utils/demo_image_encoding.py b/dimos/utils/demo_image_encoding.py index 84c08e31e4..ef458a31de 100644 --- a/dimos/utils/demo_image_encoding.py +++ b/dimos/utils/demo_image_encoding.py @@ -53,7 +53,7 @@ def start(self): def stop(self): if self._thread: self._stop_event.set() - self._thread.wait() + self._thread.join(timeout=2) super().stop() def _publish_image(self): From 2f5023ac05081e7439fd71b0bdc31c6b66d204d0 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Sun, 26 Oct 2025 03:18:42 +0200 Subject: [PATCH 3/7] add blueprint --- dimos/robot/all_blueprints.py | 1 + dimos/robot/unitree_webrtc/unitree_go2_blueprints.py | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 096af58b94..3e688ecc90 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -20,6 +20,7 @@ "unitree-go2": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard", "unitree-go2-basic": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:basic", "unitree-go2-shm": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard_with_shm", + "unitree-go2-jpeglcm": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard_with_jpeglcm", "unitree-go2-agentic": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic", "demo-osm": "dimos.mapping.osm.demo_osm:demo_osm", "demo-remapping": "dimos.robot.unitree_webrtc.demo_remapping:remapping", diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index 47e4ce6c8c..5b54fec5b3 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -16,7 +16,7 @@ from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE, DEFAULT_CAPACITY_DEPTH_IMAGE from dimos.core.blueprints import autoconnect -from dimos.core.transport import LCMTransport, pSHMTransport +from dimos.core.transport import JpegLcmTransport, LCMTransport, pSHMTransport from dimos.msgs.geometry_msgs import PoseStamped from dimos.msgs.sensor_msgs import Image from dimos_lcm.sensor_msgs import CameraInfo @@ -101,6 +101,12 @@ ), ) +standard_with_jpeglcm = standard.with_transports( + { + ("color_image", Image): JpegLcmTransport("/go2/color_image", Image), + } +) + agentic = autoconnect( standard, llm_agent(), From bc651f54b52fbd0e4b2322eca86b9ba0f67ccbee Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Sun, 26 Oct 2025 03:39:34 +0200 Subject: [PATCH 4/7] fix script --- dimos/utils/demo_image_encoding.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dimos/utils/demo_image_encoding.py b/dimos/utils/demo_image_encoding.py index ef458a31de..7e9b317bb6 100644 --- a/dimos/utils/demo_image_encoding.py +++ b/dimos/utils/demo_image_encoding.py @@ -29,7 +29,7 @@ import time from reactivex.disposable import Disposable -from dimos.core.dimos import Dimos +from dimos.core.module_coordinator import ModuleCoordinator from dimos.core.module import Module from dimos.core.stream import In, Out from dimos.core.transport import JpegLcmTransport, LCMTransport @@ -96,7 +96,7 @@ def main(): ) args = parser.parse_args() - dimos = Dimos(n=2) + dimos = ModuleCoordinator(n=2) dimos.start() emitter = dimos.deploy(EmitterModule) receiver = dimos.deploy(ReceiverModule) From fb9c9028d6c1fb9a82cacc40a18a3d651a7971cb Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Sun, 26 Oct 2025 08:01:50 +0200 Subject: [PATCH 5/7] JpegSharedMemoryEncoderMixin --- dimos/core/transport.py | 26 +++++++++++++++++++ dimos/protocol/pubsub/jpeg_shm.py | 20 ++++++++++++++ dimos/protocol/pubsub/lcmpubsub.py | 21 +++++++++++++++ dimos/robot/all_blueprints.py | 1 + .../unitree_webrtc/unitree_go2_blueprints.py | 19 ++++++++++++-- 5 files changed, 85 insertions(+), 2 deletions(-) create mode 100644 dimos/protocol/pubsub/jpeg_shm.py diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 343da3b3bc..0aab2ba1ed 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -41,6 +41,7 @@ from dimos.protocol.pubsub.lcmpubsub import LCM, JpegLCM, PickleLCM from dimos.protocol.pubsub.lcmpubsub import Topic as LCMTopic from dimos.protocol.pubsub.shmpubsub import SharedMemory, PickleSharedMemory +from dimos.protocol.pubsub.jpeg_shm import JpegSharedMemory T = TypeVar("T") @@ -165,6 +166,31 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) +class JpegShmTransport(PubSubTransport[T]): + _started: bool = False + + def __init__(self, topic: str, quality: int = 75, **kwargs): + super().__init__(topic) + self.shm = JpegSharedMemory(quality=quality, **kwargs) + self.quality = quality + + def __reduce__(self): + return (JpegShmTransport, (self.topic, self.quality)) + + def broadcast(self, _, msg): + if not self._started: + self.shm.start() + self._started = True + + self.shm.publish(self.topic, msg) + + def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None: + if not self._started: + self.shm.start() + self._started = True + return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) + + class DaskTransport(Transport[T]): subscribers: List[Callable[[T], None]] _started: bool = False diff --git a/dimos/protocol/pubsub/jpeg_shm.py b/dimos/protocol/pubsub/jpeg_shm.py new file mode 100644 index 0000000000..68a97ec6b6 --- /dev/null +++ b/dimos/protocol/pubsub/jpeg_shm.py @@ -0,0 +1,20 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dimos.protocol.pubsub.lcmpubsub import JpegSharedMemoryEncoderMixin +from dimos.protocol.pubsub.shmpubsub import SharedMemoryPubSubBase + + +class JpegSharedMemory(JpegSharedMemoryEncoderMixin, SharedMemoryPubSubBase): + pass diff --git a/dimos/protocol/pubsub/lcmpubsub.py b/dimos/protocol/pubsub/lcmpubsub.py index c8165a22b6..971d7789a4 100644 --- a/dimos/protocol/pubsub/lcmpubsub.py +++ b/dimos/protocol/pubsub/lcmpubsub.py @@ -24,10 +24,13 @@ import lcm +from dimos.msgs.sensor_msgs import Image +from dimos.msgs.sensor_msgs.image_impls.AbstractImage import ImageFormat from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin from dimos.protocol.service.lcmservice import LCMConfig, LCMService, autoconf, check_system from dimos.utils.deprecation import deprecated from dimos.utils.logging_config import setup_logger +from turbojpeg import TurboJPEG logger = setup_logger(__name__) @@ -121,6 +124,24 @@ def decode(self, msg: bytes, topic: Topic) -> LCMMsg: return topic.lcm_type.lcm_jpeg_decode(msg) +class JpegSharedMemoryEncoderMixin(PubSubEncoderMixin[str, Image]): + def __init__(self, quality: int = 75, **kwargs): + super().__init__(**kwargs) + self.jpeg = TurboJPEG() + self.quality = quality + + def encode(self, msg: Any, _topic: str) -> bytes: + if not isinstance(msg, Image): + raise ValueError("Can only encode images.") + + bgr_image = msg.to_bgr().to_opencv() + return self.jpeg.encode(bgr_image, quality=self.quality) + + def decode(self, msg: bytes, _topic: str) -> Image: + bgr_array = self.jpeg.decode(msg) + return Image(data=bgr_array, format=ImageFormat.BGR) + + class LCM( LCMEncoderMixin, LCMPubSubBase, diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 3e688ecc90..351b59cf86 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -20,6 +20,7 @@ "unitree-go2": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard", "unitree-go2-basic": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:basic", "unitree-go2-shm": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard_with_shm", + "unitree-go2-jpegshm": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard_with_jpegshm", "unitree-go2-jpeglcm": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard_with_jpeglcm", "unitree-go2-agentic": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic", "demo-osm": "dimos.mapping.osm.demo_osm:demo_osm", diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index 5b54fec5b3..77ea1d38b2 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -16,7 +16,7 @@ from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE, DEFAULT_CAPACITY_DEPTH_IMAGE from dimos.core.blueprints import autoconnect -from dimos.core.transport import JpegLcmTransport, LCMTransport, pSHMTransport +from dimos.core.transport import JpegLcmTransport, JpegShmTransport, LCMTransport, pSHMTransport from dimos.msgs.geometry_msgs import PoseStamped from dimos.msgs.sensor_msgs import Image from dimos_lcm.sensor_msgs import CameraInfo @@ -101,12 +101,27 @@ ), ) -standard_with_jpeglcm = standard.with_transports( +standard_with_jpeglcm = standard.transports( { ("color_image", Image): JpegLcmTransport("/go2/color_image", Image), } ) +standard_with_jpegshm = autoconnect( + standard.transports( + { + ("color_image", Image): JpegShmTransport( + "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ), + } + ), + foxglove_bridge( + shm_channels=[ + "/go2/color_image#sensor_msgs.Image", + ] + ), +) + agentic = autoconnect( standard, llm_agent(), From e0a74ee2c2fcde8d04e962f490653640bbecfb62 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Sun, 26 Oct 2025 08:22:53 +0200 Subject: [PATCH 6/7] jpeg_shm_channels --- dimos/robot/foxglove_bridge.py | 4 +++- dimos/robot/unitree_webrtc/unitree_go2_blueprints.py | 6 ++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dimos/robot/foxglove_bridge.py b/dimos/robot/foxglove_bridge.py index 0e9b967757..4db0038a9a 100644 --- a/dimos/robot/foxglove_bridge.py +++ b/dimos/robot/foxglove_bridge.py @@ -26,9 +26,10 @@ class FoxgloveBridge(Module): _thread: threading.Thread _loop: asyncio.AbstractEventLoop - def __init__(self, *args, shm_channels=None, **kwargs): + def __init__(self, *args, shm_channels=None, jpeg_shm_channels=None, **kwargs): super().__init__(*args, **kwargs) self.shm_channels = shm_channels or [] + self.jpeg_shm_channels = jpeg_shm_channels or [] @rpc def start(self): @@ -50,6 +51,7 @@ def run_bridge(): debug=False, num_threads=4, shm_channels=self.shm_channels, + jpeg_shm_channels=self.jpeg_shm_channels, ) self._loop.run_until_complete(bridge.run()) except Exception as e: diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index 77ea1d38b2..5d3c48cb27 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -110,13 +110,11 @@ standard_with_jpegshm = autoconnect( standard.transports( { - ("color_image", Image): JpegShmTransport( - "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE - ), + ("color_image", Image): JpegShmTransport("/go2/color_image", quality=75), } ), foxglove_bridge( - shm_channels=[ + jpeg_shm_channels=[ "/go2/color_image#sensor_msgs.Image", ] ), From f4fb8c67999b1139ee0b34cd63713ed269a95fcc Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Mon, 27 Oct 2025 02:01:32 +0200 Subject: [PATCH 7/7] update dimos-lcm --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 87061c3ff9..2244be853b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,7 +107,7 @@ dependencies = [ "dask[complete]==2025.5.1", # LCM / DimOS utilities - "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@bceb618ed431562a93f3d806c79656f85695a15d", + "dimos-lcm @ git+https://github.com/dimensionalOS/dimos-lcm.git@3aeb724863144a8ba6cf72c9f42761d1007deda4", # CLI "pydantic-settings>=2.11.0,<3",