From 340bf2407d9e15ce58c6f9c785ad91588bf4db00 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 16:02:18 -0700 Subject: [PATCH 1/8] lcmspy cli --- bin/lcmspy | 7 ++ dimos/utils/cli/lcmspy.py | 196 +++++++++++++++++++++++++++++ dimos/utils/cli/lcmspy_cli.py | 130 ++++++++++++++++++++ dimos/utils/cli/test_lcmspy.py | 217 +++++++++++++++++++++++++++++++++ pyproject.toml | 5 +- 5 files changed, 553 insertions(+), 2 deletions(-) create mode 100755 bin/lcmspy create mode 100755 dimos/utils/cli/lcmspy.py create mode 100644 dimos/utils/cli/lcmspy_cli.py create mode 100644 dimos/utils/cli/test_lcmspy.py diff --git a/bin/lcmspy b/bin/lcmspy new file mode 100755 index 0000000000..0efa14fce3 --- /dev/null +++ b/bin/lcmspy @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +# current script dir + ..dimos + + +script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +python $script_dir/../dimos/utils/cli/lcmspy_cli.py "$@" diff --git a/dimos/utils/cli/lcmspy.py b/dimos/utils/cli/lcmspy.py new file mode 100755 index 0000000000..d0345891f0 --- /dev/null +++ b/dimos/utils/cli/lcmspy.py @@ -0,0 +1,196 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import time +from collections import deque +from dataclasses import dataclass +from enum import Enum + +import lcm + +from dimos.protocol.service.lcmservice import LCMConfig, LCMService + + +class BandwidthUnit(Enum): + BPS = "B/s" + KBPS = "kB/s" + MBPS = "MB/s" + GBPS = "GB/s" + + +class Topic: + history_window: float = 60.0 + + def __init__(self, name: str, history_window: float = 60.0): + self.name = name + # Store (timestamp, data_size) tuples for statistics + self.message_history = deque() + self.history_window = history_window + + def msg(self, data: bytes): + # print(f"> msg {self.__str__()} {len(data)} bytes") + datalen = len(data) + self.message_history.append((time.time(), datalen)) + self._cleanup_old_messages() + + def _cleanup_old_messages(self, max_age: float = None): + """Remove messages older than max_age seconds""" + current_time = time.time() + while self.message_history and current_time - self.message_history[0][0] > ( + max_age or self.history_window + ): + self.message_history.popleft() + + def _get_messages_in_window(self, time_window: float): + """Get messages within the specified time window""" + current_time = time.time() + cutoff_time = current_time - time_window + return [(ts, size) for ts, size in self.message_history if ts >= cutoff_time] + + # avg msg freq in the last n seconds + def freq(self, time_window: float) -> float: + messages = self._get_messages_in_window(time_window) + if not messages: + return 0.0 + return len(messages) / time_window + + # avg bandwidth in kB/s in the last n seconds + def kbps(self, time_window: float) -> float: + messages = self._get_messages_in_window(time_window) + if not messages: + return 0.0 + total_bytes = sum(size for _, size in messages) + total_kbytes = total_bytes / 1000 # Convert bytes to kB + return total_kbytes / time_window + + def kbps_hr(self, time_window: float, round_to: int = 2) -> tuple[float, BandwidthUnit]: + """Return human-readable bandwidth with appropriate units""" + kbps_val = self.kbps(time_window) + + if kbps_val >= 1024: + return round(kbps_val / 1024, round_to), BandwidthUnit.MBPS + elif kbps_val >= 1: + return round(kbps_val, round_to), BandwidthUnit.KBPS + else: + # Convert to B/s for small values + bps = kbps_val * 1000 + return round(bps, round_to), BandwidthUnit.BPS + + # avg msg size in the last n seconds + def size(self, time_window: float) -> float: + messages = self._get_messages_in_window(time_window) + if not messages: + return 0.0 + total_size = sum(size for _, size in messages) + return total_size / len(messages) + + def __str__(self): + return f"topic({self.name})" + + +@dataclass +class LCMSpyConfig(LCMConfig): + topic_history_window: float = 60.0 + + +class LCMSpy(LCMService, Topic): + default_config = LCMSpyConfig + topic = dict[str, Topic] + graph_log_window: float = 1.0 + topic_class: type[Topic] = Topic + + def __init__(self, **kwargs): + super().__init__(**kwargs) + Topic.__init__(self, name="total", history_window=self.config.topic_history_window) + self.topic = {} + self.l = lcm.LCM(self.config.url) if self.config.url else lcm.LCM() + + def start(self): + super().start() + self.l.subscribe("/.*", self.msg) + + def stop(self): + """Stop the LCM spy and clean up resources""" + super().stop() + + def msg(self, topic, data): + Topic.msg(self, data) + + if topic not in self.topic: + print(self.config) + self.topic[topic] = self.topic_class( + topic, history_window=self.config.topic_history_window + ) + self.topic[topic].msg(data) + + +class GraphTopic(Topic): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.freq_history = deque(maxlen=20) + self.bandwidth_history = deque(maxlen=20) + + def update_graphs(self, step_window: float = 1.0): + """Update historical data for graphing""" + freq = self.freq(step_window) + kbps = self.kbps(step_window) + self.freq_history.append(freq) + self.bandwidth_history.append(kbps) + + +@dataclass +class GraphLCMSpyConfig(LCMSpyConfig): + graph_log_window: float = 1.0 + + +class GraphLCMSpy(LCMSpy, GraphTopic): + default_config = GraphLCMSpyConfig + + graph_log_thread: threading.Thread | None = None + graph_log_stop_event: threading.Event = threading.Event() + topic_class: type[Topic] = GraphTopic + + def __init__(self, **kwargs): + super().__init__(**kwargs) + GraphTopic.__init__(self, name="total", history_window=self.config.topic_history_window) + + def start(self): + super().start() + self.graph_log_thread = threading.Thread(target=self.graph_log, daemon=True) + self.graph_log_thread.start() + + def graph_log(self): + while not self.graph_log_stop_event.is_set(): + self.update_graphs(self.config.graph_log_window) # Update global history + for topic in self.topic.values(): + topic.update_graphs(self.config.graph_log_window) + time.sleep(self.config.graph_log_window) + + def stop(self): + """Stop the graph logging and LCM spy""" + self.graph_log_stop_event.set() + if self.graph_log_thread and self.graph_log_thread.is_alive(): + self.graph_log_thread.join(timeout=1.0) + super().stop() + + +if __name__ == "__main__": + lcm_spy = LCMSpy() + lcm_spy.start() + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("LCM Spy stopped.") diff --git a/dimos/utils/cli/lcmspy_cli.py b/dimos/utils/cli/lcmspy_cli.py new file mode 100644 index 0000000000..3e92e6fa13 --- /dev/null +++ b/dimos/utils/cli/lcmspy_cli.py @@ -0,0 +1,130 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import math +import random +import threading +from typing import List + +from rich.text import Text +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.color import Color +from textual.containers import Container +from textual.reactive import reactive +from textual.renderables.sparkline import Sparkline as SparklineRenderable +from textual.widgets import DataTable, Footer, Header, Label, Sparkline + +from dimos.utils.cli.lcmspy import GraphLCMSpy +from dimos.utils.cli.lcmspy import GraphTopic as SpyTopic + + +def gradient(max_value: float, value: float) -> str: + ratio = min(value / max_value, 1.0) + green = Color(0, 255, 0) + red = Color(255, 0, 0) + color = green.blend(red, ratio) + + return color.hex + + +def topic_text(topic_name: str) -> Text: + if "#" in topic_name: + parts = topic_name.split("#", 1) + return Text(parts[0], style="white") + Text("#" + parts[1], style="blue") + + if topic_name[:4] == "/rpc": + return Text(topic_name[:4], style="red") + Text(topic_name[4:], style="white") + + return Text(topic_name, style="white") + + +class LCMSpyApp(App): + """A real-time CLI dashboard for LCM traffic statistics using Textual.""" + + CSS = """ + Screen { + layout: vertical; + } + DataTable { + height: 2fr; + width: 1fr; + border: none; + background: black; + } + """ + + refresh_interval: float = 0.5 # seconds + show_command_palette = reactive(True) + + BINDINGS = [ + ("q", "quit"), + ("ctrl+c", "quit"), + ] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.spy = GraphLCMSpy(autoconf=True, graph_log_window=0.5) + self.table: DataTable | None = None + + def compose(self) -> ComposeResult: + # yield Header() + + self.table = DataTable(zebra_stripes=False, cursor_type=None) + self.table.add_column("Topic", width=30) + self.table.add_column("Freq (Hz)") + self.table.add_column("Bandwidth") + yield self.table + yield Footer() + + def on_mount(self): + self.theme = "flexoki" + self.spy.start() + self.set_interval(self.refresh_interval, self.refresh_table) + + async def on_unmount(self): + self.spy.stop() + + def refresh_table(self): + topics: List[SpyTopic] = list(self.spy.topic.values()) + topics.sort(key=lambda t: t.kbps(5.0), reverse=True) + self.table.clear(columns=False) + + for t in topics: + freq = t.freq(5.0) + kbps = t.kbps(5.0) + bw_val, bw_unit = t.kbps_hr(5.0) + + self.table.add_row( + topic_text(t.name), + Text(f"{freq:.1f}", style=gradient(10, freq)), + Text(f"{bw_val} {bw_unit.value}", style=gradient(1024 * 3, kbps)), + ) + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1 and sys.argv[1] == "web": + # get script file + import os + + from textual_serve.server import Server + + server = Server(f"python {os.path.abspath(__file__)}") + server.serve() + else: + LCMSpyApp().run() diff --git a/dimos/utils/cli/test_lcmspy.py b/dimos/utils/cli/test_lcmspy.py new file mode 100644 index 0000000000..c491638e34 --- /dev/null +++ b/dimos/utils/cli/test_lcmspy.py @@ -0,0 +1,217 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import pytest + +from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic +from dimos.utils.lcmspy import LCMSpy, GraphLCMSpy, GraphTopic + + +def test_spy_basic(): + lcm = PickleLCM(autoconf=True) + lcm.start() + + lcmspy = LCMSpy() + lcmspy.start() + + video_topic = Topic(topic="/video") + odom_topic = Topic(topic="/odom") + + for i in range(5): + lcm.publish(video_topic, f"video frame {i}") + time.sleep(0.1) + if i % 2 == 0: + lcm.publish(odom_topic, f"odometry data {i / 2}") + + # Wait a bit for messages to be processed + time.sleep(0.5) + + # Test statistics for video topic + video_topic_spy = lcmspy.topic["/video"] + assert video_topic_spy is not None + + # Test frequency (should be around 10 Hz for 5 messages in ~0.5 seconds) + freq = video_topic_spy.freq(1.0) + assert freq > 0 + print(f"Video topic frequency: {freq:.2f} Hz") + + # Test bandwidth + kbps = video_topic_spy.kbps(1.0) + assert kbps > 0 + print(f"Video topic bandwidth: {kbps:.2f} kbps") + + # Test average message size + avg_size = video_topic_spy.size(1.0) + assert avg_size > 0 + print(f"Video topic average message size: {avg_size:.2f} bytes") + + # Test statistics for odom topic + odom_topic_spy = lcmspy.topic["/odom"] + assert odom_topic_spy is not None + + freq = odom_topic_spy.freq(1.0) + assert freq > 0 + print(f"Odom topic frequency: {freq:.2f} Hz") + + kbps = odom_topic_spy.kbps(1.0) + assert kbps > 0 + print(f"Odom topic bandwidth: {kbps:.2f} kbps") + + avg_size = odom_topic_spy.size(1.0) + assert avg_size > 0 + print(f"Odom topic average message size: {avg_size:.2f} bytes") + + print(f"Video topic: {video_topic_spy}") + print(f"Odom topic: {odom_topic_spy}") + + +def test_topic_statistics_direct(): + """Test Topic statistics directly without LCM""" + from dimos.utils.lcmspy import Topic as TopicSpy + + topic = TopicSpy("/test") + + # Add some test messages + test_data = [b"small", b"medium sized message", b"very long message for testing purposes"] + + for i, data in enumerate(test_data): + topic.msg(data) + time.sleep(0.1) # Simulate time passing + + # Test statistics over 1 second window + freq = topic.freq(1.0) + kbps = topic.kbps(1.0) + avg_size = topic.size(1.0) + + assert freq > 0 + assert kbps > 0 + assert avg_size > 0 + + print(f"Direct test - Frequency: {freq:.2f} Hz") + print(f"Direct test - Bandwidth: {kbps:.2f} kbps") + print(f"Direct test - Avg size: {avg_size:.2f} bytes") + + +def test_topic_cleanup(): + """Test that old messages are properly cleaned up""" + from dimos.utils.lcmspy import Topic as TopicSpy + + topic = TopicSpy("/test") + + # Add a message + topic.msg(b"test message") + initial_count = len(topic.message_history) + assert initial_count == 1 + + # Simulate time passing by manually adding old timestamps + old_time = time.time() - 70 # 70 seconds ago + topic.message_history.appendleft((old_time, 10)) + + # Trigger cleanup + topic._cleanup_old_messages(max_age=60.0) + + # Should only have the recent message + assert len(topic.message_history) == 1 + assert topic.message_history[0][0] > time.time() - 10 # Recent message + + +def test_graph_topic_basic(): + """Test GraphTopic basic functionality""" + topic = GraphTopic("/test_graph") + + # Add some messages and update graphs + topic.msg(b"test message") + topic.update_graphs(1.0) + + # Should have history data + assert len(topic.freq_history) == 1 + assert len(topic.bandwidth_history) == 1 + assert topic.freq_history[0] > 0 + assert topic.bandwidth_history[0] > 0 + + +def test_graph_lcmspy_basic(): + """Test GraphLCMSpy basic functionality""" + spy = GraphLCMSpy(graph_log_window=0.1) + spy.start() + time.sleep(0.2) # Wait for thread to start + + # Simulate a message + spy.msg("/test", b"test data") + time.sleep(0.2) # Wait for graph update + + # Should create GraphTopic with history + topic = spy.topic["/test"] + assert isinstance(topic, GraphTopic) + assert len(topic.freq_history) > 0 + assert len(topic.bandwidth_history) > 0 + + spy.stop() + + +def test_lcmspy_global_totals(): + """Test that LCMSpy tracks global totals as a Topic itself""" + spy = LCMSpy() + spy.start() + + # Send messages to different topics + spy.msg("/video", b"video frame data") + spy.msg("/odom", b"odometry data") + spy.msg("/imu", b"imu data") + + # The spy itself should have accumulated all messages + assert len(spy.message_history) == 3 + + # Check global statistics + global_freq = spy.freq(1.0) + global_kbps = spy.kbps(1.0) + global_size = spy.size(1.0) + + assert global_freq > 0 + assert global_kbps > 0 + assert global_size > 0 + + print(f"Global frequency: {global_freq:.2f} Hz") + print(f"Global bandwidth: {spy.kbps_hr(1.0)}") + print(f"Global avg message size: {global_size:.0f} bytes") + + spy.stop() + + +def test_graph_lcmspy_global_totals(): + """Test that GraphLCMSpy tracks global totals with history""" + spy = GraphLCMSpy(graph_log_window=0.1) + spy.start() + time.sleep(0.2) + + # Send messages + spy.msg("/video", b"video frame data") + spy.msg("/odom", b"odometry data") + time.sleep(0.2) # Wait for graph update + + # Update global graphs + spy.update_graphs(1.0) + + # Should have global history + assert len(spy.freq_history) == 1 + assert len(spy.bandwidth_history) == 1 + assert spy.freq_history[0] > 0 + assert spy.bandwidth_history[0] > 0 + + print(f"Global frequency history: {spy.freq_history[0]:.2f} Hz") + print(f"Global bandwidth history: {spy.bandwidth_history[0]:.2f} kB/s") + + spy.stop() diff --git a/pyproject.toml b/pyproject.toml index 412b39f0a0..af355ecd3a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,8 +145,9 @@ dev = [ "ruff==0.11.10", "mypy==1.15.0", "pre_commit==4.2.0", - "pytest", - "pytest-asyncio==0.26.0" + "pytest==8.3.5", + "pytest-asyncio==0.26.0", + "textual==3.7.1" ] [tool.ruff] From 634d4c4474f303db7b11cf47a765312cf430caec Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 16:37:44 -0700 Subject: [PATCH 2/8] total traffic measure --- dimos/utils/cli/lcmspy.py | 44 ++++++++++++++++++++++++----------- dimos/utils/cli/lcmspy_cli.py | 8 ++++--- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/dimos/utils/cli/lcmspy.py b/dimos/utils/cli/lcmspy.py index d0345891f0..aa5b48b297 100755 --- a/dimos/utils/cli/lcmspy.py +++ b/dimos/utils/cli/lcmspy.py @@ -24,10 +24,22 @@ class BandwidthUnit(Enum): - BPS = "B/s" - KBPS = "kB/s" - MBPS = "MB/s" - GBPS = "GB/s" + BP = "B" + KBP = "kB" + MBP = "MB" + GBP = "GB" + + +def human_readable_bytes(bytes_value: float, round_to: int = 2) -> tuple[float, BandwidthUnit]: + """Convert bytes to human-readable format with appropriate units""" + if bytes_value >= 1024**3: # GB + return round(bytes_value / (1024**3), round_to), BandwidthUnit.GBP + elif bytes_value >= 1024**2: # MB + return round(bytes_value / (1024**2), round_to), BandwidthUnit.MBP + elif bytes_value >= 1024: # KB + return round(bytes_value / 1024, round_to), BandwidthUnit.KBP + else: + return round(bytes_value, round_to), BandwidthUnit.BP class Topic: @@ -38,11 +50,14 @@ def __init__(self, name: str, history_window: float = 60.0): # Store (timestamp, data_size) tuples for statistics self.message_history = deque() self.history_window = history_window + # Total traffic accumulator (doesn't get cleaned up) + self.total_traffic_bytes = 0 def msg(self, data: bytes): # print(f"> msg {self.__str__()} {len(data)} bytes") datalen = len(data) self.message_history.append((time.time(), datalen)) + self.total_traffic_bytes += datalen self._cleanup_old_messages() def _cleanup_old_messages(self, max_age: float = None): @@ -78,15 +93,9 @@ def kbps(self, time_window: float) -> float: def kbps_hr(self, time_window: float, round_to: int = 2) -> tuple[float, BandwidthUnit]: """Return human-readable bandwidth with appropriate units""" kbps_val = self.kbps(time_window) - - if kbps_val >= 1024: - return round(kbps_val / 1024, round_to), BandwidthUnit.MBPS - elif kbps_val >= 1: - return round(kbps_val, round_to), BandwidthUnit.KBPS - else: - # Convert to B/s for small values - bps = kbps_val * 1000 - return round(bps, round_to), BandwidthUnit.BPS + # Convert kB/s to B/s for human_readable_bytes + bps = kbps_val * 1000 + return human_readable_bytes(bps, round_to) # avg msg size in the last n seconds def size(self, time_window: float) -> float: @@ -96,6 +105,15 @@ def size(self, time_window: float) -> float: total_size = sum(size for _, size in messages) return total_size / len(messages) + def total_traffic(self) -> int: + """Return total traffic passed in bytes since the beginning""" + return self.total_traffic_bytes + + def total_traffic_hr(self) -> tuple[float, BandwidthUnit]: + """Return human-readable total traffic with appropriate units""" + total_bytes = self.total_traffic() + return human_readable_bytes(total_bytes) + def __str__(self): return f"topic({self.name})" diff --git a/dimos/utils/cli/lcmspy_cli.py b/dimos/utils/cli/lcmspy_cli.py index 3e92e6fa13..188dd793de 100644 --- a/dimos/utils/cli/lcmspy_cli.py +++ b/dimos/utils/cli/lcmspy_cli.py @@ -87,6 +87,7 @@ def compose(self) -> ComposeResult: self.table.add_column("Topic", width=30) self.table.add_column("Freq (Hz)") self.table.add_column("Bandwidth") + self.table.add_column("Total Traffic") yield self.table yield Footer() @@ -100,18 +101,20 @@ async def on_unmount(self): def refresh_table(self): topics: List[SpyTopic] = list(self.spy.topic.values()) - topics.sort(key=lambda t: t.kbps(5.0), reverse=True) + topics.sort(key=lambda t: t.total_traffic(), reverse=True) self.table.clear(columns=False) for t in topics: freq = t.freq(5.0) kbps = t.kbps(5.0) bw_val, bw_unit = t.kbps_hr(5.0) + total_val, total_unit = t.total_traffic_hr() self.table.add_row( topic_text(t.name), Text(f"{freq:.1f}", style=gradient(10, freq)), - Text(f"{bw_val} {bw_unit.value}", style=gradient(1024 * 3, kbps)), + Text(f"{bw_val} {bw_unit.value}/s", style=gradient(1024 * 3, kbps)), + Text(f"{total_val} {total_unit.value}"), ) @@ -119,7 +122,6 @@ def refresh_table(self): import sys if len(sys.argv) > 1 and sys.argv[1] == "web": - # get script file import os from textual_serve.server import Server From 8e032b31fe4e5ebe751b77993ca5e275543d1788 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 16:39:20 -0700 Subject: [PATCH 3/8] lcmspy test import fix --- dimos/utils/cli/test_lcmspy.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dimos/utils/cli/test_lcmspy.py b/dimos/utils/cli/test_lcmspy.py index c491638e34..01a7ab30c6 100644 --- a/dimos/utils/cli/test_lcmspy.py +++ b/dimos/utils/cli/test_lcmspy.py @@ -17,7 +17,7 @@ import pytest from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic -from dimos.utils.lcmspy import LCMSpy, GraphLCMSpy, GraphTopic +from dimos.utils.cli.lcmspy import GraphLCMSpy, GraphTopic, LCMSpy def test_spy_basic(): @@ -80,7 +80,7 @@ def test_spy_basic(): def test_topic_statistics_direct(): """Test Topic statistics directly without LCM""" - from dimos.utils.lcmspy import Topic as TopicSpy + from dimos.utils.cli.lcmspy import Topic as TopicSpy topic = TopicSpy("/test") @@ -107,7 +107,7 @@ def test_topic_statistics_direct(): def test_topic_cleanup(): """Test that old messages are properly cleaned up""" - from dimos.utils.lcmspy import Topic as TopicSpy + from dimos.utils.cli.lcmspy import Topic as TopicSpy topic = TopicSpy("/test") From 44967d04fd52517f71215097c3ad039e612b1815 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 18:41:56 -0700 Subject: [PATCH 4/8] trying to fix tests in CI --- dimos/utils/cli/test_lcmspy.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dimos/utils/cli/test_lcmspy.py b/dimos/utils/cli/test_lcmspy.py index 01a7ab30c6..46edbe0b36 100644 --- a/dimos/utils/cli/test_lcmspy.py +++ b/dimos/utils/cli/test_lcmspy.py @@ -17,8 +17,11 @@ import pytest from dimos.protocol.pubsub.lcmpubsub import PickleLCM, Topic +from dimos.protocol.service.lcmservice import autoconf from dimos.utils.cli.lcmspy import GraphLCMSpy, GraphTopic, LCMSpy +autoconf() + def test_spy_basic(): lcm = PickleLCM(autoconf=True) From 22fc0035a74c86f824137d3ce66dcb0a38fba818 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 19:06:24 -0700 Subject: [PATCH 5/8] lcmservice autoconf fix --- dimos/protocol/service/lcmservice.py | 32 +++++++++++++++++++++------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py index 516354642b..5f8c747864 100644 --- a/dimos/protocol/service/lcmservice.py +++ b/dimos/protocol/service/lcmservice.py @@ -18,6 +18,8 @@ import sys import threading import traceback +import os +from functools import cache from dataclasses import dataclass from typing import Any, Callable, Optional, Protocol, runtime_checkable @@ -26,17 +28,29 @@ from dimos.protocol.service.spec import Service +@cache +def check_root() -> bool: + """Return True if the current process is running as root (UID 0).""" + try: + return os.geteuid() == 0 # type: ignore[attr-defined] + except AttributeError: + # Platforms without geteuid (e.g. Windows) – assume non-root. + return False + + def check_multicast() -> list[str]: """Check if multicast configuration is needed and return required commands.""" commands_needed = [] + sudo = "" if check_root() else "sudo " + # Check if loopback interface has multicast enabled try: result = subprocess.run(["ip", "link", "show", "lo"], capture_output=True, text=True) if "MULTICAST" not in result.stdout: - commands_needed.append("sudo ifconfig lo multicast") + commands_needed.append(f"{sudo}ifconfig lo multicast") except Exception: - commands_needed.append("sudo ifconfig lo multicast") + commands_needed.append(f"{sudo}ifconfig lo multicast") # Check if multicast route exists try: @@ -44,9 +58,9 @@ def check_multicast() -> list[str]: ["ip", "route", "show", "224.0.0.0/4"], capture_output=True, text=True ) if not result.stdout.strip(): - commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") + commands_needed.append(f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") except Exception: - commands_needed.append("sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") + commands_needed.append(f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo") return commands_needed @@ -55,22 +69,24 @@ def check_buffers() -> list[str]: """Check if buffer configuration is needed and return required commands.""" commands_needed = [] + sudo = "" if check_root() else "sudo " + # Check current buffer settings try: result = subprocess.run(["sysctl", "net.core.rmem_max"], capture_output=True, text=True) current_max = int(result.stdout.split("=")[1].strip()) if current_max < 2097152: - commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_max=2097152") except Exception: - commands_needed.append("sudo sysctl -w net.core.rmem_max=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_max=2097152") try: result = subprocess.run(["sysctl", "net.core.rmem_default"], capture_output=True, text=True) current_default = int(result.stdout.split("=")[1].strip()) if current_default < 2097152: - commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_default=2097152") except Exception: - commands_needed.append("sudo sysctl -w net.core.rmem_default=2097152") + commands_needed.append(f"{sudo}sysctl -w net.core.rmem_default=2097152") return commands_needed From e08469971039e779c03a8f0149eef5ad4de56200 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 19:18:43 -0700 Subject: [PATCH 6/8] lcmservice test fixes --- dimos/protocol/service/test_lcmservice.py | 79 ++++++++++++----------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/dimos/protocol/service/test_lcmservice.py b/dimos/protocol/service/test_lcmservice.py index 53d8c7fd12..c5b86cac35 100644 --- a/dimos/protocol/service/test_lcmservice.py +++ b/dimos/protocol/service/test_lcmservice.py @@ -23,9 +23,15 @@ autoconf, check_buffers, check_multicast, + check_root, ) +def get_sudo_prefix() -> str: + """Return 'sudo ' if not running as root, empty string if running as root.""" + return "" if check_root() else "sudo " + + def test_check_multicast_all_configured(): """Test check_multicast when system is properly configured.""" with patch("dimos.protocol.pubsub.lcmpubsub.subprocess.run") as mock_run: @@ -63,7 +69,8 @@ def test_check_multicast_missing_multicast_flag(): ] result = check_multicast() - assert result == ["sudo ifconfig lo multicast"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}ifconfig lo multicast"] def test_check_multicast_missing_route(): @@ -83,7 +90,8 @@ def test_check_multicast_missing_route(): ] result = check_multicast() - assert result == ["sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo"] def test_check_multicast_all_missing(): @@ -103,9 +111,10 @@ def test_check_multicast_all_missing(): ] result = check_multicast() + sudo = get_sudo_prefix() expected = [ - "sudo ifconfig lo multicast", - "sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", + f"{sudo}ifconfig lo multicast", + f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", ] assert result == expected @@ -117,9 +126,10 @@ def test_check_multicast_subprocess_exception(): mock_run.side_effect = Exception("Command failed") result = check_multicast() + sudo = get_sudo_prefix() expected = [ - "sudo ifconfig lo multicast", - "sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", + f"{sudo}ifconfig lo multicast", + f"{sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo", ] assert result == expected @@ -151,7 +161,8 @@ def test_check_buffers_low_max_buffer(): ] result = check_buffers() - assert result == ["sudo sysctl -w net.core.rmem_max=2097152"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}sysctl -w net.core.rmem_max=2097152"] def test_check_buffers_low_default_buffer(): @@ -166,7 +177,8 @@ def test_check_buffers_low_default_buffer(): ] result = check_buffers() - assert result == ["sudo sysctl -w net.core.rmem_default=2097152"] + sudo = get_sudo_prefix() + assert result == [f"{sudo}sysctl -w net.core.rmem_default=2097152"] def test_check_buffers_both_low(): @@ -181,9 +193,10 @@ def test_check_buffers_both_low(): ] result = check_buffers() + sudo = get_sudo_prefix() expected = [ - "sudo sysctl -w net.core.rmem_max=2097152", - "sudo sysctl -w net.core.rmem_default=2097152", + f"{sudo}sysctl -w net.core.rmem_max=2097152", + f"{sudo}sysctl -w net.core.rmem_default=2097152", ] assert result == expected @@ -195,9 +208,10 @@ def test_check_buffers_subprocess_exception(): mock_run.side_effect = Exception("Command failed") result = check_buffers() + sudo = get_sudo_prefix() expected = [ - "sudo sysctl -w net.core.rmem_max=2097152", - "sudo sysctl -w net.core.rmem_default=2097152", + f"{sudo}sysctl -w net.core.rmem_max=2097152", + f"{sudo}sysctl -w net.core.rmem_default=2097152", ] assert result == expected @@ -212,9 +226,10 @@ def test_check_buffers_parsing_error(): ] result = check_buffers() + sudo = get_sudo_prefix() expected = [ - "sudo sysctl -w net.core.rmem_max=2097152", - "sudo sysctl -w net.core.rmem_default=2097152", + f"{sudo}sysctl -w net.core.rmem_max=2097152", + f"{sudo}sysctl -w net.core.rmem_default=2097152", ] assert result == expected @@ -267,29 +282,26 @@ def test_autoconf_with_config_needed_success(): # Command execution calls type( "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo ifconfig lo multicast - type("MockResult", (), {"stdout": "success", "returncode": 0})(), # sudo route add... - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo sysctl rmem_max - type( - "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo sysctl rmem_default + )(), # ifconfig lo multicast + type("MockResult", (), {"stdout": "success", "returncode": 0})(), # route add... + type("MockResult", (), {"stdout": "success", "returncode": 0})(), # sysctl rmem_max + type("MockResult", (), {"stdout": "success", "returncode": 0})(), # sysctl rmem_default ] with patch("builtins.print") as mock_print: autoconf() + sudo = get_sudo_prefix() # Verify the expected print calls expected_calls = [ ("System configuration required. Executing commands...",), - (" Running: sudo ifconfig lo multicast",), + (f" Running: {sudo}ifconfig lo multicast",), (" ✓ Success",), - (" Running: sudo route add -net 224.0.0.0 netmask 240.0.0.0 dev lo",), + (f" Running: {sudo}route add -net 224.0.0.0 netmask 240.0.0.0 dev lo",), (" ✓ Success",), - (" Running: sudo sysctl -w net.core.rmem_max=2097152",), + (f" Running: {sudo}sysctl -w net.core.rmem_max=2097152",), (" ✓ Success",), - (" Running: sudo sysctl -w net.core.rmem_default=2097152",), + (f" Running: {sudo}sysctl -w net.core.rmem_default=2097152",), (" ✓ Success",), ("System configuration completed.",), ] @@ -318,20 +330,11 @@ def test_autoconf_with_command_failures(): # Command execution calls - first succeeds, second fails type( "MockResult", (), {"stdout": "success", "returncode": 0} - )(), # sudo ifconfig lo multicast + )(), # ifconfig lo multicast subprocess.CalledProcessError( 1, - [ - "sudo", - "route", - "add", - "-net", - "224.0.0.0", - "netmask", - "240.0.0.0", - "dev", - "lo", - ], + get_sudo_prefix().split() + + ["route", "add", "-net", "224.0.0.0", "netmask", "240.0.0.0", "dev", "lo"], "Permission denied", "Operation not permitted", ), From e4452346e1c8514232a8842bfe851eb7d0c18343 Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 19:24:47 -0700 Subject: [PATCH 7/8] lcmspy doesn't limit topic display width --- dimos/utils/cli/lcmspy_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/utils/cli/lcmspy_cli.py b/dimos/utils/cli/lcmspy_cli.py index 188dd793de..47bf2b8a56 100644 --- a/dimos/utils/cli/lcmspy_cli.py +++ b/dimos/utils/cli/lcmspy_cli.py @@ -84,7 +84,7 @@ def compose(self) -> ComposeResult: # yield Header() self.table = DataTable(zebra_stripes=False, cursor_type=None) - self.table.add_column("Topic", width=30) + self.table.add_column("Topic") self.table.add_column("Freq (Hz)") self.table.add_column("Bandwidth") self.table.add_column("Total Traffic") From 89a818917a02f05ca8ab9f54bf99415eb0d8f30d Mon Sep 17 00:00:00 2001 From: lesh Date: Fri, 11 Jul 2025 19:31:08 -0700 Subject: [PATCH 8/8] disabled LCM tests in CI --- dimos/utils/cli/test_lcmspy.py | 6 ++++++ pyproject.toml | 7 +++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/dimos/utils/cli/test_lcmspy.py b/dimos/utils/cli/test_lcmspy.py index 46edbe0b36..a77bb03d20 100644 --- a/dimos/utils/cli/test_lcmspy.py +++ b/dimos/utils/cli/test_lcmspy.py @@ -23,6 +23,7 @@ autoconf() +@pytest.mark.lcm def test_spy_basic(): lcm = PickleLCM(autoconf=True) lcm.start() @@ -81,6 +82,7 @@ def test_spy_basic(): print(f"Odom topic: {odom_topic_spy}") +@pytest.mark.lcm def test_topic_statistics_direct(): """Test Topic statistics directly without LCM""" from dimos.utils.cli.lcmspy import Topic as TopicSpy @@ -131,6 +133,7 @@ def test_topic_cleanup(): assert topic.message_history[0][0] > time.time() - 10 # Recent message +@pytest.mark.lcm def test_graph_topic_basic(): """Test GraphTopic basic functionality""" topic = GraphTopic("/test_graph") @@ -146,6 +149,7 @@ def test_graph_topic_basic(): assert topic.bandwidth_history[0] > 0 +@pytest.mark.lcm def test_graph_lcmspy_basic(): """Test GraphLCMSpy basic functionality""" spy = GraphLCMSpy(graph_log_window=0.1) @@ -165,6 +169,7 @@ def test_graph_lcmspy_basic(): spy.stop() +@pytest.mark.lcm def test_lcmspy_global_totals(): """Test that LCMSpy tracks global totals as a Topic itself""" spy = LCMSpy() @@ -194,6 +199,7 @@ def test_lcmspy_global_totals(): spy.stop() +@pytest.mark.lcm def test_graph_lcmspy_global_totals(): """Test that GraphLCMSpy tracks global totals with history""" spy = GraphLCMSpy(graph_log_window=0.1) diff --git a/pyproject.toml b/pyproject.toml index af355ecd3a..5cb8c5be9d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -188,9 +188,12 @@ markers = [ "exclude: arbitrary exclusion from CI and default test exec", "tool: dev tooling", "needsdata: needs test data to be downloaded", - "ros: depend on ros"] + "ros: depend on ros", + "lcm: tests that run actual LCM bus (can't execute in CI)" -addopts = "-v -ra --color=yes -m 'not vis and not benchmark and not exclude and not tool and not needsdata and not ros and not heavy'" +] + +addopts = "-v -ra --color=yes -m 'not vis and not benchmark and not exclude and not tool and not needsdata and not lcm and not ros and not heavy'"