diff --git a/.github/workflows/_docker-build-template.yml b/.github/workflows/_docker-build-template.yml index 93b0e05a26..0b5f095fca 100644 --- a/.github/workflows/_docker-build-template.yml +++ b/.github/workflows/_docker-build-template.yml @@ -19,14 +19,10 @@ jobs: packages: write steps: - - name: exit early - if: ${{ !inputs.should-run }} - run: | - exit 0 - name: free up disk space # takes a bit of time, so disabled by default # explicitly enable this for large builds - if: ${{ inputs.freespace }} + if: ${{ inputs.should-run && inputs.freespace }} run: | echo -e "pre cleanup space:\n $(df -h)" sudo rm -rf /opt/ghc @@ -36,8 +32,10 @@ jobs: echo -e "post cleanup space:\n $(df -h)" - uses: actions/checkout@v4 + if: ${{ inputs.should-run }} - uses: docker/login-action@v3 + if: ${{ inputs.should-run }} with: registry: ghcr.io username: ${{ github.actor }} @@ -45,15 +43,18 @@ jobs: # required for github cache of docker layers - uses: crazy-max/ghaction-github-runtime@v3 + if: ${{ inputs.should-run }} # required for github cache of docker layers - uses: docker/setup-buildx-action@v3 + if: ${{ inputs.should-run }} with: driver: docker-container install: true use: true - uses: docker/build-push-action@v6 + if: ${{ inputs.should-run }} with: push: true context: ${{ inputs.context }} diff --git a/.github/workflows/cleanup-runner.yml b/.github/workflows/cleanup-runner.yml new file mode 100644 index 0000000000..f51c69d638 --- /dev/null +++ b/.github/workflows/cleanup-runner.yml @@ -0,0 +1,47 @@ +name: cleanup-runner + +on: + workflow_run: + workflows: ["docker"] + types: [completed] + workflow_dispatch: + +jobs: + cleanup: + runs-on: [self-hosted, Linux] + steps: + - name: Check disk usage + id: disk-check + run: | + USAGE=$(df / | awk 'NR==2 {print $5}' | sed 's/%//') + echo "Disk usage: ${USAGE}%" + echo "usage=${USAGE}" >> $GITHUB_OUTPUT + + - name: Clean Docker images + if: steps.disk-check.outputs.usage > 50 + run: | + echo "=== Docker usage before cleanup ===" + docker system df + + echo -e "\n=== Removing dangling images ===" + docker images -f "dangling=true" -q | xargs -r docker rmi || true + + echo -e "\n=== Docker usage after cleanup ===" + docker system df + + echo -e "\n=== Disk usage after cleanup ===" + df -h / + + - name: Aggressive cleanup if disk critically full + if: steps.disk-check.outputs.usage > 90 + run: | + echo "=== CRITICAL: Disk usage above 90% - Aggressive cleanup ===" + + echo -e "\n=== Removing images older than 3 days ===" + docker image prune -a --filter "until=72h" -f + + echo -e "\n=== Final docker usage ===" + docker system df + + echo -e "\n=== Final disk usage ===" + df -h / \ No newline at end of file diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index f7109ec9c2..1a1d55735e 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -38,8 +38,7 @@ jobs: - .github/workflows/_docker-build-template.yml - .github/workflows/docker.yml - docker/python/** - - requirements*.txt - - requirements.txt + - pyproject.toml dev: - docker/dev/** @@ -77,6 +76,7 @@ jobs: if: needs.check-changes.outputs.ros == 'true' uses: ./.github/workflows/_docker-build-template.yml with: + should-run: true from-image: ubuntu:22.04 to-image: ghcr.io/dimensionalos/ros:${{ needs.check-changes.outputs.branch-tag }} dockerfile: ros @@ -180,3 +180,73 @@ jobs: }} cmd: "pytest -m heavy" dev-image: dev:${{ needs.dev.result == 'success' && needs.check-changes.outputs.branch-tag || 'dev' }} + + run-lcm-tests: + needs: [check-changes, dev] + if: always() + uses: ./.github/workflows/tests.yml + with: + should-run: ${{ + needs.check-changes.result == 'success' && + ((needs.dev.result == 'success') || + (needs.dev.result == 'skipped' && + needs.check-changes.outputs.tests == 'true')) + }} + cmd: "pytest -m lcm" + dev-image: dev:${{ needs.dev.result == 'success' && needs.check-changes.outputs.branch-tag || 'dev' }} + + # Run module tests directly to avoid pytest forking issues + run-module-tests: + needs: [check-changes, dev] + if: always() + uses: ./.github/workflows/tests.yml + with: + should-run: ${{ + needs.check-changes.result == 'success' && + ((needs.dev.result == 'success') || + (needs.dev.result == 'skipped' && + needs.check-changes.outputs.tests == 'true')) + }} + cmd: "export PYTHONPATH=$(pwd):$PYTHONPATH && python dimos/perception/test_spatial_memory_module.py" + dev-image: dev:${{ needs.dev.result == 'success' && needs.check-changes.outputs.branch-tag || 'dev' }} + # TODO: Remove when merge to main as workflow_run needed + cleanup-runner: + needs: [run-tests, run-heavy-tests, run-ros-tests] + if: always() + runs-on: [self-hosted, Linux] + steps: + - name: Check disk usage + id: disk-check + run: | + USAGE=$(df / | awk 'NR==2 {print $5}' | sed 's/%//') + echo "Disk usage: ${USAGE}%" + echo "usage=${USAGE}" >> $GITHUB_OUTPUT + + - name: Clean Docker images + if: steps.disk-check.outputs.usage > 50 + run: | + echo "=== Docker usage before cleanup ===" + docker system df + + echo -e "\n=== Removing dangling images ===" + docker images -f "dangling=true" -q | xargs -r docker rmi || true + + echo -e "\n=== Docker usage after cleanup ===" + docker system df + + echo -e "\n=== Disk usage after cleanup ===" + df -h / + + - name: Aggressive cleanup if disk critically full + if: steps.disk-check.outputs.usage > 90 + run: | + echo "=== CRITICAL: Disk usage above 90% - Aggressive cleanup ===" + + echo -e "\n=== Removing images older than 3 days ===" + docker image prune -a --filter "until=72h" -f + + echo -e "\n=== Final docker usage ===" + docker system df + + echo -e "\n=== Final disk usage ===" + df -h / diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b0d17d374b..df1a38d65e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -47,10 +47,18 @@ jobs: sudo chown -R $USER:$USER ${{ github.workspace }} || true - uses: actions/checkout@v4 + with: + lfs: true + + - name: Configure Git LFS + run: | + git config --global --add safe.directory '*' + git lfs install + git lfs fetch + git lfs checkout - name: Run tests run: | - git config --global --add safe.directory '*' /entrypoint.sh bash -c "${{ inputs.cmd }}" - name: check disk space diff --git a/dimos/perception/spatial_perception.py b/dimos/perception/spatial_perception.py index 3572a50dfb..099c0d18c5 100644 --- a/dimos/perception/spatial_perception.py +++ b/dimos/perception/spatial_perception.py @@ -38,6 +38,8 @@ logger = setup_logger("dimos.perception.spatial_memory") +print("") + class SpatialMemory(Module): """ @@ -170,7 +172,7 @@ def __init__( # Track latest data for processing self._latest_video_frame: Optional[np.ndarray] = None self._latest_odom: Optional[Odometry] = None - self._process_interval = 0.1 # Process at 10Hz + self._process_interval = 1 logger.info(f"SpatialMemory initialized with model {embedding_model}") diff --git a/dimos/perception/test_spatial_memory_module.py b/dimos/perception/test_spatial_memory_module.py index 40b86d0ef1..33c630e9b7 100644 --- a/dimos/perception/test_spatial_memory_module.py +++ b/dimos/perception/test_spatial_memory_module.py @@ -34,6 +34,7 @@ from dimos.utils.data import get_data from dimos.utils.testing import TimedSensorReplay from dimos.utils.logging_config import setup_logger +from unittest.mock import patch, MagicMock import warnings logger = setup_logger("test_spatial_memory_module") @@ -61,7 +62,8 @@ def start(self): self._subscription = ( video_replay.stream() .pipe( - ops.sample(0.1) # Limit to 10 FPS for testing + ops.sample(2), # Sample every 2 seconds for resource-constrained systems + ops.take(5), # Only take 5 frames total ) .subscribe(self.video_out.publish) ) @@ -94,7 +96,14 @@ def start(self): odom_replay = TimedSensorReplay(self.odom_path, autocast=Odometry.from_msg) # Subscribe to the replay stream and publish to LCM - self._subscription = odom_replay.stream().subscribe(self.odom_out.publish) + self._subscription = ( + odom_replay.stream() + .pipe( + ops.sample(0.5), # Sample every 500ms + ops.take(10), # Only take 10 odometry updates total + ) + .subscribe(self.odom_out.publish) + ) logger.info("OdometryReplayModule started") @@ -107,7 +116,7 @@ def stop(self): logger.info("OdometryReplayModule stopped") -@pytest.mark.heavy +@pytest.mark.skip(reason="Run directly with python") class TestSpatialMemoryModule: @pytest.fixture(scope="function") def temp_dir(self): @@ -167,7 +176,7 @@ async def test_spatial_memory_module_with_replay(self, temp_dir): # Wait for some frames to be processed logger.info("Waiting for frames to be processed...") - await asyncio.sleep(5) # Process for 5 seconds + await asyncio.sleep(3) # Stop the replay modules to prevent infinite streaming video_module.stop() @@ -210,4 +219,8 @@ async def test_spatial_memory_module_with_replay(self, temp_dir): if __name__ == "__main__": - pytest.main(["-v", "-s", __file__]) + # pytest.main(["-v", "-s", __file__]) + test = TestSpatialMemoryModule() + asyncio.run( + test.test_spatial_memory_module_with_replay(tempfile.mkdtemp(prefix="spatial_memory_test_")) + ) diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py index 5f8c747864..b9ea1e2333 100644 --- a/dimos/protocol/service/lcmservice.py +++ b/dimos/protocol/service/lcmservice.py @@ -14,18 +14,21 @@ from __future__ import annotations +import os import subprocess import sys import threading import traceback -import os -from functools import cache from dataclasses import dataclass -from typing import Any, Callable, Optional, Protocol, runtime_checkable +from functools import cache +from typing import Optional, Protocol, runtime_checkable import lcm from dimos.protocol.service.spec import Service +from dimos.utils.logging_config import setup_logger + +logger = setup_logger("dimos.protocol.service.lcmservice") @cache @@ -65,71 +68,114 @@ def check_multicast() -> list[str]: return commands_needed -def check_buffers() -> list[str]: - """Check if buffer configuration is needed and return required commands.""" +def check_buffers() -> tuple[list[str], Optional[int]]: + """Check if buffer configuration is needed and return required commands and current size. + + Returns: + Tuple of (commands_needed, current_max_buffer_size) + """ commands_needed = [] + current_max = None sudo = "" if check_root() else "sudo " # Check current buffer settings try: result = subprocess.run(["sysctl", "net.core.rmem_max"], capture_output=True, text=True) - current_max = int(result.stdout.split("=")[1].strip()) - if current_max < 2097152: + current_max = int(result.stdout.split("=")[1].strip()) if result.returncode == 0 else None + if not current_max or current_max < 2097152: commands_needed.append(f"{sudo}sysctl -w net.core.rmem_max=2097152") - except Exception: + except: commands_needed.append(f"{sudo}sysctl -w net.core.rmem_max=2097152") try: result = subprocess.run(["sysctl", "net.core.rmem_default"], capture_output=True, text=True) - current_default = int(result.stdout.split("=")[1].strip()) - if current_default < 2097152: + current_default = ( + int(result.stdout.split("=")[1].strip()) if result.returncode == 0 else None + ) + if not current_default or current_default < 2097152: commands_needed.append(f"{sudo}sysctl -w net.core.rmem_default=2097152") - except Exception: + except: commands_needed.append(f"{sudo}sysctl -w net.core.rmem_default=2097152") - return commands_needed + return commands_needed, current_max def check_system() -> None: - """Check if system configuration is needed and exit with required commands if not prepared.""" - commands_needed = [] - commands_needed.extend(check_multicast()) - commands_needed.extend(check_buffers()) - - if commands_needed: - print("System configuration required. Please run the following commands:") - for cmd in commands_needed: - print(f" {cmd}") - print("\nThen restart your application.") + """Check if system configuration is needed and exit only for critical issues. + + Multicast configuration is critical for LCM to work. + Buffer sizes are performance optimizations - warn but don't fail in containers. + """ + multicast_commands = check_multicast() + buffer_commands, current_buffer_size = check_buffers() + + # Check multicast first - this is critical + if multicast_commands: + logger.error( + "Critical: Multicast configuration required. Please run the following commands:" + ) + for cmd in multicast_commands: + logger.error(f" {cmd}") + logger.error("\nThen restart your application.") sys.exit(1) + # Buffer configuration is just for performance + elif buffer_commands: + if current_buffer_size: + logger.warning( + f"UDP buffer size limited to {current_buffer_size} bytes ({current_buffer_size // 1024}KB). Large LCM packets may fail." + ) + else: + logger.warning("UDP buffer sizes are limited. Large LCM packets may fail.") + logger.warning("For better performance, consider running:") + for cmd in buffer_commands: + logger.warning(f" {cmd}") + logger.warning("Note: This may not be possible in Docker containers.") + def autoconf() -> None: """Auto-configure system by running checks and executing required commands if needed.""" commands_needed = [] + + # Check multicast configuration commands_needed.extend(check_multicast()) - commands_needed.extend(check_buffers()) + + # Check buffer configuration + buffer_commands, _ = check_buffers() + commands_needed.extend(buffer_commands) if not commands_needed: return - print("System configuration required. Executing commands...") + logger.info("System configuration required. Executing commands...") + for cmd in commands_needed: - print(f" Running: {cmd}") + logger.info(f" Running: {cmd}") try: # Split command into parts for subprocess cmd_parts = cmd.split() - result = subprocess.run(cmd_parts, capture_output=True, text=True, check=True) - print(" ✓ Success") + subprocess.run(cmd_parts, capture_output=True, text=True, check=True) + logger.info(" ✓ Success") except subprocess.CalledProcessError as e: - print(f" ✗ Failed: {e}") - print(f" stdout: {e.stdout}") - print(f" stderr: {e.stderr}") + # Check if this is a multicast/route command or a sysctl command + if "route" in cmd or "multicast" in cmd: + # Multicast/route failures should still fail + logger.error(f" ✗ Failed to configure multicast: {e}") + logger.error(f" stdout: {e.stdout}") + logger.error(f" stderr: {e.stderr}") + raise + elif "sysctl" in cmd: + # Sysctl failures are just warnings (likely docker/container) + logger.warning( + f" ✗ Not able to auto-configure UDP buffer sizes (likely docker image): {e}" + ) except Exception as e: - print(f" ✗ Error: {e}") + logger.error(f" ✗ Error: {e}") + if "route" in cmd or "multicast" in cmd: + raise - print("System configuration completed.") + logger.info("System configuration completed.") @dataclass diff --git a/dimos/protocol/service/test_lcmservice.py b/dimos/protocol/service/test_lcmservice.py index c5b86cac35..7e2f5e8147 100644 --- a/dimos/protocol/service/test_lcmservice.py +++ b/dimos/protocol/service/test_lcmservice.py @@ -145,8 +145,9 @@ def test_check_buffers_all_configured(): )(), ] - result = check_buffers() - assert result == [] + commands, buffer_size = check_buffers() + assert commands == [] + assert buffer_size == 2097152 def test_check_buffers_low_max_buffer(): @@ -160,9 +161,10 @@ def test_check_buffers_low_max_buffer(): )(), ] - result = check_buffers() + commands, buffer_size = check_buffers() sudo = get_sudo_prefix() - assert result == [f"{sudo}sysctl -w net.core.rmem_max=2097152"] + assert commands == [f"{sudo}sysctl -w net.core.rmem_max=2097152"] + assert buffer_size == 1048576 def test_check_buffers_low_default_buffer(): @@ -176,9 +178,10 @@ def test_check_buffers_low_default_buffer(): )(), ] - result = check_buffers() + commands, buffer_size = check_buffers() sudo = get_sudo_prefix() - assert result == [f"{sudo}sysctl -w net.core.rmem_default=2097152"] + assert commands == [f"{sudo}sysctl -w net.core.rmem_default=2097152"] + assert buffer_size == 2097152 def test_check_buffers_both_low(): @@ -192,13 +195,14 @@ def test_check_buffers_both_low(): )(), ] - result = check_buffers() + commands, buffer_size = check_buffers() sudo = get_sudo_prefix() expected = [ f"{sudo}sysctl -w net.core.rmem_max=2097152", f"{sudo}sysctl -w net.core.rmem_default=2097152", ] - assert result == expected + assert commands == expected + assert buffer_size == 1048576 def test_check_buffers_subprocess_exception(): @@ -207,13 +211,14 @@ def test_check_buffers_subprocess_exception(): # Mock subprocess exceptions mock_run.side_effect = Exception("Command failed") - result = check_buffers() + commands, buffer_size = check_buffers() sudo = get_sudo_prefix() expected = [ f"{sudo}sysctl -w net.core.rmem_max=2097152", f"{sudo}sysctl -w net.core.rmem_default=2097152", ] - assert result == expected + assert commands == expected + assert buffer_size is None def test_check_buffers_parsing_error(): @@ -225,13 +230,47 @@ def test_check_buffers_parsing_error(): type("MockResult", (), {"stdout": "also invalid", "returncode": 0})(), ] - result = check_buffers() + commands, buffer_size = check_buffers() sudo = get_sudo_prefix() expected = [ f"{sudo}sysctl -w net.core.rmem_max=2097152", f"{sudo}sysctl -w net.core.rmem_default=2097152", ] - assert result == expected + assert commands == expected + assert buffer_size is None + + +def test_check_buffers_dev_container(): + """Test check_buffers in dev container where sysctl fails.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock dev container behavior - sysctl returns non-zero + mock_run.side_effect = [ + type( + "MockResult", + (), + { + "stdout": "sysctl: cannot stat /proc/sys/net/core/rmem_max: No such file or directory", + "returncode": 255, + }, + )(), + type( + "MockResult", + (), + { + "stdout": "sysctl: cannot stat /proc/sys/net/core/rmem_default: No such file or directory", + "returncode": 255, + }, + )(), + ] + + commands, buffer_size = check_buffers() + sudo = get_sudo_prefix() + expected = [ + f"{sudo}sysctl -w net.core.rmem_max=2097152", + f"{sudo}sysctl -w net.core.rmem_default=2097152", + ] + assert commands == expected + assert buffer_size is None def test_autoconf_no_config_needed(): @@ -256,10 +295,12 @@ def test_autoconf_no_config_needed(): )(), ] - with patch("builtins.print") as mock_print: + with patch("dimos.protocol.service.lcmservice.logger") as mock_logger: autoconf() - # Should not print anything when no config is needed - mock_print.assert_not_called() + # Should not log anything when no config is needed + mock_logger.info.assert_not_called() + mock_logger.error.assert_not_called() + mock_logger.warning.assert_not_called() def test_autoconf_with_config_needed_success(): @@ -288,26 +329,27 @@ def test_autoconf_with_config_needed_success(): type("MockResult", (), {"stdout": "success", "returncode": 0})(), # sysctl rmem_default ] - with patch("builtins.print") as mock_print: + from unittest.mock import call + + with patch("dimos.protocol.service.lcmservice.logger") as mock_logger: autoconf() sudo = get_sudo_prefix() - # Verify the expected print calls - expected_calls = [ - ("System configuration required. Executing commands...",), - (f" Running: {sudo}ifconfig lo multicast",), - (" ✓ Success",), - (f" Running: {sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo",), - (" ✓ Success",), - (f" Running: {sudo}sysctl -w net.core.rmem_max=2097152",), - (" ✓ Success",), - (f" Running: {sudo}sysctl -w net.core.rmem_default=2097152",), - (" ✓ Success",), - ("System configuration completed.",), + # Verify the expected log calls + expected_info_calls = [ + call("System configuration required. Executing commands..."), + call(f" Running: {sudo}ifconfig lo multicast"), + call(" ✓ Success"), + call(f" Running: {sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo"), + call(" ✓ Success"), + call(f" Running: {sudo}sysctl -w net.core.rmem_max=2097152"), + call(" ✓ Success"), + call(f" Running: {sudo}sysctl -w net.core.rmem_default=2097152"), + call(" ✓ Success"), + call("System configuration completed."), ] - from unittest.mock import call - mock_print.assert_has_calls([call(*args) for args in expected_calls]) + mock_logger.info.assert_has_calls(expected_info_calls) def test_autoconf_with_command_failures(): @@ -340,12 +382,17 @@ def test_autoconf_with_command_failures(): ), ] - with patch("builtins.print") as mock_print: - autoconf() + with patch("dimos.protocol.service.lcmservice.logger") as mock_logger: + # The function should raise on multicast/route failures + with pytest.raises(subprocess.CalledProcessError): + autoconf() + + # Verify it logged the failure before raising + info_calls = [call[0][0] for call in mock_logger.info.call_args_list] + error_calls = [call[0][0] for call in mock_logger.error.call_args_list] - # Verify it handles the failure gracefully - print_calls = [call[0][0] for call in mock_print.call_args_list] - assert "System configuration required. Executing commands..." in print_calls - assert " ✓ Success" in print_calls # First command succeeded - assert any("✗ Failed" in call for call in print_calls) # Second command failed - assert "System configuration completed." in print_calls + assert "System configuration required. Executing commands..." in info_calls + assert " ✓ Success" in info_calls # First command succeeded + assert any( + "✗ Failed to configure multicast" in call for call in error_calls + ) # Second command failed diff --git a/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py index 40c0cdca33..e6fd203658 100644 --- a/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2.py @@ -19,7 +19,7 @@ import threading import time import warnings -from typing import Callable +from typing import Callable, Optional from reactivex import Observable from reactivex import operators as ops @@ -27,9 +27,9 @@ import dimos.core.colors as colors from dimos import core from dimos.core import In, Module, Out, rpc -from dimos.msgs.foxglove_msgs import Arrow -from dimos.msgs.geometry_msgs import Pose, PoseStamped, Twist, Vector3 +from dimos.msgs.geometry_msgs import Pose, PoseStamped, Vector3 from dimos.msgs.sensor_msgs import Image +from dimos.perception.spatial_perception import SpatialMemory from dimos.protocol import pubsub from dimos.robot.foxglove_bridge import FoxgloveBridge from dimos.robot.frontier_exploration.wavefront_frontier_goal_selector import ( @@ -99,7 +99,7 @@ def move(self, vector: Vector): print("move supressed", vector) -class ConnectionModule(FakeRTC, Module): +class ConnectionModule(UnitreeWebRTCConnection, Module): movecmd: In[Vector3] = None odom: Out[Vector3] = None lidar: Out[LidarMessage] = None @@ -162,7 +162,12 @@ def plancmd(): class UnitreeGo2Light: - def __init__(self, ip: str): + def __init__( + self, + ip: str, + output_dir: str = os.path.join(os.getcwd(), "assets", "output"), + ): + self.output_dir = output_dir self.ip = ip self.dimos = None self.connection = None @@ -173,6 +178,28 @@ def __init__(self, ip: str): self.foxglove_bridge = None self.ctrl = None + # Spatial Memory Initialization ====================================== + # Create output directory + os.makedirs(self.output_dir, exist_ok=True) + logger.info(f"Robot outputs will be saved to: {self.output_dir}") + + # Initialize memory directories + self.memory_dir = os.path.join(self.output_dir, "memory") + os.makedirs(self.memory_dir, exist_ok=True) + + # Initialize spatial memory properties + self.spatial_memory_dir = os.path.join(self.memory_dir, "spatial_memory") + self.spatial_memory_collection = "spatial_memory" + self.db_path = os.path.join(self.spatial_memory_dir, "chromadb_data") + self.visual_memory_path = os.path.join(self.spatial_memory_dir, "visual_memory.pkl") + + # Create spatial memory directory + os.makedirs(self.spatial_memory_dir, exist_ok=True) + os.makedirs(self.db_path, exist_ok=True) + + self.spatial_memory_module = None + # ============================================================== + async def start(self): self.dimos = core.start(4) @@ -226,6 +253,25 @@ async def start(self): set_local_nav=self.local_planner.navigate_path_local, ) + # Spatial Memory Module ====================================== + self.spatial_memory_module = self.dimos.deploy( + SpatialMemory, + collection_name=self.spatial_memory_collection, + db_path=self.db_path, + visual_memory_path=self.visual_memory_path, + output_dir=self.spatial_memory_dir, + ) + + # Connect video and odometry streams to spatial memory + self.spatial_memory_module.video.connect(self.connection.video) + self.spatial_memory_module.odom.connect(self.connection.odom) + + # Start the spatial memory module + self.spatial_memory_module.start() + + logger.info("Spatial memory module deployed and connected") + # ============================================================== + # Configure AstarPlanner OUTPUT path: Out[Path] to /global_path LCM topic self.global_planner.path.transport = core.pLCMTransport("/global_path") # ====================================== @@ -338,6 +384,15 @@ def costmap(self): raise RuntimeError("Mapper not initialized. Call start() first.") return self.mapper.costmap + @property + def spatial_memory(self) -> Optional[SpatialMemory]: + """Get the robot's spatial memory module. + + Returns: + SpatialMemory module instance or None if perception is disabled + """ + return self.spatial_memory_module + def get_video_stream(self, fps: int = 30) -> Observable: """Get the video stream with rate limiting and processing. diff --git a/dimos/robot/unitree_webrtc/multiprocess/unitree_go2_heavy.py b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2_heavy.py index 87517a6e52..44d7976324 100644 --- a/dimos/robot/unitree_webrtc/multiprocess/unitree_go2_heavy.py +++ b/dimos/robot/unitree_webrtc/multiprocess/unitree_go2_heavy.py @@ -14,22 +14,20 @@ """Heavy version of Unitree Go2 with GPU-required modules.""" -import os import asyncio -from typing import Optional, List +from typing import List, Optional + import numpy as np -from reactivex import Observable, operators as ops +from reactivex import Observable from reactivex.disposable import CompositeDisposable from reactivex.scheduler import ThreadPoolScheduler -from dimos.robot.unitree_webrtc.multiprocess.unitree_go2 import UnitreeGo2Light -from dimos.perception.spatial_perception import SpatialMemory -from dimos.perception.person_tracker import PersonTrackingStream from dimos.perception.object_tracker import ObjectTrackingStream -from dimos.skills.skills import SkillLibrary, AbstractRobotSkill +from dimos.perception.person_tracker import PersonTrackingStream +from dimos.robot.unitree_webrtc.multiprocess.unitree_go2 import UnitreeGo2Light from dimos.robot.unitree_webrtc.unitree_skills import MyUnitreeSkills +from dimos.skills.skills import AbstractRobotSkill, SkillLibrary from dimos.types.robot_capabilities import RobotCapability -from dimos.types.vector import Vector from dimos.utils.logging_config import setup_logger from dimos.utils.threadpool import get_scheduler @@ -50,7 +48,6 @@ class UnitreeGo2Heavy(UnitreeGo2Light): def __init__( self, ip: str, - output_dir: str = os.path.join(os.getcwd(), "assets", "output"), skill_library: Optional[SkillLibrary] = None, robot_capabilities: Optional[List[RobotCapability]] = None, spatial_memory_collection: str = "spatial_memory", @@ -72,7 +69,6 @@ def __init__( """ super().__init__(ip) - self.output_dir = output_dir self.enable_perception = enable_perception self.disposables = CompositeDisposable() self.pool_scheduler = pool_scheduler if pool_scheduler else get_scheduler() @@ -84,24 +80,6 @@ def __init__( RobotCapability.AUDIO, ] - # Create output directory - os.makedirs(self.output_dir, exist_ok=True) - logger.info(f"Robot outputs will be saved to: {self.output_dir}") - - # Initialize memory directories - self.memory_dir = os.path.join(self.output_dir, "memory") - os.makedirs(self.memory_dir, exist_ok=True) - - # Initialize spatial memory properties - self.spatial_memory_dir = os.path.join(self.memory_dir, "spatial_memory") - self.spatial_memory_collection = spatial_memory_collection - self.db_path = os.path.join(self.spatial_memory_dir, "chromadb_data") - self.visual_memory_path = os.path.join(self.spatial_memory_dir, "visual_memory.pkl") - - # Create spatial memory directory - os.makedirs(self.spatial_memory_dir, exist_ok=True) - os.makedirs(self.db_path, exist_ok=True) - # Camera configuration for Unitree Go2 self.camera_intrinsics = [819.553492, 820.646595, 625.284099, 336.808987] self.camera_pitch = np.deg2rad(0) # negative for downward pitch @@ -113,7 +91,6 @@ def __init__( self.skill_library = skill_library # Initialize spatial memory module (will be deployed after connection is established) - self.spatial_memory_module = None self._video_stream = None self.new_memory = new_memory @@ -133,26 +110,7 @@ async def start(self): # Now we have connection publishing to LCM, initialize video stream self._video_stream = self.get_video_stream(fps=10) # Lower FPS for processing - # Deploy Spatial Memory Module if perception is enabled if self.enable_perception: - self.spatial_memory_module = self.dimos.deploy( - SpatialMemory, - collection_name=self.spatial_memory_collection, - db_path=self.db_path, - visual_memory_path=self.visual_memory_path, - new_memory=self.new_memory, - output_dir=self.spatial_memory_dir, - ) - - # Connect video and odometry streams to spatial memory - self.spatial_memory_module.video.connect(self.connection.video) - self.spatial_memory_module.odom.connect(self.connection.odom) - - # Start the spatial memory module - self.spatial_memory_module.start() - - logger.info("Spatial memory module deployed and connected") - # Initialize person and object tracking self.person_tracker = PersonTrackingStream( camera_intrinsics=self.camera_intrinsics, @@ -185,15 +143,6 @@ async def start(self): logger.info("UnitreeGo2Heavy initialized with all modules") - @property - def spatial_memory(self) -> Optional[SpatialMemory]: - """Get the robot's spatial memory module. - - Returns: - SpatialMemory module instance or None if perception is disabled - """ - return self.spatial_memory_module - @property def video_stream(self) -> Optional[Observable]: """Get the robot's video stream. diff --git a/dimos/utils/cli/test_lcmspy.py b/dimos/utils/cli/test_lcmspy.py index a77bb03d20..1d2ad16115 100644 --- a/dimos/utils/cli/test_lcmspy.py +++ b/dimos/utils/cli/test_lcmspy.py @@ -17,18 +17,15 @@ import pytest from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic -from dimos.protocol.service.lcmservice import autoconf from dimos.utils.cli.lcmspy import GraphLCMSpy, GraphTopic, LCMSpy -autoconf() - @pytest.mark.lcm def test_spy_basic(): lcm = PickleLCM(autoconf=True) lcm.start() - lcmspy = LCMSpy() + lcmspy = LCMSpy(autoconf=True) lcmspy.start() video_topic = Topic(topic="/video") @@ -152,7 +149,7 @@ def test_graph_topic_basic(): @pytest.mark.lcm def test_graph_lcmspy_basic(): """Test GraphLCMSpy basic functionality""" - spy = GraphLCMSpy(graph_log_window=0.1) + spy = GraphLCMSpy(autoconf=True, graph_log_window=0.1) spy.start() time.sleep(0.2) # Wait for thread to start @@ -172,7 +169,7 @@ def test_graph_lcmspy_basic(): @pytest.mark.lcm def test_lcmspy_global_totals(): """Test that LCMSpy tracks global totals as a Topic itself""" - spy = LCMSpy() + spy = LCMSpy(autoconf=True) spy.start() # Send messages to different topics @@ -202,7 +199,7 @@ def test_lcmspy_global_totals(): @pytest.mark.lcm def test_graph_lcmspy_global_totals(): """Test that GraphLCMSpy tracks global totals with history""" - spy = GraphLCMSpy(graph_log_window=0.1) + spy = GraphLCMSpy(autoconf=True, graph_log_window=0.1) spy.start() time.sleep(0.2) diff --git a/docker/dev/Dockerfile b/docker/dev/Dockerfile index 514f0b01c6..33a0612eb6 100644 --- a/docker/dev/Dockerfile +++ b/docker/dev/Dockerfile @@ -24,10 +24,17 @@ RUN apt-get update && apt-get install -y \ # Configure git to trust any directory (resolves dubious ownership issues in containers) RUN git config --global --add safe.directory '*' -COPY . /app/ WORKDIR /app + +COPY pyproject.toml /app/ +RUN mkdir -p /app/dimos +RUN touch /app/dimos/__init__.py + +# Install dependencies RUN --mount=type=cache,target=/root/.cache/pip pip install .[dev] +COPY . /app/ + # Copy files and add version to motd COPY /assets/dimensionalascii.txt /etc/motd COPY /docker/dev/bash.sh /root/.bash.sh diff --git a/docker/python/Dockerfile b/docker/python/Dockerfile index f8d06496b4..9b89c5ab62 100644 --- a/docker/python/Dockerfile +++ b/docker/python/Dockerfile @@ -34,10 +34,13 @@ RUN apt-get install -y \ # Fix distutils-installed packages that block pip upgrades RUN apt-get purge -y python3-blinker python3-sympy python3-oauthlib || true -RUN mkdir -p /app/dimos +WORKDIR /app -COPY . /app/ +COPY pyproject.toml /app/ +RUN mkdir -p /app/dimos +RUN touch /app/dimos/__init__.py -WORKDIR /app +# Install dependencies +RUN --mount=type=cache,target=/root/.cache/pip bash -c "pip install --upgrade 'pip>=24' 'setuptools>=70' 'wheel' 'packaging>=24' && pip install '.[cpu]'" -RUN --mount=type=cache,target=/root/.cache/pip bash -c "pip install --upgrade 'pip>=24' 'setuptools>=70' 'wheel' 'packaging>=24' && pip install '.[cpu]'" \ No newline at end of file +COPY . /app/ \ No newline at end of file