From 601aeeb40b1209348219b1da785bc1bbafb39a85 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 28 Feb 2026 15:46:00 +0800 Subject: [PATCH 01/23] feat(core): add per-worker resource monitor with dtop TUI Adds psutil-based resource monitoring to ModuleCoordinator.loop(), collecting CPU, memory (PSS/USS/RSS/VMS), threads, children, FDs, and IO stats per worker process every 2s. Stats are published over LCM and viewable with the new `dtop` CLI tool. - WorkerStats dataclass and collect_stats() on Worker/WorkerManager - ResourceLogger protocol with LCM and structlog implementations - dtop: live Textual TUI subscribing to /dimos/resource_stats - psutil added as explicit dependency - Bump smart blueprint to 7 workers --- dimos/core/module_coordinator.py | 38 +++- dimos/core/resource_logger.py | 78 +++++++ dimos/core/test_worker.py | 29 +++ dimos/core/worker.py | 84 ++++++++ dimos/core/worker_manager.py | 14 +- .../go2/blueprints/smart/unitree_go2.py | 2 +- dimos/utils/cli/dtop.py | 193 ++++++++++++++++++ pyproject.toml | 2 + uv.lock | 2 + 9 files changed, 432 insertions(+), 10 deletions(-) create mode 100644 dimos/core/resource_logger.py create mode 100644 dimos/utils/cli/dtop.py diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index d816a60cb4..57ae7ae67c 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -12,17 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + from concurrent.futures import ThreadPoolExecutor +import os import time from typing import TYPE_CHECKING, Any from dimos.core.global_config import GlobalConfig, global_config -from dimos.core.module import Module, ModuleT from dimos.core.resource import Resource +from dimos.core.resource_logger import LCMResourceLogger, ResourceLogger +from dimos.core.worker import WorkerStats, collect_process_stats from dimos.core.worker_manager import WorkerManager from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: + from dimos.core.module import Module, ModuleT from dimos.core.rpc_client import ModuleProxy logger = setup_logger() @@ -33,7 +38,7 @@ class ModuleCoordinator(Resource): # type: ignore[misc] _global_config: GlobalConfig _n: int | None = None _memory_limit: str = "auto" - _deployed_modules: dict[type[Module], "ModuleProxy"] + _deployed_modules: dict[type[Module], ModuleProxy] def __init__( self, @@ -61,7 +66,7 @@ def stop(self) -> None: self._client.close_all() # type: ignore[union-attr] - def deploy(self, module_class: type[ModuleT], *args, **kwargs) -> "ModuleProxy": # type: ignore[no-untyped-def] + def deploy(self, module_class: type[ModuleT], *args, **kwargs) -> ModuleProxy: # type: ignore[no-untyped-def] if not self._client: raise ValueError("Trying to dimos.deploy before the client has started") @@ -71,7 +76,7 @@ def deploy(self, module_class: type[ModuleT], *args, **kwargs) -> "ModuleProxy": def deploy_parallel( self, module_specs: list[tuple[type[ModuleT], tuple[Any, ...], dict[str, Any]]] - ) -> list["ModuleProxy"]: + ) -> list[ModuleProxy]: if not self._client: raise ValueError("Not started") @@ -94,14 +99,35 @@ def start_all_modules(self) -> None: if hasattr(module, "on_system_modules"): module.on_system_modules(module_list) - def get_instance(self, module: type[ModuleT]) -> "ModuleProxy": + def get_instance(self, module: type[ModuleT]) -> ModuleProxy: return self._deployed_modules.get(module) # type: ignore[return-value, no-any-return] - def loop(self) -> None: + def loop( + self, + resource_logger: ResourceLogger | None = None, + monitor_interval: float = 2.0, + ) -> None: + _logger: ResourceLogger = resource_logger or LCMResourceLogger() + coordinator_pid = os.getpid() + # Prime cpu_percent so the first real reading isn't 0.0. + collect_process_stats(coordinator_pid) + + last_monitor = time.monotonic() try: while True: time.sleep(0.1) + now = time.monotonic() + if now - last_monitor >= monitor_interval: + last_monitor = now + self._log_resource_stats(_logger, coordinator_pid) except KeyboardInterrupt: return finally: self.stop() + + def _log_resource_stats(self, _logger: ResourceLogger, coordinator_pid: int) -> None: + coordinator = collect_process_stats(coordinator_pid) + workers: list[WorkerStats] = [] + if self._client is not None: + workers = self._client.collect_stats() + _logger.log_stats(coordinator, workers) diff --git a/dimos/core/resource_logger.py b/dimos/core/resource_logger.py new file mode 100644 index 0000000000..f16cc76f5c --- /dev/null +++ b/dimos/core/resource_logger.py @@ -0,0 +1,78 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import asdict +from typing import TYPE_CHECKING, Any, Protocol + +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.core.worker import WorkerStats + +logger = setup_logger() + + +class ResourceLogger(Protocol): + def log_stats(self, coordinator: WorkerStats, workers: list[WorkerStats]) -> None: ... + + +class StructlogResourceLogger: + """Default implementation — logs resource stats via structlog info.""" + + def log_stats(self, coordinator: WorkerStats, workers: list[WorkerStats]) -> None: + logger.info( + "coordinator", + pid=coordinator.pid, + cpu_pct=coordinator.cpu_percent, + pss_mb=round(coordinator.pss_mb, 1), + rss_mb=round(coordinator.rss_mb, 1), + threads=coordinator.num_threads, + ) + for w in workers: + logger.info( + "worker", + worker_id=w.worker_id, + pid=w.pid, + alive=w.alive, + cpu_pct=w.cpu_percent, + pss_mb=round(w.pss_mb, 1), + rss_mb=round(w.rss_mb, 1), + vms_mb=round(w.vms_mb, 1), + threads=w.num_threads, + children=w.num_children, + fds=w.num_fds, + io_r_mb=round(w.io_read_mb, 1), + io_w_mb=round(w.io_write_mb, 1), + modules=w.modules, + ) + + +class LCMResourceLogger: + """Publishes resource stats as dicts over a pickle LCM channel.""" + + def __init__(self, topic: str = "/dimos/resource_stats") -> None: + from dimos.core.transport import pLCMTransport + + self._transport: pLCMTransport[dict[str, Any]] = pLCMTransport(topic) + + def log_stats(self, coordinator: WorkerStats, workers: list[WorkerStats]) -> None: + self._transport.broadcast( + None, + { + "coordinator": asdict(coordinator), + "workers": [asdict(w) for w in workers], + }, + ) diff --git a/dimos/core/test_worker.py b/dimos/core/test_worker.py index 8c75f41222..a619d254e7 100644 --- a/dimos/core/test_worker.py +++ b/dimos/core/test_worker.py @@ -165,6 +165,35 @@ def test_worker_manager_parallel_deployment(create_worker_manager): module3.stop() +@pytest.mark.slow +def test_collect_stats(create_worker_manager): + manager = create_worker_manager(n_workers=2) + module1 = manager.deploy(SimpleModule) + module2 = manager.deploy(AnotherModule) + module1.start() + module2.start() + + stats = manager.collect_stats() + assert len(stats) == 2 + + for s in stats: + assert s.alive is True + assert s.pid > 0 + assert s.rss_mb > 0 + assert s.num_threads >= 1 + assert s.num_fds >= 0 + assert s.io_read_mb >= 0 + assert s.io_write_mb >= 0 + + # At least one worker should report module names + all_modules = [name for s in stats for name in s.modules] + assert "SimpleModule" in all_modules + assert "AnotherModule" in all_modules + + module1.stop() + module2.stop() + + @pytest.mark.slow def test_worker_pool_modules_share_workers(create_worker_manager): manager = create_worker_manager(n_workers=1) diff --git a/dimos/core/worker.py b/dimos/core/worker.py index bb2ede4e03..284553123e 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -14,11 +14,14 @@ from __future__ import annotations +from dataclasses import dataclass, field import multiprocessing as mp import threading import traceback from typing import TYPE_CHECKING, Any +import psutil + from dimos.utils.logging_config import setup_logger from dimos.utils.sequential_ids import SequentialIds @@ -27,6 +30,79 @@ from dimos.core.module import ModuleT +_MB = 1024 * 1024 + + +@dataclass(frozen=True) +class WorkerStats: + pid: int + worker_id: int + alive: bool + cpu_percent: float = 0.0 + cpu_time_user: float = 0.0 + cpu_time_system: float = 0.0 + cpu_time_iowait: float = 0.0 + pss_mb: float = 0.0 + uss_mb: float = 0.0 + rss_mb: float = 0.0 + vms_mb: float = 0.0 + num_threads: int = 0 + num_children: int = 0 + num_fds: int = 0 + io_read_mb: float = 0.0 + io_write_mb: float = 0.0 + modules: list[str] = field(default_factory=list) + + +def collect_process_stats( + pid: int, worker_id: int = -1, modules: list[str] | None = None +) -> WorkerStats: + """Collect resource stats for a single process by PID.""" + try: + proc = psutil.Process(pid) + with proc.oneshot(): + cpu_pct = proc.cpu_percent(interval=None) + ct = proc.cpu_times() + mem_basic = proc.memory_info() + try: + mem_full = proc.memory_full_info() + pss = getattr(mem_full, "pss", 0) / _MB + uss = getattr(mem_full, "uss", 0) / _MB + except (psutil.AccessDenied, AttributeError): + pss = uss = 0.0 + try: + io = proc.io_counters() + io_r = io.read_bytes / _MB + io_w = io.write_bytes / _MB + except (psutil.AccessDenied, AttributeError): + io_r = io_w = 0.0 + try: + fds = proc.num_fds() + except (psutil.AccessDenied, AttributeError): + fds = 0 + return WorkerStats( + pid=pid, + worker_id=worker_id, + alive=True, + cpu_percent=cpu_pct, + cpu_time_user=ct.user, + cpu_time_system=ct.system, + cpu_time_iowait=getattr(ct, "iowait", 0.0), + pss_mb=pss, + uss_mb=uss, + rss_mb=mem_basic.rss / _MB, + vms_mb=mem_basic.vms / _MB, + num_threads=proc.num_threads(), + num_children=len(proc.children(recursive=True)), + num_fds=fds, + io_read_mb=io_r, + io_write_mb=io_w, + modules=modules or [], + ) + except (psutil.NoSuchProcess, psutil.AccessDenied): + return WorkerStats(pid=pid, worker_id=worker_id, alive=False, modules=modules or []) + + logger = setup_logger() @@ -155,6 +231,14 @@ def __init__(self) -> None: def module_count(self) -> int: return len(self._modules) + self._reserved + def collect_stats(self) -> WorkerStats: + """Collect resource stats for this worker's process.""" + modules = [actor._cls.__name__ for actor in self._modules.values()] + if self._process is not None and self._process.is_alive(): + pid: int = self._process.pid # type: ignore[assignment] + return collect_process_stats(pid, self._worker_id, modules) + return WorkerStats(pid=0, worker_id=self._worker_id, alive=False, modules=modules) + def reserve_slot(self) -> None: """Reserve a slot so _select_worker() sees the pending load.""" self._reserved += 1 diff --git a/dimos/core/worker_manager.py b/dimos/core/worker_manager.py index d80b431c50..f9dddd6e4e 100644 --- a/dimos/core/worker_manager.py +++ b/dimos/core/worker_manager.py @@ -12,14 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + from concurrent.futures import ThreadPoolExecutor -from typing import Any +from typing import TYPE_CHECKING, Any -from dimos.core.module import ModuleT from dimos.core.rpc_client import RPCClient -from dimos.core.worker import Worker +from dimos.core.worker import Worker, WorkerStats from dimos.utils.logging_config import setup_logger +if TYPE_CHECKING: + from dimos.core.module import ModuleT + logger = setup_logger() @@ -86,6 +90,10 @@ def _deploy( return results + def collect_stats(self) -> list[WorkerStats]: + """Collect resource stats for all worker processes.""" + return [w.collect_stats() for w in self._workers] + def close_all(self) -> None: if self._closed: return diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py index c38e0eefa5..22743ac135 100644 --- a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py @@ -26,6 +26,6 @@ cost_mapper(), replanning_a_star_planner(), wavefront_frontier_explorer(), -).global_config(n_workers=6, robot_model="unitree_go2") +).global_config(n_workers=7, robot_model="unitree_go2") __all__ = ["unitree_go2"] diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py new file mode 100644 index 0000000000..b4f0a99f5c --- /dev/null +++ b/dimos/utils/cli/dtop.py @@ -0,0 +1,193 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Live TUI for per-worker resource stats over LCM. + +Usage: + uv run python -m dimos.utils.cli.resourcespy [--topic /dimos/resource_stats] +""" + +from __future__ import annotations + +import threading +from typing import Any + +from rich.text import Text +from textual.app import App, ComposeResult +from textual.color import Color +from textual.widgets import DataTable + +from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM, Topic +from dimos.utils.cli import theme + + +def _bar(value: float, max_val: float, width: int = 12) -> Text: + """Render a tiny colored bar.""" + ratio = min(value / max_val, 1.0) if max_val > 0 else 0.0 + filled = int(ratio * width) + cyan = Color.parse(theme.CYAN) + yellow = Color.parse(theme.YELLOW) + color = cyan.blend(yellow, ratio).hex + return Text("█" * filled + "░" * (width - filled), style=color) + + +def _fmt_mb(val: float) -> Text: + if val >= 1024: + return Text(f"{val / 1024:.1f} GB", style=theme.BRIGHT_YELLOW) + return Text(f"{val:.1f} MB", style=theme.WHITE) + + +def _fmt_pct(val: float) -> Text: + cyan = Color.parse(theme.CYAN) + yellow = Color.parse(theme.YELLOW) + color = cyan.blend(yellow, min(val / 100.0, 1.0)).hex + return Text(f"{val:.0f}%", style=color) + + +def _fmt_time(seconds: float) -> Text: + if seconds >= 3600: + return Text(f"{seconds / 3600:.1f}h", style=theme.WHITE) + if seconds >= 60: + return Text(f"{seconds / 60:.1f}m", style=theme.WHITE) + return Text(f"{seconds:.1f}s", style=theme.WHITE) + + +class ResourceSpyApp(App): # type: ignore[type-arg] + CSS_PATH = "dimos.tcss" + + TITLE = "" + SHOW_TREE = False + + CSS = f""" + Screen {{ + layout: vertical; + background: {theme.BACKGROUND}; + }} + DataTable {{ + height: 1fr; + border: solid {theme.BORDER}; + background: {theme.BG}; + scrollbar-size: 0 0; + }} + DataTable > .datatable--header {{ + color: {theme.ACCENT}; + background: transparent; + }} + """ + + BINDINGS = [("q", "quit"), ("ctrl+c", "quit")] + + def __init__(self, topic_name: str = "/dimos/resource_stats") -> None: + super().__init__() + self._topic_name = topic_name + self._lcm = PickleLCM(autoconf=True) + self._lock = threading.Lock() + self._latest: dict[str, Any] | None = None + self._snap_count = 0 + + def compose(self) -> ComposeResult: + table: DataTable = DataTable(zebra_stripes=True, cursor_type=None) # type: ignore[type-arg, arg-type] + table.add_column("Role", width=14) + table.add_column("PID", width=8) + table.add_column("CPU %", width=8) + table.add_column("CPU bar", width=14) + table.add_column("User", width=8) + table.add_column("Sys", width=8) + table.add_column("IOw", width=8) + table.add_column("PSS", width=10) + table.add_column("USS", width=10) + table.add_column("RSS", width=10) + table.add_column("VMS", width=10) + table.add_column("Thr", width=5) + table.add_column("Ch", width=5) + table.add_column("FDs", width=5) + table.add_column("IO R", width=10) + table.add_column("IO W", width=10) + table.add_column("Modules", width=30) + yield table + + def on_mount(self) -> None: + self._lcm.subscribe(Topic(self._topic_name), self._on_msg) + self._lcm.start() + self.set_interval(1.0, self._refresh) + + async def on_unmount(self) -> None: + self._lcm.stop() + + def _on_msg(self, msg: dict[str, Any], _topic: str) -> None: + with self._lock: + self._latest = msg + self._snap_count += 1 + + def _refresh(self) -> None: + with self._lock: + data = self._latest + + if data is None: + return + + table = self.query_one(DataTable) + table.clear(columns=False) + + coord = data.get("coordinator", {}) + self._add_row(table, "coordinator", theme.BRIGHT_CYAN, coord, "—") + + for w in data.get("workers", []): + alive = w.get("alive", False) + wid = w.get("worker_id", "?") + role_style = theme.BRIGHT_GREEN if alive else theme.BRIGHT_RED + modules = ", ".join(w.get("modules", [])) or "—" + self._add_row(table, f"worker {wid}", role_style, w, modules) + + @staticmethod + def _add_row( + table: DataTable, # type: ignore[type-arg] + role: str, + role_style: str, + d: dict[str, Any], + modules: str, + ) -> None: + table.add_row( + Text(role, style=role_style), + Text(str(d.get("pid", "?")), style=theme.DIM), + _fmt_pct(d.get("cpu_percent", 0)), + _bar(d.get("cpu_percent", 0), 100), + _fmt_time(d.get("cpu_time_user", 0)), + _fmt_time(d.get("cpu_time_system", 0)), + _fmt_time(d.get("cpu_time_iowait", 0)), + _fmt_mb(d.get("pss_mb", 0)), + _fmt_mb(d.get("uss_mb", 0)), + _fmt_mb(d.get("rss_mb", 0)), + _fmt_mb(d.get("vms_mb", 0)), + Text(str(d.get("num_threads", 0)), style=theme.WHITE), + Text(str(d.get("num_children", 0)), style=theme.WHITE), + Text(str(d.get("num_fds", 0)), style=theme.WHITE), + _fmt_mb(d.get("io_read_mb", 0)), + _fmt_mb(d.get("io_write_mb", 0)), + Text(modules, style=theme.BRIGHT_BLUE), + ) + + +def main() -> None: + import sys + + topic = "/dimos/resource_stats" + if len(sys.argv) > 1 and sys.argv[1] == "--topic" and len(sys.argv) > 2: + topic = sys.argv[2] + + ResourceSpyApp(topic_name=topic).run() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 84ba5ea25c..ea64a2fb94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,7 @@ dependencies = [ "rerun-sdk>=0.20.0", "toolz>=1.1.0", "protobuf>=6.33.5,<7", + "psutil>=7.0.0", ] @@ -76,6 +77,7 @@ humancli = "dimos.utils.cli.human.humanclianim:main" dimos = "dimos.robot.cli.dimos:main" rerun-bridge = "dimos.visualization.rerun.bridge:app" doclinks = "dimos.utils.docs.doclinks:main" +dtop = "dimos.utils.cli.dtop:main" [project.optional-dependencies] misc = [ diff --git a/uv.lock b/uv.lock index bd25dd1d10..2f53ef0e6f 100644 --- a/uv.lock +++ b/uv.lock @@ -1695,6 +1695,7 @@ dependencies = [ { name = "plotext" }, { name = "plum-dispatch" }, { name = "protobuf" }, + { name = "psutil" }, { name = "pydantic" }, { name = "pydantic-settings" }, { name = "python-dotenv" }, @@ -2048,6 +2049,7 @@ requires-dist = [ { name = "portal", marker = "extra == 'misc'" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = "==4.2.0" }, { name = "protobuf", specifier = ">=6.33.5,<7" }, + { name = "psutil", specifier = ">=7.0.0" }, { name = "psycopg2-binary", marker = "extra == 'psql'", specifier = ">=2.9.11" }, { name = "py-spy", marker = "extra == 'dev'" }, { name = "pydantic" }, From cd3615d8383edce3da3c8a1d8bcf6b4c1bb6d6f9 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 28 Feb 2026 17:18:38 +0800 Subject: [PATCH 02/23] fix(dtop): fix cpu_percent always-zero bug and improve TUI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cache psutil.Process objects across snapshots so cpu_percent(interval=None) has a previous sample to diff against. Fix wrong module name in docstring, remove dead _snap_count state, and extend color gradient to cyan→yellow→red. --- dimos/core/worker.py | 9 ++++++++- dimos/utils/cli/dtop.py | 24 +++++++++++++----------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/dimos/core/worker.py b/dimos/core/worker.py index 284553123e..2555c02b02 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -32,6 +32,9 @@ _MB = 1024 * 1024 +# Cache Process objects so cpu_percent(interval=None) has a previous sample. +_proc_cache: dict[int, psutil.Process] = {} + @dataclass(frozen=True) class WorkerStats: @@ -59,7 +62,10 @@ def collect_process_stats( ) -> WorkerStats: """Collect resource stats for a single process by PID.""" try: - proc = psutil.Process(pid) + proc = _proc_cache.get(pid) + if proc is None or not proc.is_running(): + proc = psutil.Process(pid) + _proc_cache[pid] = proc with proc.oneshot(): cpu_pct = proc.cpu_percent(interval=None) ct = proc.cpu_times() @@ -100,6 +106,7 @@ def collect_process_stats( modules=modules or [], ) except (psutil.NoSuchProcess, psutil.AccessDenied): + _proc_cache.pop(pid, None) return WorkerStats(pid=pid, worker_id=worker_id, alive=False, modules=modules or []) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index b4f0a99f5c..cb828b470d 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -15,7 +15,7 @@ """Live TUI for per-worker resource stats over LCM. Usage: - uv run python -m dimos.utils.cli.resourcespy [--topic /dimos/resource_stats] + uv run python -m dimos.utils.cli.dtop [--topic /dimos/resource_stats] """ from __future__ import annotations @@ -32,14 +32,21 @@ from dimos.utils.cli import theme +def _heat(ratio: float) -> str: + """Map 0..1 ratio to a cyan → yellow → red gradient.""" + cyan = Color.parse(theme.CYAN) + yellow = Color.parse(theme.YELLOW) + red = Color.parse(theme.RED) + if ratio <= 0.5: + return cyan.blend(yellow, ratio * 2).hex + return yellow.blend(red, (ratio - 0.5) * 2).hex + + def _bar(value: float, max_val: float, width: int = 12) -> Text: """Render a tiny colored bar.""" ratio = min(value / max_val, 1.0) if max_val > 0 else 0.0 filled = int(ratio * width) - cyan = Color.parse(theme.CYAN) - yellow = Color.parse(theme.YELLOW) - color = cyan.blend(yellow, ratio).hex - return Text("█" * filled + "░" * (width - filled), style=color) + return Text("█" * filled + "░" * (width - filled), style=_heat(ratio)) def _fmt_mb(val: float) -> Text: @@ -49,10 +56,7 @@ def _fmt_mb(val: float) -> Text: def _fmt_pct(val: float) -> Text: - cyan = Color.parse(theme.CYAN) - yellow = Color.parse(theme.YELLOW) - color = cyan.blend(yellow, min(val / 100.0, 1.0)).hex - return Text(f"{val:.0f}%", style=color) + return Text(f"{val:.0f}%", style=_heat(min(val / 100.0, 1.0))) def _fmt_time(seconds: float) -> Text: @@ -94,7 +98,6 @@ def __init__(self, topic_name: str = "/dimos/resource_stats") -> None: self._lcm = PickleLCM(autoconf=True) self._lock = threading.Lock() self._latest: dict[str, Any] | None = None - self._snap_count = 0 def compose(self) -> ComposeResult: table: DataTable = DataTable(zebra_stripes=True, cursor_type=None) # type: ignore[type-arg, arg-type] @@ -128,7 +131,6 @@ async def on_unmount(self) -> None: def _on_msg(self, msg: dict[str, Any], _topic: str) -> None: with self._lock: self._latest = msg - self._snap_count += 1 def _refresh(self) -> None: with self._lock: From 3e0a5ef2f0ea0c2543cc921e85a06be29ad12e07 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sat, 28 Feb 2026 17:24:03 +0800 Subject: [PATCH 03/23] faster interval --- dimos/core/module_coordinator.py | 2 +- dimos/utils/cli/dtop.py | 56 +++++++++++++++++++++----------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index 57ae7ae67c..9060c1081d 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -105,7 +105,7 @@ def get_instance(self, module: type[ModuleT]) -> ModuleProxy: def loop( self, resource_logger: ResourceLogger | None = None, - monitor_interval: float = 2.0, + monitor_interval: float = 0.5, ) -> None: _logger: ResourceLogger = resource_logger or LCMResourceLogger() coordinator_pid = os.getpid() diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index cb828b470d..e84b2a8f54 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -21,6 +21,7 @@ from __future__ import annotations import threading +import time from typing import Any from rich.text import Text @@ -98,6 +99,7 @@ def __init__(self, topic_name: str = "/dimos/resource_stats") -> None: self._lcm = PickleLCM(autoconf=True) self._lock = threading.Lock() self._latest: dict[str, Any] | None = None + self._last_msg_time: float = 0.0 def compose(self) -> ComposeResult: table: DataTable = DataTable(zebra_stripes=True, cursor_type=None) # type: ignore[type-arg, arg-type] @@ -131,26 +133,30 @@ async def on_unmount(self) -> None: def _on_msg(self, msg: dict[str, Any], _topic: str) -> None: with self._lock: self._latest = msg + self._last_msg_time = time.monotonic() def _refresh(self) -> None: with self._lock: data = self._latest + last_msg = self._last_msg_time if data is None: return + stale = (time.monotonic() - last_msg) > 2.0 + table = self.query_one(DataTable) table.clear(columns=False) coord = data.get("coordinator", {}) - self._add_row(table, "coordinator", theme.BRIGHT_CYAN, coord, "—") + self._add_row(table, "coordinator", theme.BRIGHT_CYAN, coord, "—", stale) for w in data.get("workers", []): alive = w.get("alive", False) wid = w.get("worker_id", "?") role_style = theme.BRIGHT_GREEN if alive else theme.BRIGHT_RED modules = ", ".join(w.get("modules", [])) or "—" - self._add_row(table, f"worker {wid}", role_style, w, modules) + self._add_row(table, f"worker {wid}", role_style, w, modules, stale) @staticmethod def _add_row( @@ -159,25 +165,37 @@ def _add_row( role_style: str, d: dict[str, Any], modules: str, + stale: bool, ) -> None: + dim = "#606060" + s = dim if stale else None # override style when stale table.add_row( - Text(role, style=role_style), - Text(str(d.get("pid", "?")), style=theme.DIM), - _fmt_pct(d.get("cpu_percent", 0)), - _bar(d.get("cpu_percent", 0), 100), - _fmt_time(d.get("cpu_time_user", 0)), - _fmt_time(d.get("cpu_time_system", 0)), - _fmt_time(d.get("cpu_time_iowait", 0)), - _fmt_mb(d.get("pss_mb", 0)), - _fmt_mb(d.get("uss_mb", 0)), - _fmt_mb(d.get("rss_mb", 0)), - _fmt_mb(d.get("vms_mb", 0)), - Text(str(d.get("num_threads", 0)), style=theme.WHITE), - Text(str(d.get("num_children", 0)), style=theme.WHITE), - Text(str(d.get("num_fds", 0)), style=theme.WHITE), - _fmt_mb(d.get("io_read_mb", 0)), - _fmt_mb(d.get("io_write_mb", 0)), - Text(modules, style=theme.BRIGHT_BLUE), + Text(role, style=s or role_style), + Text(str(d.get("pid", "?")), style=s or theme.BRIGHT_BLACK), + Text( + f"{d.get('cpu_percent', 0):.0f}%", + style=s or _heat(min(d.get("cpu_percent", 0) / 100.0, 1.0)), + ), + _bar(d.get("cpu_percent", 0), 100) if not stale else Text("░" * 12, style=dim), + Text(_fmt_time(d.get("cpu_time_user", 0)).plain, style=s or theme.WHITE), + Text(_fmt_time(d.get("cpu_time_system", 0)).plain, style=s or theme.WHITE), + Text(_fmt_time(d.get("cpu_time_iowait", 0)).plain, style=s or theme.WHITE), + Text(_fmt_mb(d.get("pss_mb", 0)).plain, style=s or _fmt_mb(d.get("pss_mb", 0)).style), + Text(_fmt_mb(d.get("uss_mb", 0)).plain, style=s or _fmt_mb(d.get("uss_mb", 0)).style), + Text(_fmt_mb(d.get("rss_mb", 0)).plain, style=s or _fmt_mb(d.get("rss_mb", 0)).style), + Text(_fmt_mb(d.get("vms_mb", 0)).plain, style=s or _fmt_mb(d.get("vms_mb", 0)).style), + Text(str(d.get("num_threads", 0)), style=s or theme.WHITE), + Text(str(d.get("num_children", 0)), style=s or theme.WHITE), + Text(str(d.get("num_fds", 0)), style=s or theme.WHITE), + Text( + _fmt_mb(d.get("io_read_mb", 0)).plain, + style=s or _fmt_mb(d.get("io_read_mb", 0)).style, + ), + Text( + _fmt_mb(d.get("io_write_mb", 0)).plain, + style=s or _fmt_mb(d.get("io_write_mb", 0)).style, + ), + Text(modules, style=s or theme.BRIGHT_BLUE), ) From 1735bff2a6b9413821d2748a3d3e170a565dc350 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 12:43:47 +0800 Subject: [PATCH 04/23] need to refactor the resource monitor --- dimos/core/module_coordinator.py | 14 +++++------- dimos/core/resource_logger.py | 3 --- dimos/core/test_worker.py | 2 +- dimos/core/worker.py | 10 +-------- dimos/robot/unitree_webrtc/__init__.py | 31 -------------------------- dimos/utils/cli/dtop.py | 29 ++++++++---------------- 6 files changed, 16 insertions(+), 73 deletions(-) delete mode 100644 dimos/robot/unitree_webrtc/__init__.py diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index 9060c1081d..182ccc0eee 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -16,7 +16,7 @@ from concurrent.futures import ThreadPoolExecutor import os -import time +import threading from typing import TYPE_CHECKING, Any from dimos.core.global_config import GlobalConfig, global_config @@ -105,21 +105,17 @@ def get_instance(self, module: type[ModuleT]) -> ModuleProxy: def loop( self, resource_logger: ResourceLogger | None = None, - monitor_interval: float = 0.5, + monitor_interval: float = 1.0, ) -> None: _logger: ResourceLogger = resource_logger or LCMResourceLogger() coordinator_pid = os.getpid() # Prime cpu_percent so the first real reading isn't 0.0. collect_process_stats(coordinator_pid) - last_monitor = time.monotonic() + stop = threading.Event() try: - while True: - time.sleep(0.1) - now = time.monotonic() - if now - last_monitor >= monitor_interval: - last_monitor = now - self._log_resource_stats(_logger, coordinator_pid) + while not stop.wait(monitor_interval): + self._log_resource_stats(_logger, coordinator_pid) except KeyboardInterrupt: return finally: diff --git a/dimos/core/resource_logger.py b/dimos/core/resource_logger.py index f16cc76f5c..81074ee4b7 100644 --- a/dimos/core/resource_logger.py +++ b/dimos/core/resource_logger.py @@ -38,7 +38,6 @@ def log_stats(self, coordinator: WorkerStats, workers: list[WorkerStats]) -> Non pid=coordinator.pid, cpu_pct=coordinator.cpu_percent, pss_mb=round(coordinator.pss_mb, 1), - rss_mb=round(coordinator.rss_mb, 1), threads=coordinator.num_threads, ) for w in workers: @@ -49,8 +48,6 @@ def log_stats(self, coordinator: WorkerStats, workers: list[WorkerStats]) -> Non alive=w.alive, cpu_pct=w.cpu_percent, pss_mb=round(w.pss_mb, 1), - rss_mb=round(w.rss_mb, 1), - vms_mb=round(w.vms_mb, 1), threads=w.num_threads, children=w.num_children, fds=w.num_fds, diff --git a/dimos/core/test_worker.py b/dimos/core/test_worker.py index a619d254e7..8fa47bd1a5 100644 --- a/dimos/core/test_worker.py +++ b/dimos/core/test_worker.py @@ -179,7 +179,7 @@ def test_collect_stats(create_worker_manager): for s in stats: assert s.alive is True assert s.pid > 0 - assert s.rss_mb > 0 + assert s.pss_mb >= 0 assert s.num_threads >= 1 assert s.num_fds >= 0 assert s.io_read_mb >= 0 diff --git a/dimos/core/worker.py b/dimos/core/worker.py index 2555c02b02..73976cf027 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -46,9 +46,6 @@ class WorkerStats: cpu_time_system: float = 0.0 cpu_time_iowait: float = 0.0 pss_mb: float = 0.0 - uss_mb: float = 0.0 - rss_mb: float = 0.0 - vms_mb: float = 0.0 num_threads: int = 0 num_children: int = 0 num_fds: int = 0 @@ -69,13 +66,11 @@ def collect_process_stats( with proc.oneshot(): cpu_pct = proc.cpu_percent(interval=None) ct = proc.cpu_times() - mem_basic = proc.memory_info() try: mem_full = proc.memory_full_info() pss = getattr(mem_full, "pss", 0) / _MB - uss = getattr(mem_full, "uss", 0) / _MB except (psutil.AccessDenied, AttributeError): - pss = uss = 0.0 + pss = 0.0 try: io = proc.io_counters() io_r = io.read_bytes / _MB @@ -95,9 +90,6 @@ def collect_process_stats( cpu_time_system=ct.system, cpu_time_iowait=getattr(ct, "iowait", 0.0), pss_mb=pss, - uss_mb=uss, - rss_mb=mem_basic.rss / _MB, - vms_mb=mem_basic.vms / _MB, num_threads=proc.num_threads(), num_children=len(proc.children(recursive=True)), num_fds=fds, diff --git a/dimos/robot/unitree_webrtc/__init__.py b/dimos/robot/unitree_webrtc/__init__.py deleted file mode 100644 index 451aa53128..0000000000 --- a/dimos/robot/unitree_webrtc/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Compatibility package for legacy dimos.robot.unitree_webrtc imports.""" - -from importlib import import_module -import sys - -_ALIAS_MODULES = { - "demo_error_on_name_conflicts": "dimos.robot.unitree.demo_error_on_name_conflicts", - "keyboard_teleop": "dimos.robot.unitree.keyboard_teleop", - "mujoco_connection": "dimos.robot.unitree.mujoco_connection", - "type": "dimos.robot.unitree.type", - "unitree_g1_skill_container": "dimos.robot.unitree.g1.skill_container", - "unitree_skill_container": "dimos.robot.unitree.unitree_skill_container", -} - -for alias, target in _ALIAS_MODULES.items(): - sys.modules[f"{__name__}.{alias}"] = import_module(target) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index e84b2a8f54..d34b20144c 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -103,23 +103,19 @@ def __init__(self, topic_name: str = "/dimos/resource_stats") -> None: def compose(self) -> ComposeResult: table: DataTable = DataTable(zebra_stripes=True, cursor_type=None) # type: ignore[type-arg, arg-type] - table.add_column("Role", width=14) - table.add_column("PID", width=8) + table.add_column("Modules", width=30) table.add_column("CPU %", width=8) table.add_column("CPU bar", width=14) table.add_column("User", width=8) table.add_column("Sys", width=8) table.add_column("IOw", width=8) table.add_column("PSS", width=10) - table.add_column("USS", width=10) - table.add_column("RSS", width=10) - table.add_column("VMS", width=10) table.add_column("Thr", width=5) table.add_column("Ch", width=5) table.add_column("FDs", width=5) - table.add_column("IO R", width=10) - table.add_column("IO W", width=10) - table.add_column("Modules", width=30) + table.add_column("IO R/W", width=14) + table.add_column("Role", width=14) + table.add_column("PID", width=8) yield table def on_mount(self) -> None: @@ -170,8 +166,7 @@ def _add_row( dim = "#606060" s = dim if stale else None # override style when stale table.add_row( - Text(role, style=s or role_style), - Text(str(d.get("pid", "?")), style=s or theme.BRIGHT_BLACK), + Text(modules, style=s or theme.BRIGHT_BLUE), Text( f"{d.get('cpu_percent', 0):.0f}%", style=s or _heat(min(d.get("cpu_percent", 0) / 100.0, 1.0)), @@ -181,21 +176,15 @@ def _add_row( Text(_fmt_time(d.get("cpu_time_system", 0)).plain, style=s or theme.WHITE), Text(_fmt_time(d.get("cpu_time_iowait", 0)).plain, style=s or theme.WHITE), Text(_fmt_mb(d.get("pss_mb", 0)).plain, style=s or _fmt_mb(d.get("pss_mb", 0)).style), - Text(_fmt_mb(d.get("uss_mb", 0)).plain, style=s or _fmt_mb(d.get("uss_mb", 0)).style), - Text(_fmt_mb(d.get("rss_mb", 0)).plain, style=s or _fmt_mb(d.get("rss_mb", 0)).style), - Text(_fmt_mb(d.get("vms_mb", 0)).plain, style=s or _fmt_mb(d.get("vms_mb", 0)).style), Text(str(d.get("num_threads", 0)), style=s or theme.WHITE), Text(str(d.get("num_children", 0)), style=s or theme.WHITE), Text(str(d.get("num_fds", 0)), style=s or theme.WHITE), Text( - _fmt_mb(d.get("io_read_mb", 0)).plain, - style=s or _fmt_mb(d.get("io_read_mb", 0)).style, - ), - Text( - _fmt_mb(d.get("io_write_mb", 0)).plain, - style=s or _fmt_mb(d.get("io_write_mb", 0)).style, + f"{d.get('io_read_mb', 0):.0f}/{d.get('io_write_mb', 0):.0f}", + style=s or theme.WHITE, ), - Text(modules, style=s or theme.BRIGHT_BLUE), + Text(role, style=s or role_style), + Text(str(d.get("pid", "?")), style=s or theme.BRIGHT_BLACK), ) From bbb5410e9f6b0ab579208fde71b699e0fe81243d Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 12:57:01 +0800 Subject: [PATCH 05/23] worker vs generic process data types --- dimos/core/module_coordinator.py | 8 +- dimos/core/resource_monitor/__init__.py | 15 +++ .../logger.py} | 8 +- dimos/core/resource_monitor/stats.py | 94 +++++++++++++++++++ dimos/core/worker.py | 82 +--------------- dimos/core/worker_manager.py | 3 +- 6 files changed, 126 insertions(+), 84 deletions(-) create mode 100644 dimos/core/resource_monitor/__init__.py rename dimos/core/{resource_logger.py => resource_monitor/logger.py} (86%) create mode 100644 dimos/core/resource_monitor/stats.py diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index 182ccc0eee..450ec74afc 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -21,8 +21,12 @@ from dimos.core.global_config import GlobalConfig, global_config from dimos.core.resource import Resource -from dimos.core.resource_logger import LCMResourceLogger, ResourceLogger -from dimos.core.worker import WorkerStats, collect_process_stats +from dimos.core.resource_monitor import ( + LCMResourceLogger, + ResourceLogger, + WorkerStats, + collect_process_stats, +) from dimos.core.worker_manager import WorkerManager from dimos.utils.logging_config import setup_logger diff --git a/dimos/core/resource_monitor/__init__.py b/dimos/core/resource_monitor/__init__.py new file mode 100644 index 0000000000..8e9d7230a8 --- /dev/null +++ b/dimos/core/resource_monitor/__init__.py @@ -0,0 +1,15 @@ +from dimos.core.resource_monitor.logger import ( + LCMResourceLogger, + ResourceLogger, + StructlogResourceLogger, +) +from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats, collect_process_stats + +__all__ = [ + "LCMResourceLogger", + "ProcessStats", + "ResourceLogger", + "StructlogResourceLogger", + "WorkerStats", + "collect_process_stats", +] diff --git a/dimos/core/resource_logger.py b/dimos/core/resource_monitor/logger.py similarity index 86% rename from dimos/core/resource_logger.py rename to dimos/core/resource_monitor/logger.py index 81074ee4b7..04ffbe3e87 100644 --- a/dimos/core/resource_logger.py +++ b/dimos/core/resource_monitor/logger.py @@ -20,19 +20,19 @@ from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: - from dimos.core.worker import WorkerStats + from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats logger = setup_logger() class ResourceLogger(Protocol): - def log_stats(self, coordinator: WorkerStats, workers: list[WorkerStats]) -> None: ... + def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: ... class StructlogResourceLogger: """Default implementation — logs resource stats via structlog info.""" - def log_stats(self, coordinator: WorkerStats, workers: list[WorkerStats]) -> None: + def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: logger.info( "coordinator", pid=coordinator.pid, @@ -65,7 +65,7 @@ def __init__(self, topic: str = "/dimos/resource_stats") -> None: self._transport: pLCMTransport[dict[str, Any]] = pLCMTransport(topic) - def log_stats(self, coordinator: WorkerStats, workers: list[WorkerStats]) -> None: + def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> None: self._transport.broadcast( None, { diff --git a/dimos/core/resource_monitor/stats.py b/dimos/core/resource_monitor/stats.py new file mode 100644 index 0000000000..17667d3cae --- /dev/null +++ b/dimos/core/resource_monitor/stats.py @@ -0,0 +1,94 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import dataclass, field + +import psutil + +_MB = 1024 * 1024 + +# Cache Process objects so cpu_percent(interval=None) has a previous sample. +_proc_cache: dict[int, psutil.Process] = {} + + +@dataclass(frozen=True) +class ProcessStats: + """Resource stats for a single OS process.""" + + pid: int + alive: bool + cpu_percent: float = 0.0 + cpu_time_user: float = 0.0 + cpu_time_system: float = 0.0 + cpu_time_iowait: float = 0.0 + pss_mb: float = 0.0 + num_threads: int = 0 + num_children: int = 0 + num_fds: int = 0 + io_read_mb: float = 0.0 + io_write_mb: float = 0.0 + + +@dataclass(frozen=True) +class WorkerStats(ProcessStats): + """Process stats extended with worker-specific metadata.""" + + worker_id: int = -1 + modules: list[str] = field(default_factory=list) + + +def collect_process_stats(pid: int) -> ProcessStats: + """Collect resource stats for a single process by PID.""" + try: + proc = _proc_cache.get(pid) + if proc is None or not proc.is_running(): + proc = psutil.Process(pid) + _proc_cache[pid] = proc + with proc.oneshot(): + cpu_pct = proc.cpu_percent(interval=None) + ct = proc.cpu_times() + try: + mem_full = proc.memory_full_info() + pss = getattr(mem_full, "pss", 0) / _MB + except (psutil.AccessDenied, AttributeError): + pss = 0.0 + try: + io = proc.io_counters() + io_r = io.read_bytes / _MB + io_w = io.write_bytes / _MB + except (psutil.AccessDenied, AttributeError): + io_r = io_w = 0.0 + try: + fds = proc.num_fds() + except (psutil.AccessDenied, AttributeError): + fds = 0 + return ProcessStats( + pid=pid, + alive=True, + cpu_percent=cpu_pct, + cpu_time_user=ct.user, + cpu_time_system=ct.system, + cpu_time_iowait=getattr(ct, "iowait", 0.0), + pss_mb=pss, + num_threads=proc.num_threads(), + num_children=len(proc.children(recursive=True)), + num_fds=fds, + io_read_mb=io_r, + io_write_mb=io_w, + ) + except (psutil.NoSuchProcess, psutil.AccessDenied): + _proc_cache.pop(pid, None) + return ProcessStats(pid=pid, alive=False) diff --git a/dimos/core/worker.py b/dimos/core/worker.py index 73976cf027..3f6ab152d5 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -14,14 +14,13 @@ from __future__ import annotations -from dataclasses import dataclass, field +from dataclasses import asdict import multiprocessing as mp import threading import traceback from typing import TYPE_CHECKING, Any -import psutil - +from dimos.core.resource_monitor import WorkerStats, collect_process_stats from dimos.utils.logging_config import setup_logger from dimos.utils.sequential_ids import SequentialIds @@ -30,78 +29,6 @@ from dimos.core.module import ModuleT -_MB = 1024 * 1024 - -# Cache Process objects so cpu_percent(interval=None) has a previous sample. -_proc_cache: dict[int, psutil.Process] = {} - - -@dataclass(frozen=True) -class WorkerStats: - pid: int - worker_id: int - alive: bool - cpu_percent: float = 0.0 - cpu_time_user: float = 0.0 - cpu_time_system: float = 0.0 - cpu_time_iowait: float = 0.0 - pss_mb: float = 0.0 - num_threads: int = 0 - num_children: int = 0 - num_fds: int = 0 - io_read_mb: float = 0.0 - io_write_mb: float = 0.0 - modules: list[str] = field(default_factory=list) - - -def collect_process_stats( - pid: int, worker_id: int = -1, modules: list[str] | None = None -) -> WorkerStats: - """Collect resource stats for a single process by PID.""" - try: - proc = _proc_cache.get(pid) - if proc is None or not proc.is_running(): - proc = psutil.Process(pid) - _proc_cache[pid] = proc - with proc.oneshot(): - cpu_pct = proc.cpu_percent(interval=None) - ct = proc.cpu_times() - try: - mem_full = proc.memory_full_info() - pss = getattr(mem_full, "pss", 0) / _MB - except (psutil.AccessDenied, AttributeError): - pss = 0.0 - try: - io = proc.io_counters() - io_r = io.read_bytes / _MB - io_w = io.write_bytes / _MB - except (psutil.AccessDenied, AttributeError): - io_r = io_w = 0.0 - try: - fds = proc.num_fds() - except (psutil.AccessDenied, AttributeError): - fds = 0 - return WorkerStats( - pid=pid, - worker_id=worker_id, - alive=True, - cpu_percent=cpu_pct, - cpu_time_user=ct.user, - cpu_time_system=ct.system, - cpu_time_iowait=getattr(ct, "iowait", 0.0), - pss_mb=pss, - num_threads=proc.num_threads(), - num_children=len(proc.children(recursive=True)), - num_fds=fds, - io_read_mb=io_r, - io_write_mb=io_w, - modules=modules or [], - ) - except (psutil.NoSuchProcess, psutil.AccessDenied): - _proc_cache.pop(pid, None) - return WorkerStats(pid=pid, worker_id=worker_id, alive=False, modules=modules or []) - - logger = setup_logger() @@ -235,8 +162,9 @@ def collect_stats(self) -> WorkerStats: modules = [actor._cls.__name__ for actor in self._modules.values()] if self._process is not None and self._process.is_alive(): pid: int = self._process.pid # type: ignore[assignment] - return collect_process_stats(pid, self._worker_id, modules) - return WorkerStats(pid=0, worker_id=self._worker_id, alive=False, modules=modules) + ps = collect_process_stats(pid) + return WorkerStats(**asdict(ps), worker_id=self._worker_id, modules=modules) + return WorkerStats(pid=0, alive=False, worker_id=self._worker_id, modules=modules) def reserve_slot(self) -> None: """Reserve a slot so _select_worker() sees the pending load.""" diff --git a/dimos/core/worker_manager.py b/dimos/core/worker_manager.py index f9dddd6e4e..7d4eff1d66 100644 --- a/dimos/core/worker_manager.py +++ b/dimos/core/worker_manager.py @@ -18,11 +18,12 @@ from typing import TYPE_CHECKING, Any from dimos.core.rpc_client import RPCClient -from dimos.core.worker import Worker, WorkerStats +from dimos.core.worker import Worker from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: from dimos.core.module import ModuleT + from dimos.core.resource_monitor import WorkerStats logger = setup_logger() From 17d9a44eaff3ac9db49fef6e78871e50fd5c0dc4 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 14:02:30 +0800 Subject: [PATCH 06/23] refactor, fixing voxel tests --- dimos/core/resource_monitor/logger.py | 8 +- dimos/core/resource_monitor/stats.py | 123 +++++++++++++++------- dimos/core/test_worker.py | 6 +- dimos/mapping/test_voxels.py | 39 ++----- dimos/utils/cli/dtop.py | 13 +-- dimos/utils/decorators/__init__.py | 3 +- dimos/utils/decorators/decorators.py | 29 +++++ dimos/utils/decorators/test_decorators.py | 78 +++++++++++++- 8 files changed, 213 insertions(+), 86 deletions(-) diff --git a/dimos/core/resource_monitor/logger.py b/dimos/core/resource_monitor/logger.py index 04ffbe3e87..88f79b6db8 100644 --- a/dimos/core/resource_monitor/logger.py +++ b/dimos/core/resource_monitor/logger.py @@ -37,7 +37,7 @@ def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> No "coordinator", pid=coordinator.pid, cpu_pct=coordinator.cpu_percent, - pss_mb=round(coordinator.pss_mb, 1), + pss_mb=round(coordinator.pss / 1048576, 1), threads=coordinator.num_threads, ) for w in workers: @@ -47,12 +47,12 @@ def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> No pid=w.pid, alive=w.alive, cpu_pct=w.cpu_percent, - pss_mb=round(w.pss_mb, 1), + pss_mb=round(w.pss / 1048576, 1), threads=w.num_threads, children=w.num_children, fds=w.num_fds, - io_r_mb=round(w.io_read_mb, 1), - io_w_mb=round(w.io_write_mb, 1), + io_r_mb=round(w.io_read_bytes / 1048576, 1), + io_w_mb=round(w.io_write_bytes / 1048576, 1), modules=w.modules, ) diff --git a/dimos/core/resource_monitor/stats.py b/dimos/core/resource_monitor/stats.py index 17667d3cae..5cdca1485d 100644 --- a/dimos/core/resource_monitor/stats.py +++ b/dimos/core/resource_monitor/stats.py @@ -15,10 +15,11 @@ from __future__ import annotations from dataclasses import dataclass, field +from typing import TypedDict import psutil -_MB = 1024 * 1024 +from dimos.utils.decorators import ttl_cache # Cache Process objects so cpu_percent(interval=None) has a previous sample. _proc_cache: dict[int, psutil.Process] = {} @@ -34,12 +35,12 @@ class ProcessStats: cpu_time_user: float = 0.0 cpu_time_system: float = 0.0 cpu_time_iowait: float = 0.0 - pss_mb: float = 0.0 + pss: int = 0 num_threads: int = 0 num_children: int = 0 num_fds: int = 0 - io_read_mb: float = 0.0 - io_write_mb: float = 0.0 + io_read_bytes: int = 0 + io_write_bytes: int = 0 @dataclass(frozen=True) @@ -50,45 +51,89 @@ class WorkerStats(ProcessStats): modules: list[str] = field(default_factory=list) +def _get_process(pid: int) -> psutil.Process: + """Return a cached Process object, creating a new one if missing or dead.""" + proc = _proc_cache.get(pid) + if proc is None or not proc.is_running(): + proc = psutil.Process(pid) + _proc_cache[pid] = proc + return proc + + +class CpuStats(TypedDict): + cpu_percent: float + cpu_time_user: float + cpu_time_system: float + cpu_time_iowait: float + + +def _collect_cpu(proc: psutil.Process) -> CpuStats: + """Collect CPU metrics. Call inside oneshot().""" + cpu_pct = proc.cpu_percent(interval=None) + ct = proc.cpu_times() + return CpuStats( + cpu_percent=cpu_pct, + cpu_time_user=ct.user, + cpu_time_system=ct.system, + cpu_time_iowait=getattr(ct, "iowait", 0.0), + ) + + +@ttl_cache(4.0) +def _collect_pss(pid: int) -> int: + """Collect PSS memory in bytes. TTL-cached to avoid expensive smaps reads.""" + try: + proc = _get_process(pid) + mem_full = proc.memory_full_info() + return getattr(mem_full, "pss", 0) + except (psutil.AccessDenied, psutil.NoSuchProcess, AttributeError): + return 0 + + +class IoStats(TypedDict): + io_read_bytes: int + io_write_bytes: int + + +def _collect_io(proc: psutil.Process) -> IoStats: + """Collect IO counters in bytes. Call inside oneshot().""" + try: + io = proc.io_counters() + return IoStats(io_read_bytes=io.read_bytes, io_write_bytes=io.write_bytes) + except (psutil.AccessDenied, AttributeError): + return IoStats(io_read_bytes=0, io_write_bytes=0) + + +class ProcStats(TypedDict): + num_threads: int + num_children: int + num_fds: int + + +def _collect_proc(proc: psutil.Process) -> ProcStats: + """Collect thread/children/fd counts. Call inside oneshot().""" + try: + fds = proc.num_fds() + except (psutil.AccessDenied, AttributeError): + fds = 0 + return ProcStats( + num_threads=proc.num_threads(), + num_children=len(proc.children(recursive=True)), + num_fds=fds, + ) + + def collect_process_stats(pid: int) -> ProcessStats: """Collect resource stats for a single process by PID.""" try: - proc = _proc_cache.get(pid) - if proc is None or not proc.is_running(): - proc = psutil.Process(pid) - _proc_cache[pid] = proc + proc = _get_process(pid) with proc.oneshot(): - cpu_pct = proc.cpu_percent(interval=None) - ct = proc.cpu_times() - try: - mem_full = proc.memory_full_info() - pss = getattr(mem_full, "pss", 0) / _MB - except (psutil.AccessDenied, AttributeError): - pss = 0.0 - try: - io = proc.io_counters() - io_r = io.read_bytes / _MB - io_w = io.write_bytes / _MB - except (psutil.AccessDenied, AttributeError): - io_r = io_w = 0.0 - try: - fds = proc.num_fds() - except (psutil.AccessDenied, AttributeError): - fds = 0 - return ProcessStats( - pid=pid, - alive=True, - cpu_percent=cpu_pct, - cpu_time_user=ct.user, - cpu_time_system=ct.system, - cpu_time_iowait=getattr(ct, "iowait", 0.0), - pss_mb=pss, - num_threads=proc.num_threads(), - num_children=len(proc.children(recursive=True)), - num_fds=fds, - io_read_mb=io_r, - io_write_mb=io_w, - ) + cpu = _collect_cpu(proc) + io = _collect_io(proc) + proc_stats = _collect_proc(proc) + pss = _collect_pss(pid) + return ProcessStats(pid=pid, alive=True, pss=pss, **cpu, **io, **proc_stats) except (psutil.NoSuchProcess, psutil.AccessDenied): _proc_cache.pop(pid, None) + _collect_pss.cache.pop((pid,), None) # type: ignore[attr-defined] return ProcessStats(pid=pid, alive=False) diff --git a/dimos/core/test_worker.py b/dimos/core/test_worker.py index 8fa47bd1a5..ceef212f5d 100644 --- a/dimos/core/test_worker.py +++ b/dimos/core/test_worker.py @@ -179,11 +179,11 @@ def test_collect_stats(create_worker_manager): for s in stats: assert s.alive is True assert s.pid > 0 - assert s.pss_mb >= 0 + assert s.pss >= 0 assert s.num_threads >= 1 assert s.num_fds >= 0 - assert s.io_read_mb >= 0 - assert s.io_write_mb >= 0 + assert s.io_read_bytes >= 0 + assert s.io_write_bytes >= 0 # At least one worker should report module names all_modules = [name for s in stats for name in s.modules] diff --git a/dimos/mapping/test_voxels.py b/dimos/mapping/test_voxels.py index 3fdc2dd102..e0e48c1f57 100644 --- a/dimos/mapping/test_voxels.py +++ b/dimos/mapping/test_voxels.py @@ -82,39 +82,21 @@ def test_carving( ) -> None: lidar_frame1 = moment1.lidar.value assert lidar_frame1 is not None - lidar_frame1_transport: LCMTransport[PointCloud2] = LCMTransport("/prev_lidar", PointCloud2) - lidar_frame1_transport.publish(lidar_frame1) - lidar_frame1_transport.stop() lidar_frame2 = moment2.lidar.value assert lidar_frame2 is not None - # Debug: check XY overlap - pts1 = np.asarray(lidar_frame1.pointcloud.points) - pts2 = np.asarray(lidar_frame2.pointcloud.points) - - voxel_size = mapper.config.voxel_size - xy1 = set(map(tuple, (pts1[:, :2] / voxel_size).astype(int))) - xy2 = set(map(tuple, (pts2[:, :2] / voxel_size).astype(int))) - - overlap = xy1 & xy2 - print(f"\nFrame1 XY columns: {len(xy1)}") - print(f"Frame2 XY columns: {len(xy2)}") - print(f"Overlapping XY columns: {len(overlap)}") - # Carving mapper (default, carve_columns=True) mapper.add_frame(lidar_frame1) mapper.add_frame(lidar_frame2) - - moment2.global_map.set(mapper.get_global_pointcloud2()) - moment2.publish() - count_carving = mapper.size() - # Additive mapper (carve_columns=False) - additive_mapper = VoxelGridMapper(carve_columns=False) - additive_mapper.add_frame(lidar_frame1) - additive_mapper.add_frame(lidar_frame2) - count_additive = additive_mapper.size() + + # Additive count via numpy (avoids second VoxelBlockGrid allocation) + voxel_size = mapper.config.voxel_size + pts1 = np.asarray(lidar_frame1.pointcloud.points) + pts2 = np.asarray(lidar_frame2.pointcloud.points) + combined_vox = np.floor(np.vstack([pts1, pts2]) / voxel_size).astype(np.int64) + count_additive = np.unique(combined_vox, axis=0).shape[0] print("\n=== Carving comparison ===") print(f"Additive (no carving): {count_additive}") @@ -126,13 +108,6 @@ def test_carving( f"Carving should remove some voxels. Additive: {count_additive}, Carving: {count_carving}" ) - additive_global_map: LCMTransport[PointCloud2] = LCMTransport( - "additive_global_map", PointCloud2 - ) - additive_global_map.publish(additive_mapper.get_global_pointcloud2()) - additive_global_map.stop() - additive_mapper.stop() - def test_injest_a_few(mapper: VoxelGridMapper) -> None: data_dir = get_data("unitree_go2_office_walk2") diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index d34b20144c..7cc1cdea65 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -50,10 +50,11 @@ def _bar(value: float, max_val: float, width: int = 12) -> Text: return Text("█" * filled + "░" * (width - filled), style=_heat(ratio)) -def _fmt_mb(val: float) -> Text: - if val >= 1024: - return Text(f"{val / 1024:.1f} GB", style=theme.BRIGHT_YELLOW) - return Text(f"{val:.1f} MB", style=theme.WHITE) +def _fmt_bytes(val: int) -> Text: + mb = val / 1048576 + if mb >= 1024: + return Text(f"{mb / 1024:.1f} GB", style=theme.BRIGHT_YELLOW) + return Text(f"{mb:.1f} MB", style=theme.WHITE) def _fmt_pct(val: float) -> Text: @@ -175,12 +176,12 @@ def _add_row( Text(_fmt_time(d.get("cpu_time_user", 0)).plain, style=s or theme.WHITE), Text(_fmt_time(d.get("cpu_time_system", 0)).plain, style=s or theme.WHITE), Text(_fmt_time(d.get("cpu_time_iowait", 0)).plain, style=s or theme.WHITE), - Text(_fmt_mb(d.get("pss_mb", 0)).plain, style=s or _fmt_mb(d.get("pss_mb", 0)).style), + Text(_fmt_bytes(d.get("pss", 0)).plain, style=s or _fmt_bytes(d.get("pss", 0)).style), Text(str(d.get("num_threads", 0)), style=s or theme.WHITE), Text(str(d.get("num_children", 0)), style=s or theme.WHITE), Text(str(d.get("num_fds", 0)), style=s or theme.WHITE), Text( - f"{d.get('io_read_mb', 0):.0f}/{d.get('io_write_mb', 0):.0f}", + f"{d.get('io_read_bytes', 0) / 1048576:.0f}/{d.get('io_write_bytes', 0) / 1048576:.0f}", style=s or theme.WHITE, ), Text(role, style=s or role_style), diff --git a/dimos/utils/decorators/__init__.py b/dimos/utils/decorators/__init__.py index 79623922a0..d0f91a4939 100644 --- a/dimos/utils/decorators/__init__.py +++ b/dimos/utils/decorators/__init__.py @@ -1,7 +1,7 @@ """Decorators and accumulators for rate limiting and other utilities.""" from .accumulators import Accumulator, LatestAccumulator, RollingAverageAccumulator -from .decorators import CachedMethod, limit, retry, simple_mcache +from .decorators import CachedMethod, limit, retry, simple_mcache, ttl_cache __all__ = [ "Accumulator", @@ -11,4 +11,5 @@ "limit", "retry", "simple_mcache", + "ttl_cache", ] diff --git a/dimos/utils/decorators/decorators.py b/dimos/utils/decorators/decorators.py index 01e9f8b553..10fcc1885f 100644 --- a/dimos/utils/decorators/decorators.py +++ b/dimos/utils/decorators/decorators.py @@ -22,6 +22,7 @@ _CacheResult_co = TypeVar("_CacheResult_co", covariant=True) _CacheReturn = TypeVar("_CacheReturn") +_F = TypeVar("_F", bound=Callable[..., Any]) class CachedMethod(Protocol[_CacheResult_co]): @@ -166,6 +167,34 @@ def invalidate_cache(instance: Any) -> None: return getter +def ttl_cache(seconds: float) -> Callable[[_F], _F]: + """Cache function results by positional args with a time-to-live. + + Expired entries are swept on each access. + """ + + def decorator(func: _F) -> _F: + _cache: dict[tuple[Any, ...], tuple[float, Any]] = {} + + @wraps(func) + def wrapper(*args: Any) -> Any: + now = time.monotonic() + expired = [k for k, (ts, _) in _cache.items() if now - ts >= seconds] + for k in expired: + del _cache[k] + if args in _cache: + _, val = _cache[args] + return val + result = func(*args) + _cache[args] = (now, result) + return result + + wrapper.cache = _cache # type: ignore[attr-defined] + return wrapper # type: ignore[return-value] + + return decorator + + def retry(max_retries: int = 3, on_exception: type[Exception] = Exception, delay: float = 0.0): # type: ignore[no-untyped-def] """ Decorator that retries a function call if it raises an exception. diff --git a/dimos/utils/decorators/test_decorators.py b/dimos/utils/decorators/test_decorators.py index a40a806a80..7950fb0000 100644 --- a/dimos/utils/decorators/test_decorators.py +++ b/dimos/utils/decorators/test_decorators.py @@ -16,7 +16,7 @@ import pytest -from dimos.utils.decorators import RollingAverageAccumulator, limit, retry, simple_mcache +from dimos.utils.decorators import RollingAverageAccumulator, limit, retry, simple_mcache, ttl_cache def test_limit() -> None: @@ -316,3 +316,79 @@ def expensive(self) -> int: obj1.expensive.invalidate_cache(obj1) assert obj1.expensive() == 3 assert obj2.expensive() == 2 # still cached + + +def test_ttl_cache_returns_cached_value() -> None: + """Test that ttl_cache returns cached results within TTL.""" + call_count = 0 + + @ttl_cache(1.0) + def expensive(x: int) -> int: + nonlocal call_count + call_count += 1 + return x * 2 + + assert expensive(5) == 10 + assert call_count == 1 + + # Second call should be cached + assert expensive(5) == 10 + assert call_count == 1 + + # Different args should compute + assert expensive(3) == 6 + assert call_count == 2 + + +def test_ttl_cache_expires() -> None: + """Test that ttl_cache recomputes after TTL expires.""" + call_count = 0 + + @ttl_cache(0.05) + def expensive(x: int) -> int: + nonlocal call_count + call_count += 1 + return x * 2 + + assert expensive(5) == 10 + assert call_count == 1 + + time.sleep(0.1) + + # Should recompute after TTL + assert expensive(5) == 10 + assert call_count == 2 + + +def test_ttl_cache_sweep_on_access() -> None: + """Test that expired entries are swept on the next access.""" + + @ttl_cache(0.05) + def expensive(x: int) -> int: + return x * 2 + + expensive(1) + expensive(2) + assert len(expensive.cache) == 2 # type: ignore[attr-defined] + + time.sleep(0.1) + + # Next call sweeps expired entries + expensive(3) + assert (1,) not in expensive.cache # type: ignore[attr-defined] + assert (2,) not in expensive.cache # type: ignore[attr-defined] + assert (3,) in expensive.cache # type: ignore[attr-defined] + + +def test_ttl_cache_manual_cache_cleanup() -> None: + """Test that .cache dict can be manually cleaned up.""" + + @ttl_cache(10.0) + def expensive(x: int) -> int: + return x * 2 + + expensive(1) + assert (1,) in expensive.cache # type: ignore[attr-defined] + + expensive.cache.pop((1,), None) # type: ignore[attr-defined] + assert (1,) not in expensive.cache # type: ignore[attr-defined] From 349d779ace93d12814123f919876f7171a4b9df3 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 14:20:07 +0800 Subject: [PATCH 07/23] expensive carving test marked as tool --- dimos/mapping/test_voxels.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dimos/mapping/test_voxels.py b/dimos/mapping/test_voxels.py index e0e48c1f57..5dfa044d26 100644 --- a/dimos/mapping/test_voxels.py +++ b/dimos/mapping/test_voxels.py @@ -77,6 +77,7 @@ def two_perspectives_loop(moment: MomentFactory) -> None: time.sleep(1) +@pytest.mark.tool def test_carving( mapper: VoxelGridMapper, moment1: Go2MapperMoment, moment2: Go2MapperMoment ) -> None: From f99ba10c7230c8123c1e3dda8e52067c92bb44ef Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 14:31:34 +0800 Subject: [PATCH 08/23] better dtop --- dimos/utils/cli/dtop.py | 216 ++++++++++++++++++++++++++++++---------- 1 file changed, 161 insertions(+), 55 deletions(-) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index 7cc1cdea65..dd528cb544 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -24,10 +24,14 @@ import time from typing import Any +from rich.console import Group, RenderableType +from rich.panel import Panel +from rich.rule import Rule from rich.text import Text from textual.app import App, ComposeResult from textual.color import Color -from textual.widgets import DataTable +from textual.containers import VerticalScroll +from textual.widgets import Static from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM, Topic from dimos.utils.cli import theme @@ -80,14 +84,11 @@ class ResourceSpyApp(App): # type: ignore[type-arg] layout: vertical; background: {theme.BACKGROUND}; }} - DataTable {{ + VerticalScroll {{ height: 1fr; - border: solid {theme.BORDER}; - background: {theme.BG}; scrollbar-size: 0 0; }} - DataTable > .datatable--header {{ - color: {theme.ACCENT}; + #panels {{ background: transparent; }} """ @@ -103,21 +104,8 @@ def __init__(self, topic_name: str = "/dimos/resource_stats") -> None: self._last_msg_time: float = 0.0 def compose(self) -> ComposeResult: - table: DataTable = DataTable(zebra_stripes=True, cursor_type=None) # type: ignore[type-arg, arg-type] - table.add_column("Modules", width=30) - table.add_column("CPU %", width=8) - table.add_column("CPU bar", width=14) - table.add_column("User", width=8) - table.add_column("Sys", width=8) - table.add_column("IOw", width=8) - table.add_column("PSS", width=10) - table.add_column("Thr", width=5) - table.add_column("Ch", width=5) - table.add_column("FDs", width=5) - table.add_column("IO R/W", width=14) - table.add_column("Role", width=14) - table.add_column("PID", width=8) - yield table + with VerticalScroll(): + yield Static(id="panels") def on_mount(self) -> None: self._lcm.subscribe(Topic(self._topic_name), self._on_msg) @@ -141,57 +129,175 @@ def _refresh(self) -> None: return stale = (time.monotonic() - last_msg) > 2.0 + dim = "#606060" + border_style = dim if stale else theme.BORDER - table = self.query_one(DataTable) - table.clear(columns=False) + # Collect (role, role_style, data_dict, modules) entries + entries: list[tuple[str, str, dict[str, Any], str]] = [] coord = data.get("coordinator", {}) - self._add_row(table, "coordinator", theme.BRIGHT_CYAN, coord, "—", stale) + entries.append(("coordinator", theme.BRIGHT_CYAN, coord, "")) for w in data.get("workers", []): alive = w.get("alive", False) wid = w.get("worker_id", "?") role_style = theme.BRIGHT_GREEN if alive else theme.BRIGHT_RED - modules = ", ".join(w.get("modules", [])) or "—" - self._add_row(table, f"worker {wid}", role_style, w, modules, stale) + modules = ", ".join(w.get("modules", [])) or "" + entries.append((f"worker {wid}", role_style, w, modules)) + + # Build inner content: sections separated by Rules + parts: list[RenderableType] = [] + for i, (role, rs, d, mods) in enumerate(entries): + if i > 0: + # Titled divider between processes + title = Text( + f" {role}: {mods} " if mods else f" {role} ", style=dim if stale else rs + ) + parts.append(Rule(title=title, style=border_style)) + parts.extend(self._make_lines(d, stale)) + + # First entry title goes on the Panel itself + first_role, first_rs, _, first_mods = entries[0] + panel_title = Text( + f" {first_role}: {first_mods} " if first_mods else f" {first_role} ", + style=dim if stale else first_rs, + ) + + panel = Panel( + Group(*parts), + title=panel_title, + border_style=border_style, + ) + self.query_one("#panels", Static).update(panel) @staticmethod - def _add_row( - table: DataTable, # type: ignore[type-arg] - role: str, - role_style: str, - d: dict[str, Any], - modules: str, - stale: bool, - ) -> None: + def _make_lines(d: dict[str, Any], stale: bool) -> list[Text]: dim = "#606060" - s = dim if stale else None # override style when stale - table.add_row( - Text(modules, style=s or theme.BRIGHT_BLUE), - Text( - f"{d.get('cpu_percent', 0):.0f}%", - style=s or _heat(min(d.get("cpu_percent", 0) / 100.0, 1.0)), - ), - _bar(d.get("cpu_percent", 0), 100) if not stale else Text("░" * 12, style=dim), - Text(_fmt_time(d.get("cpu_time_user", 0)).plain, style=s or theme.WHITE), - Text(_fmt_time(d.get("cpu_time_system", 0)).plain, style=s or theme.WHITE), - Text(_fmt_time(d.get("cpu_time_iowait", 0)).plain, style=s or theme.WHITE), - Text(_fmt_bytes(d.get("pss", 0)).plain, style=s or _fmt_bytes(d.get("pss", 0)).style), - Text(str(d.get("num_threads", 0)), style=s or theme.WHITE), - Text(str(d.get("num_children", 0)), style=s or theme.WHITE), - Text(str(d.get("num_fds", 0)), style=s or theme.WHITE), - Text( - f"{d.get('io_read_bytes', 0) / 1048576:.0f}/{d.get('io_write_bytes', 0) / 1048576:.0f}", - style=s or theme.WHITE, - ), - Text(role, style=s or role_style), - Text(str(d.get("pid", "?")), style=s or theme.BRIGHT_BLACK), + dim2 = "#505050" + + cpu = d.get("cpu_percent", 0) + pss_text = _fmt_bytes(d.get("pss", 0)) + thr = d.get("num_threads", 0) + ch = d.get("num_children", 0) + fds = d.get("num_fds", 0) + + # Line 1: CPU% + bar + PSS + Thr/Child/FDs + line1 = Text() + line1.append("CPU ", style=dim if stale else theme.WHITE) + line1.append(f"{cpu:.0f}%", style=dim if stale else _heat(min(cpu / 100.0, 1.0))) + line1.append(" ") + if stale: + line1.append("░" * 12, style=dim) + else: + line1.append_text(_bar(cpu, 100)) + line1.append(" PSS ", style=dim if stale else theme.WHITE) + line1.append( + pss_text.plain, + style=dim if stale else (pss_text.style or theme.WHITE), ) + line1.append(f" Thr {thr}", style=dim if stale else theme.WHITE) + line1.append(f" Child {ch}", style=dim if stale else theme.WHITE) + line1.append(f" FDs {fds}", style=dim if stale else theme.WHITE) + + # Line 2: CPU times + IO R/W + s2 = dim if stale else dim2 + user_t = _fmt_time(d.get("cpu_time_user", 0)).plain + sys_t = _fmt_time(d.get("cpu_time_system", 0)).plain + iow_t = _fmt_time(d.get("cpu_time_iowait", 0)).plain + io_r = d.get("io_read_bytes", 0) / 1048576 + io_w = d.get("io_write_bytes", 0) / 1048576 + + line2 = Text() + line2.append(f"User {user_t} Sys {sys_t} IOw {iow_t}", style=s2) + line2.append(f" IO R/W {io_r:.0f}/{io_w:.0f} MB", style=s2) + + return [line1, line2] + + +_PREVIEW_DATA: dict[str, Any] = { + "coordinator": { + "cpu_percent": 12.3, + "pss": 47_400_000, + "num_threads": 4, + "num_children": 0, + "num_fds": 32, + "cpu_time_user": 1.2, + "cpu_time_system": 0.3, + "cpu_time_iowait": 0.0, + "io_read_bytes": 12_582_912, + "io_write_bytes": 4_194_304, + "pid": 1234, + }, + "workers": [ + { + "worker_id": 0, + "alive": True, + "modules": ["nav", "lidar"], + "cpu_percent": 34.0, + "pss": 125_829_120, + "num_threads": 8, + "num_children": 2, + "num_fds": 64, + "cpu_time_user": 5.1, + "cpu_time_system": 1.0, + "cpu_time_iowait": 0.2, + "io_read_bytes": 47_185_920, + "io_write_bytes": 12_582_912, + "pid": 1235, + }, + { + "worker_id": 1, + "alive": False, + "modules": ["vision"], + "cpu_percent": 87.0, + "pss": 536_870_912, + "num_threads": 16, + "num_children": 1, + "num_fds": 128, + "cpu_time_user": 42.5, + "cpu_time_system": 8.3, + "cpu_time_iowait": 1.1, + "io_read_bytes": 1_073_741_824, + "io_write_bytes": 536_870_912, + "pid": 1236, + }, + ], +} + + +def _preview() -> None: + """Print a static preview with fake data (no LCM needed).""" + from rich.console import Console + + data = _PREVIEW_DATA + border_style = theme.BORDER + + entries: list[tuple[str, str, dict[str, Any], str]] = [] + entries.append(("coordinator", theme.BRIGHT_CYAN, data["coordinator"], "")) + for w in data["workers"]: + rs = theme.BRIGHT_GREEN if w.get("alive") else theme.BRIGHT_RED + mods = ", ".join(w.get("modules", [])) + entries.append((f"worker {w['worker_id']}", rs, w, mods)) + + parts: list[RenderableType] = [] + for i, (role, rs, d, mods) in enumerate(entries): + if i > 0: + title = Text(f" {role}: {mods} " if mods else f" {role} ", style=rs) + parts.append(Rule(title=title, style=border_style)) + parts.extend(ResourceSpyApp._make_lines(d, stale=False)) + + first_role, first_rs, _, first_mods = entries[0] + panel_title = Text(f" {first_role} ", style=first_rs) + Console().print(Panel(Group(*parts), title=panel_title, border_style=border_style)) def main() -> None: import sys + if "--preview" in sys.argv: + _preview() + return + topic = "/dimos/resource_stats" if len(sys.argv) > 1 and sys.argv[1] == "--topic" and len(sys.argv) > 2: topic = sys.argv[2] From b356dbffa6b2ce500ab3453bed675bc6a4adf026 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 14:42:51 +0800 Subject: [PATCH 09/23] dtop visuals --- dimos/utils/cli/dtop.py | 179 ++++++++++++++++++++++++++++------------ 1 file changed, 127 insertions(+), 52 deletions(-) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index dd528cb544..6815d5bba7 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -22,7 +22,7 @@ import threading import time -from typing import Any +from typing import TYPE_CHECKING, Any from rich.console import Group, RenderableType from rich.panel import Panel @@ -36,6 +36,13 @@ from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM, Topic from dimos.utils.cli import theme +if TYPE_CHECKING: + from collections.abc import Callable + +# --------------------------------------------------------------------------- +# Color helpers +# --------------------------------------------------------------------------- + def _heat(ratio: float) -> str: """Map 0..1 ratio to a cyan → yellow → red gradient.""" @@ -54,23 +61,81 @@ def _bar(value: float, max_val: float, width: int = 12) -> Text: return Text("█" * filled + "░" * (width - filled), style=_heat(ratio)) -def _fmt_bytes(val: int) -> Text: - mb = val / 1048576 +def _rel_style(value: float, lo: float, hi: float) -> str: + """Color a value by where it sits in the observed [lo, hi] range.""" + if hi <= lo: + return _heat(0.0) + return _heat(min((value - lo) / (hi - lo), 1.0)) + + +# --------------------------------------------------------------------------- +# Metric formatters (plain strings — color applied separately via _rel_style) +# --------------------------------------------------------------------------- + + +def _fmt_pct(v: float) -> str: + return f"{v:.0f}%" + + +def _fmt_mem(v: float) -> str: + mb = v / 1048576 if mb >= 1024: - return Text(f"{mb / 1024:.1f} GB", style=theme.BRIGHT_YELLOW) - return Text(f"{mb:.1f} MB", style=theme.WHITE) + return f"{mb / 1024:.1f} GB" + return f"{mb:.1f} MB" + + +def _fmt_int(v: float) -> str: + return str(int(v)) + + +def _fmt_secs(v: float) -> str: + if v >= 3600: + return f"{v / 3600:.1f}h" + if v >= 60: + return f"{v / 60:.1f}m" + return f"{v:.1f}s" + +def _fmt_io(v: float) -> str: + return f"{v / 1048576:.0f} MB" -def _fmt_pct(val: float) -> Text: - return Text(f"{val:.0f}%", style=_heat(min(val / 100.0, 1.0))) +# --------------------------------------------------------------------------- +# Metric definitions — add a tuple here to add a new field +# (label, dict_key, format_fn) +# --------------------------------------------------------------------------- -def _fmt_time(seconds: float) -> Text: - if seconds >= 3600: - return Text(f"{seconds / 3600:.1f}h", style=theme.WHITE) - if seconds >= 60: - return Text(f"{seconds / 60:.1f}m", style=theme.WHITE) - return Text(f"{seconds:.1f}s", style=theme.WHITE) +_LINE1: list[tuple[str, str, Callable[[float], str]]] = [ + ("CPU", "cpu_percent", _fmt_pct), + ("PSS", "pss", _fmt_mem), + ("Thr", "num_threads", _fmt_int), + ("Child", "num_children", _fmt_int), + ("FDs", "num_fds", _fmt_int), +] + +_LINE2: list[tuple[str, str, Callable[[float], str]]] = [ + ("UserT", "cpu_time_user", _fmt_secs), + ("SysT", "cpu_time_system", _fmt_secs), + ("IOT", "cpu_time_iowait", _fmt_secs), + ("IO read", "io_read_bytes", _fmt_io), + ("IO write", "io_write_bytes", _fmt_io), +] + +_ALL_KEYS = {key for _, key, _ in _LINE1 + _LINE2} + + +def _compute_ranges(data_dicts: list[dict[str, Any]]) -> dict[str, tuple[float, float]]: + """(min, max) per metric across all processes (for relative coloring).""" + ranges: dict[str, tuple[float, float]] = {} + for key in _ALL_KEYS: + vals = [d.get(key, 0) for d in data_dicts] + ranges[key] = (min(vals), max(vals)) + return ranges + + +# --------------------------------------------------------------------------- +# App +# --------------------------------------------------------------------------- class ResourceSpyApp(App): # type: ignore[type-arg] @@ -145,16 +210,19 @@ def _refresh(self) -> None: modules = ", ".join(w.get("modules", [])) or "" entries.append((f"worker {wid}", role_style, w, modules)) + # Per-metric max for relative coloring + ranges = _compute_ranges([d for _, _, d, _ in entries]) + # Build inner content: sections separated by Rules parts: list[RenderableType] = [] for i, (role, rs, d, mods) in enumerate(entries): if i > 0: - # Titled divider between processes title = Text( - f" {role}: {mods} " if mods else f" {role} ", style=dim if stale else rs + f" {role}: {mods} " if mods else f" {role} ", + style=dim if stale else rs, ) parts.append(Rule(title=title, style=border_style)) - parts.extend(self._make_lines(d, stale)) + parts.extend(self._make_lines(d, stale, ranges)) # First entry title goes on the Panel itself first_role, first_rs, _, first_mods = entries[0] @@ -171,49 +239,54 @@ def _refresh(self) -> None: self.query_one("#panels", Static).update(panel) @staticmethod - def _make_lines(d: dict[str, Any], stale: bool) -> list[Text]: + def _make_lines( + d: dict[str, Any], + stale: bool, + ranges: dict[str, tuple[float, float]], + ) -> list[Text]: dim = "#606060" - dim2 = "#505050" - - cpu = d.get("cpu_percent", 0) - pss_text = _fmt_bytes(d.get("pss", 0)) - thr = d.get("num_threads", 0) - ch = d.get("num_children", 0) - fds = d.get("num_fds", 0) + label1_style = dim if stale else theme.WHITE + label2_style = dim if stale else theme.BRIGHT_GREEN - # Line 1: CPU% + bar + PSS + Thr/Child/FDs + # Line 1 line1 = Text() - line1.append("CPU ", style=dim if stale else theme.WHITE) - line1.append(f"{cpu:.0f}%", style=dim if stale else _heat(min(cpu / 100.0, 1.0))) - line1.append(" ") - if stale: - line1.append("░" * 12, style=dim) - else: - line1.append_text(_bar(cpu, 100)) - line1.append(" PSS ", style=dim if stale else theme.WHITE) - line1.append( - pss_text.plain, - style=dim if stale else (pss_text.style or theme.WHITE), - ) - line1.append(f" Thr {thr}", style=dim if stale else theme.WHITE) - line1.append(f" Child {ch}", style=dim if stale else theme.WHITE) - line1.append(f" FDs {fds}", style=dim if stale else theme.WHITE) - - # Line 2: CPU times + IO R/W - s2 = dim if stale else dim2 - user_t = _fmt_time(d.get("cpu_time_user", 0)).plain - sys_t = _fmt_time(d.get("cpu_time_system", 0)).plain - iow_t = _fmt_time(d.get("cpu_time_iowait", 0)).plain - io_r = d.get("io_read_bytes", 0) / 1048576 - io_w = d.get("io_write_bytes", 0) / 1048576 - + for label, key, fmt in _LINE1: + val = d.get(key, 0) + lo, hi = ranges[key] + # CPU% uses absolute 0-100 scale; everything else is relative + if key == "cpu_percent": + val_style = dim if stale else _heat(min(val / 100.0, 1.0)) + else: + val_style = dim if stale else _rel_style(val, lo, hi) + line1.append(f"{label} ", style=label1_style) + line1.append(fmt(val), style=val_style) + # CPU bar right after CPU% + if key == "cpu_percent": + line1.append(" ") + if stale: + line1.append("░" * 12, style=dim) + else: + line1.append_text(_bar(val, 100)) + line1.append(" ") + + # Line 2 line2 = Text() - line2.append(f"User {user_t} Sys {sys_t} IOw {iow_t}", style=s2) - line2.append(f" IO R/W {io_r:.0f}/{io_w:.0f} MB", style=s2) + for i, (label, key, fmt) in enumerate(_LINE2): + val = d.get(key, 0) + lo, hi = ranges[key] + val_style = dim if stale else _rel_style(val, lo, hi) + line2.append(f"{label} ", style=label2_style) + line2.append(fmt(val), style=val_style) + if i < len(_LINE2) - 1: + line2.append(" ") return [line1, line2] +# --------------------------------------------------------------------------- +# Preview +# --------------------------------------------------------------------------- + _PREVIEW_DATA: dict[str, Any] = { "coordinator": { "cpu_percent": 12.3, @@ -279,12 +352,14 @@ def _preview() -> None: mods = ", ".join(w.get("modules", [])) entries.append((f"worker {w['worker_id']}", rs, w, mods)) + ranges = _compute_ranges([d for _, _, d, _ in entries]) + parts: list[RenderableType] = [] for i, (role, rs, d, mods) in enumerate(entries): if i > 0: title = Text(f" {role}: {mods} " if mods else f" {role} ", style=rs) parts.append(Rule(title=title, style=border_style)) - parts.extend(ResourceSpyApp._make_lines(d, stale=False)) + parts.extend(ResourceSpyApp._make_lines(d, stale=False, ranges=ranges)) first_role, first_rs, _, first_mods = entries[0] panel_title = Text(f" {first_role} ", style=first_rs) From a5aaf9ef12d48cd3a29798546667a521a2bcab1a Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 14:47:40 +0800 Subject: [PATCH 10/23] visual fixes --- dimos/utils/cli/dtop.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index 6815d5bba7..e649543e46 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -116,12 +116,13 @@ def _fmt_io(v: float) -> str: _LINE2: list[tuple[str, str, Callable[[float], str]]] = [ ("UserT", "cpu_time_user", _fmt_secs), ("SysT", "cpu_time_system", _fmt_secs), - ("IOT", "cpu_time_iowait", _fmt_secs), - ("IO read", "io_read_bytes", _fmt_io), - ("IO write", "io_write_bytes", _fmt_io), + ("ioT", "cpu_time_iowait", _fmt_secs), ] -_ALL_KEYS = {key for _, key, _ in _LINE1 + _LINE2} +# IO r/w is a compound field handled specially in _make_lines +_IO_KEYS = ("io_read_bytes", "io_write_bytes") + +_ALL_KEYS = {key for _, key, _ in _LINE1 + _LINE2} | set(_IO_KEYS) def _compute_ranges(data_dicts: list[dict[str, Any]]) -> dict[str, tuple[float, float]]: @@ -195,7 +196,7 @@ def _refresh(self) -> None: stale = (time.monotonic() - last_msg) > 2.0 dim = "#606060" - border_style = dim if stale else theme.BORDER + border_style = dim if stale else "#777777" # Collect (role, role_style, data_dict, modules) entries entries: list[tuple[str, str, dict[str, Any], str]] = [] @@ -215,11 +216,11 @@ def _refresh(self) -> None: # Build inner content: sections separated by Rules parts: list[RenderableType] = [] - for i, (role, rs, d, mods) in enumerate(entries): + for i, (role, _rs, d, mods) in enumerate(entries): if i > 0: title = Text( f" {role}: {mods} " if mods else f" {role} ", - style=dim if stale else rs, + style=dim if stale else theme.WHITE, ) parts.append(Rule(title=title, style=border_style)) parts.extend(self._make_lines(d, stale, ranges)) @@ -271,14 +272,23 @@ def _make_lines( # Line 2 line2 = Text() - for i, (label, key, fmt) in enumerate(_LINE2): + for _i, (label, key, fmt) in enumerate(_LINE2): val = d.get(key, 0) lo, hi = ranges[key] val_style = dim if stale else _rel_style(val, lo, hi) line2.append(f"{label} ", style=label2_style) line2.append(fmt(val), style=val_style) - if i < len(_LINE2) - 1: - line2.append(" ") + line2.append(" ") + + # IO r/w — compound field + io_r = d.get(_IO_KEYS[0], 0) + io_w = d.get(_IO_KEYS[1], 0) + lo_r, hi_r = ranges[_IO_KEYS[0]] + lo_w, hi_w = ranges[_IO_KEYS[1]] + line2.append("IO r/w ", style=label2_style) + line2.append(_fmt_io(io_r), style=dim if stale else _rel_style(io_r, lo_r, hi_r)) + line2.append("/", style=label2_style) + line2.append(_fmt_io(io_w), style=dim if stale else _rel_style(io_w, lo_w, hi_w)) return [line1, line2] @@ -343,7 +353,7 @@ def _preview() -> None: from rich.console import Console data = _PREVIEW_DATA - border_style = theme.BORDER + border_style = "#555555" entries: list[tuple[str, str, dict[str, Any], str]] = [] entries.append(("coordinator", theme.BRIGHT_CYAN, data["coordinator"], "")) From 9690d139e5a90fb5838a061fe0349dbe125b2cec Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 14:50:32 +0800 Subject: [PATCH 11/23] drop wrap --- dimos/utils/cli/dtop.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index e649543e46..c0a9074a6c 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -216,11 +216,11 @@ def _refresh(self) -> None: # Build inner content: sections separated by Rules parts: list[RenderableType] = [] - for i, (role, _rs, d, mods) in enumerate(entries): + for i, (role, rs, d, mods) in enumerate(entries): if i > 0: title = Text( f" {role}: {mods} " if mods else f" {role} ", - style=dim if stale else theme.WHITE, + style=dim if stale else rs, ) parts.append(Rule(title=title, style=border_style)) parts.extend(self._make_lines(d, stale, ranges)) @@ -247,7 +247,7 @@ def _make_lines( ) -> list[Text]: dim = "#606060" label1_style = dim if stale else theme.WHITE - label2_style = dim if stale else theme.BRIGHT_GREEN + label2_style = label1_style # Line 1 line1 = Text() From bcd21d459819053ca93abcbf17f6c5d69a948aba Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 14:56:35 +0800 Subject: [PATCH 12/23] add "dimos top" --- dimos/robot/cli/dimos.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 22a9e234d9..47a1e777e8 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -168,6 +168,15 @@ def humancli(ctx: typer.Context) -> None: humancli_main() +@main.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +def top(ctx: typer.Context) -> None: + """Live resource monitor TUI.""" + from dimos.utils.cli.dtop import main as dtop_main + + sys.argv = ["dtop", *ctx.args] + dtop_main() + + topic_app = typer.Typer(help="Topic commands for pub/sub") main.add_typer(topic_app, name="topic") From eb711ab6890f12119bdb506540dc53c6a4a0d8fc Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 15:19:00 +0800 Subject: [PATCH 13/23] workerstats moved to worker --- dimos/core/resource_monitor/__init__.py | 3 ++- dimos/core/resource_monitor/logger.py | 3 ++- dimos/core/resource_monitor/stats.py | 10 +--------- dimos/core/worker.py | 13 ++++++++++--- dimos/mapping/test_voxels.py | 1 - 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dimos/core/resource_monitor/__init__.py b/dimos/core/resource_monitor/__init__.py index 8e9d7230a8..8138c1b893 100644 --- a/dimos/core/resource_monitor/__init__.py +++ b/dimos/core/resource_monitor/__init__.py @@ -3,7 +3,8 @@ ResourceLogger, StructlogResourceLogger, ) -from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats, collect_process_stats +from dimos.core.resource_monitor.stats import ProcessStats, collect_process_stats +from dimos.core.worker import WorkerStats __all__ = [ "LCMResourceLogger", diff --git a/dimos/core/resource_monitor/logger.py b/dimos/core/resource_monitor/logger.py index 88f79b6db8..0f4deac239 100644 --- a/dimos/core/resource_monitor/logger.py +++ b/dimos/core/resource_monitor/logger.py @@ -20,7 +20,8 @@ from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: - from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats + from dimos.core.resource_monitor.stats import ProcessStats + from dimos.core.worker import WorkerStats logger = setup_logger() diff --git a/dimos/core/resource_monitor/stats.py b/dimos/core/resource_monitor/stats.py index 5cdca1485d..502fc25e1d 100644 --- a/dimos/core/resource_monitor/stats.py +++ b/dimos/core/resource_monitor/stats.py @@ -14,7 +14,7 @@ from __future__ import annotations -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import TypedDict import psutil @@ -43,14 +43,6 @@ class ProcessStats: io_write_bytes: int = 0 -@dataclass(frozen=True) -class WorkerStats(ProcessStats): - """Process stats extended with worker-specific metadata.""" - - worker_id: int = -1 - modules: list[str] = field(default_factory=list) - - def _get_process(pid: int) -> psutil.Process: """Return a cached Process object, creating a new one if missing or dead.""" proc = _proc_cache.get(pid) diff --git a/dimos/core/worker.py b/dimos/core/worker.py index 3f6ab152d5..1b339db319 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -11,16 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from __future__ import annotations -from dataclasses import asdict +from dataclasses import asdict, dataclass, field import multiprocessing as mp import threading import traceback from typing import TYPE_CHECKING, Any -from dimos.core.resource_monitor import WorkerStats, collect_process_stats +from dimos.core.resource_monitor import ProcessStats, collect_process_stats from dimos.utils.logging_config import setup_logger from dimos.utils.sequential_ids import SequentialIds @@ -32,6 +31,14 @@ logger = setup_logger() +@dataclass(frozen=True) +class WorkerStats(ProcessStats): + """Process stats extended with worker-specific metadata.""" + + worker_id: int = -1 + modules: list[str] = field(default_factory=list) + + class ActorFuture: """Mimics Dask's ActorFuture - wraps a result with .result() method.""" diff --git a/dimos/mapping/test_voxels.py b/dimos/mapping/test_voxels.py index 5dfa044d26..95e70e1d6d 100644 --- a/dimos/mapping/test_voxels.py +++ b/dimos/mapping/test_voxels.py @@ -92,7 +92,6 @@ def test_carving( mapper.add_frame(lidar_frame2) count_carving = mapper.size() - # Additive count via numpy (avoids second VoxelBlockGrid allocation) voxel_size = mapper.config.voxel_size pts1 = np.asarray(lidar_frame1.pointcloud.points) pts2 = np.asarray(lidar_frame2.pointcloud.points) From fc9e1089d3d041193bd869975f510e6c105310aa Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 17:33:43 +0800 Subject: [PATCH 14/23] circular import fix --- dimos/core/module_coordinator.py | 2 +- dimos/core/resource_monitor/__init__.py | 2 -- dimos/core/worker_manager.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index 450ec74afc..2929e50e1f 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -24,7 +24,6 @@ from dimos.core.resource_monitor import ( LCMResourceLogger, ResourceLogger, - WorkerStats, collect_process_stats, ) from dimos.core.worker_manager import WorkerManager @@ -33,6 +32,7 @@ if TYPE_CHECKING: from dimos.core.module import Module, ModuleT from dimos.core.rpc_client import ModuleProxy + from dimos.core.worker import WorkerStats logger = setup_logger() diff --git a/dimos/core/resource_monitor/__init__.py b/dimos/core/resource_monitor/__init__.py index 8138c1b893..f275e68630 100644 --- a/dimos/core/resource_monitor/__init__.py +++ b/dimos/core/resource_monitor/__init__.py @@ -4,13 +4,11 @@ StructlogResourceLogger, ) from dimos.core.resource_monitor.stats import ProcessStats, collect_process_stats -from dimos.core.worker import WorkerStats __all__ = [ "LCMResourceLogger", "ProcessStats", "ResourceLogger", "StructlogResourceLogger", - "WorkerStats", "collect_process_stats", ] diff --git a/dimos/core/worker_manager.py b/dimos/core/worker_manager.py index 7d4eff1d66..0e22b4a7aa 100644 --- a/dimos/core/worker_manager.py +++ b/dimos/core/worker_manager.py @@ -23,7 +23,7 @@ if TYPE_CHECKING: from dimos.core.module import ModuleT - from dimos.core.resource_monitor import WorkerStats + from dimos.core.worker import WorkerStats logger = setup_logger() From 0117581f9119935633dda5acd04d2c4a22543935 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 18:22:55 +0800 Subject: [PATCH 15/23] Fix type narrowing for Process.pid instead of type: ignore --- dimos/core/worker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dimos/core/worker.py b/dimos/core/worker.py index 1b339db319..831df614de 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -168,7 +168,9 @@ def collect_stats(self) -> WorkerStats: """Collect resource stats for this worker's process.""" modules = [actor._cls.__name__ for actor in self._modules.values()] if self._process is not None and self._process.is_alive(): - pid: int = self._process.pid # type: ignore[assignment] + pid = self._process.pid + if pid is None: + return WorkerStats(pid=0, alive=False, worker_id=self._worker_id, modules=modules) ps = collect_process_stats(pid) return WorkerStats(**asdict(ps), worker_id=self._worker_id, modules=modules) return WorkerStats(pid=0, alive=False, worker_id=self._worker_id, modules=modules) From d63342b74f61f4771547232fc60904396dd2260c Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 22:26:29 +0800 Subject: [PATCH 16/23] refactored resourcemonitor --- dimos/core/module_coordinator.py | 40 +++---- dimos/core/resource_monitor/__init__.py | 5 +- dimos/core/resource_monitor/logger.py | 3 +- dimos/core/resource_monitor/monitor.py | 126 ++++++++++++++++++++++ dimos/core/resource_monitor/stats.py | 12 ++- dimos/core/test_worker.py | 24 ++++- dimos/core/worker.py | 33 +++--- dimos/core/worker_manager.py | 7 +- dimos/robot/cli/dimos.py | 1 + dimos/utils/cli/dtop.py | 4 +- dimos/utils/decorators/__init__.py | 3 +- dimos/utils/decorators/decorators.py | 56 ++++++---- dimos/utils/decorators/test_decorators.py | 16 ++- 13 files changed, 243 insertions(+), 87 deletions(-) create mode 100644 dimos/core/resource_monitor/monitor.py diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index 2929e50e1f..dc9b773d31 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -15,24 +15,18 @@ from __future__ import annotations from concurrent.futures import ThreadPoolExecutor -import os import threading from typing import TYPE_CHECKING, Any from dimos.core.global_config import GlobalConfig, global_config from dimos.core.resource import Resource -from dimos.core.resource_monitor import ( - LCMResourceLogger, - ResourceLogger, - collect_process_stats, -) from dimos.core.worker_manager import WorkerManager from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: from dimos.core.module import Module, ModuleT + from dimos.core.resource_monitor.monitor import StatsMonitor from dimos.core.rpc_client import ModuleProxy - from dimos.core.worker import WorkerStats logger = setup_logger() @@ -43,16 +37,19 @@ class ModuleCoordinator(Resource): # type: ignore[misc] _n: int | None = None _memory_limit: str = "auto" _deployed_modules: dict[type[Module], ModuleProxy] + _stats_monitor: StatsMonitor | None = None def __init__( self, n: int | None = None, cfg: GlobalConfig = global_config, + monitor_stats: bool = False, ) -> None: self._n = n if n is not None else cfg.n_workers self._memory_limit = cfg.memory_limit self._global_config = cfg self._deployed_modules = {} + self.monitor_stats = monitor_stats def start(self) -> None: n = self._n if self._n is not None else 2 @@ -60,6 +57,10 @@ def start(self) -> None: self._client.start() def stop(self) -> None: + if self._stats_monitor is not None: + self._stats_monitor.stop() + self._stats_monitor = None + for module_class, module in reversed(self._deployed_modules.items()): logger.info("Stopping module...", module=module_class.__name__) try: @@ -106,28 +107,17 @@ def start_all_modules(self) -> None: def get_instance(self, module: type[ModuleT]) -> ModuleProxy: return self._deployed_modules.get(module) # type: ignore[return-value, no-any-return] - def loop( - self, - resource_logger: ResourceLogger | None = None, - monitor_interval: float = 1.0, - ) -> None: - _logger: ResourceLogger = resource_logger or LCMResourceLogger() - coordinator_pid = os.getpid() - # Prime cpu_percent so the first real reading isn't 0.0. - collect_process_stats(coordinator_pid) + def loop(self) -> None: + if self.monitor_stats and self._client is not None: + from dimos.core.resource_monitor.monitor import StatsMonitor + + self._stats_monitor = StatsMonitor(self._client) + self._stats_monitor.start() stop = threading.Event() try: - while not stop.wait(monitor_interval): - self._log_resource_stats(_logger, coordinator_pid) + stop.wait() except KeyboardInterrupt: return finally: self.stop() - - def _log_resource_stats(self, _logger: ResourceLogger, coordinator_pid: int) -> None: - coordinator = collect_process_stats(coordinator_pid) - workers: list[WorkerStats] = [] - if self._client is not None: - workers = self._client.collect_stats() - _logger.log_stats(coordinator, workers) diff --git a/dimos/core/resource_monitor/__init__.py b/dimos/core/resource_monitor/__init__.py index f275e68630..217941a2ec 100644 --- a/dimos/core/resource_monitor/__init__.py +++ b/dimos/core/resource_monitor/__init__.py @@ -3,12 +3,15 @@ ResourceLogger, StructlogResourceLogger, ) -from dimos.core.resource_monitor.stats import ProcessStats, collect_process_stats +from dimos.core.resource_monitor.monitor import StatsMonitor +from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats, collect_process_stats __all__ = [ "LCMResourceLogger", "ProcessStats", "ResourceLogger", + "StatsMonitor", "StructlogResourceLogger", + "WorkerStats", "collect_process_stats", ] diff --git a/dimos/core/resource_monitor/logger.py b/dimos/core/resource_monitor/logger.py index 0f4deac239..88f79b6db8 100644 --- a/dimos/core/resource_monitor/logger.py +++ b/dimos/core/resource_monitor/logger.py @@ -20,8 +20,7 @@ from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: - from dimos.core.resource_monitor.stats import ProcessStats - from dimos.core.worker import WorkerStats + from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats logger = setup_logger() diff --git a/dimos/core/resource_monitor/monitor.py b/dimos/core/resource_monitor/monitor.py new file mode 100644 index 0000000000..49079ed98e --- /dev/null +++ b/dimos/core/resource_monitor/monitor.py @@ -0,0 +1,126 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import asdict +import os +import threading +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +from dimos.core.resource import Resource +from dimos.core.resource_monitor.stats import ( + WorkerStats, + collect_process_stats, +) +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from collections.abc import Sequence + + from dimos.core.resource_monitor.logger import ResourceLogger + +logger = setup_logger() + + +@runtime_checkable +class WorkerInfo(Protocol): + @property + def pid(self) -> int | None: ... + + @property + def worker_id(self) -> int: ... + + @property + def module_names(self) -> list[str]: ... + + +@runtime_checkable +class WorkerSource(Protocol): + @property + def workers(self) -> Sequence[WorkerInfo]: ... + + +class StatsMonitor(Resource): + """Self-contained resource monitor that runs in a daemon thread. + + Collects stats for the coordinator process and all workers, then + forwards them to a ``ResourceLogger``. + """ + + def __init__( + self, + worker_source: WorkerSource, + resource_logger: ResourceLogger | None = None, + interval: float = 1.0, + coordinator_pid: int | None = None, + ) -> None: + self._worker_source = worker_source + self._interval = interval + self._coordinator_pid = coordinator_pid or os.getpid() + self._stop = threading.Event() + self._thread: threading.Thread | None = None + + if resource_logger is not None: + self._logger = resource_logger + else: + from dimos.core.resource_monitor.logger import LCMResourceLogger + + self._logger = LCMResourceLogger() + + def start(self) -> None: + """Start the monitoring daemon thread.""" + # Prime cpu_percent so the first real reading isn't 0.0. + collect_process_stats(self._coordinator_pid) + + self._stop.clear() + self._thread = threading.Thread(target=self._loop, daemon=True) + self._thread.start() + + def stop(self) -> None: + """Signal the monitor thread to stop and wait for it.""" + self._stop.set() + if self._thread is not None: + self._thread.join(timeout=5.0) + self._thread = None + + def _loop(self) -> None: + while not self._stop.wait(self._interval): + try: + self._collect_and_log() + except Exception: + logger.error("StatsMonitor collection failed", exc_info=True) + + def _collect_and_log(self) -> None: + coordinator = collect_process_stats(self._coordinator_pid) + + worker_stats: list[WorkerStats] = [] + for w in self._worker_source.workers: + pid = w.pid + if pid is not None: + ps = collect_process_stats(pid) + worker_stats.append( + WorkerStats(**asdict(ps), worker_id=w.worker_id, modules=w.module_names) + ) + else: + worker_stats.append( + WorkerStats( + pid=0, + alive=False, + worker_id=w.worker_id, + modules=w.module_names, + ) + ) + + self._logger.log_stats(coordinator, worker_stats) diff --git a/dimos/core/resource_monitor/stats.py b/dimos/core/resource_monitor/stats.py index 502fc25e1d..c020c853e0 100644 --- a/dimos/core/resource_monitor/stats.py +++ b/dimos/core/resource_monitor/stats.py @@ -14,7 +14,7 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import TypedDict import psutil @@ -127,5 +127,13 @@ def collect_process_stats(pid: int) -> ProcessStats: return ProcessStats(pid=pid, alive=True, pss=pss, **cpu, **io, **proc_stats) except (psutil.NoSuchProcess, psutil.AccessDenied): _proc_cache.pop(pid, None) - _collect_pss.cache.pop((pid,), None) # type: ignore[attr-defined] + _collect_pss.cache.pop((pid,), None) return ProcessStats(pid=pid, alive=False) + + +@dataclass(frozen=True) +class WorkerStats(ProcessStats): + """Process stats extended with worker-specific metadata.""" + + worker_id: int = -1 + modules: list[str] = field(default_factory=list) diff --git a/dimos/core/test_worker.py b/dimos/core/test_worker.py index ceef212f5d..a5217f2dd6 100644 --- a/dimos/core/test_worker.py +++ b/dimos/core/test_worker.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + import pytest from dimos.core.core import rpc @@ -20,6 +22,9 @@ from dimos.core.worker_manager import WorkerManager from dimos.msgs.geometry_msgs import Vector3 +if TYPE_CHECKING: + from dimos.core.resource_monitor.stats import WorkerStats + class SimpleModule(Module): output: Out[Vector3] @@ -167,13 +172,30 @@ def test_worker_manager_parallel_deployment(create_worker_manager): @pytest.mark.slow def test_collect_stats(create_worker_manager): + from dimos.core.resource_monitor.monitor import StatsMonitor + manager = create_worker_manager(n_workers=2) module1 = manager.deploy(SimpleModule) module2 = manager.deploy(AnotherModule) module1.start() module2.start() - stats = manager.collect_stats() + # Use a capturing logger to collect stats via StatsMonitor + captured: list[list[WorkerStats]] = [] + + class CapturingLogger: + def log_stats(self, coordinator, workers): + captured.append(workers) + + monitor = StatsMonitor(manager, resource_logger=CapturingLogger(), interval=0.5) + monitor.start() + import time + + time.sleep(1.5) + monitor.stop() + + assert len(captured) >= 1 + stats = captured[-1] assert len(stats) == 2 for s in stats: diff --git a/dimos/core/worker.py b/dimos/core/worker.py index 831df614de..b0dd802841 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -13,13 +13,11 @@ # limitations under the License. from __future__ import annotations -from dataclasses import asdict, dataclass, field import multiprocessing as mp import threading import traceback from typing import TYPE_CHECKING, Any -from dimos.core.resource_monitor import ProcessStats, collect_process_stats from dimos.utils.logging_config import setup_logger from dimos.utils.sequential_ids import SequentialIds @@ -31,14 +29,6 @@ logger = setup_logger() -@dataclass(frozen=True) -class WorkerStats(ProcessStats): - """Process stats extended with worker-specific metadata.""" - - worker_id: int = -1 - modules: list[str] = field(default_factory=list) - - class ActorFuture: """Mimics Dask's ActorFuture - wraps a result with .result() method.""" @@ -164,16 +154,21 @@ def __init__(self) -> None: def module_count(self) -> int: return len(self._modules) + self._reserved - def collect_stats(self) -> WorkerStats: - """Collect resource stats for this worker's process.""" - modules = [actor._cls.__name__ for actor in self._modules.values()] + @property + def pid(self) -> int | None: + """PID of the worker process, or ``None`` if not alive.""" if self._process is not None and self._process.is_alive(): - pid = self._process.pid - if pid is None: - return WorkerStats(pid=0, alive=False, worker_id=self._worker_id, modules=modules) - ps = collect_process_stats(pid) - return WorkerStats(**asdict(ps), worker_id=self._worker_id, modules=modules) - return WorkerStats(pid=0, alive=False, worker_id=self._worker_id, modules=modules) + p: int | None = self._process.pid + return p + return None + + @property + def worker_id(self) -> int: + return self._worker_id + + @property + def module_names(self) -> list[str]: + return [actor._cls.__name__ for actor in self._modules.values()] def reserve_slot(self) -> None: """Reserve a slot so _select_worker() sees the pending load.""" diff --git a/dimos/core/worker_manager.py b/dimos/core/worker_manager.py index 0e22b4a7aa..4dbb51eb54 100644 --- a/dimos/core/worker_manager.py +++ b/dimos/core/worker_manager.py @@ -23,7 +23,6 @@ if TYPE_CHECKING: from dimos.core.module import ModuleT - from dimos.core.worker import WorkerStats logger = setup_logger() @@ -91,9 +90,9 @@ def _deploy( return results - def collect_stats(self) -> list[WorkerStats]: - """Collect resource stats for all worker processes.""" - return [w.collect_stats() for w in self._workers] + @property + def workers(self) -> list[Worker]: + return list(self._workers) def close_all(self) -> None: if self._closed: diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 47a1e777e8..0ebd991a1b 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -117,6 +117,7 @@ def run( blueprint = autoconnect(*map(get_by_name, robot_types)) dimos = blueprint.build(cli_config_overrides=cli_config_overrides) + dimos.monitor_stats = True dimos.loop() diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index c0a9074a6c..fce8e9c757 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Live TUI for per-worker resource stats over LCM. +"""dtop — Live TUI for per-worker resource stats over LCM. Usage: uv run python -m dimos.utils.cli.dtop [--topic /dimos/resource_stats] @@ -139,7 +139,7 @@ def _compute_ranges(data_dicts: list[dict[str, Any]]) -> dict[str, tuple[float, # --------------------------------------------------------------------------- -class ResourceSpyApp(App): # type: ignore[type-arg] +class ResourceSpyApp(App[None]): CSS_PATH = "dimos.tcss" TITLE = "" diff --git a/dimos/utils/decorators/__init__.py b/dimos/utils/decorators/__init__.py index d0f91a4939..6c7e761a7f 100644 --- a/dimos/utils/decorators/__init__.py +++ b/dimos/utils/decorators/__init__.py @@ -1,13 +1,14 @@ """Decorators and accumulators for rate limiting and other utilities.""" from .accumulators import Accumulator, LatestAccumulator, RollingAverageAccumulator -from .decorators import CachedMethod, limit, retry, simple_mcache, ttl_cache +from .decorators import CachedMethod, TtlCacheWrapper, limit, retry, simple_mcache, ttl_cache __all__ = [ "Accumulator", "CachedMethod", "LatestAccumulator", "RollingAverageAccumulator", + "TtlCacheWrapper", "limit", "retry", "simple_mcache", diff --git a/dimos/utils/decorators/decorators.py b/dimos/utils/decorators/decorators.py index 1cdefd5f3b..ddbcbf0f43 100644 --- a/dimos/utils/decorators/decorators.py +++ b/dimos/utils/decorators/decorators.py @@ -13,10 +13,10 @@ # limitations under the License. from collections.abc import Callable -from functools import wraps +from functools import update_wrapper, wraps import threading import time -from typing import Any, Protocol, TypeVar, cast +from typing import Any, Generic, Protocol, TypeVar, cast from .accumulators import Accumulator, LatestAccumulator @@ -167,30 +167,44 @@ def invalidate_cache(instance: Any) -> None: return cast("CachedMethod[_CacheReturn]", getter) -def ttl_cache(seconds: float) -> Callable[[_F], _F]: +class TtlCacheWrapper(Generic[_CacheReturn]): + """Wrapper returned by :func:`ttl_cache`. + + Exposes a ``.cache`` dict that maps ``(args_tuple)`` → + ``(timestamp, value)`` so callers can inspect or evict entries. + """ + + cache: dict[tuple[Any, ...], tuple[float, _CacheReturn]] + + def __init__(self, func: Callable[..., _CacheReturn], seconds: float) -> None: + self._func = func + self._seconds = seconds + self.cache = {} + update_wrapper(self, func) + + def __call__(self, *args: Any) -> _CacheReturn: + now = time.monotonic() + expired = [k for k, (ts, _) in self.cache.items() if now - ts >= self._seconds] + for k in expired: + del self.cache[k] + if args in self.cache: + _, val = self.cache[args] + return val + result = self._func(*args) + self.cache[args] = (now, result) + return result + + +def ttl_cache( + seconds: float, +) -> Callable[[Callable[..., _CacheReturn]], TtlCacheWrapper[_CacheReturn]]: """Cache function results by positional args with a time-to-live. Expired entries are swept on each access. """ - def decorator(func: _F) -> _F: - _cache: dict[tuple[Any, ...], tuple[float, Any]] = {} - - @wraps(func) - def wrapper(*args: Any) -> Any: - now = time.monotonic() - expired = [k for k, (ts, _) in _cache.items() if now - ts >= seconds] - for k in expired: - del _cache[k] - if args in _cache: - _, val = _cache[args] - return val - result = func(*args) - _cache[args] = (now, result) - return result - - wrapper.cache = _cache # type: ignore[attr-defined] - return wrapper # type: ignore[return-value] + def decorator(func: Callable[..., _CacheReturn]) -> TtlCacheWrapper[_CacheReturn]: + return TtlCacheWrapper(func, seconds) return decorator diff --git a/dimos/utils/decorators/test_decorators.py b/dimos/utils/decorators/test_decorators.py index 7950fb0000..4c999f9dcb 100644 --- a/dimos/utils/decorators/test_decorators.py +++ b/dimos/utils/decorators/test_decorators.py @@ -369,15 +369,14 @@ def expensive(x: int) -> int: expensive(1) expensive(2) - assert len(expensive.cache) == 2 # type: ignore[attr-defined] - + assert len(expensive.cache) == 2 time.sleep(0.1) # Next call sweeps expired entries expensive(3) - assert (1,) not in expensive.cache # type: ignore[attr-defined] - assert (2,) not in expensive.cache # type: ignore[attr-defined] - assert (3,) in expensive.cache # type: ignore[attr-defined] + assert (1,) not in expensive.cache + assert (2,) not in expensive.cache + assert (3,) in expensive.cache def test_ttl_cache_manual_cache_cleanup() -> None: @@ -388,7 +387,6 @@ def expensive(x: int) -> int: return x * 2 expensive(1) - assert (1,) in expensive.cache # type: ignore[attr-defined] - - expensive.cache.pop((1,), None) # type: ignore[attr-defined] - assert (1,) not in expensive.cache # type: ignore[attr-defined] + assert (1,) in expensive.cache + expensive.cache.pop((1,), None) + assert (1,) not in expensive.cache From 736808b68713e230b5cc3e4eebca5937f953b5a5 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 23:26:55 +0800 Subject: [PATCH 17/23] small decorator rewrite --- dimos/utils/decorators/__init__.py | 3 +- dimos/utils/decorators/decorators.py | 37 ++++++++++++----------- dimos/utils/decorators/test_decorators.py | 16 +++++----- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/dimos/utils/decorators/__init__.py b/dimos/utils/decorators/__init__.py index 6c7e761a7f..d0f91a4939 100644 --- a/dimos/utils/decorators/__init__.py +++ b/dimos/utils/decorators/__init__.py @@ -1,14 +1,13 @@ """Decorators and accumulators for rate limiting and other utilities.""" from .accumulators import Accumulator, LatestAccumulator, RollingAverageAccumulator -from .decorators import CachedMethod, TtlCacheWrapper, limit, retry, simple_mcache, ttl_cache +from .decorators import CachedMethod, limit, retry, simple_mcache, ttl_cache __all__ = [ "Accumulator", "CachedMethod", "LatestAccumulator", "RollingAverageAccumulator", - "TtlCacheWrapper", "limit", "retry", "simple_mcache", diff --git a/dimos/utils/decorators/decorators.py b/dimos/utils/decorators/decorators.py index ddbcbf0f43..c5626fcaf7 100644 --- a/dimos/utils/decorators/decorators.py +++ b/dimos/utils/decorators/decorators.py @@ -16,12 +16,13 @@ from functools import update_wrapper, wraps import threading import time -from typing import Any, Generic, Protocol, TypeVar, cast +from typing import Any, Generic, ParamSpec, Protocol, TypeVar, cast from .accumulators import Accumulator, LatestAccumulator _CacheResult_co = TypeVar("_CacheResult_co", covariant=True) _CacheReturn = TypeVar("_CacheReturn") +_P = ParamSpec("_P") _F = TypeVar("_F", bound=Callable[..., Any]) @@ -167,44 +168,44 @@ def invalidate_cache(instance: Any) -> None: return cast("CachedMethod[_CacheReturn]", getter) -class TtlCacheWrapper(Generic[_CacheReturn]): - """Wrapper returned by :func:`ttl_cache`. - - Exposes a ``.cache`` dict that maps ``(args_tuple)`` → - ``(timestamp, value)`` so callers can inspect or evict entries. - """ +class _TtlCacheWrapper(Generic[_P, _CacheReturn]): + """Wrapper returned by :func:`ttl_cache`.""" cache: dict[tuple[Any, ...], tuple[float, _CacheReturn]] - def __init__(self, func: Callable[..., _CacheReturn], seconds: float) -> None: + def __init__(self, func: Callable[_P, _CacheReturn], seconds: float) -> None: self._func = func self._seconds = seconds self.cache = {} update_wrapper(self, func) - def __call__(self, *args: Any) -> _CacheReturn: - now = time.monotonic() - expired = [k for k, (ts, _) in self.cache.items() if now - ts >= self._seconds] - for k in expired: - del self.cache[k] + def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _CacheReturn: + self.pop_expired() if args in self.cache: _, val = self.cache[args] return val - result = self._func(*args) - self.cache[args] = (now, result) + result = self._func(*args, **kwargs) + self.cache[args] = (time.monotonic(), result) return result + def pop_expired(self) -> None: + """Remove all expired entries from the cache.""" + now = time.monotonic() + expired = [k for k, (ts, _) in self.cache.items() if now - ts >= self._seconds] + for k in expired: + del self.cache[k] + def ttl_cache( seconds: float, -) -> Callable[[Callable[..., _CacheReturn]], TtlCacheWrapper[_CacheReturn]]: +) -> Callable[[Callable[_P, _CacheReturn]], _TtlCacheWrapper[_P, _CacheReturn]]: """Cache function results by positional args with a time-to-live. Expired entries are swept on each access. """ - def decorator(func: Callable[..., _CacheReturn]) -> TtlCacheWrapper[_CacheReturn]: - return TtlCacheWrapper(func, seconds) + def decorator(func: Callable[_P, _CacheReturn]) -> _TtlCacheWrapper[_P, _CacheReturn]: + return _TtlCacheWrapper(func, seconds) return decorator diff --git a/dimos/utils/decorators/test_decorators.py b/dimos/utils/decorators/test_decorators.py index 4c999f9dcb..9018776fe6 100644 --- a/dimos/utils/decorators/test_decorators.py +++ b/dimos/utils/decorators/test_decorators.py @@ -369,24 +369,24 @@ def expensive(x: int) -> int: expensive(1) expensive(2) - assert len(expensive.cache) == 2 + assert len(expensive._cache) == 2 time.sleep(0.1) # Next call sweeps expired entries expensive(3) - assert (1,) not in expensive.cache - assert (2,) not in expensive.cache - assert (3,) in expensive.cache + assert (1,) not in expensive._cache + assert (2,) not in expensive._cache + assert (3,) in expensive._cache def test_ttl_cache_manual_cache_cleanup() -> None: - """Test that .cache dict can be manually cleaned up.""" + """Test that evict() removes a specific cache entry.""" @ttl_cache(10.0) def expensive(x: int) -> int: return x * 2 expensive(1) - assert (1,) in expensive.cache - expensive.cache.pop((1,), None) - assert (1,) not in expensive.cache + assert (1,) in expensive._cache + expensive.evict(1) + assert (1,) not in expensive._cache From 4d3740cde175603421a299ff09c5990768b727d8 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 23:42:45 +0800 Subject: [PATCH 18/23] global config monitor stats --- dimos/core/global_config.py | 1 + dimos/core/module_coordinator.py | 14 ++++++-------- dimos/robot/cli/dimos.py | 1 - 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py index f629122cee..fe559b1313 100644 --- a/dimos/core/global_config.py +++ b/dimos/core/global_config.py @@ -45,6 +45,7 @@ class GlobalConfig(BaseSettings): robot_rotation_diameter: float = 0.6 planner_strategy: NavigationStrategy = "simple" planner_robot_speed: float | None = None + monitor_stats: bool = True model_config = SettingsConfigDict( env_file=".env", diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index dc9b773d31..ba399a84d5 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -43,19 +43,23 @@ def __init__( self, n: int | None = None, cfg: GlobalConfig = global_config, - monitor_stats: bool = False, ) -> None: self._n = n if n is not None else cfg.n_workers self._memory_limit = cfg.memory_limit self._global_config = cfg self._deployed_modules = {} - self.monitor_stats = monitor_stats def start(self) -> None: n = self._n if self._n is not None else 2 self._client = WorkerManager(n_workers=n) self._client.start() + if self._global_config.monitor_stats: + from dimos.core.resource_monitor.monitor import StatsMonitor + + self._stats_monitor = StatsMonitor(self._client) + self._stats_monitor.start() + def stop(self) -> None: if self._stats_monitor is not None: self._stats_monitor.stop() @@ -108,12 +112,6 @@ def get_instance(self, module: type[ModuleT]) -> ModuleProxy: return self._deployed_modules.get(module) # type: ignore[return-value, no-any-return] def loop(self) -> None: - if self.monitor_stats and self._client is not None: - from dimos.core.resource_monitor.monitor import StatsMonitor - - self._stats_monitor = StatsMonitor(self._client) - self._stats_monitor.start() - stop = threading.Event() try: stop.wait() diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 0ebd991a1b..47a1e777e8 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -117,7 +117,6 @@ def run( blueprint = autoconnect(*map(get_by_name, robot_types)) dimos = blueprint.build(cli_config_overrides=cli_config_overrides) - dimos.monitor_stats = True dimos.loop() From f36439301c0afc3422da56a1e79a00a18b09bc4f Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 23:47:41 +0800 Subject: [PATCH 19/23] dtop shows waiting for stats --- dimos/utils/cli/dtop.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index fce8e9c757..7ec33c61c4 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -154,6 +154,12 @@ class ResourceSpyApp(App[None]): height: 1fr; scrollbar-size: 0 0; }} + VerticalScroll.waiting {{ + align: center middle; + }} + .waiting #panels {{ + width: auto; + }} #panels {{ background: transparent; }} @@ -191,8 +197,17 @@ def _refresh(self) -> None: data = self._latest last_msg = self._last_msg_time + scroll = self.query_one(VerticalScroll) if data is None: + scroll.add_class("waiting") + waiting = Panel( + Text("dtop waiting for stats…", style=theme.FOREGROUND, justify="center"), + border_style=theme.CYAN, + expand=False, + ) + self.query_one("#panels", Static).update(waiting) return + scroll.remove_class("waiting") stale = (time.monotonic() - last_msg) > 2.0 dim = "#606060" From d7f19a19852e1a5fedd485cb39896cd2d5144c14 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 23:51:34 +0800 Subject: [PATCH 20/23] stats disabled by default --- dimos/core/global_config.py | 2 +- dimos/core/module_coordinator.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py index fe559b1313..2153f912b4 100644 --- a/dimos/core/global_config.py +++ b/dimos/core/global_config.py @@ -45,7 +45,7 @@ class GlobalConfig(BaseSettings): robot_rotation_diameter: float = 0.6 planner_strategy: NavigationStrategy = "simple" planner_robot_speed: float | None = None - monitor_stats: bool = True + stats: bool = False model_config = SettingsConfigDict( env_file=".env", diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index ba399a84d5..401b41fa5f 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -54,7 +54,7 @@ def start(self) -> None: self._client = WorkerManager(n_workers=n) self._client.start() - if self._global_config.monitor_stats: + if self._global_config.stats: from dimos.core.resource_monitor.monitor import StatsMonitor self._stats_monitor = StatsMonitor(self._client) From 0fab5be4f21ec3c7707e3c3a8716879a7d03e9c2 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 23:52:21 +0800 Subject: [PATCH 21/23] renamed stats to dtop --- dimos/core/global_config.py | 2 +- dimos/core/module_coordinator.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py index 2153f912b4..3660a957dc 100644 --- a/dimos/core/global_config.py +++ b/dimos/core/global_config.py @@ -45,7 +45,7 @@ class GlobalConfig(BaseSettings): robot_rotation_diameter: float = 0.6 planner_strategy: NavigationStrategy = "simple" planner_robot_speed: float | None = None - stats: bool = False + dtop: bool = False model_config = SettingsConfigDict( env_file=".env", diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index 401b41fa5f..86afb9ebc4 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -54,7 +54,7 @@ def start(self) -> None: self._client = WorkerManager(n_workers=n) self._client.start() - if self._global_config.stats: + if self._global_config.dtop: from dimos.core.resource_monitor.monitor import StatsMonitor self._stats_monitor = StatsMonitor(self._client) From 8b4650804d6f66dfdf4382a0582db4f4ae995c8f Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Sun, 1 Mar 2026 23:54:58 +0800 Subject: [PATCH 22/23] better drop init msg --- dimos/utils/cli/dtop.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index 7ec33c61c4..16eae36604 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -201,7 +201,11 @@ def _refresh(self) -> None: if data is None: scroll.add_class("waiting") waiting = Panel( - Text("dtop waiting for stats…", style=theme.FOREGROUND, justify="center"), + Text( + "use `dimos --dtop ...` to emit stats", + style=theme.FOREGROUND, + justify="center", + ), border_style=theme.CYAN, expand=False, ) From 8a6e78adf80c081f608b8cc7accc01ad68e227dd Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 2 Mar 2026 18:14:13 +0800 Subject: [PATCH 23/23] fix ttl_cache tests to match new _TtlCacheWrapper API --- dimos/utils/decorators/test_decorators.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dimos/utils/decorators/test_decorators.py b/dimos/utils/decorators/test_decorators.py index 9018776fe6..98545a2e37 100644 --- a/dimos/utils/decorators/test_decorators.py +++ b/dimos/utils/decorators/test_decorators.py @@ -369,14 +369,14 @@ def expensive(x: int) -> int: expensive(1) expensive(2) - assert len(expensive._cache) == 2 + assert len(expensive.cache) == 2 time.sleep(0.1) # Next call sweeps expired entries expensive(3) - assert (1,) not in expensive._cache - assert (2,) not in expensive._cache - assert (3,) in expensive._cache + assert (1,) not in expensive.cache + assert (2,) not in expensive.cache + assert (3,) in expensive.cache def test_ttl_cache_manual_cache_cleanup() -> None: @@ -387,6 +387,6 @@ def expensive(x: int) -> int: return x * 2 expensive(1) - assert (1,) in expensive._cache - expensive.evict(1) - assert (1,) not in expensive._cache + assert (1,) in expensive.cache + del expensive.cache[(1,)] + assert (1,) not in expensive.cache