Skip to content
6 changes: 5 additions & 1 deletion dimos/protocol/pubsub/lcmpubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable

from dimos.protocol.pubsub.spec import PickleEncoderMixin, PubSub, PubSubEncoderMixin
from dimos.protocol.service.lcmservice import LCMConfig, LCMService, autoconf
from dimos.protocol.service.lcmservice import (
LCMConfig,
LCMService,
autoconf,
)
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
Expand Down
241 changes: 36 additions & 205 deletions dimos/protocol/service/lcmservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,220 +16,55 @@

from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from functools import cache
import os
import platform
import subprocess
import sys
import threading
import traceback
from typing import Protocol, runtime_checkable

import lcm

from dimos.protocol.service.spec import Service
from dimos.protocol.service.system_configurator import (
BufferConfiguratorLinux,
BufferConfiguratorMacOS,
MaxFileConfiguratorMacOS,
MulticastConfiguratorLinux,
MulticastConfiguratorMacOS,
SystemConfigurator,
configure_system,
)
from dimos.utils.logging_config import setup_logger

logger = setup_logger()

_DEFAULT_LCM_HOST = "239.255.76.67"
_DEFAULT_LCM_PORT = "7667"
# LCM_DEFAULT_URL is used by LCM (we didn't pick that env var name)
_DEFAULT_LCM_URL = os.getenv(
"LCM_DEFAULT_URL", f"udpm://{_DEFAULT_LCM_HOST}:{_DEFAULT_LCM_PORT}?ttl=0"
)

@cache
def check_root() -> bool:
"""Return True if the current process is running as root (UID 0)."""
try:
return os.geteuid() == 0
except AttributeError:
# Platforms without geteuid (e.g. Windows) – assume non-root.
return False


def check_multicast() -> list[str]:
"""Check if multicast configuration is needed and return required commands."""
commands_needed = []

sudo = "" if check_root() else "sudo "

def autoconf(check_only: bool = False) -> None:
# check multicast and buffer sizes
system = platform.system()

checks: list[SystemConfigurator] = []
if system == "Linux":
# Linux commands
loopback_interface = "lo"
# Check if loopback interface has multicast enabled
try:
result = subprocess.run(
["ip", "link", "show", loopback_interface], capture_output=True, text=True
)
if "MULTICAST" not in result.stdout:
commands_needed.append(f"{sudo}ip l set {loopback_interface} multicast on")
except Exception:
commands_needed.append(f"{sudo}ip l set {loopback_interface} multicast on")

# Check if multicast route exists
try:
result = subprocess.run(
["ip", "route", "show", "224.0.0.0/4"], capture_output=True, text=True
)
if not result.stdout.strip():
commands_needed.append(f"{sudo}ip route add 224.0.0.0/4 dev {loopback_interface}")
except Exception:
commands_needed.append(f"{sudo}ip route add 224.0.0.0/4 dev {loopback_interface}")

elif system == "Darwin": # macOS
loopback_interface = "lo0"
# Check if multicast route exists
try:
result = subprocess.run(["netstat", "-nr"], capture_output=True, text=True)
route_exists = "224.0.0.0/4" in result.stdout or "224.0.0/4" in result.stdout
if not route_exists:
commands_needed.append(f"{sudo}ip route add 224.0.0.0/4 dev {loopback_interface}")
except Exception:
commands_needed.append(f"{sudo}ip route add 224.0.0.0/4 dev {loopback_interface}")

checks = [
MulticastConfiguratorLinux(loopback_interface="lo"),
BufferConfiguratorLinux(),
]
elif system == "Darwin":
checks = [
MulticastConfiguratorMacOS(loopback_interface="lo0"),
BufferConfiguratorMacOS(),
MaxFileConfiguratorMacOS(),
]
else:
# For other systems, skip multicast configuration
logger.warning(f"Multicast configuration not supported on {system}")

return commands_needed


def _set_net_value(commands_needed: list[str], sudo: str, name: str, value: int) -> int | None:
try:
result = subprocess.run(["sysctl", name], capture_output=True, text=True)
if result.returncode == 0:
current = int(result.stdout.replace(":", "=").split("=")[1].strip())
else:
current = None
if not current or current < value:
commands_needed.append(f"{sudo}sysctl -w {name}={value}")
return current
except:
commands_needed.append(f"{sudo}sysctl -w {name}={value}")
return None


TARGET_RMEM_SIZE = 67108864
TARGET_MAX_SOCKET_BUFFER_SIZE_MACOS = 8388608
TARGET_MAX_DGRAM_SIZE_MACOS = 65535


def check_buffers() -> tuple[list[str], int | None]:
"""Check if buffer configuration is needed and return required commands and current size.

Returns:
Tuple of (commands_needed, current_max_buffer_size)
"""
commands_needed: list[str] = []
current_max = None

sudo = "" if check_root() else "sudo "
system = platform.system()

if system == "Linux":
# Linux buffer configuration
current_max = _set_net_value(commands_needed, sudo, "net.core.rmem_max", TARGET_RMEM_SIZE)
_set_net_value(commands_needed, sudo, "net.core.rmem_default", TARGET_RMEM_SIZE)
elif system == "Darwin": # macOS
# macOS buffer configuration - check and set UDP buffer related sysctls
current_max = _set_net_value(
commands_needed, sudo, "kern.ipc.maxsockbuf", TARGET_MAX_SOCKET_BUFFER_SIZE_MACOS
)
_set_net_value(commands_needed, sudo, "net.inet.udp.recvspace", TARGET_RMEM_SIZE)
_set_net_value(commands_needed, sudo, "net.inet.udp.maxdgram", TARGET_MAX_DGRAM_SIZE_MACOS)
else:
# For other systems, skip buffer configuration
logger.warning(f"Buffer configuration not supported on {system}")

return commands_needed, current_max


def check_system() -> None:
"""Check if system configuration is needed and exit only for critical issues.

Multicast configuration is critical for LCM to work.
Buffer sizes are performance optimizations - warn but don't fail in containers.
"""
if os.environ.get("CI"):
logger.debug("CI environment detected: Skipping system configuration checks.")
return

multicast_commands = check_multicast()
buffer_commands, current_buffer_size = check_buffers()

# Multicast configuration
if multicast_commands:
logger.error(
"Critical: Multicast configuration required. Please run the following commands:"
)
for cmd in multicast_commands:
logger.error(f" {cmd}")
logger.error("\nThen restart your application.")
sys.exit(1)

# Buffer configuration is critical for throughput and packet loss
elif buffer_commands:
if current_buffer_size:
logger.warning(
f"UDP buffer size limited to {current_buffer_size} bytes ({current_buffer_size // 1024}KB). Large LCM packets may fail."
)
else:
logger.warning("UDP buffer sizes are limited. Large LCM packets may fail.")
logger.warning("For better performance, consider running:")
for cmd in buffer_commands:
logger.warning(f" {cmd}")
logger.warning("Note: This may not be possible in Docker containers.")


def autoconf() -> None:
"""Auto-configure system by running checks and executing required commands if needed."""
if os.environ.get("CI"):
logger.info("CI environment detected: Skipping automatic system configuration.")
return

platform.system()

commands_needed = []

# Check multicast configuration
commands_needed.extend(check_multicast())

# Check buffer configuration
buffer_commands, _ = check_buffers()
commands_needed.extend(buffer_commands)

if not commands_needed:
logger.error(f"System configuration not supported on {system}")
return

logger.info("System configuration required. Executing commands...")

for cmd in commands_needed:
logger.info(f" Running: {cmd}")
try:
# Split command into parts for subprocess
cmd_parts = cmd.split()
subprocess.run(cmd_parts, capture_output=True, text=True, check=True)
logger.info(" ✓ Success")
except subprocess.CalledProcessError as e:
# Check if this is a multicast/route command or a sysctl command
if "route" in cmd or "multicast" in cmd:
# Multicast/route failures should still fail
logger.error(f" ✗ Failed to configure multicast: {e}")
logger.error(f" stdout: {e.stdout}")
logger.error(f" stderr: {e.stderr}")
raise
elif "sysctl" in cmd:
# Sysctl failures are just warnings (likely docker/container)
logger.warning(
f" ✗ Not able to auto-configure UDP buffer sizes (likely docker image): {e}"
)
except Exception as e:
logger.error(f" ✗ Error: {e}")
if "route" in cmd or "multicast" in cmd:
raise

logger.info("System configuration completed.")


_DEFAULT_LCM_URL_MACOS = "udpm://239.255.76.67:7667?ttl=0"
configure_system(checks, check_only=check_only)


@dataclass
Expand All @@ -240,9 +75,8 @@ class LCMConfig:
lcm: lcm.LCM | None = None

def __post_init__(self) -> None:
if self.url is None and platform.system() == "Darwin":
# On macOS, use multicast with TTL=0 to keep traffic local
self.url = _DEFAULT_LCM_URL_MACOS
if self.url is None:
self.url = _DEFAULT_LCM_URL
Copy link
Member Author

@jeff-hykin jeff-hykin Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this confirmed to be the default LCM for all OS's. I wanted to get it from lcm directly but it seems they don't expose it



@runtime_checkable
Expand Down Expand Up @@ -326,13 +160,10 @@ def start(self) -> None:
else:
self.l = lcm.LCM(self.config.url) if self.config.url else lcm.LCM()

if self.config.autoconf:
autoconf()
else:
try:
check_system()
except Exception as e:
print(f"Error checking system configuration: {e}")
try:
autoconf(check_only=not self.config.autoconf)
except Exception as e:
print(f"Error checking system configuration: {e}")

self._stop_event.clear()
self._thread = threading.Thread(target=self._lcm_loop)
Expand Down
Loading
Loading