diff --git a/dimos/agents/__init__.py b/dimos/agents/__init__.py index 8e099a21b4..9e1dd2df77 100644 --- a/dimos/agents/__init__.py +++ b/dimos/agents/__init__.py @@ -11,3 +11,5 @@ from dimos.agents.spec import AgentSpec from dimos.protocol.skill.skill import skill from dimos.protocol.skill.type import Output, Reducer, Stream + +__all__ = ["Agent", "AgentSpec", "Output", "Reducer", "Stream", "deploy", "skill"] diff --git a/dimos/hardware/sensors/gstreamer_camera.py b/dimos/hardware/sensors/camera/gstreamer/gstreamer_camera.py similarity index 100% rename from dimos/hardware/sensors/gstreamer_camera.py rename to dimos/hardware/sensors/camera/gstreamer/gstreamer_camera.py diff --git a/dimos/hardware/sensors/gstreamer_camera_test_script.py b/dimos/hardware/sensors/camera/gstreamer/gstreamer_camera_test_script.py similarity index 97% rename from dimos/hardware/sensors/gstreamer_camera_test_script.py rename to dimos/hardware/sensors/camera/gstreamer/gstreamer_camera_test_script.py index 5b78377519..cc0e3424a5 100755 --- a/dimos/hardware/sensors/gstreamer_camera_test_script.py +++ b/dimos/hardware/sensors/camera/gstreamer/gstreamer_camera_test_script.py @@ -19,7 +19,7 @@ import time from dimos import core -from dimos.hardware.sensors.gstreamer_camera import GstreamerCameraModule +from dimos.hardware.sensors.camera.gstreamer.gstreamer_camera import GstreamerCameraModule from dimos.msgs.sensor_msgs import Image from dimos.protocol import pubsub diff --git a/dimos/hardware/sensors/gstreamer_sender.py b/dimos/hardware/sensors/camera/gstreamer/gstreamer_sender.py similarity index 100% rename from dimos/hardware/sensors/gstreamer_sender.py rename to dimos/hardware/sensors/camera/gstreamer/gstreamer_sender.py diff --git a/dimos/hardware/sensors/camera/gstreamer/readme.md b/dimos/hardware/sensors/camera/gstreamer/readme.md new file mode 100644 index 0000000000..29198aea24 --- /dev/null +++ b/dimos/hardware/sensors/camera/gstreamer/readme.md @@ -0,0 +1 @@ +This gstreamer stuff is obsoleted but could be adopted as an alternative hardware for camera module if needed diff --git a/dimos/hardware/sensors/camera/module.py b/dimos/hardware/sensors/camera/module.py index 4232fe1f49..de2c3b8c78 100644 --- a/dimos/hardware/sensors/camera/module.py +++ b/dimos/hardware/sensors/camera/module.py @@ -12,29 +12,27 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections.abc import Callable +from collections.abc import Callable, Generator from dataclasses import dataclass, field -import queue import time +from typing import Any import reactivex as rx from reactivex import operators as ops -from reactivex.disposable import Disposable from reactivex.observable import Observable -from dimos import spec -from dimos.agents import Output, Reducer, Stream, skill # type: ignore[attr-defined] +from dimos.agents import Output, Reducer, Stream, skill from dimos.core import Module, ModuleConfig, Out, rpc from dimos.hardware.sensors.camera.spec import CameraHardware from dimos.hardware.sensors.camera.webcam import Webcam from dimos.msgs.geometry_msgs import Quaternion, Transform, Vector3 -from dimos.msgs.sensor_msgs import Image from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo from dimos.msgs.sensor_msgs.Image import Image, sharpness_barrier -from dimos.spec import perception as spec # type: ignore[no-redef] +from dimos.spec import perception +from dimos.utils.reactive import iter_observable -def default_transform(): # type: ignore[no-untyped-def] +def default_transform() -> Transform: return Transform( translation=Vector3(0.0, 0.0, 0.0), rotation=Quaternion(0.0, 0.0, 0.0, 1.0), @@ -47,81 +45,52 @@ def default_transform(): # type: ignore[no-untyped-def] class CameraModuleConfig(ModuleConfig): frame_id: str = "camera_link" transform: Transform | None = field(default_factory=default_transform) - hardware: Callable[[], CameraHardware] | CameraHardware = Webcam # type: ignore[type-arg] - frequency: float = 5.0 + hardware: Callable[[], CameraHardware[Any]] | CameraHardware[Any] = Webcam + frequency: float = 0.0 # Hz, 0 means no limit -class CameraModule(Module[CameraModuleConfig], spec.Camera): +class CameraModule(Module[CameraModuleConfig], perception.Camera): color_image: Out[Image] camera_info: Out[CameraInfo] - hardware: CameraHardware = None # type: ignore[assignment, type-arg] - _module_subscription: Disposable | None = None - _camera_info_subscription: Disposable | None = None - _skill_stream: Observable[Image] | None = None + hardware: CameraHardware[Any] config: CameraModuleConfig default_config = CameraModuleConfig - def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) - @property - def hardware_camera_info(self) -> CameraInfo: - return self.hardware.camera_info - @rpc - def start(self): # type: ignore[no-untyped-def] + def start(self) -> None: if callable(self.config.hardware): self.hardware = self.config.hardware() else: self.hardware = self.config.hardware - if self._module_subscription: - return "already started" - - stream = self.hardware.image_stream().pipe(sharpness_barrier(self.config.frequency)) # type: ignore[attr-defined] - self._disposables.add(stream.subscribe(self.color_image.publish)) - - # camera_info_stream = self.camera_info_stream(frequency=5.0) - - def publish_info(camera_info: CameraInfo) -> None: - self.camera_info.publish(camera_info) - - if self.config.transform is None: - return - - camera_link = self.config.transform - camera_link.ts = camera_info.ts - camera_optical = Transform( - translation=Vector3(0.0, 0.0, 0.0), - rotation=Quaternion(-0.5, 0.5, -0.5, 0.5), - frame_id="camera_link", - child_frame_id="camera_optical", - ts=camera_link.ts, - ) - - self.tf.publish(camera_link, camera_optical) + stream = self.hardware.image_stream() - self._camera_info_subscription = self.camera_info_stream().subscribe(publish_info) # type: ignore[assignment] - self._module_subscription = stream.subscribe(self.color_image.publish) # type: ignore[attr-defined] + if self.config.frequency > 0: + stream = stream.pipe(sharpness_barrier(self.config.frequency)) - @skill(stream=Stream.passive, output=Output.image, reducer=Reducer.latest) # type: ignore[arg-type] - def video_stream(self) -> Image: # type: ignore[misc] - """implicit video stream skill""" - _queue = queue.Queue(maxsize=1) # type: ignore[var-annotated] - self.hardware.image_stream().subscribe(_queue.put) + self._disposables.add( + stream.subscribe(self.color_image.publish), + ) - yield from iter(_queue.get, None) + self._disposables.add( + rx.interval(1.0).subscribe(lambda _: self.publish_metadata()), + ) - def publish_info(self, camera_info: CameraInfo) -> None: + def publish_metadata(self) -> None: + camera_info = self.hardware.camera_info.with_ts(time.time()) self.camera_info.publish(camera_info) - if self.config.transform is None: + if not self.config.transform: return camera_link = self.config.transform camera_link.ts = camera_info.ts + camera_optical = Transform( translation=Vector3(0.0, 0.0, 0.0), rotation=Quaternion(-0.5, 0.5, -0.5, 0.5), @@ -132,21 +101,13 @@ def publish_info(self, camera_info: CameraInfo) -> None: self.tf.publish(camera_link, camera_optical) - def camera_info_stream(self, frequency: float = 1.0) -> Observable[CameraInfo]: - def camera_info(_) -> CameraInfo: # type: ignore[no-untyped-def] - self.hardware.camera_info.ts = time.time() - return self.hardware.camera_info - - return rx.interval(1.0 / frequency).pipe(ops.map(camera_info)) - - def stop(self): # type: ignore[no-untyped-def] - if self._module_subscription: - self._module_subscription.dispose() - self._module_subscription = None - if self._camera_info_subscription: - self._camera_info_subscription.dispose() - self._camera_info_subscription = None - # Also stop the hardware if it has a stop method + # actually skills should support on_demand passive skills so we don't emit this periodically + # but just provide the latest frame on demand + @skill(stream=Stream.passive, output=Output.image, reducer=Reducer.latest) # type: ignore[arg-type] + def video_stream(self) -> Generator[Image, None, None]: + yield from iter_observable(self.hardware.image_stream().pipe(ops.sample(1.0))) + + def stop(self) -> None: if self.hardware and hasattr(self.hardware, "stop"): self.hardware.stop() super().stop() diff --git a/dimos/hardware/sensors/camera/test_webcam.py b/dimos/hardware/sensors/camera/test_webcam.py index e9f544e791..0d1a1d0040 100644 --- a/dimos/hardware/sensors/camera/test_webcam.py +++ b/dimos/hardware/sensors/camera/test_webcam.py @@ -24,10 +24,15 @@ from dimos.msgs.sensor_msgs import CameraInfo, Image -@pytest.mark.tool -def test_streaming_single() -> None: - dimos = core.start(1) +@pytest.fixture +def dimos(): + dimos_instance = core.start(1) + yield dimos_instance + dimos_instance.stop() + +@pytest.mark.tool +def test_streaming_single(dimos) -> None: camera = dimos.deploy( CameraModule, transform=Transform( @@ -37,15 +42,14 @@ def test_streaming_single() -> None: child_frame_id="camera_link", ), hardware=lambda: Webcam( - stereo_slice="left", camera_index=0, - frequency=15, + frequency=0.0, # full speed but set something to test sharpness barrier camera_info=zed.CameraInfo.SingleWebcam, ), ) - camera.image.transport = core.LCMTransport("/image1", Image) - camera.camera_info.transport = core.LCMTransport("/image1/camera_info", CameraInfo) + camera.color_image.transport = core.LCMTransport("/color_image", Image) + camera.camera_info.transport = core.LCMTransport("/camera_info", CameraInfo) camera.start() try: @@ -54,55 +58,3 @@ def test_streaming_single() -> None: except KeyboardInterrupt: camera.stop() dimos.stop() - - -@pytest.mark.tool -def test_streaming_double() -> None: - dimos = core.start(2) - - camera1 = dimos.deploy( - CameraModule, - transform=Transform( - translation=Vector3(0.05, 0.0, 0.0), - rotation=Quaternion(0.0, 0.0, 0.0, 1.0), - frame_id="sensor", - child_frame_id="camera_link", - ), - hardware=lambda: Webcam( - stereo_slice="left", - camera_index=0, - frequency=15, - camera_info=zed.CameraInfo.SingleWebcam, - ), - ) - - camera2 = dimos.deploy( - CameraModule, - transform=Transform( - translation=Vector3(0.05, 0.0, 0.0), - rotation=Quaternion(0.0, 0.0, 0.0, 1.0), - frame_id="sensor", - child_frame_id="camera_link", - ), - hardware=lambda: Webcam( - camera_index=4, - frequency=15, - stereo_slice="left", - camera_info=zed.CameraInfo.SingleWebcam, - ), - ) - - camera1.image.transport = core.LCMTransport("/image1", Image) - camera1.camera_info.transport = core.LCMTransport("/image1/camera_info", CameraInfo) - camera1.start() - camera2.image.transport = core.LCMTransport("/image2", Image) - camera2.camera_info.transport = core.LCMTransport("/image2/camera_info", CameraInfo) - camera2.start() - - try: - while True: - time.sleep(1) - except KeyboardInterrupt: - camera1.stop() - camera2.stop() - dimos.stop() diff --git a/dimos/hardware/sensors/sensor.py b/dimos/hardware/sensors/sensor.py deleted file mode 100644 index dc86d93e56..0000000000 --- a/dimos/hardware/sensors/sensor.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from abc import ABC, abstractmethod - - -class AbstractSensor(ABC): - def __init__(self, sensor_type=None) -> None: # type: ignore[no-untyped-def] - self.sensor_type = sensor_type - - @abstractmethod - def get_sensor_type(self): # type: ignore[no-untyped-def] - """Return the type of sensor.""" - pass - - @abstractmethod - def calculate_intrinsics(self): # type: ignore[no-untyped-def] - """Calculate the sensor's intrinsics.""" - pass - - @abstractmethod - def get_intrinsics(self): # type: ignore[no-untyped-def] - """Return the sensor's intrinsics.""" - pass diff --git a/dimos/msgs/sensor_msgs/CameraInfo.py b/dimos/msgs/sensor_msgs/CameraInfo.py index c54b6565fa..b6f85dbaca 100644 --- a/dimos/msgs/sensor_msgs/CameraInfo.py +++ b/dimos/msgs/sensor_msgs/CameraInfo.py @@ -99,6 +99,29 @@ def __init__( self.roi_width = 0 self.roi_do_rectify = False + def with_ts(self, ts: float) -> CameraInfo: + """Return a copy of this CameraInfo with the given timestamp. + + Args: + ts: New timestamp + + Returns: + New CameraInfo instance with updated timestamp + """ + return CameraInfo( + height=self.height, + width=self.width, + distortion_model=self.distortion_model, + D=self.D.copy(), + K=self.K.copy(), + R=self.R.copy(), + P=self.P.copy(), + binning_x=self.binning_x, + binning_y=self.binning_y, + frame_id=self.frame_id, + ts=ts, + ) + @classmethod def from_yaml(cls, yaml_file: str) -> CameraInfo: """Create CameraInfo from YAML file. diff --git a/dimos/utils/reactive.py b/dimos/utils/reactive.py index c7dea45576..b5833a6348 100644 --- a/dimos/utils/reactive.py +++ b/dimos/utils/reactive.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections.abc import Callable +from collections.abc import Callable, Generator +from queue import Queue import threading from typing import Any, Generic, TypeVar @@ -228,3 +229,35 @@ def _quality_barrier(source: Observable[T]) -> Observable[T]: ) return _quality_barrier + + +def iter_observable(observable: Observable[T]) -> Generator[T, None, None]: + """Convert an Observable to a blocking iterator. + + Yields items as they arrive from the observable. Properly disposes + the subscription when the generator is closed. + """ + q: Queue[T | None] = Queue() + done = threading.Event() + + def on_next(value: T) -> None: + q.put(value) + + def on_complete() -> None: + done.set() + q.put(None) + + def on_error(e: Exception) -> None: + done.set() + q.put(None) + + sub = observable.subscribe(on_next=on_next, on_completed=on_complete, on_error=on_error) + + try: + while not done.is_set() or not q.empty(): + item = q.get() + if item is None and done.is_set(): + break + yield item # type: ignore[misc] + finally: + sub.dispose() diff --git a/dimos/utils/test_reactive.py b/dimos/utils/test_reactive.py index 17b69ba0aa..a0f3fe42ef 100644 --- a/dimos/utils/test_reactive.py +++ b/dimos/utils/test_reactive.py @@ -28,6 +28,7 @@ callback_to_observable, getter_ondemand, getter_streaming, + iter_observable, ) @@ -283,3 +284,12 @@ def stop_fn(cb) -> None: # Dispose subscription and check that stop was called subscription.dispose() assert stop_called, "Stop function should be called on dispose" + + +def test_iter_observable() -> None: + source = dispose_spy(rx.of(1, 2, 3, 4, 5)) + + result = list(iter_observable(source)) + + assert result == [1, 2, 3, 4, 5] + assert source.is_disposed(), "Observable should be disposed after iteration"