diff --git a/dimos/protocol/pubsub/lcmpubsub.py b/dimos/protocol/pubsub/lcmpubsub.py index c9b3869d04..5d9459908c 100644 --- a/dimos/protocol/pubsub/lcmpubsub.py +++ b/dimos/protocol/pubsub/lcmpubsub.py @@ -18,7 +18,11 @@ from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin -from dimos.protocol.service.lcmservice import LCMConfig, LCMService, autoconf +from dimos.protocol.service.lcmservice import ( + LCMConfig, + LCMService, + autoconf, +) from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py index 75a6a4d362..cf0a0647d8 100644 --- a/dimos/protocol/service/lcmservice.py +++ b/dimos/protocol/service/lcmservice.py @@ -16,11 +16,8 @@ from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from functools import cache import os import platform -import subprocess -import sys import threading import traceback from typing import Protocol, runtime_checkable @@ -28,208 +25,46 @@ import lcm from dimos.protocol.service.spec import Service +from dimos.protocol.service.system_configurator import ( + BufferConfiguratorLinux, + BufferConfiguratorMacOS, + MaxFileConfiguratorMacOS, + MulticastConfiguratorLinux, + MulticastConfiguratorMacOS, + SystemConfigurator, + configure_system, +) from dimos.utils.logging_config import setup_logger logger = setup_logger() +_DEFAULT_LCM_HOST = "239.255.76.67" +_DEFAULT_LCM_PORT = "7667" +# LCM_DEFAULT_URL is used by LCM (we didn't pick that env var name) +_DEFAULT_LCM_URL = os.getenv( + "LCM_DEFAULT_URL", f"udpm://{_DEFAULT_LCM_HOST}:{_DEFAULT_LCM_PORT}?ttl=0" +) -@cache -def check_root() -> bool: - """Return True if the current process is running as root (UID 0).""" - try: - return os.geteuid() == 0 - except AttributeError: - # Platforms without geteuid (e.g. Windows) – assume non-root. - return False - - -def check_multicast() -> list[str]: - """Check if multicast configuration is needed and return required commands.""" - commands_needed = [] - - sudo = "" if check_root() else "sudo " +def autoconf(check_only: bool = False) -> None: + # check multicast and buffer sizes system = platform.system() - + checks: list[SystemConfigurator] = [] if system == "Linux": - # Linux commands - loopback_interface = "lo" - # Check if loopback interface has multicast enabled - try: - result = subprocess.run( - ["ip", "link", "show", loopback_interface], capture_output=True, text=True - ) - if "MULTICAST" not in result.stdout: - commands_needed.append(f"{sudo}ip l set {loopback_interface} multicast on") - except Exception: - commands_needed.append(f"{sudo}ip l set {loopback_interface} multicast on") - - # Check if multicast route exists - try: - result = subprocess.run( - ["ip", "route", "show", "224.0.0.0/4"], capture_output=True, text=True - ) - if not result.stdout.strip(): - commands_needed.append(f"{sudo}ip route add 224.0.0.0/4 dev {loopback_interface}") - except Exception: - commands_needed.append(f"{sudo}ip route add 224.0.0.0/4 dev {loopback_interface}") - - elif system == "Darwin": # macOS - loopback_interface = "lo0" - # Check if multicast route exists - try: - result = subprocess.run(["netstat", "-nr"], capture_output=True, text=True) - route_exists = "224.0.0.0/4" in result.stdout or "224.0.0/4" in result.stdout - if not route_exists: - commands_needed.append(f"{sudo}ip route add 224.0.0.0/4 dev {loopback_interface}") - except Exception: - commands_needed.append(f"{sudo}ip route add 224.0.0.0/4 dev {loopback_interface}") - + checks = [ + MulticastConfiguratorLinux(loopback_interface="lo"), + BufferConfiguratorLinux(), + ] + elif system == "Darwin": + checks = [ + MulticastConfiguratorMacOS(loopback_interface="lo0"), + BufferConfiguratorMacOS(), + MaxFileConfiguratorMacOS(), + ] else: - # For other systems, skip multicast configuration - logger.warning(f"Multicast configuration not supported on {system}") - - return commands_needed - - -def _set_net_value(commands_needed: list[str], sudo: str, name: str, value: int) -> int | None: - try: - result = subprocess.run(["sysctl", name], capture_output=True, text=True) - if result.returncode == 0: - current = int(result.stdout.replace(":", "=").split("=")[1].strip()) - else: - current = None - if not current or current < value: - commands_needed.append(f"{sudo}sysctl -w {name}={value}") - return current - except: - commands_needed.append(f"{sudo}sysctl -w {name}={value}") - return None - - -TARGET_RMEM_SIZE = 67108864 -TARGET_MAX_SOCKET_BUFFER_SIZE_MACOS = 8388608 -TARGET_MAX_DGRAM_SIZE_MACOS = 65535 - - -def check_buffers() -> tuple[list[str], int | None]: - """Check if buffer configuration is needed and return required commands and current size. - - Returns: - Tuple of (commands_needed, current_max_buffer_size) - """ - commands_needed: list[str] = [] - current_max = None - - sudo = "" if check_root() else "sudo " - system = platform.system() - - if system == "Linux": - # Linux buffer configuration - current_max = _set_net_value(commands_needed, sudo, "net.core.rmem_max", TARGET_RMEM_SIZE) - _set_net_value(commands_needed, sudo, "net.core.rmem_default", TARGET_RMEM_SIZE) - elif system == "Darwin": # macOS - # macOS buffer configuration - check and set UDP buffer related sysctls - current_max = _set_net_value( - commands_needed, sudo, "kern.ipc.maxsockbuf", TARGET_MAX_SOCKET_BUFFER_SIZE_MACOS - ) - _set_net_value(commands_needed, sudo, "net.inet.udp.recvspace", TARGET_RMEM_SIZE) - _set_net_value(commands_needed, sudo, "net.inet.udp.maxdgram", TARGET_MAX_DGRAM_SIZE_MACOS) - else: - # For other systems, skip buffer configuration - logger.warning(f"Buffer configuration not supported on {system}") - - return commands_needed, current_max - - -def check_system() -> None: - """Check if system configuration is needed and exit only for critical issues. - - Multicast configuration is critical for LCM to work. - Buffer sizes are performance optimizations - warn but don't fail in containers. - """ - if os.environ.get("CI"): - logger.debug("CI environment detected: Skipping system configuration checks.") - return - - multicast_commands = check_multicast() - buffer_commands, current_buffer_size = check_buffers() - - # Multicast configuration - if multicast_commands: - logger.error( - "Critical: Multicast configuration required. Please run the following commands:" - ) - for cmd in multicast_commands: - logger.error(f" {cmd}") - logger.error("\nThen restart your application.") - sys.exit(1) - - # Buffer configuration is critical for throughput and packet loss - elif buffer_commands: - if current_buffer_size: - logger.warning( - f"UDP buffer size limited to {current_buffer_size} bytes ({current_buffer_size // 1024}KB). Large LCM packets may fail." - ) - else: - logger.warning("UDP buffer sizes are limited. Large LCM packets may fail.") - logger.warning("For better performance, consider running:") - for cmd in buffer_commands: - logger.warning(f" {cmd}") - logger.warning("Note: This may not be possible in Docker containers.") - - -def autoconf() -> None: - """Auto-configure system by running checks and executing required commands if needed.""" - if os.environ.get("CI"): - logger.info("CI environment detected: Skipping automatic system configuration.") - return - - platform.system() - - commands_needed = [] - - # Check multicast configuration - commands_needed.extend(check_multicast()) - - # Check buffer configuration - buffer_commands, _ = check_buffers() - commands_needed.extend(buffer_commands) - - if not commands_needed: + logger.error(f"System configuration not supported on {system}") return - - logger.info("System configuration required. Executing commands...") - - for cmd in commands_needed: - logger.info(f" Running: {cmd}") - try: - # Split command into parts for subprocess - cmd_parts = cmd.split() - subprocess.run(cmd_parts, capture_output=True, text=True, check=True) - logger.info(" ✓ Success") - except subprocess.CalledProcessError as e: - # Check if this is a multicast/route command or a sysctl command - if "route" in cmd or "multicast" in cmd: - # Multicast/route failures should still fail - logger.error(f" ✗ Failed to configure multicast: {e}") - logger.error(f" stdout: {e.stdout}") - logger.error(f" stderr: {e.stderr}") - raise - elif "sysctl" in cmd: - # Sysctl failures are just warnings (likely docker/container) - logger.warning( - f" ✗ Not able to auto-configure UDP buffer sizes (likely docker image): {e}" - ) - except Exception as e: - logger.error(f" ✗ Error: {e}") - if "route" in cmd or "multicast" in cmd: - raise - - logger.info("System configuration completed.") - - -_DEFAULT_LCM_URL_MACOS = "udpm://239.255.76.67:7667?ttl=0" + configure_system(checks, check_only=check_only) @dataclass @@ -240,9 +75,8 @@ class LCMConfig: lcm: lcm.LCM | None = None def __post_init__(self) -> None: - if self.url is None and platform.system() == "Darwin": - # On macOS, use multicast with TTL=0 to keep traffic local - self.url = _DEFAULT_LCM_URL_MACOS + if self.url is None: + self.url = _DEFAULT_LCM_URL @runtime_checkable @@ -326,13 +160,10 @@ def start(self) -> None: else: self.l = lcm.LCM(self.config.url) if self.config.url else lcm.LCM() - if self.config.autoconf: - autoconf() - else: - try: - check_system() - except Exception as e: - print(f"Error checking system configuration: {e}") + try: + autoconf(check_only=not self.config.autoconf) + except Exception as e: + print(f"Error checking system configuration: {e}") self._stop_event.clear() self._thread = threading.Thread(target=self._lcm_loop) diff --git a/dimos/protocol/service/system_configurator.py b/dimos/protocol/service/system_configurator.py new file mode 100644 index 0000000000..44b8c45276 --- /dev/null +++ b/dimos/protocol/service/system_configurator.py @@ -0,0 +1,436 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from abc import ABC, abstractmethod +from functools import cache +import os +import re +import resource +import subprocess +from typing import Any + +# ----------------------------- sudo helpers ----------------------------- + + +@cache +def _is_root_user() -> bool: + try: + return os.geteuid() == 0 + except AttributeError: + return False + + +def sudo_run(*args: Any, **kwargs: Any) -> subprocess.CompletedProcess[str]: + if _is_root_user(): + return subprocess.run(list(args), **kwargs) + return subprocess.run(["sudo", *args], **kwargs) + + +def _read_sysctl_int(name: str) -> int | None: + try: + result = subprocess.run(["sysctl", name], capture_output=True, text=True) + if result.returncode != 0: + print( + f"[sysctl] ERROR: `sysctl {name}` rc={result.returncode} stderr={result.stderr!r}" + ) + return None + + text = result.stdout.strip().replace(":", "=") + if "=" not in text: + print(f"[sysctl] ERROR: unexpected output for {name}: {text!r}") + return None + + return int(text.split("=", 1)[1].strip()) + except Exception as error: + print(f"[sysctl] ERROR: reading {name}: {error}") + return None + + +def _write_sysctl_int(name: str, value: int) -> None: + sudo_run("sysctl", "-w", f"{name}={value}", check=True, text=True, capture_output=False) + + +# -------------------------- base class for system config checks/requirements -------------------------- + + +class SystemConfigurator(ABC): + critical: bool = False + + @abstractmethod + def check(self) -> bool: + """Return True if configured. Log errors and return False on uncertainty.""" + raise NotImplementedError + + @abstractmethod + def explanation(self) -> str | None: + """ + Return a human-readable summary of what would be done (sudo commands) if not configured. + Return None when no changes are needed. + """ + raise NotImplementedError + + @abstractmethod + def fix(self) -> None: + """Apply fixes (may attempt sudo, catch, and apply fallback measures if needed).""" + raise NotImplementedError + + +# ----------------------------- generic enforcement of system configs ----------------------------- + + +def configure_system(checks: list[SystemConfigurator], check_only: bool = False) -> None: + if os.environ.get("CI"): + print("CI environment detected: skipping system configuration.") + return + + # run checks + failing = [check for check in checks if not check.check()] + if not failing: + return + + # ask for permission to modify system + explanations: list[str] = [msg for check in failing if (msg := check.explanation()) is not None] + + if explanations: + print("System configuration changes are recommended/required:\n") + print("\n\n".join(explanations)) + print() + + if check_only: + return + + try: + answer = input("Apply these changes now? [y/N]: ").strip().lower() + except (EOFError, KeyboardInterrupt): + answer = "" + + if answer not in ("y", "yes"): + if any(check.critical for check in failing): + raise SystemExit(1) + return + + for check in failing: + try: + check.fix() + except subprocess.CalledProcessError as error: + if check.critical: + print(f"Critical fix failed rc={error.returncode}") + print(f"stdout: {error.stdout}") + print(f"stderr: {error.stderr}") + raise + print(f"Optional improvement failed: rc={error.returncode}") + print(f"stdout: {error.stdout}") + print(f"stderr: {error.stderr}") + + print("System configuration completed.") + + +# ------------------------------ specific checks: multicast ------------------------------ + + +class MulticastConfiguratorLinux(SystemConfigurator): + critical = True + MULTICAST_PREFIX = "224.0.0.0/4" + + def __init__(self, loopback_interface: str = "lo"): + self.loopback_interface = loopback_interface + + self.loopback_ok: bool | None = None + self.route_ok: bool | None = None + + self.enable_multicast_cmd = [ + "ip", + "link", + "set", + self.loopback_interface, + "multicast", + "on", + ] + self.add_route_cmd = [ + "ip", + "route", + "add", + self.MULTICAST_PREFIX, + "dev", + self.loopback_interface, + ] + + def check(self) -> bool: + # Verify `ip` exists (iproute2) + try: + subprocess.run(["ip", "-V"], capture_output=True, text=True, check=False) + except FileNotFoundError as error: + print( + f"ERROR: `ip` not found (iproute2 missing, did you install system requirements?): {error}" + ) + self.loopback_ok = self.route_ok = False + return False + except Exception as error: + print(f"ERROR: failed probing `ip`: {error}") + self.loopback_ok = self.route_ok = False + return False + + # check MULTICAST on loopback + try: + result = subprocess.run( + ["ip", "-o", "link", "show", self.loopback_interface], + capture_output=True, + text=True, + ) + if result.returncode != 0: + print( + f"ERROR: `ip link show {self.loopback_interface}` rc={result.returncode} " + f"stderr={result.stderr!r}" + ) + self.loopback_ok = False + else: + match = re.search(r"<([^>]*)>", result.stdout) + flags = { + flag.strip().upper() + for flag in (match.group(1).split(",") if match else []) + if flag.strip() + } + self.loopback_ok = "MULTICAST" in flags + except Exception as error: + print(f"ERROR: failed checking loopback multicast: {error}") + self.loopback_ok = False + + # Check if multicast route exists + try: + result = subprocess.run( + ["ip", "-o", "route", "show", self.MULTICAST_PREFIX], + capture_output=True, + text=True, + ) + if result.returncode != 0: + print( + f"ERROR: `ip route show {self.MULTICAST_PREFIX}` rc={result.returncode} " + f"stderr={result.stderr!r}" + ) + self.route_ok = False + else: + self.route_ok = bool(result.stdout.strip()) + except Exception as error: + print(f"ERROR: failed checking multicast route: {error}") + self.route_ok = False + + return bool(self.loopback_ok and self.route_ok) + + def explanation(self) -> str | None: + output = "" + if not self.loopback_ok: + output += f"- Multicast: sudo {' '.join(self.enable_multicast_cmd)}\n" + if not self.route_ok: + output += f"- Multicast: sudo {' '.join(self.add_route_cmd)}\n" + return output + + def fix(self) -> None: + if not self.loopback_ok: + sudo_run(*self.enable_multicast_cmd, check=True, text=True, capture_output=True) + if not self.route_ok: + sudo_run(*self.add_route_cmd, check=True, text=True, capture_output=True) + + +class MulticastConfiguratorMacOS(SystemConfigurator): + critical = True + + def __init__(self, loopback_interface: str = "lo0"): + self.loopback_interface = loopback_interface + self.add_route_cmd = [ + "route", + "add", + "-net", + "224.0.0.0/4", + "-interface", + self.loopback_interface, + ] + + def check(self) -> bool: + # `netstat -nr` shows the routing table. We search for a 224/4 route entry. + try: + result = subprocess.run(["netstat", "-nr"], capture_output=True, text=True) + if result.returncode != 0: + print(f"ERROR: `netstat -nr` rc={result.returncode} stderr={result.stderr!r}") + return False + + route_ok = ("224.0.0.0/4" in result.stdout) or ("224.0.0/4" in result.stdout) + return bool(route_ok) + except Exception as error: + print(f"ERROR: failed checking multicast route via netstat: {error}") + return False + + def explanation(self) -> str | None: + return f"Multicast: - sudo {' '.join(self.add_route_cmd)}" + + def fix(self) -> None: + sudo_run(*self.add_route_cmd, check=True, text=True, capture_output=True) + + +# ------------------------------ specific checks: buffers ------------------------------ + +IDEAL_RMEM_SIZE = 67_108_864 # 64MB + + +class BufferConfiguratorLinux(SystemConfigurator): + critical = False + + TARGET_RMEM_SIZE = IDEAL_RMEM_SIZE + + def __init__(self) -> None: + self.needs: list[tuple[str, int]] = [] # (key, target_value) + + def check(self) -> bool: + self.needs.clear() + for key, target in [ + ("net.core.rmem_max", self.TARGET_RMEM_SIZE), + ("net.core.rmem_default", self.TARGET_RMEM_SIZE), + ]: + current = _read_sysctl_int(key) + if current is None or current < target: + self.needs.append((key, target)) + return not self.needs + + def explanation(self) -> str | None: + lines = [] + for key, target in self.needs: + lines.append(f"- socket buffer optimization: sudo sysctl -w {key}={target}") + return "\n".join(lines) + + def fix(self) -> None: + for key, target in self.needs: + _write_sysctl_int(key, target) + + +class BufferConfiguratorMacOS(SystemConfigurator): + critical = False + MAX_POSSIBLE_RECVSPACE = 2_097_152 + MAX_POSSIBLE_BUFFER_SIZE = 8_388_608 + MAX_POSSIBLE_DGRAM_SIZE = 65_535 + # these values are based on macos 26 + + TARGET_BUFFER_SIZE = MAX_POSSIBLE_BUFFER_SIZE + TARGET_RECVSPACE = MAX_POSSIBLE_RECVSPACE # we want this to be IDEAL_RMEM_SIZE but MacOS 26 (and probably in general) doesn't support it + TARGET_DGRAM_SIZE = MAX_POSSIBLE_DGRAM_SIZE + + def __init__(self) -> None: + self.needs: list[tuple[str, int]] = [] + + def check(self) -> bool: + self.needs.clear() + for key, target in [ + ("kern.ipc.maxsockbuf", self.TARGET_BUFFER_SIZE), + ("net.inet.udp.recvspace", self.TARGET_RECVSPACE), + ("net.inet.udp.maxdgram", self.TARGET_DGRAM_SIZE), + ]: + current = _read_sysctl_int(key) + if current is None or current < target: + self.needs.append((key, target)) + return not self.needs + + def explanation(self) -> str | None: + lines = [] + for key, target in self.needs: + lines.append(f"- sudo sysctl -w {key}={target}") + return "\n".join(lines) + + def fix(self) -> None: + for key, target in self.needs: + _write_sysctl_int(key, target) + + +# ------------------------------ specific checks: ulimit ------------------------------ + + +class MaxFileConfiguratorMacOS(SystemConfigurator): + """Ensure the open file descriptor limit (ulimit -n) is at least TARGET_FILE_COUNT_LIMIT.""" + + critical = False + TARGET_FILE_COUNT_LIMIT = 65536 + + def __init__(self, target: int = TARGET_FILE_COUNT_LIMIT): + self.target = target + self.current_soft: int = 0 + self.current_hard: int = 0 + self.can_fix_without_sudo: bool = False + + def check(self) -> bool: + try: + self.current_soft, self.current_hard = resource.getrlimit(resource.RLIMIT_NOFILE) + except Exception as error: + print(f"[ulimit] ERROR: failed to get RLIMIT_NOFILE: {error}") + return False + + if self.current_soft >= self.target: + return True + + # Check if we can raise to target without sudo (hard limit is high enough) + self.can_fix_without_sudo = self.current_hard >= self.target + return False + + def explanation(self) -> str | None: + lines = [] + if self.can_fix_without_sudo: + lines.append(f"- Raise soft file count limit to {self.target} (no sudo required)") + else: + lines.append(f"- Raise soft file count limit to {min(self.target, self.current_hard)}") + lines.append( + f"- Raise hard limit via: sudo launchctl limit maxfiles {self.target} {self.target}" + ) + return "\n".join(lines) + + def fix(self) -> None: + if self.current_soft >= self.target: + return + + if self.can_fix_without_sudo: + # Hard limit is sufficient, just raise the soft limit + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (self.target, self.current_hard)) + except Exception as error: + print(f"[ulimit] ERROR: failed to set soft limit: {error}") + raise + else: + # Need to raise both soft and hard limits via launchctl + try: + sudo_run( + "launchctl", + "limit", + "maxfiles", + str(self.target), + str(self.target), + check=True, + text=True, + capture_output=True, + ) + except subprocess.CalledProcessError as error: + print(f"[ulimit] WARNING: launchctl failed: {error.stderr}") + # Fallback: raise soft limit as high as the current hard limit allows + if self.current_hard > self.current_soft: + try: + resource.setrlimit( + resource.RLIMIT_NOFILE, (self.current_hard, self.current_hard) + ) + except Exception as fallback_error: + print(f"[ulimit] ERROR: fallback also failed: {fallback_error}") + raise + + # After launchctl, try to apply the new limit to the current process + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (self.target, self.target)) + except Exception as error: + print( + f"[ulimit] WARNING: could not apply to current process (restart may be required): {error}" + ) diff --git a/dimos/protocol/service/test_lcmservice.py b/dimos/protocol/service/test_lcmservice.py index ce686d9e1d..a85462cf31 100644 --- a/dimos/protocol/service/test_lcmservice.py +++ b/dimos/protocol/service/test_lcmservice.py @@ -13,553 +13,306 @@ # limitations under the License. import os -import subprocess -from unittest.mock import patch +import pickle +import threading +import time +from unittest.mock import MagicMock, patch import pytest from dimos.protocol.service.lcmservice import ( - TARGET_MAX_DGRAM_SIZE_MACOS, - TARGET_MAX_SOCKET_BUFFER_SIZE_MACOS, - TARGET_RMEM_SIZE, + _DEFAULT_LCM_URL, + LCMConfig, + LCMService, + Topic, autoconf, - check_buffers, - check_multicast, - check_root, +) +from dimos.protocol.service.system_configurator import ( + BufferConfiguratorLinux, + BufferConfiguratorMacOS, + MaxFileConfiguratorMacOS, + MulticastConfiguratorLinux, + MulticastConfiguratorMacOS, ) +# ----------------------------- autoconf tests ----------------------------- -def get_sudo_prefix() -> str: - """Return 'sudo ' if not running as root, empty string if running as root.""" - return "" if check_root() else "sudo " - - -def test_check_multicast_all_configured() -> None: - """Test check_multicast when system is properly configured.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock successful checks with realistic output format - mock_run.side_effect = [ - type( - "MockResult", - (), - { - "stdout": "1: lo: mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000\n link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00", - "returncode": 0, - }, - )(), - type( - "MockResult", (), {"stdout": "224.0.0.0/4 dev lo scope link", "returncode": 0} - )(), - ] - - result = check_multicast() - assert result == [] - - -def test_check_multicast_missing_multicast_flag() -> None: - """Test check_multicast when loopback interface lacks multicast.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock interface without MULTICAST flag (realistic current system state) - mock_run.side_effect = [ - type( - "MockResult", - (), - { - "stdout": "1: lo: mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000\n link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00", - "returncode": 0, - }, - )(), - type( - "MockResult", (), {"stdout": "224.0.0.0/4 dev lo scope link", "returncode": 0} - )(), - ] - - result = check_multicast() - sudo = get_sudo_prefix() - assert result == [f"{sudo}ip l set lo multicast on"] - - -def test_check_multicast_missing_route() -> None: - """Test check_multicast when multicast route is missing.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock missing route - interface has multicast but no route - mock_run.side_effect = [ - type( - "MockResult", - (), - { - "stdout": "1: lo: mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000\n link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00", - "returncode": 0, - }, - )(), - type( - "MockResult", (), {"stdout": "", "returncode": 0} - )(), # Empty output - no route - ] - - result = check_multicast() - sudo = get_sudo_prefix() - assert result == [f"{sudo}ip route add 224.0.0.0/4 dev lo"] - - -def test_check_multicast_all_missing() -> None: - """Test check_multicast when both multicast flag and route are missing (current system state).""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock both missing - matches actual current system state - mock_run.side_effect = [ - type( - "MockResult", - (), - { - "stdout": "1: lo: mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000\n link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00", - "returncode": 0, - }, - )(), - type( - "MockResult", (), {"stdout": "", "returncode": 0} - )(), # Empty output - no route - ] - - result = check_multicast() - sudo = get_sudo_prefix() - expected = [ - f"{sudo}ip l set lo multicast on", - f"{sudo}ip route add 224.0.0.0/4 dev lo", - ] - assert result == expected - - -def test_check_multicast_subprocess_exception() -> None: - """Test check_multicast when subprocess calls fail.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock subprocess exceptions - mock_run.side_effect = Exception("Command failed") - - result = check_multicast() - sudo = get_sudo_prefix() - expected = [ - f"{sudo}ip l set lo multicast on", - f"{sudo}ip route add 224.0.0.0/4 dev lo", - ] - assert result == expected - - -def test_check_multicast_macos() -> None: - """Test check_multicast on macOS when configuration is needed.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Darwin"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock netstat -nr to not contain the multicast route - mock_run.side_effect = [ - type( - "MockResult", - (), - { - "stdout": "default 192.168.1.1 UGScg en0", - "returncode": 0, - }, - )(), - ] - - result = check_multicast() - sudo = get_sudo_prefix() - expected = [f"{sudo}ip route add 224.0.0.0/4 dev lo0"] - assert result == expected - - -def test_check_buffers_all_configured() -> None: - """Test check_buffers when system is properly configured.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock sufficient buffer sizes - mock_run.side_effect = [ - type( - "MockResult", (), {"stdout": "net.core.rmem_max = 67108864", "returncode": 0} - )(), - type( - "MockResult", - (), - {"stdout": "net.core.rmem_default = 67108864", "returncode": 0}, - )(), - ] - - commands, buffer_size = check_buffers() - assert commands == [] - assert buffer_size >= TARGET_RMEM_SIZE - - -def test_check_buffers_low_max_buffer() -> None: - """Test check_buffers when rmem_max is too low.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock low rmem_max - mock_run.side_effect = [ - type( - "MockResult", (), {"stdout": "net.core.rmem_max = 1048576", "returncode": 0} - )(), - type( - "MockResult", - (), - {"stdout": f"net.core.rmem_default = {TARGET_RMEM_SIZE}", "returncode": 0}, - )(), - ] - - commands, buffer_size = check_buffers() - sudo = get_sudo_prefix() - assert commands == [f"{sudo}sysctl -w net.core.rmem_max={TARGET_RMEM_SIZE}"] - assert buffer_size == 1048576 - - -def test_check_buffers_low_default_buffer() -> None: - """Test check_buffers when rmem_default is too low.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock low rmem_default - mock_run.side_effect = [ - type( - "MockResult", - (), - {"stdout": f"net.core.rmem_max = {TARGET_RMEM_SIZE}", "returncode": 0}, - )(), - type( - "MockResult", (), {"stdout": "net.core.rmem_default = 1048576", "returncode": 0} - )(), - ] - - commands, buffer_size = check_buffers() - sudo = get_sudo_prefix() - assert commands == [f"{sudo}sysctl -w net.core.rmem_default={TARGET_RMEM_SIZE}"] - assert buffer_size == TARGET_RMEM_SIZE - - -def test_check_buffers_both_low() -> None: - """Test check_buffers when both buffer sizes are too low.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock both low - mock_run.side_effect = [ - type( - "MockResult", (), {"stdout": "net.core.rmem_max = 1048576", "returncode": 0} - )(), - type( - "MockResult", (), {"stdout": "net.core.rmem_default = 1048576", "returncode": 0} - )(), - ] - - commands, buffer_size = check_buffers() - sudo = get_sudo_prefix() - expected = [ - f"{sudo}sysctl -w net.core.rmem_max={TARGET_RMEM_SIZE}", - f"{sudo}sysctl -w net.core.rmem_default={TARGET_RMEM_SIZE}", - ] - assert commands == expected - assert buffer_size == 1048576 - - -def test_check_buffers_subprocess_exception() -> None: - """Test check_buffers when subprocess calls fail.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock subprocess exceptions - mock_run.side_effect = Exception("Command failed") - - commands, buffer_size = check_buffers() - sudo = get_sudo_prefix() - expected = [ - f"{sudo}sysctl -w net.core.rmem_max={TARGET_RMEM_SIZE}", - f"{sudo}sysctl -w net.core.rmem_default={TARGET_RMEM_SIZE}", - ] - assert commands == expected - assert buffer_size is None - - -def test_check_buffers_parsing_error() -> None: - """Test check_buffers when output parsing fails.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock malformed output - mock_run.side_effect = [ - type("MockResult", (), {"stdout": "invalid output", "returncode": 0})(), - type("MockResult", (), {"stdout": "also invalid", "returncode": 0})(), - ] - - commands, buffer_size = check_buffers() - sudo = get_sudo_prefix() - expected = [ - f"{sudo}sysctl -w net.core.rmem_max={TARGET_RMEM_SIZE}", - f"{sudo}sysctl -w net.core.rmem_default={TARGET_RMEM_SIZE}", - ] - assert commands == expected - assert buffer_size is None - - -def test_check_buffers_dev_container() -> None: - """Test check_buffers in dev container where sysctl fails.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock dev container behavior - sysctl returns non-zero - mock_run.side_effect = [ - type( - "MockResult", - (), - { - "stdout": "sysctl: cannot stat /proc/sys/net/core/rmem_max: No such file or directory", - "returncode": 255, - }, - )(), - type( - "MockResult", - (), - { - "stdout": "sysctl: cannot stat /proc/sys/net/core/rmem_default: No such file or directory", - "returncode": 255, - }, - )(), - ] - - commands, buffer_size = check_buffers() - sudo = get_sudo_prefix() - expected = [ - f"{sudo}sysctl -w net.core.rmem_max={TARGET_RMEM_SIZE}", - f"{sudo}sysctl -w net.core.rmem_default={TARGET_RMEM_SIZE}", - ] - assert commands == expected - assert buffer_size is None - - -def test_check_buffers_macos_all_configured() -> None: - """Test check_buffers on macOS when system is properly configured.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Darwin"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock sufficient buffer sizes for macOS - mock_run.side_effect = [ - type( - "MockResult", - (), - { - "stdout": f"kern.ipc.maxsockbuf: {TARGET_MAX_SOCKET_BUFFER_SIZE_MACOS}", - "returncode": 0, - }, - )(), - type( - "MockResult", - (), - {"stdout": f"net.inet.udp.recvspace: {TARGET_RMEM_SIZE}", "returncode": 0}, - )(), - type( - "MockResult", - (), - { - "stdout": f"net.inet.udp.maxdgram: {TARGET_MAX_DGRAM_SIZE_MACOS}", - "returncode": 0, - }, - )(), - ] - - commands, buffer_size = check_buffers() - assert commands == [] - assert buffer_size == TARGET_MAX_SOCKET_BUFFER_SIZE_MACOS - - -def test_check_buffers_macos_needs_config() -> None: - """Test check_buffers on macOS when configuration is needed.""" - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Darwin"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - mock_max_sock_buf_size = 4194304 - # Mock low buffer sizes for macOS - mock_run.side_effect = [ - type( - "MockResult", - (), - {"stdout": f"kern.ipc.maxsockbuf: {mock_max_sock_buf_size}", "returncode": 0}, - )(), - type( - "MockResult", (), {"stdout": "net.inet.udp.recvspace: 1048576", "returncode": 0} - )(), - type( - "MockResult", (), {"stdout": "net.inet.udp.maxdgram: 32768", "returncode": 0} - )(), - ] - - commands, buffer_size = check_buffers() - sudo = get_sudo_prefix() - expected = [ - f"{sudo}sysctl -w kern.ipc.maxsockbuf={TARGET_MAX_SOCKET_BUFFER_SIZE_MACOS}", - f"{sudo}sysctl -w net.inet.udp.recvspace={TARGET_RMEM_SIZE}", - f"{sudo}sysctl -w net.inet.udp.maxdgram={TARGET_MAX_DGRAM_SIZE_MACOS}", - ] - assert commands == expected - assert buffer_size == mock_max_sock_buf_size - - -def test_autoconf_no_config_needed() -> None: - """Test autoconf when no configuration is needed.""" - # Clear CI environment variable for this test - with patch.dict(os.environ, {"CI": ""}, clear=False): - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock all checks passing - mock_run.side_effect = [ - # check_multicast calls - type( - "MockResult", - (), - { - "stdout": "1: lo: mtu 65536", - "returncode": 0, - }, - )(), - type( - "MockResult", - (), - {"stdout": "224.0.0.0/4 dev lo scope link", "returncode": 0}, - )(), - # check_buffers calls - type( - "MockResult", - (), - {"stdout": f"net.core.rmem_max = {TARGET_RMEM_SIZE}", "returncode": 0}, - )(), - type( - "MockResult", - (), - {"stdout": f"net.core.rmem_default = {TARGET_RMEM_SIZE}", "returncode": 0}, - )(), - ] +class TestConfigureSystemForLcm: + def test_creates_linux_checks_on_linux(self) -> None: + with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): + with patch("dimos.protocol.service.lcmservice.configure_system") as mock_configure: + autoconf() + mock_configure.assert_called_once() + checks = mock_configure.call_args[0][0] + assert len(checks) == 2 + assert isinstance(checks[0], MulticastConfiguratorLinux) + assert isinstance(checks[1], BufferConfiguratorLinux) + assert checks[0].loopback_interface == "lo" + + def test_creates_macos_checks_on_darwin(self) -> None: + with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Darwin"): + with patch("dimos.protocol.service.lcmservice.configure_system") as mock_configure: + autoconf() + mock_configure.assert_called_once() + checks = mock_configure.call_args[0][0] + assert len(checks) == 3 + assert isinstance(checks[0], MulticastConfiguratorMacOS) + assert isinstance(checks[1], BufferConfiguratorMacOS) + assert isinstance(checks[2], MaxFileConfiguratorMacOS) + assert checks[0].loopback_interface == "lo0" + + def test_passes_check_only_flag(self) -> None: + with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): + with patch("dimos.protocol.service.lcmservice.configure_system") as mock_configure: + autoconf(check_only=True) + mock_configure.assert_called_once() + assert mock_configure.call_args[1]["check_only"] is True + + def test_logs_error_on_unsupported_system(self) -> None: + with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Windows"): + with patch("dimos.protocol.service.lcmservice.configure_system") as mock_configure: with patch("dimos.protocol.service.lcmservice.logger") as mock_logger: autoconf() - # Should not log anything when no config is needed - mock_logger.info.assert_not_called() - mock_logger.error.assert_not_called() - mock_logger.warning.assert_not_called() + mock_configure.assert_not_called() + mock_logger.error.assert_called_once() + assert "Windows" in mock_logger.error.call_args[0][0] -def test_autoconf_with_config_needed_success() -> None: - """Test autoconf when configuration is needed and commands succeed.""" - # Clear CI environment variable for this test - with patch.dict(os.environ, {"CI": ""}, clear=False): - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock checks failing, then mock the execution succeeding - mock_run.side_effect = [ - # check_multicast calls - type( - "MockResult", - (), - {"stdout": "1: lo: mtu 65536", "returncode": 0}, - )(), - type("MockResult", (), {"stdout": "", "returncode": 0})(), - # check_buffers calls - type( - "MockResult", (), {"stdout": "net.core.rmem_max = 1048576", "returncode": 0} - )(), - type( - "MockResult", - (), - {"stdout": "net.core.rmem_default = 1048576", "returncode": 0}, - )(), - # Command execution calls - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # ip l set lo multicast on - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # route add... - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sysctl rmem_max - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sysctl rmem_default - ] - - from unittest.mock import call +# ----------------------------- LCMConfig tests ----------------------------- - with patch("dimos.protocol.service.lcmservice.logger") as mock_logger: - autoconf() - sudo = get_sudo_prefix() - # Verify the expected log calls - expected_info_calls = [ - call("System configuration required. Executing commands..."), - call(f" Running: {sudo}ip l set lo multicast on"), - call(" ✓ Success"), - call(f" Running: {sudo}ip route add 224.0.0.0/4 dev lo"), - call(" ✓ Success"), - call(f" Running: {sudo}sysctl -w net.core.rmem_max={TARGET_RMEM_SIZE}"), - call(" ✓ Success"), - call( - f" Running: {sudo}sysctl -w net.core.rmem_default={TARGET_RMEM_SIZE}" - ), - call(" ✓ Success"), - call("System configuration completed."), - ] - - mock_logger.info.assert_has_calls(expected_info_calls) - - -def test_autoconf_with_command_failures() -> None: - """Test autoconf when some commands fail.""" - # Clear CI environment variable for this test - with patch.dict(os.environ, {"CI": ""}, clear=False): - with patch("dimos.protocol.service.lcmservice.platform.system", return_value="Linux"): - with patch("dimos.protocol.service.lcmservice.subprocess.run") as mock_run: - # Mock checks failing, then mock some commands failing - mock_run.side_effect = [ - # check_multicast calls - type( - "MockResult", - (), - {"stdout": "1: lo: mtu 65536", "returncode": 0}, - )(), - type("MockResult", (), {"stdout": "", "returncode": 0})(), - # check_buffers calls (no buffer issues for simpler test) - type( - "MockResult", - (), - {"stdout": f"net.core.rmem_max = {TARGET_RMEM_SIZE}", "returncode": 0}, - )(), - type( - "MockResult", - (), - {"stdout": f"net.core.rmem_default = {TARGET_RMEM_SIZE}", "returncode": 0}, - )(), - # Command execution calls - first succeeds, second fails - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # ip l set lo multicast on - subprocess.CalledProcessError( - 1, - [ - *get_sudo_prefix().split(), - "ip", - "route", - "add", - "224.0.0.0/4", - "dev", - "lo", - ], - "Permission denied", - "Operation not permitted", - ), - ] +class TestLCMConfig: + def test_default_values(self) -> None: + config = LCMConfig() + assert config.ttl == 0 + assert config.url == _DEFAULT_LCM_URL + assert config.autoconf is True + assert config.lcm is None - with patch("dimos.protocol.service.lcmservice.logger") as mock_logger: - # The function should raise on multicast/route failures - with pytest.raises(subprocess.CalledProcessError): - autoconf() - - # Verify it logged the failure before raising - info_calls = [call[0][0] for call in mock_logger.info.call_args_list] - error_calls = [call[0][0] for call in mock_logger.error.call_args_list] - - assert "System configuration required. Executing commands..." in info_calls - assert " ✓ Success" in info_calls # First command succeeded - assert any( - "✗ Failed to configure multicast" in call for call in error_calls - ) # Second command failed + def test_custom_url(self) -> None: + custom_url = "udpm://192.168.1.1:7777?ttl=1" + config = LCMConfig(url=custom_url) + assert config.url == custom_url + + def test_post_init_sets_default_url_when_none(self) -> None: + config = LCMConfig(url=None) + assert config.url == _DEFAULT_LCM_URL + + def test_autoconf_can_be_disabled(self) -> None: + config = LCMConfig(autoconf=False) + assert config.autoconf is False + + +# ----------------------------- Topic tests ----------------------------- + + +class TestTopic: + def test_str_without_lcm_type(self) -> None: + topic = Topic(topic="my_topic") + assert str(topic) == "my_topic" + + def test_str_with_lcm_type(self) -> None: + mock_type = MagicMock() + mock_type.msg_name = "TestMessage" + topic = Topic(topic="my_topic", lcm_type=mock_type) + assert str(topic) == "my_topic#TestMessage" + + +# ----------------------------- LCMService tests ----------------------------- + + +class TestLCMService: + def test_init_with_default_config(self) -> None: + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + service = LCMService() + assert service.config.url == _DEFAULT_LCM_URL + assert service.l == mock_lcm_instance + mock_lcm_class.assert_called_once_with(_DEFAULT_LCM_URL) + + def test_init_with_custom_url(self) -> None: + custom_url = "udpm://192.168.1.1:7777?ttl=1" + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + # Pass url as kwarg, not config= + LCMService(url=custom_url) + mock_lcm_class.assert_called_once_with(custom_url) + + def test_init_with_existing_lcm_instance(self) -> None: + mock_lcm_instance = MagicMock() + + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + # Pass lcm as kwarg + service = LCMService(lcm=mock_lcm_instance) + mock_lcm_class.assert_not_called() + assert service.l == mock_lcm_instance + + def test_start_and_stop(self) -> None: + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + with patch("dimos.protocol.service.lcmservice.autoconf"): + service = LCMService(autoconf=False) + service.start() + + # Verify thread is running + assert service._thread is not None + assert service._thread.is_alive() + + service.stop() + + # Give the thread a moment to stop + time.sleep(0.1) + assert not service._thread.is_alive() + + def test_start_calls_configure_system(self) -> None: + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + with patch("dimos.protocol.service.lcmservice.autoconf") as mock_configure: + service = LCMService(autoconf=True) + service.start() + + # With autoconf=True, check_only should be False + mock_configure.assert_called_once_with(check_only=False) + + service.stop() + + def test_start_with_autoconf_disabled(self) -> None: + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + with patch("dimos.protocol.service.lcmservice.autoconf") as mock_configure: + service = LCMService(autoconf=False) + service.start() + + # With autoconf=False, check_only should be True + mock_configure.assert_called_once_with(check_only=True) + + service.stop() + + def test_getstate_excludes_unpicklable_attrs(self) -> None: + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + service = LCMService() + state = service.__getstate__() + + assert "l" not in state + assert "_stop_event" not in state + assert "_thread" not in state + assert "_l_lock" not in state + assert "_call_thread_pool" not in state + assert "_call_thread_pool_lock" not in state + + def test_setstate_reinitializes_runtime_attrs(self) -> None: + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + service = LCMService() + state = service.__getstate__() + + # Simulate unpickling + new_service = object.__new__(LCMService) + new_service.__setstate__(state) + + assert new_service.l is None + assert isinstance(new_service._stop_event, threading.Event) + assert new_service._thread is None + # threading.Lock is a factory function, not a type + # Just check that the lock exists and has acquire/release methods + assert hasattr(new_service._l_lock, "acquire") + assert hasattr(new_service._l_lock, "release") + + def test_start_reinitializes_lcm_after_unpickling(self) -> None: + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + with patch("dimos.protocol.service.lcmservice.autoconf"): + service = LCMService() + state = service.__getstate__() + + # Simulate unpickling + new_service = object.__new__(LCMService) + new_service.__setstate__(state) + + # Start should reinitialize LCM + new_service.start() + + # LCM should be created again + assert mock_lcm_class.call_count == 2 + + new_service.stop() + + def test_stop_cleans_up_lcm_instance(self) -> None: + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + with patch("dimos.protocol.service.lcmservice.autoconf"): + service = LCMService() + service.start() + service.stop() + + # LCM instance should be cleaned up when we created it + assert service.l is None + + def test_stop_preserves_external_lcm_instance(self) -> None: + mock_lcm_instance = MagicMock() + + with patch("dimos.protocol.service.lcmservice.autoconf"): + # Pass lcm as kwarg + service = LCMService(lcm=mock_lcm_instance) + service.start() + service.stop() + + # External LCM instance should not be cleaned up + assert service.l == mock_lcm_instance + + def test_get_call_thread_pool_creates_pool(self) -> None: + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + service = LCMService() + assert service._call_thread_pool is None + + pool = service._get_call_thread_pool() + assert pool is not None + assert service._call_thread_pool == pool + + # Should return same pool on subsequent calls + pool2 = service._get_call_thread_pool() + assert pool2 is pool + + # Clean up + pool.shutdown(wait=False) + + def test_stop_shuts_down_thread_pool(self) -> None: + with patch("dimos.protocol.service.lcmservice.lcm.LCM") as mock_lcm_class: + mock_lcm_instance = MagicMock() + mock_lcm_class.return_value = mock_lcm_instance + + with patch("dimos.protocol.service.lcmservice.autoconf"): + service = LCMService() + service.start() + + # Create thread pool + pool = service._get_call_thread_pool() + assert pool is not None + + service.stop() + + # Pool should be cleaned up + assert service._call_thread_pool is None diff --git a/dimos/protocol/service/test_system_configurator.py b/dimos/protocol/service/test_system_configurator.py new file mode 100644 index 0000000000..22bb662044 --- /dev/null +++ b/dimos/protocol/service/test_system_configurator.py @@ -0,0 +1,483 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import resource +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from dimos.protocol.service.system_configurator import ( + IDEAL_RMEM_SIZE, + BufferConfiguratorLinux, + BufferConfiguratorMacOS, + MaxFileConfiguratorMacOS, + MulticastConfiguratorLinux, + MulticastConfiguratorMacOS, + SystemConfigurator, + _is_root_user, + _read_sysctl_int, + _write_sysctl_int, + configure_system, + sudo_run, +) + +# ----------------------------- Helper function tests ----------------------------- + + +class TestIsRootUser: + def test_is_root_when_euid_is_zero(self) -> None: + # Clear the cache before testing + _is_root_user.cache_clear() + with patch("os.geteuid", return_value=0): + assert _is_root_user() is True + + def test_is_not_root_when_euid_is_nonzero(self) -> None: + _is_root_user.cache_clear() + with patch("os.geteuid", return_value=1000): + assert _is_root_user() is False + + def test_returns_false_when_geteuid_not_available(self) -> None: + _is_root_user.cache_clear() + with patch("os.geteuid", side_effect=AttributeError): + assert _is_root_user() is False + + +class TestSudoRun: + def test_runs_without_sudo_when_root(self) -> None: + _is_root_user.cache_clear() + with patch("os.geteuid", return_value=0): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + sudo_run("echo", "hello", check=True) + mock_run.assert_called_once_with(["echo", "hello"], check=True) + + def test_runs_with_sudo_when_not_root(self) -> None: + _is_root_user.cache_clear() + with patch("os.geteuid", return_value=1000): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + sudo_run("echo", "hello", check=True) + mock_run.assert_called_once_with(["sudo", "echo", "hello"], check=True) + + +class TestReadSysctlInt: + def test_reads_value_with_equals_sign(self) -> None: + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="net.core.rmem_max = 67108864") + result = _read_sysctl_int("net.core.rmem_max") + assert result == 67108864 + + def test_reads_value_with_colon(self) -> None: + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="kern.ipc.maxsockbuf: 8388608") + result = _read_sysctl_int("kern.ipc.maxsockbuf") + assert result == 8388608 + + def test_returns_none_on_nonzero_returncode(self) -> None: + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=1, stderr="error") + result = _read_sysctl_int("net.core.rmem_max") + assert result is None + + def test_returns_none_on_malformed_output(self) -> None: + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout="invalid output") + result = _read_sysctl_int("net.core.rmem_max") + assert result is None + + def test_returns_none_on_exception(self) -> None: + with patch("subprocess.run", side_effect=Exception("Command failed")): + result = _read_sysctl_int("net.core.rmem_max") + assert result is None + + +class TestWriteSysctlInt: + def test_calls_sudo_run_with_correct_args(self) -> None: + _is_root_user.cache_clear() + with patch("os.geteuid", return_value=1000): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + _write_sysctl_int("net.core.rmem_max", 67108864) + mock_run.assert_called_once_with( + ["sudo", "sysctl", "-w", "net.core.rmem_max=67108864"], + check=True, + text=True, + capture_output=False, + ) + + +# ----------------------------- configure_system tests ----------------------------- + + +class MockConfigurator(SystemConfigurator): + """A mock configurator for testing configure_system.""" + + def __init__(self, passes: bool = True, is_critical: bool = False) -> None: + self._passes = passes + self.critical = is_critical + self.fix_called = False + + def check(self) -> bool: + return self._passes + + def explanation(self) -> str | None: + if self._passes: + return None + return "Mock explanation" + + def fix(self) -> None: + self.fix_called = True + + +class TestConfigureSystem: + def test_skips_in_ci_environment(self) -> None: + with patch.dict(os.environ, {"CI": "true"}): + mock_check = MockConfigurator(passes=False) + configure_system([mock_check]) + assert not mock_check.fix_called + + def test_does_nothing_when_all_checks_pass(self) -> None: + with patch.dict(os.environ, {"CI": ""}, clear=False): + mock_check = MockConfigurator(passes=True) + configure_system([mock_check]) + assert not mock_check.fix_called + + def test_check_only_mode_does_not_fix(self) -> None: + with patch.dict(os.environ, {"CI": ""}, clear=False): + mock_check = MockConfigurator(passes=False) + configure_system([mock_check], check_only=True) + assert not mock_check.fix_called + + def test_prompts_user_and_fixes_on_yes(self) -> None: + with patch.dict(os.environ, {"CI": ""}, clear=False): + mock_check = MockConfigurator(passes=False) + with patch("builtins.input", return_value="y"): + configure_system([mock_check]) + assert mock_check.fix_called + + def test_does_not_fix_on_no(self) -> None: + with patch.dict(os.environ, {"CI": ""}, clear=False): + mock_check = MockConfigurator(passes=False) + with patch("builtins.input", return_value="n"): + configure_system([mock_check]) + assert not mock_check.fix_called + + def test_exits_on_no_with_critical_check(self) -> None: + with patch.dict(os.environ, {"CI": ""}, clear=False): + mock_check = MockConfigurator(passes=False, is_critical=True) + with patch("builtins.input", return_value="n"): + with pytest.raises(SystemExit) as exc_info: + configure_system([mock_check]) + assert exc_info.value.code == 1 + + def test_handles_eof_error_on_input(self) -> None: + with patch.dict(os.environ, {"CI": ""}, clear=False): + mock_check = MockConfigurator(passes=False) + with patch("builtins.input", side_effect=EOFError): + configure_system([mock_check]) + assert not mock_check.fix_called + + +# ----------------------------- MulticastConfiguratorLinux tests ----------------------------- + + +class TestMulticastConfiguratorLinux: + def test_check_returns_true_when_fully_configured(self) -> None: + configurator = MulticastConfiguratorLinux() + with patch("subprocess.run") as mock_run: + mock_run.side_effect = [ + MagicMock(returncode=0), # ip -V + MagicMock( + returncode=0, + stdout="1: lo: mtu 65536", + ), + MagicMock(returncode=0, stdout="224.0.0.0/4 dev lo scope link"), + ] + assert configurator.check() is True + assert configurator.loopback_ok is True + assert configurator.route_ok is True + + def test_check_returns_false_when_multicast_flag_missing(self) -> None: + configurator = MulticastConfiguratorLinux() + with patch("subprocess.run") as mock_run: + mock_run.side_effect = [ + MagicMock(returncode=0), # ip -V + MagicMock(returncode=0, stdout="1: lo: mtu 65536"), + MagicMock(returncode=0, stdout="224.0.0.0/4 dev lo scope link"), + ] + assert configurator.check() is False + assert configurator.loopback_ok is False + assert configurator.route_ok is True + + def test_check_returns_false_when_route_missing(self) -> None: + configurator = MulticastConfiguratorLinux() + with patch("subprocess.run") as mock_run: + mock_run.side_effect = [ + MagicMock(returncode=0), # ip -V + MagicMock( + returncode=0, + stdout="1: lo: mtu 65536", + ), + MagicMock(returncode=0, stdout=""), # Empty - no route + ] + assert configurator.check() is False + assert configurator.loopback_ok is True + assert configurator.route_ok is False + + def test_check_returns_false_when_ip_not_found(self) -> None: + configurator = MulticastConfiguratorLinux() + with patch("subprocess.run", side_effect=FileNotFoundError): + assert configurator.check() is False + assert configurator.loopback_ok is False + assert configurator.route_ok is False + + def test_explanation_includes_needed_commands(self) -> None: + configurator = MulticastConfiguratorLinux() + configurator.loopback_ok = False + configurator.route_ok = False + explanation = configurator.explanation() + assert "ip link set lo multicast on" in explanation + assert "ip route add 224.0.0.0/4 dev lo" in explanation + + def test_fix_runs_needed_commands(self) -> None: + _is_root_user.cache_clear() + configurator = MulticastConfiguratorLinux() + configurator.loopback_ok = False + configurator.route_ok = False + with patch("os.geteuid", return_value=0): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + configurator.fix() + assert mock_run.call_count == 2 + + +# ----------------------------- MulticastConfiguratorMacOS tests ----------------------------- + + +class TestMulticastConfiguratorMacOS: + def test_check_returns_true_when_route_exists(self) -> None: + configurator = MulticastConfiguratorMacOS() + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock( + returncode=0, + stdout="224.0.0.0/4 link#1 UCS lo0", + ) + assert configurator.check() is True + + def test_check_returns_false_when_route_missing(self) -> None: + configurator = MulticastConfiguratorMacOS() + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock( + returncode=0, stdout="default 192.168.1.1 UGScg en0" + ) + assert configurator.check() is False + + def test_check_returns_false_on_netstat_error(self) -> None: + configurator = MulticastConfiguratorMacOS() + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=1, stderr="error") + assert configurator.check() is False + + def test_explanation_includes_route_command(self) -> None: + configurator = MulticastConfiguratorMacOS() + explanation = configurator.explanation() + assert "route add -net 224.0.0.0/4 -interface lo0" in explanation + + def test_fix_runs_route_command(self) -> None: + _is_root_user.cache_clear() + configurator = MulticastConfiguratorMacOS() + with patch("os.geteuid", return_value=0): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + configurator.fix() + mock_run.assert_called_once() + args = mock_run.call_args[0][0] + assert "route" in args + assert "224.0.0.0/4" in args + + +# ----------------------------- BufferConfiguratorLinux tests ----------------------------- + + +class TestBufferConfiguratorLinux: + def test_check_returns_true_when_buffers_sufficient(self) -> None: + configurator = BufferConfiguratorLinux() + with patch("dimos.protocol.service.system_configurator._read_sysctl_int") as mock_read: + mock_read.return_value = IDEAL_RMEM_SIZE + assert configurator.check() is True + assert configurator.needs == [] + + def test_check_returns_false_when_rmem_max_low(self) -> None: + configurator = BufferConfiguratorLinux() + with patch("dimos.protocol.service.system_configurator._read_sysctl_int") as mock_read: + mock_read.side_effect = [1048576, IDEAL_RMEM_SIZE] # rmem_max low + assert configurator.check() is False + assert len(configurator.needs) == 1 + assert configurator.needs[0][0] == "net.core.rmem_max" + + def test_check_returns_false_when_both_low(self) -> None: + configurator = BufferConfiguratorLinux() + with patch("dimos.protocol.service.system_configurator._read_sysctl_int") as mock_read: + mock_read.return_value = 1048576 # Both low + assert configurator.check() is False + assert len(configurator.needs) == 2 + + def test_explanation_lists_needed_changes(self) -> None: + configurator = BufferConfiguratorLinux() + configurator.needs = [("net.core.rmem_max", IDEAL_RMEM_SIZE)] + explanation = configurator.explanation() + assert "net.core.rmem_max" in explanation + assert str(IDEAL_RMEM_SIZE) in explanation + + def test_fix_writes_needed_values(self) -> None: + configurator = BufferConfiguratorLinux() + configurator.needs = [("net.core.rmem_max", IDEAL_RMEM_SIZE)] + with patch("dimos.protocol.service.system_configurator._write_sysctl_int") as mock_write: + configurator.fix() + mock_write.assert_called_once_with("net.core.rmem_max", IDEAL_RMEM_SIZE) + + +# ----------------------------- BufferConfiguratorMacOS tests ----------------------------- + + +class TestBufferConfiguratorMacOS: + def test_check_returns_true_when_buffers_sufficient(self) -> None: + configurator = BufferConfiguratorMacOS() + with patch("dimos.protocol.service.system_configurator._read_sysctl_int") as mock_read: + mock_read.side_effect = [ + BufferConfiguratorMacOS.TARGET_BUFFER_SIZE, + BufferConfiguratorMacOS.TARGET_RECVSPACE, + BufferConfiguratorMacOS.TARGET_DGRAM_SIZE, + ] + assert configurator.check() is True + assert configurator.needs == [] + + def test_check_returns_false_when_values_low(self) -> None: + configurator = BufferConfiguratorMacOS() + with patch("dimos.protocol.service.system_configurator._read_sysctl_int") as mock_read: + mock_read.return_value = 1024 # All low + assert configurator.check() is False + assert len(configurator.needs) == 3 + + def test_explanation_lists_needed_changes(self) -> None: + configurator = BufferConfiguratorMacOS() + configurator.needs = [ + ("kern.ipc.maxsockbuf", BufferConfiguratorMacOS.TARGET_BUFFER_SIZE), + ] + explanation = configurator.explanation() + assert "kern.ipc.maxsockbuf" in explanation + + def test_fix_writes_needed_values(self) -> None: + configurator = BufferConfiguratorMacOS() + configurator.needs = [ + ("kern.ipc.maxsockbuf", BufferConfiguratorMacOS.TARGET_BUFFER_SIZE), + ] + with patch("dimos.protocol.service.system_configurator._write_sysctl_int") as mock_write: + configurator.fix() + mock_write.assert_called_once_with( + "kern.ipc.maxsockbuf", BufferConfiguratorMacOS.TARGET_BUFFER_SIZE + ) + + +# ----------------------------- MaxFileConfiguratorMacOS tests ----------------------------- + + +class TestMaxFileConfiguratorMacOS: + def test_check_returns_true_when_soft_limit_sufficient(self) -> None: + configurator = MaxFileConfiguratorMacOS(target=65536) + with patch("resource.getrlimit") as mock_getrlimit: + mock_getrlimit.return_value = (65536, 1048576) + assert configurator.check() is True + assert configurator.current_soft == 65536 + assert configurator.current_hard == 1048576 + + def test_check_returns_false_when_soft_limit_low(self) -> None: + configurator = MaxFileConfiguratorMacOS(target=65536) + with patch("resource.getrlimit") as mock_getrlimit: + mock_getrlimit.return_value = (256, 1048576) + assert configurator.check() is False + assert configurator.can_fix_without_sudo is True + + def test_check_returns_false_when_both_limits_low(self) -> None: + configurator = MaxFileConfiguratorMacOS(target=65536) + with patch("resource.getrlimit") as mock_getrlimit: + mock_getrlimit.return_value = (256, 10240) + assert configurator.check() is False + assert configurator.can_fix_without_sudo is False + + def test_check_returns_false_on_exception(self) -> None: + configurator = MaxFileConfiguratorMacOS(target=65536) + with patch("resource.getrlimit", side_effect=Exception("error")): + assert configurator.check() is False + + def test_explanation_when_sudo_not_needed(self) -> None: + configurator = MaxFileConfiguratorMacOS(target=65536) + configurator.current_soft = 256 + configurator.current_hard = 1048576 + configurator.can_fix_without_sudo = True + explanation = configurator.explanation() + assert "65536" in explanation + assert "no sudo" in explanation.lower() or "Raise soft" in explanation + + def test_explanation_when_sudo_needed(self) -> None: + configurator = MaxFileConfiguratorMacOS(target=65536) + configurator.current_soft = 256 + configurator.current_hard = 10240 + configurator.can_fix_without_sudo = False + explanation = configurator.explanation() + assert "launchctl" in explanation + + def test_fix_raises_soft_limit_without_sudo(self) -> None: + configurator = MaxFileConfiguratorMacOS(target=65536) + configurator.current_soft = 256 + configurator.current_hard = 1048576 + configurator.can_fix_without_sudo = True + with patch("resource.setrlimit") as mock_setrlimit: + configurator.fix() + mock_setrlimit.assert_called_once_with(resource.RLIMIT_NOFILE, (65536, 1048576)) + + def test_fix_does_nothing_when_already_sufficient(self) -> None: + configurator = MaxFileConfiguratorMacOS(target=65536) + configurator.current_soft = 65536 + configurator.current_hard = 1048576 + with patch("resource.setrlimit") as mock_setrlimit: + configurator.fix() + mock_setrlimit.assert_not_called() + + def test_fix_uses_launchctl_when_hard_limit_low(self) -> None: + _is_root_user.cache_clear() + configurator = MaxFileConfiguratorMacOS(target=65536) + configurator.current_soft = 256 + configurator.current_hard = 10240 + configurator.can_fix_without_sudo = False + with patch("os.geteuid", return_value=0): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + with patch("resource.setrlimit"): + configurator.fix() + # Check launchctl was called + args = mock_run.call_args[0][0] + assert "launchctl" in args + assert "maxfiles" in args + + def test_fix_raises_on_setrlimit_error(self) -> None: + configurator = MaxFileConfiguratorMacOS(target=65536) + configurator.current_soft = 256 + configurator.current_hard = 1048576 + configurator.can_fix_without_sudo = True + with patch("resource.setrlimit", side_effect=ValueError("test error")): + with pytest.raises(ValueError): + configurator.fix()