Skip to content
Merged
2 changes: 1 addition & 1 deletion dimos/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def close_all() -> None:
from dimos.protocol.pubsub import shmpubsub

for obj in gc.get_objects():
if isinstance(obj, shmpubsub.SharedMemory | shmpubsub.PickleSharedMemory):
if isinstance(obj, shmpubsub.SharedMemoryPubSubBase):
try:
obj.stop()
except Exception:
Expand Down
4 changes: 2 additions & 2 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from dimos.protocol.pubsub.jpeg_shm import JpegSharedMemory
from dimos.protocol.pubsub.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic
from dimos.protocol.pubsub.rospubsub import DimosROS, ROSTopic
from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory, SharedMemory
from dimos.protocol.pubsub.shmpubsub import BytesSharedMemory, PickleSharedMemory

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down Expand Up @@ -167,7 +167,7 @@ class SHMTransport(PubSubTransport[T]):

def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(topic)
self.shm = SharedMemory(**kwargs)
self.shm = BytesSharedMemory(**kwargs)

def __reduce__(self): # type: ignore[no-untyped-def]
return (SHMTransport, (self.topic,))
Expand Down
4 changes: 2 additions & 2 deletions dimos/protocol/pubsub/benchmark/test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import pytest

from dimos.protocol.pubsub.benchmark.testdata import testdata
from dimos.protocol.pubsub.benchmark.testdata import testcases
from dimos.protocol.pubsub.benchmark.type import (
BenchmarkResult,
BenchmarkResults,
Expand Down Expand Up @@ -86,7 +86,7 @@ def benchmark_results() -> Generator[BenchmarkResults, None, None]:

@pytest.mark.tool
@pytest.mark.parametrize("msg_size", MSG_SIZES, ids=[size_id(s) for s in MSG_SIZES])
@pytest.mark.parametrize("pubsub_context, msggen", testdata, ids=[pubsub_id(t) for t in testdata])
@pytest.mark.parametrize("pubsub_context, msggen", testcases, ids=[pubsub_id(t) for t in testcases])
def test_throughput(
pubsub_context: PubSubContext[Any, Any],
msggen: MsgGen[Any, Any],
Expand Down
146 changes: 85 additions & 61 deletions dimos/protocol/pubsub/benchmark/testdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,35 @@
from contextlib import contextmanager
from typing import Any

import numpy as np

from dimos.msgs.sensor_msgs.Image import Image, ImageFormat
from dimos.protocol.pubsub.benchmark.type import Case
from dimos.protocol.pubsub.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic
from dimos.protocol.pubsub.memory import Memory
from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory
from dimos.protocol.pubsub.shmpubsub import BytesSharedMemory, LCMSharedMemory, PickleSharedMemory


def make_data(size: int) -> bytes:
def make_data_bytes(size: int) -> bytes:
"""Generate random bytes of given size."""
return bytes(i % 256 for i in range(size))


testdata: list[Case[Any, Any]] = []
def make_data_image(size: int) -> Image:
"""Generate an RGB Image with approximately `size` bytes of data."""
raw_data = np.frombuffer(make_data_bytes(size), dtype=np.uint8).reshape(-1)
# Pad to make it divisible by 3 for RGB
padded_size = ((len(raw_data) + 2) // 3) * 3
padded_data = np.pad(raw_data, (0, padded_size - len(raw_data)))
pixels = len(padded_data) // 3
# Find reasonable dimensions
height = max(1, int(pixels**0.5))
width = pixels // height
data = padded_data[: height * width * 3].reshape(height, width, 3)
return Image(data=data, format=ImageFormat.RGB)


testcases: list[Case[Any, Any]] = []


@contextmanager
Expand All @@ -40,24 +56,11 @@ def lcm_pubsub_channel() -> Generator[LCM, None, None]:


def lcm_msggen(size: int) -> tuple[LCMTopic, Image]:
import numpy as np

# Create image data as numpy array with shape (height, width, channels)
raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1)
# Pad to make it divisible by 3 for RGB
padded_size = ((len(raw_data) + 2) // 3) * 3
padded_data = np.pad(raw_data, (0, padded_size - len(raw_data)))
pixels = len(padded_data) // 3
# Find reasonable dimensions
height = max(1, int(pixels**0.5))
width = pixels // height
data = padded_data[: height * width * 3].reshape(height, width, 3)
topic = LCMTopic(topic="benchmark/lcm", lcm_type=Image)
msg = Image(data=data, format=ImageFormat.RGB)
return (topic, msg)
return (topic, make_data_image(size))


testdata.append(
testcases.append(
Case(
pubsub_context=lcm_pubsub_channel,
msg_gen=lcm_msggen,
Expand All @@ -66,26 +69,26 @@ def lcm_msggen(size: int) -> tuple[LCMTopic, Image]:


@contextmanager
def udp_raw_pubsub_channel() -> Generator[LCMPubSubBase, None, None]:
def udp_bytes_pubsub_channel() -> Generator[LCMPubSubBase, None, None]:
"""LCM with raw bytes - no encoding overhead."""
lcm_pubsub = LCMPubSubBase(autoconf=True)
lcm_pubsub.start()
yield lcm_pubsub
lcm_pubsub.stop()


def udp_raw_msggen(size: int) -> tuple[LCMTopic, bytes]:
def udp_bytes_msggen(size: int) -> tuple[LCMTopic, bytes]:
"""Generate raw bytes for LCM transport benchmark."""
topic = LCMTopic(topic="benchmark/lcm_raw")
return (topic, make_data(size))
return (topic, make_data_bytes(size))


# testdata.append(
# Case(
# pubsub_context=udp_raw_pubsub_channel,
# msg_gen=udp_raw_msggen,
# )
# )
testcases.append(
Case(
pubsub_context=udp_bytes_pubsub_channel,
msg_gen=udp_bytes_msggen,
)
)


@contextmanager
Expand All @@ -95,28 +98,19 @@ def memory_pubsub_channel() -> Generator[Memory, None, None]:


def memory_msggen(size: int) -> tuple[str, Any]:
import numpy as np

raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1)
padded_size = ((len(raw_data) + 2) // 3) * 3
padded_data = np.pad(raw_data, (0, padded_size - len(raw_data)))
pixels = len(padded_data) // 3
height = max(1, int(pixels**0.5))
width = pixels // height
data = padded_data[: height * width * 3].reshape(height, width, 3)
return ("benchmark/memory", Image(data=data, format=ImageFormat.RGB))
return ("benchmark/memory", make_data_image(size))


# testdata.append(
# Case(
# pubsub_context=memory_pubsub_channel,
# msg_gen=memory_msggen,
# )
# )
testcases.append(
Case(
pubsub_context=memory_pubsub_channel,
msg_gen=memory_msggen,
)
)


@contextmanager
def shm_pubsub_channel() -> Generator[PickleSharedMemory, None, None]:
def shm_pickle_pubsub_channel() -> Generator[PickleSharedMemory, None, None]:
# 12MB capacity to handle benchmark sizes up to 10MB
shm_pubsub = PickleSharedMemory(prefer="cpu", default_capacity=12 * 1024 * 1024)
shm_pubsub.start()
Expand All @@ -126,26 +120,56 @@ def shm_pubsub_channel() -> Generator[PickleSharedMemory, None, None]:

def shm_msggen(size: int) -> tuple[str, Any]:
"""Generate message for SharedMemory pubsub benchmark."""
import numpy as np

raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1)
padded_size = ((len(raw_data) + 2) // 3) * 3
padded_data = np.pad(raw_data, (0, padded_size - len(raw_data)))
pixels = len(padded_data) // 3
height = max(1, int(pixels**0.5))
width = pixels // height
data = padded_data[: height * width * 3].reshape(height, width, 3)
return ("benchmark/shm", Image(data=data, format=ImageFormat.RGB))
return ("benchmark/shm", make_data_image(size))


testdata.append(
testcases.append(
Case(
pubsub_context=shm_pubsub_channel,
pubsub_context=shm_pickle_pubsub_channel,
msg_gen=shm_msggen,
)
)


@contextmanager
def shm_bytes_pubsub_channel() -> Generator[BytesSharedMemory, None, None]:
"""SharedMemory with raw bytes - no pickle overhead."""
shm_pubsub = BytesSharedMemory(prefer="cpu", default_capacity=12 * 1024 * 1024)
shm_pubsub.start()
yield shm_pubsub
shm_pubsub.stop()


def shm_bytes_msggen(size: int) -> tuple[str, bytes]:
"""Generate raw bytes for SharedMemory transport benchmark."""
return ("benchmark/shm_bytes", make_data_bytes(size))


testcases.append(
Case(
pubsub_context=shm_bytes_pubsub_channel,
msg_gen=shm_bytes_msggen,
)
)


@contextmanager
def shm_lcm_pubsub_channel() -> Generator[LCMSharedMemory, None, None]:
"""SharedMemory with LCM binary encoding - no pickle overhead."""
shm_pubsub = LCMSharedMemory(prefer="cpu", default_capacity=12 * 1024 * 1024)
shm_pubsub.start()
yield shm_pubsub
shm_pubsub.stop()


testcases.append(
Case(
pubsub_context=shm_lcm_pubsub_channel,
msg_gen=lcm_msggen, # Reuse the LCM message generator
)
)


try:
from dimos.protocol.pubsub.redispubsub import Redis

Expand All @@ -160,10 +184,10 @@ def redis_msggen(size: int) -> tuple[str, Any]:
# Redis uses JSON serialization, so use a simple dict with base64-encoded data
import base64

data = base64.b64encode(make_data(size)).decode("ascii")
data = base64.b64encode(make_data_bytes(size)).decode("ascii")
return ("benchmark/redis", {"data": data, "size": size})

testdata.append(
testcases.append(
Case(
pubsub_context=redis_pubsub_channel,
msg_gen=redis_msggen,
Expand Down Expand Up @@ -211,7 +235,7 @@ def ros_msggen(size: int) -> tuple[RawROSTopic, ROSImage]:
import numpy as np

# Create image data
data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1)
data = np.frombuffer(make_data_bytes(size), dtype=np.uint8).reshape(-1)
padded_size = ((len(data) + 2) // 3) * 3
data = np.pad(data, (0, padded_size - len(data)))
pixels = len(data) // 3
Expand All @@ -230,14 +254,14 @@ def ros_msggen(size: int) -> tuple[RawROSTopic, ROSImage]:
topic = RawROSTopic(topic="/benchmark/ros", ros_type=ROSImage)
return (topic, msg)

testdata.append(
testcases.append(
Case(
pubsub_context=ros_best_effort_pubsub_channel,
msg_gen=ros_msggen,
)
)

testdata.append(
testcases.append(
Case(
pubsub_context=ros_reliable_pubsub_channel,
msg_gen=ros_msggen,
Expand Down
17 changes: 13 additions & 4 deletions dimos/protocol/pubsub/shm/ipc_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ def shape(self) -> tuple: ... # type: ignore[type-arg]
def dtype(self) -> np.dtype: ... # type: ignore[type-arg]

@abstractmethod
def publish(self, frame) -> None: # type: ignore[no-untyped-def]
"""Write into inactive buffer, then flip visible index (write control last)."""
def publish(self, frame, length: int | None = None) -> None: # type: ignore[no-untyped-def]
"""Write into inactive buffer, then flip visible index (write control last).

Args:
frame: The numpy array to publish
length: Optional length to copy (for variable-size messages). If None, copies full frame.
"""
...

@abstractmethod
Expand Down Expand Up @@ -185,7 +190,7 @@ def shape(self): # type: ignore[no-untyped-def]
def dtype(self): # type: ignore[no-untyped-def]
return self._dtype

def publish(self, frame) -> None: # type: ignore[no-untyped-def]
def publish(self, frame, length: int | None = None) -> None: # type: ignore[no-untyped-def]
assert isinstance(frame, np.ndarray)
assert frame.shape == self._shape and frame.dtype == self._dtype
active = int(self._ctrl[2])
Expand All @@ -196,7 +201,11 @@ def publish(self, frame) -> None: # type: ignore[no-untyped-def]
buffer=self._shm_data.buf,
offset=inactive * self._nbytes,
)
np.copyto(view, frame, casting="no")
# Only copy actual payload length if specified, otherwise copy full frame
if length is not None and length < len(frame):
np.copyto(view[:length], frame[:length], casting="no")
else:
np.copyto(view, frame, casting="no")
ts = np.int64(time.time_ns())
# Publish order: ts -> idx -> seq
self._ctrl[1] = ts
Expand Down
Loading
Loading