Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@

import dimos.core.colors as colors
from dimos.core.stream import In, RemoteIn, Transport
from dimos.protocol.pubsub.lcmpubsub import LCM, PickleLCM
from dimos.protocol.pubsub.lcmpubsub import LCM, JpegLCM, PickleLCM
from dimos.protocol.pubsub.lcmpubsub import Topic as LCMTopic
from dimos.protocol.pubsub.shmpubsub import SharedMemory, PickleSharedMemory
from dimos.protocol.pubsub.jpeg_shm import JpegSharedMemory

T = TypeVar("T")

Expand Down Expand Up @@ -88,7 +89,8 @@ class LCMTransport(PubSubTransport[T]):

def __init__(self, topic: str, type: type, **kwargs):
super().__init__(LCMTopic(topic, type))
self.lcm = LCM(**kwargs)
if not hasattr(self, "lcm"):
self.lcm = LCM(**kwargs)

def __reduce__(self):
return (LCMTransport, (self.topic.topic, self.topic.lcm_type))
Expand All @@ -107,6 +109,15 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) ->
return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg))


class JpegLcmTransport(LCMTransport):
def __init__(self, topic: str, type: type, **kwargs):
self.lcm = JpegLCM(**kwargs)
super().__init__(topic, type)

def __reduce__(self):
return (JpegLcmTransport, (self.topic.topic, self.topic.lcm_type))


class pSHMTransport(PubSubTransport[T]):
_started: bool = False

Expand Down Expand Up @@ -155,6 +166,31 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) ->
return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg))


class JpegShmTransport(PubSubTransport[T]):
_started: bool = False

def __init__(self, topic: str, quality: int = 75, **kwargs):
super().__init__(topic)
self.shm = JpegSharedMemory(quality=quality, **kwargs)
self.quality = quality

def __reduce__(self):
return (JpegShmTransport, (self.topic, self.quality))

def broadcast(self, _, msg):
if not self._started:
self.shm.start()
self._started = True

self.shm.publish(self.topic, msg)

def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) -> None:
if not self._started:
self.shm.start()
self._started = True
return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg))


class DaskTransport(Transport[T]):
subscribers: List[Callable[[T], None]]
_started: bool = False
Expand Down
80 changes: 80 additions & 0 deletions dimos/msgs/sensor_msgs/Image.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dimos_lcm.std_msgs.Header import Header
from reactivex import operators as ops
from reactivex.observable import Observable
from turbojpeg import TurboJPEG

from dimos.msgs.sensor_msgs.image_impls.AbstractImage import (
HAS_CUDA,
Expand Down Expand Up @@ -425,6 +426,85 @@ def lcm_decode(cls, data: bytes, **kwargs) -> "Image":
)
)

def lcm_jpeg_encode(self, quality: int = 75, frame_id: Optional[str] = None) -> bytes:
"""Convert to LCM Image message with JPEG-compressed data.

Args:
quality: JPEG compression quality (0-100, default 75)
frame_id: Optional frame ID override

Returns:
LCM-encoded bytes with JPEG-compressed image data
"""
jpeg = TurboJPEG()
msg = LCMImage()

# Header
msg.header = Header()
msg.header.seq = 0
msg.header.frame_id = frame_id or self.frame_id

# Set timestamp
if self.ts is not None:
msg.header.stamp.sec = int(self.ts)
msg.header.stamp.nsec = int((self.ts - int(self.ts)) * 1e9)
else:
now = time.time()
msg.header.stamp.sec = int(now)
msg.header.stamp.nsec = int((now - int(now)) * 1e9)

# Get image in BGR format for JPEG encoding
bgr_image = self.to_bgr().to_opencv()

# Encode as JPEG
jpeg_data = jpeg.encode(bgr_image, quality=quality)

# Store JPEG data and metadata
msg.height = self.height
msg.width = self.width
msg.encoding = "jpeg"
msg.is_bigendian = False
msg.step = 0 # Not applicable for compressed format

msg.data_length = len(jpeg_data)
msg.data = jpeg_data

return msg.lcm_encode()

@classmethod
def lcm_jpeg_decode(cls, data: bytes, **kwargs) -> "Image":
"""Decode an LCM Image message with JPEG-compressed data.

Args:
data: LCM-encoded bytes containing JPEG-compressed image

Returns:
Image instance
"""
jpeg = TurboJPEG()
msg = LCMImage.lcm_decode(data)

if msg.encoding != "jpeg":
raise ValueError(f"Expected JPEG encoding, got {msg.encoding}")

# Decode JPEG data
bgr_array = jpeg.decode(msg.data)

return cls(
NumpyImage(
bgr_array,
ImageFormat.BGR,
msg.header.frame_id if hasattr(msg, "header") else "",
(
msg.header.stamp.sec + msg.header.stamp.nsec / 1e9
if hasattr(msg, "header")
and hasattr(msg.header, "stamp")
and msg.header.stamp.sec > 0
else time.time()
),
)
)

# PnP wrappers
def solve_pnp(self, *args, **kwargs):
return self._impl.solve_pnp(*args, **kwargs) # type: ignore
Expand Down
20 changes: 20 additions & 0 deletions dimos/protocol/pubsub/jpeg_shm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dimos.protocol.pubsub.lcmpubsub import JpegSharedMemoryEncoderMixin
from dimos.protocol.pubsub.shmpubsub import SharedMemoryPubSubBase


class JpegSharedMemory(JpegSharedMemoryEncoderMixin, SharedMemoryPubSubBase):
pass
39 changes: 39 additions & 0 deletions dimos/protocol/pubsub/lcmpubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@

import lcm

from dimos.msgs.sensor_msgs import Image
from dimos.msgs.sensor_msgs.image_impls.AbstractImage import ImageFormat
from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin
from dimos.protocol.service.lcmservice import LCMConfig, LCMService, autoconf, check_system
from dimos.utils.deprecation import deprecated
from dimos.utils.logging_config import setup_logger
from turbojpeg import TurboJPEG


logger = setup_logger(__name__)
Expand Down Expand Up @@ -109,6 +112,36 @@ def decode(self, msg: bytes, topic: Topic) -> LCMMsg:
return topic.lcm_type.lcm_decode(msg)


class JpegEncoderMixin(PubSubEncoderMixin[Topic, Any]):
def encode(self, msg: LCMMsg, _: Topic) -> bytes:
return msg.lcm_jpeg_encode()

def decode(self, msg: bytes, topic: Topic) -> LCMMsg:
if topic.lcm_type is None:
raise ValueError(
f"Cannot decode message for topic '{topic.topic}': no lcm_type specified"
)
return topic.lcm_type.lcm_jpeg_decode(msg)


class JpegSharedMemoryEncoderMixin(PubSubEncoderMixin[str, Image]):
def __init__(self, quality: int = 75, **kwargs):
super().__init__(**kwargs)
self.jpeg = TurboJPEG()
self.quality = quality

def encode(self, msg: Any, _topic: str) -> bytes:
if not isinstance(msg, Image):
raise ValueError("Can only encode images.")

bgr_image = msg.to_bgr().to_opencv()
return self.jpeg.encode(bgr_image, quality=self.quality)

def decode(self, msg: bytes, _topic: str) -> Image:
bgr_array = self.jpeg.decode(msg)
return Image(data=bgr_array, format=ImageFormat.BGR)


class LCM(
LCMEncoderMixin,
LCMPubSubBase,
Expand All @@ -119,3 +152,9 @@ class PickleLCM(
PickleEncoderMixin,
LCMPubSubBase,
): ...


class JpegLCM(
JpegEncoderMixin,
LCMPubSubBase,
): ...
2 changes: 2 additions & 0 deletions dimos/robot/all_blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
"unitree-go2": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard",
"unitree-go2-basic": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:basic",
"unitree-go2-shm": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard_with_shm",
"unitree-go2-jpegshm": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard_with_jpegshm",
"unitree-go2-jpeglcm": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:standard_with_jpeglcm",
"unitree-go2-agentic": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic",
"demo-osm": "dimos.mapping.osm.demo_osm:demo_osm",
"demo-remapping": "dimos.robot.unitree_webrtc.demo_remapping:remapping",
Expand Down
4 changes: 3 additions & 1 deletion dimos/robot/foxglove_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ class FoxgloveBridge(Module):
_thread: threading.Thread
_loop: asyncio.AbstractEventLoop

def __init__(self, *args, shm_channels=None, **kwargs):
def __init__(self, *args, shm_channels=None, jpeg_shm_channels=None, **kwargs):
super().__init__(*args, **kwargs)
self.shm_channels = shm_channels or []
self.jpeg_shm_channels = jpeg_shm_channels or []

@rpc
def start(self):
Expand All @@ -50,6 +51,7 @@ def run_bridge():
debug=False,
num_threads=4,
shm_channels=self.shm_channels,
jpeg_shm_channels=self.jpeg_shm_channels,
)
self._loop.run_until_complete(bridge.run())
except Exception as e:
Expand Down
21 changes: 20 additions & 1 deletion dimos/robot/unitree_webrtc/unitree_go2_blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE, DEFAULT_CAPACITY_DEPTH_IMAGE
from dimos.core.blueprints import autoconnect
from dimos.core.transport import LCMTransport, pSHMTransport
from dimos.core.transport import JpegLcmTransport, JpegShmTransport, LCMTransport, pSHMTransport
from dimos.msgs.geometry_msgs import PoseStamped
from dimos.msgs.sensor_msgs import Image
from dimos_lcm.sensor_msgs import CameraInfo
Expand Down Expand Up @@ -101,6 +101,25 @@
),
)

standard_with_jpeglcm = standard.transports(
{
("color_image", Image): JpegLcmTransport("/go2/color_image", Image),
}
)

standard_with_jpegshm = autoconnect(
standard.transports(
{
("color_image", Image): JpegShmTransport("/go2/color_image", quality=75),
}
),
foxglove_bridge(
jpeg_shm_channels=[
"/go2/color_image#sensor_msgs.Image",
]
),
)

agentic = autoconnect(
standard,
llm_agent(),
Expand Down
Loading
Loading