diff --git a/bin/lfs_push b/bin/lfs_push index 7de1b5ad8e..68b1326e49 100755 --- a/bin/lfs_push +++ b/bin/lfs_push @@ -68,7 +68,7 @@ for dir_path in data/*; do compressed_dirs+=("$dir_name") # Add the compressed file to git LFS tracking - git add "$compressed_file" + git add -f "$compressed_file" echo -e " ${GREEN}✓${NC} git-add $compressed_file" diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 5bdb10d604..e5b70a2319 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -54,9 +54,9 @@ def __str__(self) -> str: class pLCMTransport(PubSubTransport[T]): _started: bool = False - def __init__(self, topic: str): + def __init__(self, topic: str, **kwargs): super().__init__(topic) - self.lcm = pickleLCM() + self.lcm = pickleLCM(**kwargs) def __reduce__(self): return (pLCMTransport, (self.topic,)) @@ -78,9 +78,9 @@ def subscribe(self, selfstream: In[T], callback: Callable[[T], None]) -> None: class LCMTransport(PubSubTransport[T]): _started: bool = False - def __init__(self, topic: str, type: type): + def __init__(self, topic: str, type: type, **kwargs): super().__init__(LCMTopic(topic, type)) - self.lcm = LCM() + self.lcm = LCM(**kwargs) def __reduce__(self): return (LCMTransport, (self.topic.topic, self.topic.lcm_type)) diff --git a/dimos/protocol/pubsub/__init__.py b/dimos/protocol/pubsub/__init__.py index 7381d8f2f5..89bd292fda 100644 --- a/dimos/protocol/pubsub/__init__.py +++ b/dimos/protocol/pubsub/__init__.py @@ -1,2 +1,3 @@ +import dimos.protocol.pubsub.lcmpubsub as lcm from dimos.protocol.pubsub.memory import Memory from dimos.protocol.pubsub.spec import PubSub diff --git a/dimos/protocol/pubsub/lcmpubsub.py b/dimos/protocol/pubsub/lcmpubsub.py index cc87e03c64..551c936223 100644 --- a/dimos/protocol/pubsub/lcmpubsub.py +++ b/dimos/protocol/pubsub/lcmpubsub.py @@ -14,7 +14,8 @@ from __future__ import annotations -import os +import subprocess +import sys import threading import traceback from dataclasses import dataclass @@ -26,13 +27,101 @@ from dimos.protocol.service.spec import Service +def check_multicast() -> list[str]: + """Check if multicast configuration is needed and return required commands.""" + commands_needed = [] + + # Check if loopback interface has multicast enabled + try: + result = subprocess.run(["ip", "link", "show", "lo"], capture_output=True, text=True) + if "MULTICAST" not in result.stdout: + commands_needed.append("sudo ifconfig lo multicast") + except Exception: + commands_needed.append("sudo ifconfig lo multicast") + + # Check if multicast route exists + try: + result = subprocess.run( + ["ip", "route", "show", "224.0.0.0/4"], capture_output=True, text=True + ) + if not result.stdout.strip(): + commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") + except Exception: + commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") + + return commands_needed + + +def check_buffers() -> list[str]: + """Check if buffer configuration is needed and return required commands.""" + commands_needed = [] + + # Check current buffer settings + try: + result = subprocess.run(["sysctl", "net.core.rmem_max"], capture_output=True, text=True) + current_max = int(result.stdout.split("=")[1].strip()) + if current_max < 2097152: + commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152") + except Exception: + commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152") + + try: + result = subprocess.run(["sysctl", "net.core.rmem_default"], capture_output=True, text=True) + current_default = int(result.stdout.split("=")[1].strip()) + if current_default < 2097152: + commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152") + except Exception: + commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152") + + return commands_needed + + +def check_system() -> None: + """Check if system configuration is needed and exit with required commands if not prepared.""" + commands_needed = [] + commands_needed.extend(check_multicast()) + commands_needed.extend(check_buffers()) + + if commands_needed: + print("System configuration required. Please run the following commands:") + for cmd in commands_needed: + print(f" {cmd}") + print("\nThen restart your application.") + sys.exit(1) + + +def autoconf() -> None: + """Auto-configure system by running checks and executing required commands if needed.""" + commands_needed = [] + commands_needed.extend(check_multicast()) + commands_needed.extend(check_buffers()) + + if not commands_needed: + return + + print("System configuration required. Executing commands...") + for cmd in commands_needed: + print(f" Running: {cmd}") + try: + # Split command into parts for subprocess + cmd_parts = cmd.split() + result = subprocess.run(cmd_parts, capture_output=True, text=True, check=True) + print(" ✓ Success") + except subprocess.CalledProcessError as e: + print(f" ✗ Failed: {e}") + print(f" stdout: {e.stdout}") + print(f" stderr: {e.stderr}") + except Exception as e: + print(f" ✗ Error: {e}") + + print("System configuration completed.") + + @dataclass class LCMConfig: ttl: int = 0 url: str | None = None - # auto configure routing - auto_configure_multicast: bool = True - auto_configure_buffers: bool = False + autoconf: bool = False @runtime_checkable @@ -89,20 +178,13 @@ def unsubscribe(): return unsubscribe def start(self): - # TODO: proper error handling/log messages for these system calls - if self.config.auto_configure_multicast: - try: - os.system("sudo ifconfig lo multicast") - os.system("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") - except Exception as e: - print(f"Error configuring multicast: {e}") - - if self.config.auto_configure_buffers: + if self.config.autoconf: + autoconf() + else: try: - os.system("sudo sysctl -w net.core.rmem_max=2097152") - os.system("sudo sysctl -w net.core.rmem_default=2097152") + check_system() except Exception as e: - print(f"Error configuring buffers: {e}") + print(f"Error checking system configuration: {e}") self._stop_event.clear() self._thread = threading.Thread(target=self._loop) diff --git a/dimos/protocol/pubsub/redis.py b/dimos/protocol/pubsub/redispubsub.py similarity index 100% rename from dimos/protocol/pubsub/redis.py rename to dimos/protocol/pubsub/redispubsub.py diff --git a/dimos/protocol/pubsub/test_lcmpubsub.py b/dimos/protocol/pubsub/test_lcmpubsub.py index 3766e2f449..456c647cd4 100644 --- a/dimos/protocol/pubsub/test_lcmpubsub.py +++ b/dimos/protocol/pubsub/test_lcmpubsub.py @@ -12,12 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import subprocess import time +from unittest.mock import patch import pytest from dimos.msgs.geometry_msgs import Pose, Quaternion, Vector3 -from dimos.protocol.pubsub.lcmpubsub import LCM, LCMbase, Topic, pickleLCM +from dimos.protocol.pubsub.lcmpubsub import ( + LCM, + LCMbase, + Topic, + autoconf, + check_buffers, + check_multicast, + pickleLCM, +) class MockLCMMessage: @@ -40,7 +50,7 @@ def __eq__(self, other): def test_lcmbase_pubsub(): - lcm = LCMbase() + lcm = LCMbase(autoconf=True) lcm.start() received_messages = [] @@ -70,7 +80,7 @@ def callback(msg, topic): def test_lcm_autodecoder_pubsub(): - lcm = LCM() + lcm = LCM(autoconf=True) lcm.start() received_messages = [] @@ -109,7 +119,7 @@ def callback(msg, topic): # passes some geometry types through LCM @pytest.mark.parametrize("test_message", test_msgs) def test_lcm_geometry_msgs_pubsub(test_message): - lcm = LCM() + lcm = LCM(autoconf=True) lcm.start() received_messages = [] @@ -143,7 +153,7 @@ def callback(msg, topic): # passes some geometry types through pickle LCM @pytest.mark.parametrize("test_message", test_msgs) def test_lcm_geometry_msgs_autopickle_pubsub(test_message): - lcm = pickleLCM() + lcm = pickleLCM(autoconf=True) lcm.start() received_messages = [] @@ -172,3 +182,341 @@ def callback(msg, topic): assert received_topic == topic print(test_message, topic) + + +class TestSystemChecks: + """Test suite for system configuration check functions.""" + + def test_check_multicast_all_configured(self): + """Test check_multicast when system is properly configured.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock successful checks with realistic output format + mock_run.side_effect = [ + type( + "MockResult", + (), + { + "stdout": "1: lo: mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000\n link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00", + "returncode": 0, + }, + )(), + type( + "MockResult", (), {"stdout": "224.0.0.0/4 dev lo scope link", "returncode": 0} + )(), + ] + + result = check_multicast() + assert result == [] + + def test_check_multicast_missing_multicast_flag(self): + """Test check_multicast when loopback interface lacks multicast.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock interface without MULTICAST flag (realistic current system state) + mock_run.side_effect = [ + type( + "MockResult", + (), + { + "stdout": "1: lo: mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000\n link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00", + "returncode": 0, + }, + )(), + type( + "MockResult", (), {"stdout": "224.0.0.0/4 dev lo scope link", "returncode": 0} + )(), + ] + + result = check_multicast() + assert result == ["sudo ifconfig lo multicast"] + + def test_check_multicast_missing_route(self): + """Test check_multicast when multicast route is missing.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock missing route - interface has multicast but no route + mock_run.side_effect = [ + type( + "MockResult", + (), + { + "stdout": "1: lo: mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000\n link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00", + "returncode": 0, + }, + )(), + type( + "MockResult", (), {"stdout": "", "returncode": 0} + )(), # Empty output - no route + ] + + result = check_multicast() + assert result == ["sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo"] + + def test_check_multicast_all_missing(self): + """Test check_multicast when both multicast flag and route are missing (current system state).""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock both missing - matches actual current system state + mock_run.side_effect = [ + type( + "MockResult", + (), + { + "stdout": "1: lo: mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000\n link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00", + "returncode": 0, + }, + )(), + type( + "MockResult", (), {"stdout": "", "returncode": 0} + )(), # Empty output - no route + ] + + result = check_multicast() + expected = [ + "sudo ifconfig lo multicast", + "sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", + ] + assert result == expected + + def test_check_multicast_subprocess_exception(self): + """Test check_multicast when subprocess calls fail.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock subprocess exceptions + mock_run.side_effect = Exception("Command failed") + + result = check_multicast() + expected = [ + "sudo ifconfig lo multicast", + "sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", + ] + assert result == expected + + def test_check_buffers_all_configured(self): + """Test check_buffers when system is properly configured.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock sufficient buffer sizes + mock_run.side_effect = [ + type( + "MockResult", (), {"stdout": "net.core.rmem_max = 2097152", "returncode": 0} + )(), + type( + "MockResult", (), {"stdout": "net.core.rmem_default = 2097152", "returncode": 0} + )(), + ] + + result = check_buffers() + assert result == [] + + def test_check_buffers_low_max_buffer(self): + """Test check_buffers when rmem_max is too low.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock low rmem_max + mock_run.side_effect = [ + type( + "MockResult", (), {"stdout": "net.core.rmem_max = 1048576", "returncode": 0} + )(), + type( + "MockResult", (), {"stdout": "net.core.rmem_default = 2097152", "returncode": 0} + )(), + ] + + result = check_buffers() + assert result == ["sudo sysctl -w net.core.rmem_max=2097152"] + + def test_check_buffers_low_default_buffer(self): + """Test check_buffers when rmem_default is too low.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock low rmem_default + mock_run.side_effect = [ + type( + "MockResult", (), {"stdout": "net.core.rmem_max = 2097152", "returncode": 0} + )(), + type( + "MockResult", (), {"stdout": "net.core.rmem_default = 1048576", "returncode": 0} + )(), + ] + + result = check_buffers() + assert result == ["sudo sysctl -w net.core.rmem_default=2097152"] + + def test_check_buffers_both_low(self): + """Test check_buffers when both buffer sizes are too low.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock both low + mock_run.side_effect = [ + type( + "MockResult", (), {"stdout": "net.core.rmem_max = 1048576", "returncode": 0} + )(), + type( + "MockResult", (), {"stdout": "net.core.rmem_default = 1048576", "returncode": 0} + )(), + ] + + result = check_buffers() + expected = [ + "sudo sysctl -w net.core.rmem_max=2097152", + "sudo sysctl -w net.core.rmem_default=2097152", + ] + assert result == expected + + def test_check_buffers_subprocess_exception(self): + """Test check_buffers when subprocess calls fail.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock subprocess exceptions + mock_run.side_effect = Exception("Command failed") + + result = check_buffers() + expected = [ + "sudo sysctl -w net.core.rmem_max=2097152", + "sudo sysctl -w net.core.rmem_default=2097152", + ] + assert result == expected + + def test_check_buffers_parsing_error(self): + """Test check_buffers when output parsing fails.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock malformed output + mock_run.side_effect = [ + type("MockResult", (), {"stdout": "invalid output", "returncode": 0})(), + type("MockResult", (), {"stdout": "also invalid", "returncode": 0})(), + ] + + result = check_buffers() + expected = [ + "sudo sysctl -w net.core.rmem_max=2097152", + "sudo sysctl -w net.core.rmem_default=2097152", + ] + assert result == expected + + def test_autoconf_no_config_needed(self): + """Test autoconf when no configuration is needed.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock all checks passing + mock_run.side_effect = [ + # check_multicast calls + type( + "MockResult", + (), + { + "stdout": "1: lo: mtu 65536", + "returncode": 0, + }, + )(), + type( + "MockResult", (), {"stdout": "224.0.0.0/4 dev lo scope link", "returncode": 0} + )(), + # check_buffers calls + type( + "MockResult", (), {"stdout": "net.core.rmem_max = 2097152", "returncode": 0} + )(), + type( + "MockResult", (), {"stdout": "net.core.rmem_default = 2097152", "returncode": 0} + )(), + ] + + with patch("builtins.print") as mock_print: + autoconf() + # Should not print anything when no config is needed + mock_print.assert_not_called() + + def test_autoconf_with_config_needed_success(self): + """Test autoconf when configuration is needed and commands succeed.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock checks failing, then mock the execution succeeding + mock_run.side_effect = [ + # check_multicast calls + type( + "MockResult", + (), + {"stdout": "1: lo: mtu 65536", "returncode": 0}, + )(), + type("MockResult", (), {"stdout": "", "returncode": 0})(), + # check_buffers calls + type( + "MockResult", (), {"stdout": "net.core.rmem_max = 1048576", "returncode": 0} + )(), + type( + "MockResult", (), {"stdout": "net.core.rmem_default = 1048576", "returncode": 0} + )(), + # Command execution calls + type( + "MockResult", (), {"stdout": "success", "returncode": 0} + )(), # sudo ifconfig lo multicast + type( + "MockResult", (), {"stdout": "success", "returncode": 0} + )(), # sudo route add... + type( + "MockResult", (), {"stdout": "success", "returncode": 0} + )(), # sudo sysctl rmem_max + type( + "MockResult", (), {"stdout": "success", "returncode": 0} + )(), # sudo sysctl rmem_default + ] + + with patch("builtins.print") as mock_print: + autoconf() + + # Verify the expected print calls + expected_calls = [ + ("System configuration required. Executing commands...",), + (" Running: sudo ifconfig lo multicast",), + (" ✓ Success",), + (" Running: sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo",), + (" ✓ Success",), + (" Running: sudo sysctl -w net.core.rmem_max=2097152",), + (" ✓ Success",), + (" Running: sudo sysctl -w net.core.rmem_default=2097152",), + (" ✓ Success",), + ("System configuration completed.",), + ] + from unittest.mock import call + + mock_print.assert_has_calls([call(*args) for args in expected_calls]) + + def test_autoconf_with_command_failures(self): + """Test autoconf when some commands fail.""" + with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: + # Mock checks failing, then mock some commands failing + mock_run.side_effect = [ + # check_multicast calls + type( + "MockResult", + (), + {"stdout": "1: lo: mtu 65536", "returncode": 0}, + )(), + type("MockResult", (), {"stdout": "", "returncode": 0})(), + # check_buffers calls (no buffer issues for simpler test) + type( + "MockResult", (), {"stdout": "net.core.rmem_max = 2097152", "returncode": 0} + )(), + type( + "MockResult", (), {"stdout": "net.core.rmem_default = 2097152", "returncode": 0} + )(), + # Command execution calls - first succeeds, second fails + type( + "MockResult", (), {"stdout": "success", "returncode": 0} + )(), # sudo ifconfig lo multicast + subprocess.CalledProcessError( + 1, + [ + "sudo", + "route", + "add", + "-net", + "224.0.0.0", + "netmask", + "240.0.0.0", + "dev", + "lo", + ], + "Permission denied", + "Operation not permitted", + ), + ] + + with patch("builtins.print") as mock_print: + autoconf() + + # Verify it handles the failure gracefully + print_calls = [call[0][0] for call in mock_print.call_args_list] + assert "System configuration required. Executing commands..." in print_calls + assert " ✓ Success" in print_calls # First command succeeded + assert any("✗ Failed" in call for call in print_calls) # Second command failed + assert "System configuration completed." in print_calls diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py index 0abd72a7e8..caaf43b965 100644 --- a/dimos/protocol/pubsub/test_spec.py +++ b/dimos/protocol/pubsub/test_spec.py @@ -16,6 +16,7 @@ import asyncio import time +import traceback from contextlib import contextmanager from typing import Any, Callable, List, Tuple @@ -42,7 +43,7 @@ def memory_context(): ] try: - from dimos.protocol.pubsub.redis import Redis + from dimos.protocol.pubsub.redispubsub import Redis @contextmanager def redis_context(): @@ -65,7 +66,7 @@ def redis_context(): @contextmanager def lcm_context(): - lcm_pubsub = LCM(auto_configure_multicast=False) + lcm_pubsub = LCM(autoconf=True) lcm_pubsub.start() yield lcm_pubsub lcm_pubsub.stop() diff --git a/docker/dev/Dockerfile b/docker/dev/Dockerfile index 05725add6f..195d8ac1e3 100644 --- a/docker/dev/Dockerfile +++ b/docker/dev/Dockerfile @@ -17,6 +17,7 @@ RUN apt-get update && apt-get install -y \ wget \ net-tools \ sudo \ + iproute2 # for LCM networking system config \ pre-commit