Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/lfs_push
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ for dir_path in data/*; do
compressed_dirs+=("$dir_name")

# Add the compressed file to git LFS tracking
git add "$compressed_file"
git add -f "$compressed_file"

echo -e " ${GREEN}✓${NC} git-add $compressed_file"

Expand Down
8 changes: 4 additions & 4 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def __str__(self) -> str:
class pLCMTransport(PubSubTransport[T]):
_started: bool = False

def __init__(self, topic: str):
def __init__(self, topic: str, **kwargs):
super().__init__(topic)
self.lcm = pickleLCM()
self.lcm = pickleLCM(**kwargs)

def __reduce__(self):
return (pLCMTransport, (self.topic,))
Expand All @@ -78,9 +78,9 @@ def subscribe(self, selfstream: In[T], callback: Callable[[T], None]) -> None:
class LCMTransport(PubSubTransport[T]):
_started: bool = False

def __init__(self, topic: str, type: type):
def __init__(self, topic: str, type: type, **kwargs):
super().__init__(LCMTopic(topic, type))
self.lcm = LCM()
self.lcm = LCM(**kwargs)

def __reduce__(self):
return (LCMTransport, (self.topic.topic, self.topic.lcm_type))
Expand Down
1 change: 1 addition & 0 deletions dimos/protocol/pubsub/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
import dimos.protocol.pubsub.lcmpubsub as lcm
from dimos.protocol.pubsub.memory import Memory
from dimos.protocol.pubsub.spec import PubSub
114 changes: 98 additions & 16 deletions dimos/protocol/pubsub/lcmpubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

from __future__ import annotations

import os
import subprocess
import sys
import threading
import traceback
from dataclasses import dataclass
Expand All @@ -26,13 +27,101 @@
from dimos.protocol.service.spec import Service


def check_multicast() -> list[str]:
"""Check if multicast configuration is needed and return required commands."""
commands_needed = []

# Check if loopback interface has multicast enabled
try:
result = subprocess.run(["ip", "link", "show", "lo"], capture_output=True, text=True)
if "MULTICAST" not in result.stdout:
commands_needed.append("sudo ifconfig lo multicast")
except Exception:
commands_needed.append("sudo ifconfig lo multicast")

# Check if multicast route exists
try:
result = subprocess.run(
["ip", "route", "show", "224.0.0.0/4"], capture_output=True, text=True
)
if not result.stdout.strip():
commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo")
except Exception:
commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo")

return commands_needed


def check_buffers() -> list[str]:
"""Check if buffer configuration is needed and return required commands."""
commands_needed = []

# Check current buffer settings
try:
result = subprocess.run(["sysctl", "net.core.rmem_max"], capture_output=True, text=True)
current_max = int(result.stdout.split("=")[1].strip())
if current_max < 2097152:
commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152")
except Exception:
commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152")

try:
result = subprocess.run(["sysctl", "net.core.rmem_default"], capture_output=True, text=True)
current_default = int(result.stdout.split("=")[1].strip())
if current_default < 2097152:
commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152")
except Exception:
commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152")

return commands_needed


def check_system() -> None:
"""Check if system configuration is needed and exit with required commands if not prepared."""
commands_needed = []
commands_needed.extend(check_multicast())
commands_needed.extend(check_buffers())

if commands_needed:
print("System configuration required. Please run the following commands:")
for cmd in commands_needed:
print(f" {cmd}")
print("\nThen restart your application.")
sys.exit(1)


def autoconf() -> None:
"""Auto-configure system by running checks and executing required commands if needed."""
commands_needed = []
commands_needed.extend(check_multicast())
commands_needed.extend(check_buffers())

if not commands_needed:
return

print("System configuration required. Executing commands...")
for cmd in commands_needed:
print(f" Running: {cmd}")
try:
# Split command into parts for subprocess
cmd_parts = cmd.split()
result = subprocess.run(cmd_parts, capture_output=True, text=True, check=True)
print(" ✓ Success")
except subprocess.CalledProcessError as e:
print(f" ✗ Failed: {e}")
print(f" stdout: {e.stdout}")
print(f" stderr: {e.stderr}")
except Exception as e:
print(f" ✗ Error: {e}")

print("System configuration completed.")


@dataclass
class LCMConfig:
ttl: int = 0
url: str | None = None
# auto configure routing
auto_configure_multicast: bool = True
auto_configure_buffers: bool = False
autoconf: bool = False


@runtime_checkable
Expand Down Expand Up @@ -89,20 +178,13 @@ def unsubscribe():
return unsubscribe

def start(self):
# TODO: proper error handling/log messages for these system calls
if self.config.auto_configure_multicast:
try:
os.system("sudo ifconfig lo multicast")
os.system("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo")
except Exception as e:
print(f"Error configuring multicast: {e}")

if self.config.auto_configure_buffers:
if self.config.autoconf:
autoconf()
else:
try:
os.system("sudo sysctl -w net.core.rmem_max=2097152")
os.system("sudo sysctl -w net.core.rmem_default=2097152")
check_system()
except Exception as e:
print(f"Error configuring buffers: {e}")
print(f"Error checking system configuration: {e}")

self._stop_event.clear()
self._thread = threading.Thread(target=self._loop)
Expand Down
File renamed without changes.
Loading