Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
486 changes: 486 additions & 0 deletions assets/foxglove_dashboards/unitree.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion dimos/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import dimos.core.colors as colors
from dimos.core.core import rpc
from dimos.core.module import Module, ModuleBase, ModuleConfig
from dimos.core.module import Module, ModuleBase, ModuleConfig, ModuleConfigT
from dimos.core.rpc_client import RPCClient
from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport
from dimos.core.transport import (
Expand Down Expand Up @@ -37,6 +37,7 @@
"Module",
"ModuleBase",
"ModuleConfig",
"ModuleConfigT",
"Out",
"PubSubTF",
"RPCSpec",
Expand Down
19 changes: 16 additions & 3 deletions dimos/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from dask.distributed import Actor, get_worker
from reactivex.disposable import CompositeDisposable
from typing_extensions import TypeVar

from dimos.core import colors
from dimos.core.core import T, rpc
Expand Down Expand Up @@ -73,9 +74,14 @@ def get_loop() -> tuple[asyncio.AbstractEventLoop, threading.Thread | None]:
class ModuleConfig:
rpc_transport: type[RPCSpec] = LCMRPC
tf_transport: type[TFSpec] = LCMTF
frame_id_prefix: str | None = None
frame_id: str | None = None


class ModuleBase(Configurable[ModuleConfig], SkillContainer, Resource):
ModuleConfigT = TypeVar("ModuleConfigT", bound=ModuleConfig, default=ModuleConfig)


class ModuleBase(Configurable[ModuleConfigT], SkillContainer, Resource):
_rpc: RPCSpec | None = None
_tf: TFSpec | None = None
_loop: asyncio.AbstractEventLoop | None = None
Expand All @@ -85,7 +91,7 @@ class ModuleBase(Configurable[ModuleConfig], SkillContainer, Resource):

rpc_calls: list[str] = []

default_config = ModuleConfig
default_config: type[ModuleConfigT] = ModuleConfig # type: ignore[assignment]

def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(*args, **kwargs)
Expand All @@ -102,6 +108,13 @@ def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
except ValueError:
...

@property
def frame_id(self) -> str:
base = self.config.frame_id or self.__class__.__name__
if self.config.frame_id_prefix:
return f"{self.config.frame_id_prefix}/{base}"
return base

@rpc
def start(self) -> None:
pass
Expand Down Expand Up @@ -276,7 +289,7 @@ def get_rpc_calls(self, *methods: str) -> RpcCall | tuple[RpcCall, ...]: # type
return result[0] if len(result) == 1 else result


class DaskModule(ModuleBase):
class DaskModule(ModuleBase[ModuleConfigT]):
ref: Actor
worker: int

Expand Down
3 changes: 2 additions & 1 deletion dimos/core/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from reactivex.disposable import Disposable

import dimos.core.colors as colors
from dimos.core.resource import Resource
from dimos.utils.logging_config import setup_logger
import dimos.utils.reactive as reactive
from dimos.utils.reactive import backpressure
Expand Down Expand Up @@ -79,7 +80,7 @@ class State(enum.Enum):
FLOWING = "flowing" # runtime: data observed


class Transport(ObservableMixin[T]):
class Transport(Resource, ObservableMixin[T]):
# used by local Output
def broadcast(self, selfstream: Out[T], value: T) -> None: ...

Expand Down
29 changes: 29 additions & 0 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) ->
self._started = True
return self.lcm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]

def start(self) -> None: ...

def stop(self) -> None:
self.lcm.stop()


class LCMTransport(PubSubTransport[T]):
_started: bool = False
Expand All @@ -82,6 +87,11 @@ def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no
if not hasattr(self, "lcm"):
self.lcm = LCM(**kwargs)

def start(self) -> None: ...

def stop(self) -> None:
self.lcm.stop()

def __reduce__(self): # type: ignore[no-untyped-def]
return (LCMTransport, (self.topic.topic, self.topic.lcm_type))

Expand All @@ -107,6 +117,11 @@ def __init__(self, topic: str, type: type, **kwargs) -> None: # type: ignore[no
def __reduce__(self): # type: ignore[no-untyped-def]
return (JpegLcmTransport, (self.topic.topic, self.topic.lcm_type))

def start(self) -> None: ...

def stop(self) -> None:
self.lcm.stop()


class pSHMTransport(PubSubTransport[T]):
_started: bool = False
Expand All @@ -131,6 +146,11 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] = None) ->
self._started = True
return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[return-value]

def start(self) -> None: ...

def stop(self) -> None:
self.shm.stop()


class SHMTransport(PubSubTransport[T]):
_started: bool = False
Expand All @@ -155,6 +175,11 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] | None = No
self._started = True
return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[arg-type, return-value]

def start(self) -> None: ...

def stop(self) -> None:
self.shm.stop()


class JpegShmTransport(PubSubTransport[T]):
_started: bool = False
Expand All @@ -180,5 +205,9 @@ def subscribe(self, callback: Callable[[T], None], selfstream: In[T] | None = No
self._started = True
return self.shm.subscribe(self.topic, lambda msg, topic: callback(msg)) # type: ignore[arg-type, return-value]

def start(self) -> None: ...

def stop(self) -> None: ...


class ZenohTransport(PubSubTransport[T]): ...
6 changes: 2 additions & 4 deletions dimos/hardware/camera/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,16 @@ class CameraModuleConfig(ModuleConfig):
frequency: float = 5.0


class CameraModule(Module, spec.Camera):
class CameraModule(Module[CameraModuleConfig], spec.Camera):
color_image: Out[Image]
camera_info: Out[CameraInfo]

hardware: Callable[[], CameraHardware] | CameraHardware = None # type: ignore[assignment, type-arg]
_skill_stream: Observable[Image] | None = None

config: CameraModuleConfig
default_config = CameraModuleConfig

def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(*args, **kwargs)

@property
def hardware_camera_info(self) -> CameraInfo:
return self.hardware.camera_info # type: ignore[union-attr]
Expand Down
2 changes: 0 additions & 2 deletions dimos/hardware/camera/zed/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ def __init__( # type: ignore[no-untyped-def]
enable_imu_fusion: bool = True,
set_floor_as_origin: bool = True,
publish_rate: float = 30.0,
frame_id: str = "zed_camera",
recording_path: str | None = None,
**kwargs,
) -> None:
Expand All @@ -574,7 +573,6 @@ def __init__( # type: ignore[no-untyped-def]
self.enable_imu_fusion = enable_imu_fusion
self.set_floor_as_origin = set_floor_as_origin
self.publish_rate = publish_rate
self.frame_id = frame_id
self.recording_path = recording_path

# Convert string parameters to ZED enums
Expand Down
17 changes: 12 additions & 5 deletions dimos/hardware/fake_zed_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
FakeZEDModule - Replays recorded ZED data for testing without hardware.
"""

from dataclasses import dataclass
import functools
import logging

from dimos_lcm.sensor_msgs import CameraInfo # type: ignore[import-untyped]
import numpy as np

from dimos.core import Module, Out, rpc
from dimos.core import Module, ModuleConfig, Out, rpc
from dimos.msgs.geometry_msgs import PoseStamped
from dimos.msgs.sensor_msgs import Image, ImageFormat
from dimos.msgs.std_msgs import Header
Expand All @@ -34,7 +35,12 @@
logger = setup_logger(level=logging.INFO)


class FakeZEDModule(Module):
@dataclass
class FakeZEDModuleConfig(ModuleConfig):
frame_id: str = "zed_camera"


class FakeZEDModule(Module[FakeZEDModuleConfig]):
"""
Fake ZED module that replays recorded data instead of real camera.
"""
Expand All @@ -45,18 +51,19 @@ class FakeZEDModule(Module):
camera_info: Out[CameraInfo]
pose: Out[PoseStamped]

def __init__(self, recording_path: str, frame_id: str = "zed_camera", **kwargs) -> None: # type: ignore[no-untyped-def]
default_config = FakeZEDModuleConfig
config: FakeZEDModuleConfig

def __init__(self, recording_path: str, **kwargs: object) -> None:
"""
Initialize FakeZEDModule with recording path.

Args:
recording_path: Path to recorded data directory
frame_id: TF frame ID for messages
"""
super().__init__(**kwargs)

self.recording_path = recording_path
self.frame_id = frame_id
self._running = False

# Initialize TF publisher
Expand Down
16 changes: 11 additions & 5 deletions dimos/hardware/gstreamer_camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
import logging
import sys
import threading
import time

import numpy as np

from dimos.core import Module, Out, rpc
from dimos.core import Module, ModuleConfig, Out, rpc
from dimos.msgs.sensor_msgs import Image, ImageFormat
from dimos.utils.logging_config import setup_logger

Expand All @@ -40,16 +41,23 @@
Gst.init(None)


@dataclass
class Config(ModuleConfig):
frame_id: str = "camera"


class GstreamerCameraModule(Module):
"""Module that captures frames from a remote camera using GStreamer TCP with absolute timestamps."""

default_config = Config
config: Config

video: Out[Image]

def __init__( # type: ignore[no-untyped-def]
self,
host: str = "localhost",
port: int = 5000,
frame_id: str = "camera",
timestamp_offset: float = 0.0,
reconnect_interval: float = 5.0,
*args,
Expand All @@ -66,7 +74,6 @@ def __init__( # type: ignore[no-untyped-def]
"""
self.host = host
self.port = port
self.frame_id = frame_id
self.timestamp_offset = timestamp_offset
self.reconnect_interval = reconnect_interval

Expand All @@ -79,8 +86,7 @@ def __init__( # type: ignore[no-untyped-def]
self.frame_count = 0
self.last_log_time = time.time()
self.reconnect_timer_id = None

Module.__init__(self, *args, **kwargs)
super().__init__(**kwargs)

@rpc
def start(self) -> None:
Expand Down
Loading
Loading