Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dimos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,15 @@
from pathlib import Path

DIMOS_PROJECT_ROOT = Path(__file__).parent.parent

"""
Constants for shared memory
Usually, auto-detection for size would be preferred. Sadly, though, channels are made
and frozen *before* the first frame is received.
Therefore, a maximum capacity for color image and depth image transfer should be defined
ahead of time.
"""
# Default color image size: 1920x1080 frame x 3 (RGB) x uint8
DEFAULT_CAPACITY_COLOR_IMAGE = 1920 * 1080 * 3
# Default depth image size: 1280x720 frame * 4 (float32 size)
DEFAULT_CAPACITY_DEPTH_IMAGE = 1280 * 720 * 4
5 changes: 5 additions & 0 deletions dimos/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dimos.core.core import rpc
from dimos.core.module import Module, ModuleBase, ModuleConfig
from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport
from dimos.utils.actor_registry import ActorRegistry
from dimos.core.transport import (
LCMTransport,
SHMTransport,
Expand Down Expand Up @@ -120,6 +121,9 @@ def deploy(
worker = actor.set_ref(actor).result()
print((f"deployed: {colors.green(actor)} @ {colors.blue('worker ' + str(worker))}"))

# Register actor deployment in shared memory
ActorRegistry.update(str(actor), str(worker))

return RPCClient(actor, actor_class)

def check_worker_memory():
Expand Down Expand Up @@ -176,6 +180,7 @@ def check_worker_memory():
)

def close_all():
ActorRegistry.clear()
dask_client.shutdown()
local_cluster.close()

Expand Down
5 changes: 5 additions & 0 deletions dimos/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
get_origin,
get_type_hints,
)
from reactivex.disposable import CompositeDisposable

from dask.distributed import Actor, get_worker

Expand Down Expand Up @@ -74,12 +75,14 @@ class ModuleBase(Configurable[ModuleConfig], SkillContainer):
_tf: Optional[TFSpec] = None
_loop: Optional[asyncio.AbstractEventLoop] = None
_loop_thread: Optional[threading.Thread]
_disposables: CompositeDisposable

default_config = ModuleConfig

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._loop, self._loop_thread = get_loop()
self._disposables = CompositeDisposable()
# we can completely override comms protocols if we want
try:
# here we attempt to figure out if we are running on a dask worker
Expand All @@ -98,6 +101,8 @@ def _close_module(self):
self._loop = None
self._loop_thread.join(timeout=2)
self._loop_thread = None
if hasattr(self, "_disposables"):
self._disposables.dispose()

def _close_rpc(self):
# Using hasattr is needed because SkillCoordinator skips ModuleBase.__init__ and self.rpc is never set.
Expand Down
1 change: 0 additions & 1 deletion dimos/navigation/bt_navigator/navigator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from dimos.msgs.geometry_msgs import PoseStamped
from dimos.msgs.nav_msgs import OccupancyGrid
from dimos_lcm.std_msgs import String
from dimos.navigation.local_planner.local_planner import BaseLocalPlanner
from dimos.navigation.bt_navigator.goal_validator import find_safe_goal
from dimos.navigation.bt_navigator.recovery_server import RecoveryServer
from dimos.protocol.tf import TF
Expand Down
8 changes: 6 additions & 2 deletions dimos/navigation/local_planner/local_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from typing import Optional

from dimos.core import Module, In, Out, rpc
from dimos.msgs.geometry_msgs import Twist, Vector3, PoseStamped
from dimos.msgs.geometry_msgs import Twist, PoseStamped
from dimos.msgs.nav_msgs import OccupancyGrid, Path
from dimos.utils.logging_config import setup_logger
from dimos.utils.transform_utils import get_distance, quaternion_to_euler, normalize_angle
Expand Down Expand Up @@ -198,6 +198,10 @@ def stop(self):
self.planning_thread = None
stop_cmd = Twist()
self.cmd_vel.publish(stop_cmd)
self._close_module()

logger.info("Local planner stopped")

@rpc
def stop_planner_module(self):
self.stop()
self._close_module()
20 changes: 14 additions & 6 deletions dimos/perception/spatial_perception.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@

import numpy as np
import cv2
from reactivex import Observable, disposable, just, interval
from reactivex import Observable, disposable, interval
from reactivex import operators as ops
from datetime import datetime
from reactivex.disposable import Disposable

from dimos.core import In, Module, Out, rpc
from dimos.core import In, Module, rpc
from dimos.msgs.sensor_msgs import Image
from dimos.msgs.geometry_msgs import Vector3, Quaternion, Pose, PoseStamped
from dimos.msgs.geometry_msgs import Vector3, Pose, PoseStamped
from dimos.utils.logging_config import setup_logger
from dimos.agents.memory.spatial_vector_db import SpatialVectorDB
from dimos.agents.memory.image_embedding import ImageEmbeddingProvider
Expand Down Expand Up @@ -190,14 +191,21 @@ def set_video(image_msg: Image):
def set_odom(odom_msg: PoseStamped):
self._latest_odom = odom_msg

self.video.subscribe(set_video)
self.odom.subscribe(set_odom)
unsub = self.video.subscribe(set_video)
self._disposables.add(Disposable(unsub))
unsub = self.odom.subscribe(set_odom)
self._disposables.add(Disposable(unsub))

# Start periodic processing using interval
interval(self._process_interval).subscribe(lambda _: self._process_frame())
unsub = interval(self._process_interval).subscribe(lambda _: self._process_frame())
self._disposables.add(Disposable(unsub))

logger.info("SpatialMemory module started and subscribed to LCM streams")

@rpc
def stop(self):
self._close_module()

def _process_frame(self):
"""Process the latest frame with pose data if available."""
if self._latest_video_frame is None or self._latest_odom is None:
Expand Down
18 changes: 14 additions & 4 deletions dimos/robot/unitree_webrtc/unitree_g1.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
from typing import Optional

from dimos import core
from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE, DEFAULT_CAPACITY_DEPTH_IMAGE
from dimos.core import Module, In, Out, rpc
from dimos.msgs.geometry_msgs import PoseStamped, Twist, TwistStamped
from dimos.msgs.nav_msgs.Odometry import Odometry
from dimos.msgs.sensor_msgs import Image, CameraInfo, PointCloud2
from dimos.msgs.sensor_msgs import CameraInfo, PointCloud2
from dimos.msgs.tf2_msgs.TFMessage import TFMessage
from dimos.protocol import pubsub
from dimos.protocol.pubsub.lcmpubsub import LCM
Expand Down Expand Up @@ -241,8 +242,12 @@ def _deploy_camera(self):
)

# Configure ZED LCM transports (same for both real and fake)
self.zed_camera.color_image.transport = core.LCMTransport("/zed/color_image", Image)
self.zed_camera.depth_image.transport = core.LCMTransport("/zed/depth_image", Image)
self.zed_camera.color_image.transport = core.pSHMTransport(
"/zed/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE
)
self.zed_camera.depth_image.transport = core.pSHMTransport(
"/zed/depth_image", default_capacity=DEFAULT_CAPACITY_DEPTH_IMAGE
)
self.zed_camera.camera_info.transport = core.LCMTransport("/zed/camera_info", CameraInfo)
self.zed_camera.pose.transport = core.LCMTransport("/zed/pose", PoseStamped)

Expand All @@ -257,7 +262,12 @@ def _deploy_visualization(self):
# Note: robot_pose connection removed since odom was removed from G1ConnectionModule

# Deploy Foxglove bridge
self.foxglove_bridge = FoxgloveBridge()
self.foxglove_bridge = FoxgloveBridge(
shm_channels=[
"/zed/color_image#sensor_msgs.Image",
"/zed/depth_image#sensor_msgs.Image",
]
)

def _deploy_joystick(self):
"""Deploy joystick control module."""
Expand Down
61 changes: 43 additions & 18 deletions dimos/robot/unitree_webrtc/unitree_go2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from reactivex import Observable

from dimos import core
from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE, DEFAULT_CAPACITY_DEPTH_IMAGE
from dimos.core import In, Module, Out, rpc
from dimos.mapping.types import LatLon
from dimos.msgs.std_msgs import Header
Expand All @@ -45,6 +46,7 @@
from dimos.protocol.pubsub.lcmpubsub import LCM, Topic
from dimos.protocol.tf import TF
from dimos.robot.foxglove_bridge import FoxgloveBridge
from dimos.utils.monitoring import UtilizationModule
from dimos.web.websocket_vis.websocket_vis_module import WebsocketVisModule
from dimos.navigation.global_planner import AstarPlanner
from dimos.navigation.local_planner.holonomic_local_planner import HolonomicLocalPlanner
Expand Down Expand Up @@ -81,18 +83,6 @@
warnings.filterwarnings("ignore", message="coroutine.*was never awaited")
warnings.filterwarnings("ignore", message="H264Decoder.*failed to decode")

"""
Constants for shared memory
Usually, auto-detection for size would be preferred. Sadly, though, channels are made
and frozen *before* the first frame is received.
Therefore, a maximum capacity for color image and depth image transfer should be defined
ahead of time.
"""
# Default color image size: 1920x1080 frame x 3 (RGB) x uint8
DEFAULT_CAPACITY_COLOR_IMAGE = 1920 * 1080 * 3
# Default depth image size: 1280x720 frame * 4 (float32 size)
DEFAULT_CAPACITY_DEPTH_IMAGE = 1280 * 720 * 4


class FakeRTC:
"""Fake WebRTC connection for testing with recorded data."""
Expand Down Expand Up @@ -362,6 +352,7 @@ def __init__(
self.spatial_memory_module = None
self.depth_module = None
self.object_tracker = None
self.utilization_module = None

self._setup_directories()

Expand Down Expand Up @@ -410,6 +401,22 @@ def start(self):
logger.info("UnitreeGo2 initialized and started")
logger.info(f"WebSocket visualization available at http://localhost:{self.websocket_port}")

def stop(self):
# self.connection.stop()
# self.mapper.stop()
# self.global_planner.stop()
# self.local_planner.stop()
# self.navigator.stop()
# self.frontier_explorer.stop()
# self.websocket_vis.stop()
# self.foxglove_bridge.stop()
self.spatial_memory_module.stop()
# self.depth_module.stop()
# self.object_tracker.stop()
self.utilization_module.stop()
self.dimos.close_all()
self.lcm.stop()

def _deploy_connection(self):
"""Deploy and configure the connection module."""
self.connection = self.dimos.deploy(
Expand All @@ -419,7 +426,9 @@ def _deploy_connection(self):
self.connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage)
self.connection.odom.transport = core.LCMTransport("/odom", PoseStamped)
self.connection.gps_location.transport = core.pLCMTransport("/gps_location")
self.connection.video.transport = core.LCMTransport("/go2/color_image", Image)
self.connection.video.transport = core.pSHMTransport(
"/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE
)
self.connection.movecmd.transport = core.LCMTransport("/cmd_vel", Twist)
self.connection.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo)
self.connection.camera_pose.transport = core.LCMTransport("/go2/camera_pose", PoseStamped)
Expand Down Expand Up @@ -496,7 +505,12 @@ def _deploy_visualization(self):
self.websocket_vis.path.connect(self.global_planner.path)
self.websocket_vis.global_costmap.connect(self.mapper.global_costmap)

self.foxglove_bridge = FoxgloveBridge()
self.foxglove_bridge = FoxgloveBridge(
shm_channels=[
"/go2/color_image#sensor_msgs.Image",
"/go2/depth_image#sensor_msgs.Image",
]
)

# TODO: This should be moved.
def _set_goal(goal: LatLon):
Expand All @@ -515,7 +529,9 @@ def _deploy_perception(self):
output_dir=self.spatial_memory_dir,
)

self.spatial_memory_module.video.transport = core.LCMTransport("/go2/color_image", Image)
self.spatial_memory_module.video.transport = core.pSHMTransport(
"/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE
)
self.spatial_memory_module.odom.transport = core.LCMTransport(
"/go2/camera_pose", PoseStamped
)
Expand All @@ -528,6 +544,8 @@ def _deploy_perception(self):
frame_id="camera_link",
)

self.utilization_module = self.dimos.deploy(UtilizationModule)

# Set up transports
self.object_tracker.detection2darray.transport = core.LCMTransport(
"/go2/detection2d", Detection2DArray
Expand All @@ -547,8 +565,12 @@ def _deploy_camera(self):
self.depth_module = self.dimos.deploy(DepthModule, gt_depth_scale=gt_depth_scale)

# Set up transports
self.depth_module.color_image.transport = core.LCMTransport("/go2/color_image", Image)
self.depth_module.depth_image.transport = core.LCMTransport("/go2/depth_image", Image)
self.depth_module.color_image.transport = core.pSHMTransport(
"/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE
)
self.depth_module.depth_image.transport = core.pSHMTransport(
"/go2/depth_image", default_capacity=DEFAULT_CAPACITY_DEPTH_IMAGE
)
self.depth_module.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo)

logger.info("Camera module deployed and connected")
Expand All @@ -573,6 +595,7 @@ def _start_modules(self):
self.spatial_memory_module.start()
self.depth_module.start()
self.object_tracker.start()
self.utilization_module.start()

# Initialize skills after connection is established
if self.skill_library is not None:
Expand Down Expand Up @@ -750,9 +773,11 @@ def main():

try:
while True:
time.sleep(1)
time.sleep(0.1)
except KeyboardInterrupt:
logger.info("Shutting down...")
finally:
robot.stop()


if __name__ == "__main__":
Expand Down
Loading
Loading