diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 25d4f7a6e5..b56fe74f4f 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -177,7 +177,7 @@ def close_all() -> None: from dimos.protocol.pubsub import shmpubsub for obj in gc.get_objects(): - if isinstance(obj, shmpubsub.SharedMemory | shmpubsub.PickleSharedMemory): + if isinstance(obj, shmpubsub.SharedMemoryPubSubBase): try: obj.stop() except Exception: diff --git a/dimos/core/transport.py b/dimos/core/transport.py index fac12f27cc..4c1b19ee2e 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -30,7 +30,7 @@ from dimos.protocol.pubsub.jpeg_shm import JpegSharedMemory from dimos.protocol.pubsub.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic from dimos.protocol.pubsub.rospubsub import DimosROS, ROSTopic -from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory, SharedMemory +from dimos.protocol.pubsub.shmpubsub import BytesSharedMemory, PickleSharedMemory if TYPE_CHECKING: from collections.abc import Callable @@ -167,7 +167,7 @@ class SHMTransport(PubSubTransport[T]): def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(topic) - self.shm = SharedMemory(**kwargs) + self.shm = BytesSharedMemory(**kwargs) def __reduce__(self): # type: ignore[no-untyped-def] return (SHMTransport, (self.topic,)) diff --git a/dimos/protocol/pubsub/benchmark/test_benchmark.py b/dimos/protocol/pubsub/benchmark/test_benchmark.py index 61cbcd540e..865c4ee324 100644 --- a/dimos/protocol/pubsub/benchmark/test_benchmark.py +++ b/dimos/protocol/pubsub/benchmark/test_benchmark.py @@ -21,7 +21,7 @@ import pytest -from dimos.protocol.pubsub.benchmark.testdata import testdata +from dimos.protocol.pubsub.benchmark.testdata import testcases from dimos.protocol.pubsub.benchmark.type import ( BenchmarkResult, BenchmarkResults, @@ -86,7 +86,7 @@ def benchmark_results() -> Generator[BenchmarkResults, None, None]: @pytest.mark.tool @pytest.mark.parametrize("msg_size", MSG_SIZES, ids=[size_id(s) for s in MSG_SIZES]) -@pytest.mark.parametrize("pubsub_context, msggen", testdata, ids=[pubsub_id(t) for t in testdata]) +@pytest.mark.parametrize("pubsub_context, msggen", testcases, ids=[pubsub_id(t) for t in testcases]) def test_throughput( pubsub_context: PubSubContext[Any, Any], msggen: MsgGen[Any, Any], diff --git a/dimos/protocol/pubsub/benchmark/testdata.py b/dimos/protocol/pubsub/benchmark/testdata.py index 00ada8a3a4..e9190ad70e 100644 --- a/dimos/protocol/pubsub/benchmark/testdata.py +++ b/dimos/protocol/pubsub/benchmark/testdata.py @@ -16,19 +16,35 @@ from contextlib import contextmanager from typing import Any +import numpy as np + from dimos.msgs.sensor_msgs.Image import Image, ImageFormat from dimos.protocol.pubsub.benchmark.type import Case from dimos.protocol.pubsub.lcmpubsub import LCM, LCMPubSubBase, Topic as LCMTopic from dimos.protocol.pubsub.memory import Memory -from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory +from dimos.protocol.pubsub.shmpubsub import BytesSharedMemory, LCMSharedMemory, PickleSharedMemory -def make_data(size: int) -> bytes: +def make_data_bytes(size: int) -> bytes: """Generate random bytes of given size.""" return bytes(i % 256 for i in range(size)) -testdata: list[Case[Any, Any]] = [] +def make_data_image(size: int) -> Image: + """Generate an RGB Image with approximately `size` bytes of data.""" + raw_data = np.frombuffer(make_data_bytes(size), dtype=np.uint8).reshape(-1) + # Pad to make it divisible by 3 for RGB + padded_size = ((len(raw_data) + 2) // 3) * 3 + padded_data = np.pad(raw_data, (0, padded_size - len(raw_data))) + pixels = len(padded_data) // 3 + # Find reasonable dimensions + height = max(1, int(pixels**0.5)) + width = pixels // height + data = padded_data[: height * width * 3].reshape(height, width, 3) + return Image(data=data, format=ImageFormat.RGB) + + +testcases: list[Case[Any, Any]] = [] @contextmanager @@ -40,24 +56,11 @@ def lcm_pubsub_channel() -> Generator[LCM, None, None]: def lcm_msggen(size: int) -> tuple[LCMTopic, Image]: - import numpy as np - - # Create image data as numpy array with shape (height, width, channels) - raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1) - # Pad to make it divisible by 3 for RGB - padded_size = ((len(raw_data) + 2) // 3) * 3 - padded_data = np.pad(raw_data, (0, padded_size - len(raw_data))) - pixels = len(padded_data) // 3 - # Find reasonable dimensions - height = max(1, int(pixels**0.5)) - width = pixels // height - data = padded_data[: height * width * 3].reshape(height, width, 3) topic = LCMTopic(topic="benchmark/lcm", lcm_type=Image) - msg = Image(data=data, format=ImageFormat.RGB) - return (topic, msg) + return (topic, make_data_image(size)) -testdata.append( +testcases.append( Case( pubsub_context=lcm_pubsub_channel, msg_gen=lcm_msggen, @@ -66,7 +69,7 @@ def lcm_msggen(size: int) -> tuple[LCMTopic, Image]: @contextmanager -def udp_raw_pubsub_channel() -> Generator[LCMPubSubBase, None, None]: +def udp_bytes_pubsub_channel() -> Generator[LCMPubSubBase, None, None]: """LCM with raw bytes - no encoding overhead.""" lcm_pubsub = LCMPubSubBase(autoconf=True) lcm_pubsub.start() @@ -74,18 +77,18 @@ def udp_raw_pubsub_channel() -> Generator[LCMPubSubBase, None, None]: lcm_pubsub.stop() -def udp_raw_msggen(size: int) -> tuple[LCMTopic, bytes]: +def udp_bytes_msggen(size: int) -> tuple[LCMTopic, bytes]: """Generate raw bytes for LCM transport benchmark.""" topic = LCMTopic(topic="benchmark/lcm_raw") - return (topic, make_data(size)) + return (topic, make_data_bytes(size)) -# testdata.append( -# Case( -# pubsub_context=udp_raw_pubsub_channel, -# msg_gen=udp_raw_msggen, -# ) -# ) +testcases.append( + Case( + pubsub_context=udp_bytes_pubsub_channel, + msg_gen=udp_bytes_msggen, + ) +) @contextmanager @@ -95,28 +98,19 @@ def memory_pubsub_channel() -> Generator[Memory, None, None]: def memory_msggen(size: int) -> tuple[str, Any]: - import numpy as np - - raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1) - padded_size = ((len(raw_data) + 2) // 3) * 3 - padded_data = np.pad(raw_data, (0, padded_size - len(raw_data))) - pixels = len(padded_data) // 3 - height = max(1, int(pixels**0.5)) - width = pixels // height - data = padded_data[: height * width * 3].reshape(height, width, 3) - return ("benchmark/memory", Image(data=data, format=ImageFormat.RGB)) + return ("benchmark/memory", make_data_image(size)) -# testdata.append( -# Case( -# pubsub_context=memory_pubsub_channel, -# msg_gen=memory_msggen, -# ) -# ) +testcases.append( + Case( + pubsub_context=memory_pubsub_channel, + msg_gen=memory_msggen, + ) +) @contextmanager -def shm_pubsub_channel() -> Generator[PickleSharedMemory, None, None]: +def shm_pickle_pubsub_channel() -> Generator[PickleSharedMemory, None, None]: # 12MB capacity to handle benchmark sizes up to 10MB shm_pubsub = PickleSharedMemory(prefer="cpu", default_capacity=12 * 1024 * 1024) shm_pubsub.start() @@ -126,26 +120,56 @@ def shm_pubsub_channel() -> Generator[PickleSharedMemory, None, None]: def shm_msggen(size: int) -> tuple[str, Any]: """Generate message for SharedMemory pubsub benchmark.""" - import numpy as np - - raw_data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1) - padded_size = ((len(raw_data) + 2) // 3) * 3 - padded_data = np.pad(raw_data, (0, padded_size - len(raw_data))) - pixels = len(padded_data) // 3 - height = max(1, int(pixels**0.5)) - width = pixels // height - data = padded_data[: height * width * 3].reshape(height, width, 3) - return ("benchmark/shm", Image(data=data, format=ImageFormat.RGB)) + return ("benchmark/shm", make_data_image(size)) -testdata.append( +testcases.append( Case( - pubsub_context=shm_pubsub_channel, + pubsub_context=shm_pickle_pubsub_channel, msg_gen=shm_msggen, ) ) +@contextmanager +def shm_bytes_pubsub_channel() -> Generator[BytesSharedMemory, None, None]: + """SharedMemory with raw bytes - no pickle overhead.""" + shm_pubsub = BytesSharedMemory(prefer="cpu", default_capacity=12 * 1024 * 1024) + shm_pubsub.start() + yield shm_pubsub + shm_pubsub.stop() + + +def shm_bytes_msggen(size: int) -> tuple[str, bytes]: + """Generate raw bytes for SharedMemory transport benchmark.""" + return ("benchmark/shm_bytes", make_data_bytes(size)) + + +testcases.append( + Case( + pubsub_context=shm_bytes_pubsub_channel, + msg_gen=shm_bytes_msggen, + ) +) + + +@contextmanager +def shm_lcm_pubsub_channel() -> Generator[LCMSharedMemory, None, None]: + """SharedMemory with LCM binary encoding - no pickle overhead.""" + shm_pubsub = LCMSharedMemory(prefer="cpu", default_capacity=12 * 1024 * 1024) + shm_pubsub.start() + yield shm_pubsub + shm_pubsub.stop() + + +testcases.append( + Case( + pubsub_context=shm_lcm_pubsub_channel, + msg_gen=lcm_msggen, # Reuse the LCM message generator + ) +) + + try: from dimos.protocol.pubsub.redispubsub import Redis @@ -160,10 +184,10 @@ def redis_msggen(size: int) -> tuple[str, Any]: # Redis uses JSON serialization, so use a simple dict with base64-encoded data import base64 - data = base64.b64encode(make_data(size)).decode("ascii") + data = base64.b64encode(make_data_bytes(size)).decode("ascii") return ("benchmark/redis", {"data": data, "size": size}) - testdata.append( + testcases.append( Case( pubsub_context=redis_pubsub_channel, msg_gen=redis_msggen, @@ -211,7 +235,7 @@ def ros_msggen(size: int) -> tuple[RawROSTopic, ROSImage]: import numpy as np # Create image data - data = np.frombuffer(make_data(size), dtype=np.uint8).reshape(-1) + data = np.frombuffer(make_data_bytes(size), dtype=np.uint8).reshape(-1) padded_size = ((len(data) + 2) // 3) * 3 data = np.pad(data, (0, padded_size - len(data))) pixels = len(data) // 3 @@ -230,14 +254,14 @@ def ros_msggen(size: int) -> tuple[RawROSTopic, ROSImage]: topic = RawROSTopic(topic="/benchmark/ros", ros_type=ROSImage) return (topic, msg) - testdata.append( + testcases.append( Case( pubsub_context=ros_best_effort_pubsub_channel, msg_gen=ros_msggen, ) ) - testdata.append( + testcases.append( Case( pubsub_context=ros_reliable_pubsub_channel, msg_gen=ros_msggen, diff --git a/dimos/protocol/pubsub/shm/ipc_factory.py b/dimos/protocol/pubsub/shm/ipc_factory.py index 5f69c3dbd1..5f0b20165e 100644 --- a/dimos/protocol/pubsub/shm/ipc_factory.py +++ b/dimos/protocol/pubsub/shm/ipc_factory.py @@ -69,8 +69,13 @@ def shape(self) -> tuple: ... # type: ignore[type-arg] def dtype(self) -> np.dtype: ... # type: ignore[type-arg] @abstractmethod - def publish(self, frame) -> None: # type: ignore[no-untyped-def] - """Write into inactive buffer, then flip visible index (write control last).""" + def publish(self, frame, length: int | None = None) -> None: # type: ignore[no-untyped-def] + """Write into inactive buffer, then flip visible index (write control last). + + Args: + frame: The numpy array to publish + length: Optional length to copy (for variable-size messages). If None, copies full frame. + """ ... @abstractmethod @@ -185,7 +190,7 @@ def shape(self): # type: ignore[no-untyped-def] def dtype(self): # type: ignore[no-untyped-def] return self._dtype - def publish(self, frame) -> None: # type: ignore[no-untyped-def] + def publish(self, frame, length: int | None = None) -> None: # type: ignore[no-untyped-def] assert isinstance(frame, np.ndarray) assert frame.shape == self._shape and frame.dtype == self._dtype active = int(self._ctrl[2]) @@ -196,7 +201,11 @@ def publish(self, frame) -> None: # type: ignore[no-untyped-def] buffer=self._shm_data.buf, offset=inactive * self._nbytes, ) - np.copyto(view, frame, casting="no") + # Only copy actual payload length if specified, otherwise copy full frame + if length is not None and length < len(frame): + np.copyto(view[:length], frame[:length], casting="no") + else: + np.copyto(view, frame, casting="no") ts = np.int64(time.time_ns()) # Publish order: ts -> idx -> seq self._ctrl[1] = ts diff --git a/dimos/protocol/pubsub/shmpubsub.py b/dimos/protocol/pubsub/shmpubsub.py index e1ae8600aa..89efb82ac3 100644 --- a/dimos/protocol/pubsub/shmpubsub.py +++ b/dimos/protocol/pubsub/shmpubsub.py @@ -30,7 +30,9 @@ import uuid import numpy as np +import numpy.typing as npt +from dimos.protocol.pubsub.lcmpubsub import LCMEncoderMixin, Topic from dimos.protocol.pubsub.shm.ipc_factory import CpuShmChannel from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin from dimos.utils.logging_config import setup_logger @@ -41,11 +43,6 @@ logger = setup_logger() -# -------------------------------------------------------------------------------------- -# Configuration (kept local to PubSub now that Service is gone) -# -------------------------------------------------------------------------------------- - - @dataclass class SharedMemoryConfig: prefer: str = "auto" # "auto" | "cpu" (DIMOS_IPC_BACKEND overrides), TODO: "cuda" @@ -53,11 +50,6 @@ class SharedMemoryConfig: close_channels_on_stop: bool = True -# -------------------------------------------------------------------------------------- -# Core PubSub with integrated SHM/IPC transport (previously the Service logic) -# -------------------------------------------------------------------------------------- - - class SharedMemoryPubSubBase(PubSub[str, Any]): """ Pub/Sub over SharedMemory/CUDA-IPC, modeled after LCMPubSubBase but self-contained. @@ -81,6 +73,8 @@ class _TopicState: "dtype", "last_local_payload", "last_seq", + "publish_buffer", + "publish_lock", "shape", "stop", "subs", @@ -101,6 +95,10 @@ def __init__(self, channel, capacity: int, cp_mod) -> None: # type: ignore[no-u self.cp = cp_mod self.last_local_payload: bytes | None = None self.suppress_counts: dict[bytes, int] = defaultdict(int) # UUID bytes as key + # Pre-allocated buffer to avoid allocation on every publish + self.publish_buffer: npt.NDArray[np.uint8] = np.zeros(self.shape, dtype=self.dtype) + # Lock for thread-safe publish buffer access + self.publish_lock = threading.Lock() # ----- init / lifecycle ------------------------------------------------- @@ -178,15 +176,19 @@ def publish(self, topic: str, message: bytes) -> None: # Build host frame [len:4] + [uuid:16] + payload and publish # We embed the message UUID in the frame for echo suppression - host = np.zeros(st.shape, dtype=st.dtype) - # Pack: length(4) + uuid(16) + payload - header = struct.pack(" Callable[[], None]: """Subscribe a callback(message: bytes, topic). Returns unsubscribe.""" @@ -216,11 +218,14 @@ def reconfigure(self, topic: str, *, capacity: int) -> dict: # type: ignore[typ st = self._ensure_topic(topic) new_cap = int(capacity) new_shape = (new_cap + 20,) # +20 for header: length(4) + uuid(16) - desc = st.channel.reconfigure(new_shape, np.uint8) - st.capacity = new_cap - st.shape = new_shape - st.dtype = np.uint8 - st.last_seq = -1 + # Lock to ensure no publish is using the buffer while we replace it + with st.publish_lock: + desc = st.channel.reconfigure(new_shape, np.uint8) + st.capacity = new_cap + st.shape = new_shape + st.dtype = np.uint8 + st.last_seq = -1 + st.publish_buffer = np.zeros(new_shape, dtype=np.uint8) return desc # type: ignore[no-any-return] # ----- Internals -------------------------------------------------------- @@ -290,36 +295,50 @@ def _fanout_loop(self, topic: str, st: _TopicState) -> None: pass -# -------------------------------------------------------------------------------------- -# Encoders + concrete PubSub classes -# -------------------------------------------------------------------------------------- +BytesSharedMemory = SharedMemoryPubSubBase -class SharedMemoryBytesEncoderMixin(PubSubEncoderMixin[str, bytes, bytes]): - """Identity encoder for raw bytes.""" +class PickleSharedMemory( + PickleEncoderMixin[str, Any], + SharedMemoryPubSubBase, +): + """SharedMemory pubsub that transports arbitrary Python objects via pickle.""" - def encode(self, msg: bytes, _: str) -> bytes: - if isinstance(msg, bytes | bytearray | memoryview): - return bytes(msg) - raise TypeError(f"SharedMemory expects bytes-like, got {type(msg)!r}") + ... - def decode(self, msg: bytes, _: str) -> bytes: - return msg +class LCMSharedMemoryPubSubBase(PubSub[Topic, Any]): + """SharedMemory pubsub that uses LCM Topic type, delegating to SharedMemoryPubSubBase.""" -class SharedMemory( - SharedMemoryBytesEncoderMixin, - SharedMemoryPubSubBase, -): - """SharedMemory pubsub that transports raw bytes.""" + def __init__(self, **kwargs: Any) -> None: + super().__init__() + self._shm = SharedMemoryPubSubBase(**kwargs) - ... + def start(self) -> None: + self._shm.start() + def stop(self) -> None: + self._shm.stop() -class PickleSharedMemory( - PickleEncoderMixin[str, Any], - SharedMemoryPubSubBase, + def publish(self, topic: Topic, message: bytes) -> None: + self._shm.publish(str(topic), message) + + def subscribe( + self, topic: Topic, callback: Callable[[bytes, Topic], Any] + ) -> Callable[[], None]: + def wrapper(msg: bytes, _: str) -> None: + callback(msg, topic) + + return self._shm.subscribe(str(topic), wrapper) + + def reconfigure(self, topic: Topic, *, capacity: int) -> dict: # type: ignore[type-arg] + return self._shm.reconfigure(str(topic), capacity=capacity) + + +class LCMSharedMemory( + LCMEncoderMixin, + LCMSharedMemoryPubSubBase, ): - """SharedMemory pubsub that transports arbitrary Python objects via pickle.""" + """SharedMemory pubsub that uses LCM binary encoding (no pickle overhead).""" ...