Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dimos/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@
from dimos.agents.spec import AgentSpec
from dimos.protocol.skill.skill import skill
from dimos.protocol.skill.type import Output, Reducer, Stream

__all__ = ["Agent", "AgentSpec", "Output", "Reducer", "Stream", "deploy", "skill"]
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import time

from dimos import core
from dimos.hardware.sensors.gstreamer_camera import GstreamerCameraModule
from dimos.hardware.sensors.camera.gstreamer.gstreamer_camera import GstreamerCameraModule
from dimos.msgs.sensor_msgs import Image
from dimos.protocol import pubsub

Expand Down
1 change: 1 addition & 0 deletions dimos/hardware/sensors/camera/gstreamer/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This gstreamer stuff is obsoleted but could be adopted as an alternative hardware for camera module if needed
103 changes: 32 additions & 71 deletions dimos/hardware/sensors/camera/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Callable
from collections.abc import Callable, Generator
from dataclasses import dataclass, field
import queue
import time
from typing import Any

import reactivex as rx
from reactivex import operators as ops
from reactivex.disposable import Disposable
from reactivex.observable import Observable

from dimos import spec
from dimos.agents import Output, Reducer, Stream, skill # type: ignore[attr-defined]
from dimos.agents import Output, Reducer, Stream, skill
from dimos.core import Module, ModuleConfig, Out, rpc
from dimos.hardware.sensors.camera.spec import CameraHardware
from dimos.hardware.sensors.camera.webcam import Webcam
from dimos.msgs.geometry_msgs import Quaternion, Transform, Vector3
from dimos.msgs.sensor_msgs import Image
from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo
from dimos.msgs.sensor_msgs.Image import Image, sharpness_barrier
from dimos.spec import perception as spec # type: ignore[no-redef]
from dimos.spec import perception
from dimos.utils.reactive import iter_observable


def default_transform(): # type: ignore[no-untyped-def]
def default_transform() -> Transform:
return Transform(
translation=Vector3(0.0, 0.0, 0.0),
rotation=Quaternion(0.0, 0.0, 0.0, 1.0),
Expand All @@ -47,81 +45,52 @@ def default_transform(): # type: ignore[no-untyped-def]
class CameraModuleConfig(ModuleConfig):
frame_id: str = "camera_link"
transform: Transform | None = field(default_factory=default_transform)
hardware: Callable[[], CameraHardware] | CameraHardware = Webcam # type: ignore[type-arg]
frequency: float = 5.0
hardware: Callable[[], CameraHardware[Any]] | CameraHardware[Any] = Webcam
frequency: float = 0.0 # Hz, 0 means no limit


class CameraModule(Module[CameraModuleConfig], spec.Camera):
class CameraModule(Module[CameraModuleConfig], perception.Camera):
color_image: Out[Image]
camera_info: Out[CameraInfo]

hardware: CameraHardware = None # type: ignore[assignment, type-arg]
_module_subscription: Disposable | None = None
_camera_info_subscription: Disposable | None = None
_skill_stream: Observable[Image] | None = None
hardware: CameraHardware[Any]

config: CameraModuleConfig
default_config = CameraModuleConfig

def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def]
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)

@property
def hardware_camera_info(self) -> CameraInfo:
return self.hardware.camera_info

@rpc
def start(self): # type: ignore[no-untyped-def]
def start(self) -> None:
if callable(self.config.hardware):
self.hardware = self.config.hardware()
else:
self.hardware = self.config.hardware

if self._module_subscription:
return "already started"

stream = self.hardware.image_stream().pipe(sharpness_barrier(self.config.frequency)) # type: ignore[attr-defined]
self._disposables.add(stream.subscribe(self.color_image.publish))

# camera_info_stream = self.camera_info_stream(frequency=5.0)

def publish_info(camera_info: CameraInfo) -> None:
self.camera_info.publish(camera_info)

if self.config.transform is None:
return

camera_link = self.config.transform
camera_link.ts = camera_info.ts
camera_optical = Transform(
translation=Vector3(0.0, 0.0, 0.0),
rotation=Quaternion(-0.5, 0.5, -0.5, 0.5),
frame_id="camera_link",
child_frame_id="camera_optical",
ts=camera_link.ts,
)

self.tf.publish(camera_link, camera_optical)
stream = self.hardware.image_stream()

self._camera_info_subscription = self.camera_info_stream().subscribe(publish_info) # type: ignore[assignment]
self._module_subscription = stream.subscribe(self.color_image.publish) # type: ignore[attr-defined]
if self.config.frequency > 0:
stream = stream.pipe(sharpness_barrier(self.config.frequency))

@skill(stream=Stream.passive, output=Output.image, reducer=Reducer.latest) # type: ignore[arg-type]
def video_stream(self) -> Image: # type: ignore[misc]
"""implicit video stream skill"""
_queue = queue.Queue(maxsize=1) # type: ignore[var-annotated]
self.hardware.image_stream().subscribe(_queue.put)
self._disposables.add(
stream.subscribe(self.color_image.publish),
)

yield from iter(_queue.get, None)
self._disposables.add(
rx.interval(1.0).subscribe(lambda _: self.publish_metadata()),
)

def publish_info(self, camera_info: CameraInfo) -> None:
def publish_metadata(self) -> None:
camera_info = self.hardware.camera_info.with_ts(time.time())
self.camera_info.publish(camera_info)

if self.config.transform is None:
if not self.config.transform:
return

camera_link = self.config.transform
camera_link.ts = camera_info.ts

camera_optical = Transform(
translation=Vector3(0.0, 0.0, 0.0),
rotation=Quaternion(-0.5, 0.5, -0.5, 0.5),
Expand All @@ -132,21 +101,13 @@ def publish_info(self, camera_info: CameraInfo) -> None:

self.tf.publish(camera_link, camera_optical)

def camera_info_stream(self, frequency: float = 1.0) -> Observable[CameraInfo]:
def camera_info(_) -> CameraInfo: # type: ignore[no-untyped-def]
self.hardware.camera_info.ts = time.time()
return self.hardware.camera_info

return rx.interval(1.0 / frequency).pipe(ops.map(camera_info))

def stop(self): # type: ignore[no-untyped-def]
if self._module_subscription:
self._module_subscription.dispose()
self._module_subscription = None
if self._camera_info_subscription:
self._camera_info_subscription.dispose()
self._camera_info_subscription = None
# Also stop the hardware if it has a stop method
# actually skills should support on_demand passive skills so we don't emit this periodically
# but just provide the latest frame on demand
@skill(stream=Stream.passive, output=Output.image, reducer=Reducer.latest) # type: ignore[arg-type]
def video_stream(self) -> Generator[Image, None, None]:
yield from iter_observable(self.hardware.image_stream().pipe(ops.sample(1.0)))

def stop(self) -> None:
if self.hardware and hasattr(self.hardware, "stop"):
self.hardware.stop()
super().stop()
Expand Down
70 changes: 11 additions & 59 deletions dimos/hardware/sensors/camera/test_webcam.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
from dimos.msgs.sensor_msgs import CameraInfo, Image


@pytest.mark.tool
def test_streaming_single() -> None:
dimos = core.start(1)
@pytest.fixture
def dimos():
dimos_instance = core.start(1)
yield dimos_instance
dimos_instance.stop()


@pytest.mark.tool
def test_streaming_single(dimos) -> None:
camera = dimos.deploy(
CameraModule,
transform=Transform(
Expand All @@ -37,15 +42,14 @@ def test_streaming_single() -> None:
child_frame_id="camera_link",
),
hardware=lambda: Webcam(
stereo_slice="left",
camera_index=0,
frequency=15,
frequency=0.0, # full speed but set something to test sharpness barrier
camera_info=zed.CameraInfo.SingleWebcam,
),
)

camera.image.transport = core.LCMTransport("/image1", Image)
camera.camera_info.transport = core.LCMTransport("/image1/camera_info", CameraInfo)
camera.color_image.transport = core.LCMTransport("/color_image", Image)
camera.camera_info.transport = core.LCMTransport("/camera_info", CameraInfo)
camera.start()

try:
Expand All @@ -54,55 +58,3 @@ def test_streaming_single() -> None:
except KeyboardInterrupt:
camera.stop()
dimos.stop()


@pytest.mark.tool
def test_streaming_double() -> None:
dimos = core.start(2)

camera1 = dimos.deploy(
CameraModule,
transform=Transform(
translation=Vector3(0.05, 0.0, 0.0),
rotation=Quaternion(0.0, 0.0, 0.0, 1.0),
frame_id="sensor",
child_frame_id="camera_link",
),
hardware=lambda: Webcam(
stereo_slice="left",
camera_index=0,
frequency=15,
camera_info=zed.CameraInfo.SingleWebcam,
),
)

camera2 = dimos.deploy(
CameraModule,
transform=Transform(
translation=Vector3(0.05, 0.0, 0.0),
rotation=Quaternion(0.0, 0.0, 0.0, 1.0),
frame_id="sensor",
child_frame_id="camera_link",
),
hardware=lambda: Webcam(
camera_index=4,
frequency=15,
stereo_slice="left",
camera_info=zed.CameraInfo.SingleWebcam,
),
)

camera1.image.transport = core.LCMTransport("/image1", Image)
camera1.camera_info.transport = core.LCMTransport("/image1/camera_info", CameraInfo)
camera1.start()
camera2.image.transport = core.LCMTransport("/image2", Image)
camera2.camera_info.transport = core.LCMTransport("/image2/camera_info", CameraInfo)
camera2.start()

try:
while True:
time.sleep(1)
except KeyboardInterrupt:
camera1.stop()
camera2.stop()
dimos.stop()
35 changes: 0 additions & 35 deletions dimos/hardware/sensors/sensor.py

This file was deleted.

23 changes: 23 additions & 0 deletions dimos/msgs/sensor_msgs/CameraInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,29 @@ def __init__(
self.roi_width = 0
self.roi_do_rectify = False

def with_ts(self, ts: float) -> CameraInfo:
"""Return a copy of this CameraInfo with the given timestamp.

Args:
ts: New timestamp

Returns:
New CameraInfo instance with updated timestamp
"""
return CameraInfo(
height=self.height,
width=self.width,
distortion_model=self.distortion_model,
D=self.D.copy(),
K=self.K.copy(),
R=self.R.copy(),
P=self.P.copy(),
binning_x=self.binning_x,
binning_y=self.binning_y,
frame_id=self.frame_id,
ts=ts,
)

@classmethod
def from_yaml(cls, yaml_file: str) -> CameraInfo:
"""Create CameraInfo from YAML file.
Expand Down
35 changes: 34 additions & 1 deletion dimos/utils/reactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Callable
from collections.abc import Callable, Generator
from queue import Queue
import threading
from typing import Any, Generic, TypeVar

Expand Down Expand Up @@ -228,3 +229,35 @@ def _quality_barrier(source: Observable[T]) -> Observable[T]:
)

return _quality_barrier


def iter_observable(observable: Observable[T]) -> Generator[T, None, None]:
"""Convert an Observable to a blocking iterator.

Yields items as they arrive from the observable. Properly disposes
the subscription when the generator is closed.
"""
q: Queue[T | None] = Queue()
done = threading.Event()

def on_next(value: T) -> None:
q.put(value)

def on_complete() -> None:
done.set()
q.put(None)

def on_error(e: Exception) -> None:
done.set()
q.put(None)

sub = observable.subscribe(on_next=on_next, on_completed=on_complete, on_error=on_error)

try:
while not done.is_set() or not q.empty():
item = q.get()
if item is None and done.is_set():
break
yield item # type: ignore[misc]
finally:
sub.dispose()
Loading