diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py index f629122cee..3660a957dc 100644 --- a/dimos/core/global_config.py +++ b/dimos/core/global_config.py @@ -45,6 +45,7 @@ class GlobalConfig(BaseSettings): robot_rotation_diameter: float = 0.6 planner_strategy: NavigationStrategy = "simple" planner_robot_speed: float | None = None + dtop: bool = False model_config = SettingsConfigDict( env_file=".env", diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index d816a60cb4..86afb9ebc4 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -12,17 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + from concurrent.futures import ThreadPoolExecutor -import time +import threading from typing import TYPE_CHECKING, Any from dimos.core.global_config import GlobalConfig, global_config -from dimos.core.module import Module, ModuleT from dimos.core.resource import Resource from dimos.core.worker_manager import WorkerManager from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: + from dimos.core.module import Module, ModuleT + from dimos.core.resource_monitor.monitor import StatsMonitor from dimos.core.rpc_client import ModuleProxy logger = setup_logger() @@ -33,7 +36,8 @@ class ModuleCoordinator(Resource): # type: ignore[misc] _global_config: GlobalConfig _n: int | None = None _memory_limit: str = "auto" - _deployed_modules: dict[type[Module], "ModuleProxy"] + _deployed_modules: dict[type[Module], ModuleProxy] + _stats_monitor: StatsMonitor | None = None def __init__( self, @@ -50,7 +54,17 @@ def start(self) -> None: self._client = WorkerManager(n_workers=n) self._client.start() + if self._global_config.dtop: + from dimos.core.resource_monitor.monitor import StatsMonitor + + self._stats_monitor = StatsMonitor(self._client) + self._stats_monitor.start() + def stop(self) -> None: + if self._stats_monitor is not None: + self._stats_monitor.stop() + self._stats_monitor = None + for module_class, module in reversed(self._deployed_modules.items()): logger.info("Stopping module...", module=module_class.__name__) try: @@ -61,7 +75,7 @@ def stop(self) -> None: self._client.close_all() # type: ignore[union-attr] - def deploy(self, module_class: type[ModuleT], *args, **kwargs) -> "ModuleProxy": # type: ignore[no-untyped-def] + def deploy(self, module_class: type[ModuleT], *args, **kwargs) -> ModuleProxy: # type: ignore[no-untyped-def] if not self._client: raise ValueError("Trying to dimos.deploy before the client has started") @@ -71,7 +85,7 @@ def deploy(self, module_class: type[ModuleT], *args, **kwargs) -> "ModuleProxy": def deploy_parallel( self, module_specs: list[tuple[type[ModuleT], tuple[Any, ...], dict[str, Any]]] - ) -> list["ModuleProxy"]: + ) -> list[ModuleProxy]: if not self._client: raise ValueError("Not started") @@ -94,13 +108,13 @@ def start_all_modules(self) -> None: if hasattr(module, "on_system_modules"): module.on_system_modules(module_list) - def get_instance(self, module: type[ModuleT]) -> "ModuleProxy": + def get_instance(self, module: type[ModuleT]) -> ModuleProxy: return self._deployed_modules.get(module) # type: ignore[return-value, no-any-return] def loop(self) -> None: + stop = threading.Event() try: - while True: - time.sleep(0.1) + stop.wait() except KeyboardInterrupt: return finally: diff --git a/dimos/core/resource_monitor/__init__.py b/dimos/core/resource_monitor/__init__.py new file mode 100644 index 0000000000..217941a2ec --- /dev/null +++ b/dimos/core/resource_monitor/__init__.py @@ -0,0 +1,17 @@ +from dimos.core.resource_monitor.logger import ( + LCMResourceLogger, + ResourceLogger, + StructlogResourceLogger, +) +from dimos.core.resource_monitor.monitor import StatsMonitor +from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats, collect_process_stats + +__all__ = [ + "LCMResourceLogger", + "ProcessStats", + "ResourceLogger", + "StatsMonitor", + "StructlogResourceLogger", + "WorkerStats", + "collect_process_stats", +] diff --git a/dimos/core/resource_monitor/logger.py b/dimos/core/resource_monitor/logger.py new file mode 100644 index 0000000000..88f79b6db8 --- /dev/null +++ b/dimos/core/resource_monitor/logger.py @@ -0,0 +1,75 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import asdict +from typing import TYPE_CHECKING, Any, Protocol + +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats + +logger = setup_logger() + + +class ResourceLogger(Protocol): + def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: ... + + +class StructlogResourceLogger: + """Default implementation — logs resource stats via structlog info.""" + + def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: + logger.info( + "coordinator", + pid=coordinator.pid, + cpu_pct=coordinator.cpu_percent, + pss_mb=round(coordinator.pss / 1048576, 1), + threads=coordinator.num_threads, + ) + for w in workers: + logger.info( + "worker", + worker_id=w.worker_id, + pid=w.pid, + alive=w.alive, + cpu_pct=w.cpu_percent, + pss_mb=round(w.pss / 1048576, 1), + threads=w.num_threads, + children=w.num_children, + fds=w.num_fds, + io_r_mb=round(w.io_read_bytes / 1048576, 1), + io_w_mb=round(w.io_write_bytes / 1048576, 1), + modules=w.modules, + ) + + +class LCMResourceLogger: + """Publishes resource stats as dicts over a pickle LCM channel.""" + + def __init__(self, topic: str = "/dimos/resource_stats") -> None: + from dimos.core.transport import pLCMTransport + + self._transport: pLCMTransport[dict[str, Any]] = pLCMTransport(topic) + + def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: + self._transport.broadcast( + None, + { + "coordinator": asdict(coordinator), + "workers": [asdict(w) for w in workers], + }, + ) diff --git a/dimos/core/resource_monitor/monitor.py b/dimos/core/resource_monitor/monitor.py new file mode 100644 index 0000000000..49079ed98e --- /dev/null +++ b/dimos/core/resource_monitor/monitor.py @@ -0,0 +1,126 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import asdict +import os +import threading +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +from dimos.core.resource import Resource +from dimos.core.resource_monitor.stats import ( + WorkerStats, + collect_process_stats, +) +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from collections.abc import Sequence + + from dimos.core.resource_monitor.logger import ResourceLogger + +logger = setup_logger() + + +@runtime_checkable +class WorkerInfo(Protocol): + @property + def pid(self) -> int | None: ... + + @property + def worker_id(self) -> int: ... + + @property + def module_names(self) -> list[str]: ... + + +@runtime_checkable +class WorkerSource(Protocol): + @property + def workers(self) -> Sequence[WorkerInfo]: ... + + +class StatsMonitor(Resource): + """Self-contained resource monitor that runs in a daemon thread. + + Collects stats for the coordinator process and all workers, then + forwards them to a ``ResourceLogger``. + """ + + def __init__( + self, + worker_source: WorkerSource, + resource_logger: ResourceLogger | None = None, + interval: float = 1.0, + coordinator_pid: int | None = None, + ) -> None: + self._worker_source = worker_source + self._interval = interval + self._coordinator_pid = coordinator_pid or os.getpid() + self._stop = threading.Event() + self._thread: threading.Thread | None = None + + if resource_logger is not None: + self._logger = resource_logger + else: + from dimos.core.resource_monitor.logger import LCMResourceLogger + + self._logger = LCMResourceLogger() + + def start(self) -> None: + """Start the monitoring daemon thread.""" + # Prime cpu_percent so the first real reading isn't 0.0. + collect_process_stats(self._coordinator_pid) + + self._stop.clear() + self._thread = threading.Thread(target=self._loop, daemon=True) + self._thread.start() + + def stop(self) -> None: + """Signal the monitor thread to stop and wait for it.""" + self._stop.set() + if self._thread is not None: + self._thread.join(timeout=5.0) + self._thread = None + + def _loop(self) -> None: + while not self._stop.wait(self._interval): + try: + self._collect_and_log() + except Exception: + logger.error("StatsMonitor collection failed", exc_info=True) + + def _collect_and_log(self) -> None: + coordinator = collect_process_stats(self._coordinator_pid) + + worker_stats: list[WorkerStats] = [] + for w in self._worker_source.workers: + pid = w.pid + if pid is not None: + ps = collect_process_stats(pid) + worker_stats.append( + WorkerStats(**asdict(ps), worker_id=w.worker_id, modules=w.module_names) + ) + else: + worker_stats.append( + WorkerStats( + pid=0, + alive=False, + worker_id=w.worker_id, + modules=w.module_names, + ) + ) + + self._logger.log_stats(coordinator, worker_stats) diff --git a/dimos/core/resource_monitor/stats.py b/dimos/core/resource_monitor/stats.py new file mode 100644 index 0000000000..c020c853e0 --- /dev/null +++ b/dimos/core/resource_monitor/stats.py @@ -0,0 +1,139 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TypedDict + +import psutil + +from dimos.utils.decorators import ttl_cache + +# Cache Process objects so cpu_percent(interval=None) has a previous sample. +_proc_cache: dict[int, psutil.Process] = {} + + +@dataclass(frozen=True) +class ProcessStats: + """Resource stats for a single OS process.""" + + pid: int + alive: bool + cpu_percent: float = 0.0 + cpu_time_user: float = 0.0 + cpu_time_system: float = 0.0 + cpu_time_iowait: float = 0.0 + pss: int = 0 + num_threads: int = 0 + num_children: int = 0 + num_fds: int = 0 + io_read_bytes: int = 0 + io_write_bytes: int = 0 + + +def _get_process(pid: int) -> psutil.Process: + """Return a cached Process object, creating a new one if missing or dead.""" + proc = _proc_cache.get(pid) + if proc is None or not proc.is_running(): + proc = psutil.Process(pid) + _proc_cache[pid] = proc + return proc + + +class CpuStats(TypedDict): + cpu_percent: float + cpu_time_user: float + cpu_time_system: float + cpu_time_iowait: float + + +def _collect_cpu(proc: psutil.Process) -> CpuStats: + """Collect CPU metrics. Call inside oneshot().""" + cpu_pct = proc.cpu_percent(interval=None) + ct = proc.cpu_times() + return CpuStats( + cpu_percent=cpu_pct, + cpu_time_user=ct.user, + cpu_time_system=ct.system, + cpu_time_iowait=getattr(ct, "iowait", 0.0), + ) + + +@ttl_cache(4.0) +def _collect_pss(pid: int) -> int: + """Collect PSS memory in bytes. TTL-cached to avoid expensive smaps reads.""" + try: + proc = _get_process(pid) + mem_full = proc.memory_full_info() + return getattr(mem_full, "pss", 0) + except (psutil.AccessDenied, psutil.NoSuchProcess, AttributeError): + return 0 + + +class IoStats(TypedDict): + io_read_bytes: int + io_write_bytes: int + + +def _collect_io(proc: psutil.Process) -> IoStats: + """Collect IO counters in bytes. Call inside oneshot().""" + try: + io = proc.io_counters() + return IoStats(io_read_bytes=io.read_bytes, io_write_bytes=io.write_bytes) + except (psutil.AccessDenied, AttributeError): + return IoStats(io_read_bytes=0, io_write_bytes=0) + + +class ProcStats(TypedDict): + num_threads: int + num_children: int + num_fds: int + + +def _collect_proc(proc: psutil.Process) -> ProcStats: + """Collect thread/children/fd counts. Call inside oneshot().""" + try: + fds = proc.num_fds() + except (psutil.AccessDenied, AttributeError): + fds = 0 + return ProcStats( + num_threads=proc.num_threads(), + num_children=len(proc.children(recursive=True)), + num_fds=fds, + ) + + +def collect_process_stats(pid: int) -> ProcessStats: + """Collect resource stats for a single process by PID.""" + try: + proc = _get_process(pid) + with proc.oneshot(): + cpu = _collect_cpu(proc) + io = _collect_io(proc) + proc_stats = _collect_proc(proc) + pss = _collect_pss(pid) + return ProcessStats(pid=pid, alive=True, pss=pss, **cpu, **io, **proc_stats) + except (psutil.NoSuchProcess, psutil.AccessDenied): + _proc_cache.pop(pid, None) + _collect_pss.cache.pop((pid,), None) + return ProcessStats(pid=pid, alive=False) + + +@dataclass(frozen=True) +class WorkerStats(ProcessStats): + """Process stats extended with worker-specific metadata.""" + + worker_id: int = -1 + modules: list[str] = field(default_factory=list) diff --git a/dimos/core/test_worker.py b/dimos/core/test_worker.py index 8c75f41222..a5217f2dd6 100644 --- a/dimos/core/test_worker.py +++ b/dimos/core/test_worker.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + import pytest from dimos.core.core import rpc @@ -20,6 +22,9 @@ from dimos.core.worker_manager import WorkerManager from dimos.msgs.geometry_msgs import Vector3 +if TYPE_CHECKING: + from dimos.core.resource_monitor.stats import WorkerStats + class SimpleModule(Module): output: Out[Vector3] @@ -165,6 +170,52 @@ def test_worker_manager_parallel_deployment(create_worker_manager): module3.stop() +@pytest.mark.slow +def test_collect_stats(create_worker_manager): + from dimos.core.resource_monitor.monitor import StatsMonitor + + manager = create_worker_manager(n_workers=2) + module1 = manager.deploy(SimpleModule) + module2 = manager.deploy(AnotherModule) + module1.start() + module2.start() + + # Use a capturing logger to collect stats via StatsMonitor + captured: list[list[WorkerStats]] = [] + + class CapturingLogger: + def log_stats(self, coordinator, workers): + captured.append(workers) + + monitor = StatsMonitor(manager, resource_logger=CapturingLogger(), interval=0.5) + monitor.start() + import time + + time.sleep(1.5) + monitor.stop() + + assert len(captured) >= 1 + stats = captured[-1] + assert len(stats) == 2 + + for s in stats: + assert s.alive is True + assert s.pid > 0 + assert s.pss >= 0 + assert s.num_threads >= 1 + assert s.num_fds >= 0 + assert s.io_read_bytes >= 0 + assert s.io_write_bytes >= 0 + + # At least one worker should report module names + all_modules = [name for s in stats for name in s.modules] + assert "SimpleModule" in all_modules + assert "AnotherModule" in all_modules + + module1.stop() + module2.stop() + + @pytest.mark.slow def test_worker_pool_modules_share_workers(create_worker_manager): manager = create_worker_manager(n_workers=1) diff --git a/dimos/core/worker.py b/dimos/core/worker.py index bb2ede4e03..b0dd802841 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from __future__ import annotations import multiprocessing as mp @@ -155,6 +154,22 @@ def __init__(self) -> None: def module_count(self) -> int: return len(self._modules) + self._reserved + @property + def pid(self) -> int | None: + """PID of the worker process, or ``None`` if not alive.""" + if self._process is not None and self._process.is_alive(): + p: int | None = self._process.pid + return p + return None + + @property + def worker_id(self) -> int: + return self._worker_id + + @property + def module_names(self) -> list[str]: + return [actor._cls.__name__ for actor in self._modules.values()] + def reserve_slot(self) -> None: """Reserve a slot so _select_worker() sees the pending load.""" self._reserved += 1 diff --git a/dimos/core/worker_manager.py b/dimos/core/worker_manager.py index d80b431c50..4dbb51eb54 100644 --- a/dimos/core/worker_manager.py +++ b/dimos/core/worker_manager.py @@ -12,14 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + from concurrent.futures import ThreadPoolExecutor -from typing import Any +from typing import TYPE_CHECKING, Any -from dimos.core.module import ModuleT from dimos.core.rpc_client import RPCClient from dimos.core.worker import Worker from dimos.utils.logging_config import setup_logger +if TYPE_CHECKING: + from dimos.core.module import ModuleT + logger = setup_logger() @@ -86,6 +90,10 @@ def _deploy( return results + @property + def workers(self) -> list[Worker]: + return list(self._workers) + def close_all(self) -> None: if self._closed: return diff --git a/dimos/mapping/test_voxels.py b/dimos/mapping/test_voxels.py index 3fdc2dd102..95e70e1d6d 100644 --- a/dimos/mapping/test_voxels.py +++ b/dimos/mapping/test_voxels.py @@ -77,44 +77,26 @@ def two_perspectives_loop(moment: MomentFactory) -> None: time.sleep(1) +@pytest.mark.tool def test_carving( mapper: VoxelGridMapper, moment1: Go2MapperMoment, moment2: Go2MapperMoment ) -> None: lidar_frame1 = moment1.lidar.value assert lidar_frame1 is not None - lidar_frame1_transport: LCMTransport[PointCloud2] = LCMTransport("/prev_lidar", PointCloud2) - lidar_frame1_transport.publish(lidar_frame1) - lidar_frame1_transport.stop() lidar_frame2 = moment2.lidar.value assert lidar_frame2 is not None - # Debug: check XY overlap - pts1 = np.asarray(lidar_frame1.pointcloud.points) - pts2 = np.asarray(lidar_frame2.pointcloud.points) - - voxel_size = mapper.config.voxel_size - xy1 = set(map(tuple, (pts1[:, :2] / voxel_size).astype(int))) - xy2 = set(map(tuple, (pts2[:, :2] / voxel_size).astype(int))) - - overlap = xy1 & xy2 - print(f"\nFrame1 XY columns: {len(xy1)}") - print(f"Frame2 XY columns: {len(xy2)}") - print(f"Overlapping XY columns: {len(overlap)}") - # Carving mapper (default, carve_columns=True) mapper.add_frame(lidar_frame1) mapper.add_frame(lidar_frame2) - - moment2.global_map.set(mapper.get_global_pointcloud2()) - moment2.publish() - count_carving = mapper.size() - # Additive mapper (carve_columns=False) - additive_mapper = VoxelGridMapper(carve_columns=False) - additive_mapper.add_frame(lidar_frame1) - additive_mapper.add_frame(lidar_frame2) - count_additive = additive_mapper.size() + + voxel_size = mapper.config.voxel_size + pts1 = np.asarray(lidar_frame1.pointcloud.points) + pts2 = np.asarray(lidar_frame2.pointcloud.points) + combined_vox = np.floor(np.vstack([pts1, pts2]) / voxel_size).astype(np.int64) + count_additive = np.unique(combined_vox, axis=0).shape[0] print("\n=== Carving comparison ===") print(f"Additive (no carving): {count_additive}") @@ -126,13 +108,6 @@ def test_carving( f"Carving should remove some voxels. Additive: {count_additive}, Carving: {count_carving}" ) - additive_global_map: LCMTransport[PointCloud2] = LCMTransport( - "additive_global_map", PointCloud2 - ) - additive_global_map.publish(additive_mapper.get_global_pointcloud2()) - additive_global_map.stop() - additive_mapper.stop() - def test_injest_a_few(mapper: VoxelGridMapper) -> None: data_dir = get_data("unitree_go2_office_walk2") diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 22a9e234d9..47a1e777e8 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -168,6 +168,15 @@ def humancli(ctx: typer.Context) -> None: humancli_main() +@main.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +def top(ctx: typer.Context) -> None: + """Live resource monitor TUI.""" + from dimos.utils.cli.dtop import main as dtop_main + + sys.argv = ["dtop", *ctx.args] + dtop_main() + + topic_app = typer.Typer(help="Topic commands for pub/sub") main.add_typer(topic_app, name="topic") diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py index c38e0eefa5..22743ac135 100644 --- a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py @@ -26,6 +26,6 @@ cost_mapper(), replanning_a_star_planner(), wavefront_frontier_explorer(), -).global_config(n_workers=6, robot_model="unitree_go2") +).global_config(n_workers=7, robot_model="unitree_go2") __all__ = ["unitree_go2"] diff --git a/dimos/robot/unitree_webrtc/__init__.py b/dimos/robot/unitree_webrtc/__init__.py deleted file mode 100644 index 451aa53128..0000000000 --- a/dimos/robot/unitree_webrtc/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Compatibility package for legacy dimos.robot.unitree_webrtc imports.""" - -from importlib import import_module -import sys - -_ALIAS_MODULES = { - "demo_error_on_name_conflicts": "dimos.robot.unitree.demo_error_on_name_conflicts", - "keyboard_teleop": "dimos.robot.unitree.keyboard_teleop", - "mujoco_connection": "dimos.robot.unitree.mujoco_connection", - "type": "dimos.robot.unitree.type", - "unitree_g1_skill_container": "dimos.robot.unitree.g1.skill_container", - "unitree_skill_container": "dimos.robot.unitree.unitree_skill_container", -} - -for alias, target in _ALIAS_MODULES.items(): - sys.modules[f"{__name__}.{alias}"] = import_module(target) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py new file mode 100644 index 0000000000..16eae36604 --- /dev/null +++ b/dimos/utils/cli/dtop.py @@ -0,0 +1,413 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""dtop — Live TUI for per-worker resource stats over LCM. + +Usage: + uv run python -m dimos.utils.cli.dtop [--topic /dimos/resource_stats] +""" + +from __future__ import annotations + +import threading +import time +from typing import TYPE_CHECKING, Any + +from rich.console import Group, RenderableType +from rich.panel import Panel +from rich.rule import Rule +from rich.text import Text +from textual.app import App, ComposeResult +from textual.color import Color +from textual.containers import VerticalScroll +from textual.widgets import Static + +from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM, Topic +from dimos.utils.cli import theme + +if TYPE_CHECKING: + from collections.abc import Callable + +# --------------------------------------------------------------------------- +# Color helpers +# --------------------------------------------------------------------------- + + +def _heat(ratio: float) -> str: + """Map 0..1 ratio to a cyan → yellow → red gradient.""" + cyan = Color.parse(theme.CYAN) + yellow = Color.parse(theme.YELLOW) + red = Color.parse(theme.RED) + if ratio <= 0.5: + return cyan.blend(yellow, ratio * 2).hex + return yellow.blend(red, (ratio - 0.5) * 2).hex + + +def _bar(value: float, max_val: float, width: int = 12) -> Text: + """Render a tiny colored bar.""" + ratio = min(value / max_val, 1.0) if max_val > 0 else 0.0 + filled = int(ratio * width) + return Text("█" * filled + "░" * (width - filled), style=_heat(ratio)) + + +def _rel_style(value: float, lo: float, hi: float) -> str: + """Color a value by where it sits in the observed [lo, hi] range.""" + if hi <= lo: + return _heat(0.0) + return _heat(min((value - lo) / (hi - lo), 1.0)) + + +# --------------------------------------------------------------------------- +# Metric formatters (plain strings — color applied separately via _rel_style) +# --------------------------------------------------------------------------- + + +def _fmt_pct(v: float) -> str: + return f"{v:.0f}%" + + +def _fmt_mem(v: float) -> str: + mb = v / 1048576 + if mb >= 1024: + return f"{mb / 1024:.1f} GB" + return f"{mb:.1f} MB" + + +def _fmt_int(v: float) -> str: + return str(int(v)) + + +def _fmt_secs(v: float) -> str: + if v >= 3600: + return f"{v / 3600:.1f}h" + if v >= 60: + return f"{v / 60:.1f}m" + return f"{v:.1f}s" + + +def _fmt_io(v: float) -> str: + return f"{v / 1048576:.0f} MB" + + +# --------------------------------------------------------------------------- +# Metric definitions — add a tuple here to add a new field +# (label, dict_key, format_fn) +# --------------------------------------------------------------------------- + +_LINE1: list[tuple[str, str, Callable[[float], str]]] = [ + ("CPU", "cpu_percent", _fmt_pct), + ("PSS", "pss", _fmt_mem), + ("Thr", "num_threads", _fmt_int), + ("Child", "num_children", _fmt_int), + ("FDs", "num_fds", _fmt_int), +] + +_LINE2: list[tuple[str, str, Callable[[float], str]]] = [ + ("UserT", "cpu_time_user", _fmt_secs), + ("SysT", "cpu_time_system", _fmt_secs), + ("ioT", "cpu_time_iowait", _fmt_secs), +] + +# IO r/w is a compound field handled specially in _make_lines +_IO_KEYS = ("io_read_bytes", "io_write_bytes") + +_ALL_KEYS = {key for _, key, _ in _LINE1 + _LINE2} | set(_IO_KEYS) + + +def _compute_ranges(data_dicts: list[dict[str, Any]]) -> dict[str, tuple[float, float]]: + """(min, max) per metric across all processes (for relative coloring).""" + ranges: dict[str, tuple[float, float]] = {} + for key in _ALL_KEYS: + vals = [d.get(key, 0) for d in data_dicts] + ranges[key] = (min(vals), max(vals)) + return ranges + + +# --------------------------------------------------------------------------- +# App +# --------------------------------------------------------------------------- + + +class ResourceSpyApp(App[None]): + CSS_PATH = "dimos.tcss" + + TITLE = "" + SHOW_TREE = False + + CSS = f""" + Screen {{ + layout: vertical; + background: {theme.BACKGROUND}; + }} + VerticalScroll {{ + height: 1fr; + scrollbar-size: 0 0; + }} + VerticalScroll.waiting {{ + align: center middle; + }} + .waiting #panels {{ + width: auto; + }} + #panels {{ + background: transparent; + }} + """ + + BINDINGS = [("q", "quit"), ("ctrl+c", "quit")] + + def __init__(self, topic_name: str = "/dimos/resource_stats") -> None: + super().__init__() + self._topic_name = topic_name + self._lcm = PickleLCM(autoconf=True) + self._lock = threading.Lock() + self._latest: dict[str, Any] | None = None + self._last_msg_time: float = 0.0 + + def compose(self) -> ComposeResult: + with VerticalScroll(): + yield Static(id="panels") + + def on_mount(self) -> None: + self._lcm.subscribe(Topic(self._topic_name), self._on_msg) + self._lcm.start() + self.set_interval(1.0, self._refresh) + + async def on_unmount(self) -> None: + self._lcm.stop() + + def _on_msg(self, msg: dict[str, Any], _topic: str) -> None: + with self._lock: + self._latest = msg + self._last_msg_time = time.monotonic() + + def _refresh(self) -> None: + with self._lock: + data = self._latest + last_msg = self._last_msg_time + + scroll = self.query_one(VerticalScroll) + if data is None: + scroll.add_class("waiting") + waiting = Panel( + Text( + "use `dimos --dtop ...` to emit stats", + style=theme.FOREGROUND, + justify="center", + ), + border_style=theme.CYAN, + expand=False, + ) + self.query_one("#panels", Static).update(waiting) + return + scroll.remove_class("waiting") + + stale = (time.monotonic() - last_msg) > 2.0 + dim = "#606060" + border_style = dim if stale else "#777777" + + # Collect (role, role_style, data_dict, modules) entries + entries: list[tuple[str, str, dict[str, Any], str]] = [] + + coord = data.get("coordinator", {}) + entries.append(("coordinator", theme.BRIGHT_CYAN, coord, "")) + + for w in data.get("workers", []): + alive = w.get("alive", False) + wid = w.get("worker_id", "?") + role_style = theme.BRIGHT_GREEN if alive else theme.BRIGHT_RED + modules = ", ".join(w.get("modules", [])) or "" + entries.append((f"worker {wid}", role_style, w, modules)) + + # Per-metric max for relative coloring + ranges = _compute_ranges([d for _, _, d, _ in entries]) + + # Build inner content: sections separated by Rules + parts: list[RenderableType] = [] + for i, (role, rs, d, mods) in enumerate(entries): + if i > 0: + title = Text( + f" {role}: {mods} " if mods else f" {role} ", + style=dim if stale else rs, + ) + parts.append(Rule(title=title, style=border_style)) + parts.extend(self._make_lines(d, stale, ranges)) + + # First entry title goes on the Panel itself + first_role, first_rs, _, first_mods = entries[0] + panel_title = Text( + f" {first_role}: {first_mods} " if first_mods else f" {first_role} ", + style=dim if stale else first_rs, + ) + + panel = Panel( + Group(*parts), + title=panel_title, + border_style=border_style, + ) + self.query_one("#panels", Static).update(panel) + + @staticmethod + def _make_lines( + d: dict[str, Any], + stale: bool, + ranges: dict[str, tuple[float, float]], + ) -> list[Text]: + dim = "#606060" + label1_style = dim if stale else theme.WHITE + label2_style = label1_style + + # Line 1 + line1 = Text() + for label, key, fmt in _LINE1: + val = d.get(key, 0) + lo, hi = ranges[key] + # CPU% uses absolute 0-100 scale; everything else is relative + if key == "cpu_percent": + val_style = dim if stale else _heat(min(val / 100.0, 1.0)) + else: + val_style = dim if stale else _rel_style(val, lo, hi) + line1.append(f"{label} ", style=label1_style) + line1.append(fmt(val), style=val_style) + # CPU bar right after CPU% + if key == "cpu_percent": + line1.append(" ") + if stale: + line1.append("░" * 12, style=dim) + else: + line1.append_text(_bar(val, 100)) + line1.append(" ") + + # Line 2 + line2 = Text() + for _i, (label, key, fmt) in enumerate(_LINE2): + val = d.get(key, 0) + lo, hi = ranges[key] + val_style = dim if stale else _rel_style(val, lo, hi) + line2.append(f"{label} ", style=label2_style) + line2.append(fmt(val), style=val_style) + line2.append(" ") + + # IO r/w — compound field + io_r = d.get(_IO_KEYS[0], 0) + io_w = d.get(_IO_KEYS[1], 0) + lo_r, hi_r = ranges[_IO_KEYS[0]] + lo_w, hi_w = ranges[_IO_KEYS[1]] + line2.append("IO r/w ", style=label2_style) + line2.append(_fmt_io(io_r), style=dim if stale else _rel_style(io_r, lo_r, hi_r)) + line2.append("/", style=label2_style) + line2.append(_fmt_io(io_w), style=dim if stale else _rel_style(io_w, lo_w, hi_w)) + + return [line1, line2] + + +# --------------------------------------------------------------------------- +# Preview +# --------------------------------------------------------------------------- + +_PREVIEW_DATA: dict[str, Any] = { + "coordinator": { + "cpu_percent": 12.3, + "pss": 47_400_000, + "num_threads": 4, + "num_children": 0, + "num_fds": 32, + "cpu_time_user": 1.2, + "cpu_time_system": 0.3, + "cpu_time_iowait": 0.0, + "io_read_bytes": 12_582_912, + "io_write_bytes": 4_194_304, + "pid": 1234, + }, + "workers": [ + { + "worker_id": 0, + "alive": True, + "modules": ["nav", "lidar"], + "cpu_percent": 34.0, + "pss": 125_829_120, + "num_threads": 8, + "num_children": 2, + "num_fds": 64, + "cpu_time_user": 5.1, + "cpu_time_system": 1.0, + "cpu_time_iowait": 0.2, + "io_read_bytes": 47_185_920, + "io_write_bytes": 12_582_912, + "pid": 1235, + }, + { + "worker_id": 1, + "alive": False, + "modules": ["vision"], + "cpu_percent": 87.0, + "pss": 536_870_912, + "num_threads": 16, + "num_children": 1, + "num_fds": 128, + "cpu_time_user": 42.5, + "cpu_time_system": 8.3, + "cpu_time_iowait": 1.1, + "io_read_bytes": 1_073_741_824, + "io_write_bytes": 536_870_912, + "pid": 1236, + }, + ], +} + + +def _preview() -> None: + """Print a static preview with fake data (no LCM needed).""" + from rich.console import Console + + data = _PREVIEW_DATA + border_style = "#555555" + + entries: list[tuple[str, str, dict[str, Any], str]] = [] + entries.append(("coordinator", theme.BRIGHT_CYAN, data["coordinator"], "")) + for w in data["workers"]: + rs = theme.BRIGHT_GREEN if w.get("alive") else theme.BRIGHT_RED + mods = ", ".join(w.get("modules", [])) + entries.append((f"worker {w['worker_id']}", rs, w, mods)) + + ranges = _compute_ranges([d for _, _, d, _ in entries]) + + parts: list[RenderableType] = [] + for i, (role, rs, d, mods) in enumerate(entries): + if i > 0: + title = Text(f" {role}: {mods} " if mods else f" {role} ", style=rs) + parts.append(Rule(title=title, style=border_style)) + parts.extend(ResourceSpyApp._make_lines(d, stale=False, ranges=ranges)) + + first_role, first_rs, _, first_mods = entries[0] + panel_title = Text(f" {first_role} ", style=first_rs) + Console().print(Panel(Group(*parts), title=panel_title, border_style=border_style)) + + +def main() -> None: + import sys + + if "--preview" in sys.argv: + _preview() + return + + topic = "/dimos/resource_stats" + if len(sys.argv) > 1 and sys.argv[1] == "--topic" and len(sys.argv) > 2: + topic = sys.argv[2] + + ResourceSpyApp(topic_name=topic).run() + + +if __name__ == "__main__": + main() diff --git a/dimos/utils/decorators/__init__.py b/dimos/utils/decorators/__init__.py index 79623922a0..d0f91a4939 100644 --- a/dimos/utils/decorators/__init__.py +++ b/dimos/utils/decorators/__init__.py @@ -1,7 +1,7 @@ """Decorators and accumulators for rate limiting and other utilities.""" from .accumulators import Accumulator, LatestAccumulator, RollingAverageAccumulator -from .decorators import CachedMethod, limit, retry, simple_mcache +from .decorators import CachedMethod, limit, retry, simple_mcache, ttl_cache __all__ = [ "Accumulator", @@ -11,4 +11,5 @@ "limit", "retry", "simple_mcache", + "ttl_cache", ] diff --git a/dimos/utils/decorators/decorators.py b/dimos/utils/decorators/decorators.py index ab3ed21e3f..c5626fcaf7 100644 --- a/dimos/utils/decorators/decorators.py +++ b/dimos/utils/decorators/decorators.py @@ -13,15 +13,17 @@ # limitations under the License. from collections.abc import Callable -from functools import wraps +from functools import update_wrapper, wraps import threading import time -from typing import Any, Protocol, TypeVar, cast +from typing import Any, Generic, ParamSpec, Protocol, TypeVar, cast from .accumulators import Accumulator, LatestAccumulator _CacheResult_co = TypeVar("_CacheResult_co", covariant=True) _CacheReturn = TypeVar("_CacheReturn") +_P = ParamSpec("_P") +_F = TypeVar("_F", bound=Callable[..., Any]) class CachedMethod(Protocol[_CacheResult_co]): @@ -166,6 +168,48 @@ def invalidate_cache(instance: Any) -> None: return cast("CachedMethod[_CacheReturn]", getter) +class _TtlCacheWrapper(Generic[_P, _CacheReturn]): + """Wrapper returned by :func:`ttl_cache`.""" + + cache: dict[tuple[Any, ...], tuple[float, _CacheReturn]] + + def __init__(self, func: Callable[_P, _CacheReturn], seconds: float) -> None: + self._func = func + self._seconds = seconds + self.cache = {} + update_wrapper(self, func) + + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _CacheReturn: + self.pop_expired() + if args in self.cache: + _, val = self.cache[args] + return val + result = self._func(*args, **kwargs) + self.cache[args] = (time.monotonic(), result) + return result + + def pop_expired(self) -> None: + """Remove all expired entries from the cache.""" + now = time.monotonic() + expired = [k for k, (ts, _) in self.cache.items() if now - ts >= self._seconds] + for k in expired: + del self.cache[k] + + +def ttl_cache( + seconds: float, +) -> Callable[[Callable[_P, _CacheReturn]], _TtlCacheWrapper[_P, _CacheReturn]]: + """Cache function results by positional args with a time-to-live. + + Expired entries are swept on each access. + """ + + def decorator(func: Callable[_P, _CacheReturn]) -> _TtlCacheWrapper[_P, _CacheReturn]: + return _TtlCacheWrapper(func, seconds) + + return decorator + + def retry(max_retries: int = 3, on_exception: type[Exception] = Exception, delay: float = 0.0): # type: ignore[no-untyped-def] """ Decorator that retries a function call if it raises an exception. diff --git a/dimos/utils/decorators/test_decorators.py b/dimos/utils/decorators/test_decorators.py index a40a806a80..98545a2e37 100644 --- a/dimos/utils/decorators/test_decorators.py +++ b/dimos/utils/decorators/test_decorators.py @@ -16,7 +16,7 @@ import pytest -from dimos.utils.decorators import RollingAverageAccumulator, limit, retry, simple_mcache +from dimos.utils.decorators import RollingAverageAccumulator, limit, retry, simple_mcache, ttl_cache def test_limit() -> None: @@ -316,3 +316,77 @@ def expensive(self) -> int: obj1.expensive.invalidate_cache(obj1) assert obj1.expensive() == 3 assert obj2.expensive() == 2 # still cached + + +def test_ttl_cache_returns_cached_value() -> None: + """Test that ttl_cache returns cached results within TTL.""" + call_count = 0 + + @ttl_cache(1.0) + def expensive(x: int) -> int: + nonlocal call_count + call_count += 1 + return x * 2 + + assert expensive(5) == 10 + assert call_count == 1 + + # Second call should be cached + assert expensive(5) == 10 + assert call_count == 1 + + # Different args should compute + assert expensive(3) == 6 + assert call_count == 2 + + +def test_ttl_cache_expires() -> None: + """Test that ttl_cache recomputes after TTL expires.""" + call_count = 0 + + @ttl_cache(0.05) + def expensive(x: int) -> int: + nonlocal call_count + call_count += 1 + return x * 2 + + assert expensive(5) == 10 + assert call_count == 1 + + time.sleep(0.1) + + # Should recompute after TTL + assert expensive(5) == 10 + assert call_count == 2 + + +def test_ttl_cache_sweep_on_access() -> None: + """Test that expired entries are swept on the next access.""" + + @ttl_cache(0.05) + def expensive(x: int) -> int: + return x * 2 + + expensive(1) + expensive(2) + assert len(expensive.cache) == 2 + time.sleep(0.1) + + # Next call sweeps expired entries + expensive(3) + assert (1,) not in expensive.cache + assert (2,) not in expensive.cache + assert (3,) in expensive.cache + + +def test_ttl_cache_manual_cache_cleanup() -> None: + """Test that evict() removes a specific cache entry.""" + + @ttl_cache(10.0) + def expensive(x: int) -> int: + return x * 2 + + expensive(1) + assert (1,) in expensive.cache + del expensive.cache[(1,)] + assert (1,) not in expensive.cache diff --git a/pyproject.toml b/pyproject.toml index fde9b90c29..248678ce1f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,7 @@ dependencies = [ "rerun-sdk>=0.20.0", "toolz>=1.1.0", "protobuf>=6.33.5,<7", + "psutil>=7.0.0", ] @@ -76,6 +77,7 @@ humancli = "dimos.utils.cli.human.humanclianim:main" dimos = "dimos.robot.cli.dimos:main" rerun-bridge = "dimos.visualization.rerun.bridge:app" doclinks = "dimos.utils.docs.doclinks:main" +dtop = "dimos.utils.cli.dtop:main" [project.optional-dependencies] misc = [ diff --git a/uv.lock b/uv.lock index bd25dd1d10..2f53ef0e6f 100644 --- a/uv.lock +++ b/uv.lock @@ -1695,6 +1695,7 @@ dependencies = [ { name = "plotext" }, { name = "plum-dispatch" }, { name = "protobuf" }, + { name = "psutil" }, { name = "pydantic" }, { name = "pydantic-settings" }, { name = "python-dotenv" }, @@ -2048,6 +2049,7 @@ requires-dist = [ { name = "portal", marker = "extra == 'misc'" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = "==4.2.0" }, { name = "protobuf", specifier = ">=6.33.5,<7" }, + { name = "psutil", specifier = ">=7.0.0" }, { name = "psycopg2-binary", marker = "extra == 'psql'", specifier = ">=2.9.11" }, { name = "py-spy", marker = "extra == 'dev'" }, { name = "pydantic" },