From 75950509221c07a4562315609c69d316e3e6a325 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Wed, 1 Oct 2025 08:19:38 +0300 Subject: [PATCH 1/6] shared mem again --- dimos/robot/unitree_webrtc/unitree_go2.py | 24 ++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index 0194652900..02013e36ef 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -419,7 +419,9 @@ def _deploy_connection(self): self.connection.lidar.transport = core.LCMTransport("/lidar", LidarMessage) self.connection.odom.transport = core.LCMTransport("/odom", PoseStamped) self.connection.gps_location.transport = core.pLCMTransport("/gps_location") - self.connection.video.transport = core.LCMTransport("/go2/color_image", Image) + self.connection.video.transport = core.pSHMTransport( + "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ) self.connection.movecmd.transport = core.LCMTransport("/cmd_vel", Twist) self.connection.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo) self.connection.camera_pose.transport = core.LCMTransport("/go2/camera_pose", PoseStamped) @@ -496,7 +498,12 @@ def _deploy_visualization(self): self.websocket_vis.path.connect(self.global_planner.path) self.websocket_vis.global_costmap.connect(self.mapper.global_costmap) - self.foxglove_bridge = FoxgloveBridge() + self.foxglove_bridge = FoxgloveBridge( + shm_channels=[ + "/go2/color_image#sensor_msgs.Image", + "/go2/depth_image#sensor_msgs.Image", + ] + ) # TODO: This should be moved. def _set_goal(goal: LatLon): @@ -515,7 +522,10 @@ def _deploy_perception(self): output_dir=self.spatial_memory_dir, ) - self.spatial_memory_module.video.transport = core.LCMTransport("/go2/color_image", Image) + color_image_default_capacity = 1920 * 1080 * 4 + self.spatial_memory_module.video.transport = core.pSHMTransport( + "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ) self.spatial_memory_module.odom.transport = core.LCMTransport( "/go2/camera_pose", PoseStamped ) @@ -547,8 +557,12 @@ def _deploy_camera(self): self.depth_module = self.dimos.deploy(DepthModule, gt_depth_scale=gt_depth_scale) # Set up transports - self.depth_module.color_image.transport = core.LCMTransport("/go2/color_image", Image) - self.depth_module.depth_image.transport = core.LCMTransport("/go2/depth_image", Image) + self.depth_module.color_image.transport = core.pSHMTransport( + "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ) + self.depth_module.depth_image.transport = core.pSHMTransport( + "/go2/depth_image", default_capacity=DEFAULT_CAPACITY_DEPTH_IMAGE + ) self.depth_module.camera_info.transport = core.LCMTransport("/go2/camera_info", CameraInfo) logger.info("Camera module deployed and connected") From 3008b94835ce2c0c3d26ccc3a0b912468a0b00b2 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Wed, 1 Oct 2025 22:09:04 +0300 Subject: [PATCH 2/6] clean up --- dimos/core/module.py | 5 +++++ dimos/navigation/bt_navigator/navigator.py | 1 - .../navigation/local_planner/local_planner.py | 8 ++++++-- dimos/perception/spatial_perception.py | 20 +++++++++++++------ dimos/robot/unitree_webrtc/unitree_go2.py | 20 +++++++++++++++++-- 5 files changed, 43 insertions(+), 11 deletions(-) diff --git a/dimos/core/module.py b/dimos/core/module.py index f7786bd55e..ed973991f3 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -23,6 +23,7 @@ get_origin, get_type_hints, ) +from reactivex.disposable import CompositeDisposable from dask.distributed import Actor, get_worker @@ -74,12 +75,14 @@ class ModuleBase(Configurable[ModuleConfig], SkillContainer): _tf: Optional[TFSpec] = None _loop: Optional[asyncio.AbstractEventLoop] = None _loop_thread: Optional[threading.Thread] + _disposables: CompositeDisposable default_config = ModuleConfig def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._loop, self._loop_thread = get_loop() + self._disposables = CompositeDisposable() # we can completely override comms protocols if we want try: # here we attempt to figure out if we are running on a dask worker @@ -98,6 +101,8 @@ def _close_module(self): self._loop = None self._loop_thread.join(timeout=2) self._loop_thread = None + if hasattr(self, "_disposables"): + self._disposables.dispose() def _close_rpc(self): # Using hasattr is needed because SkillCoordinator skips ModuleBase.__init__ and self.rpc is never set. diff --git a/dimos/navigation/bt_navigator/navigator.py b/dimos/navigation/bt_navigator/navigator.py index d64971c4a3..9bde5d9ca5 100644 --- a/dimos/navigation/bt_navigator/navigator.py +++ b/dimos/navigation/bt_navigator/navigator.py @@ -27,7 +27,6 @@ from dimos.msgs.geometry_msgs import PoseStamped from dimos.msgs.nav_msgs import OccupancyGrid from dimos_lcm.std_msgs import String -from dimos.navigation.local_planner.local_planner import BaseLocalPlanner from dimos.navigation.bt_navigator.goal_validator import find_safe_goal from dimos.navigation.bt_navigator.recovery_server import RecoveryServer from dimos.protocol.tf import TF diff --git a/dimos/navigation/local_planner/local_planner.py b/dimos/navigation/local_planner/local_planner.py index 0eac75ea6d..ec415e6e51 100644 --- a/dimos/navigation/local_planner/local_planner.py +++ b/dimos/navigation/local_planner/local_planner.py @@ -25,7 +25,7 @@ from typing import Optional from dimos.core import Module, In, Out, rpc -from dimos.msgs.geometry_msgs import Twist, Vector3, PoseStamped +from dimos.msgs.geometry_msgs import Twist, PoseStamped from dimos.msgs.nav_msgs import OccupancyGrid, Path from dimos.utils.logging_config import setup_logger from dimos.utils.transform_utils import get_distance, quaternion_to_euler, normalize_angle @@ -198,6 +198,10 @@ def stop(self): self.planning_thread = None stop_cmd = Twist() self.cmd_vel.publish(stop_cmd) - self._close_module() logger.info("Local planner stopped") + + @rpc + def stop_planner_module(self): + self.stop() + self._close_module() diff --git a/dimos/perception/spatial_perception.py b/dimos/perception/spatial_perception.py index e0504faa75..8205eaba0a 100644 --- a/dimos/perception/spatial_perception.py +++ b/dimos/perception/spatial_perception.py @@ -23,13 +23,14 @@ import numpy as np import cv2 -from reactivex import Observable, disposable, just, interval +from reactivex import Observable, disposable, interval from reactivex import operators as ops from datetime import datetime +from reactivex.disposable import Disposable -from dimos.core import In, Module, Out, rpc +from dimos.core import In, Module, rpc from dimos.msgs.sensor_msgs import Image -from dimos.msgs.geometry_msgs import Vector3, Quaternion, Pose, PoseStamped +from dimos.msgs.geometry_msgs import Vector3, Pose, PoseStamped from dimos.utils.logging_config import setup_logger from dimos.agents.memory.spatial_vector_db import SpatialVectorDB from dimos.agents.memory.image_embedding import ImageEmbeddingProvider @@ -190,14 +191,21 @@ def set_video(image_msg: Image): def set_odom(odom_msg: PoseStamped): self._latest_odom = odom_msg - self.video.subscribe(set_video) - self.odom.subscribe(set_odom) + unsub = self.video.subscribe(set_video) + self._disposables.add(Disposable(unsub)) + unsub = self.odom.subscribe(set_odom) + self._disposables.add(Disposable(unsub)) # Start periodic processing using interval - interval(self._process_interval).subscribe(lambda _: self._process_frame()) + unsub = interval(self._process_interval).subscribe(lambda _: self._process_frame()) + self._disposables.add(Disposable(unsub)) logger.info("SpatialMemory module started and subscribed to LCM streams") + @rpc + def stop(self): + self._close_module() + def _process_frame(self): """Process the latest frame with pose data if available.""" if self._latest_video_frame is None or self._latest_odom is None: diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index 02013e36ef..5bbc649463 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -410,6 +410,21 @@ def start(self): logger.info("UnitreeGo2 initialized and started") logger.info(f"WebSocket visualization available at http://localhost:{self.websocket_port}") + def stop(self): + # self.connection.stop() + # self.mapper.stop() + # self.global_planner.stop() + # self.local_planner.stop() + # self.navigator.stop() + # self.frontier_explorer.stop() + # self.websocket_vis.stop() + # self.foxglove_bridge.stop() + self.spatial_memory_module.stop() + # self.depth_module.stop() + # self.object_tracker.stop() + self.dimos.close_all() + self.lcm.stop() + def _deploy_connection(self): """Deploy and configure the connection module.""" self.connection = self.dimos.deploy( @@ -522,7 +537,6 @@ def _deploy_perception(self): output_dir=self.spatial_memory_dir, ) - color_image_default_capacity = 1920 * 1080 * 4 self.spatial_memory_module.video.transport = core.pSHMTransport( "/go2/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE ) @@ -764,9 +778,11 @@ def main(): try: while True: - time.sleep(1) + time.sleep(0.1) except KeyboardInterrupt: logger.info("Shutting down...") + finally: + robot.stop() if __name__ == "__main__": From cd8715b6b5422e21d346b6efe7d7526ed01dcb68 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Wed, 1 Oct 2025 22:50:34 +0300 Subject: [PATCH 3/6] move constants --- dimos/constants.py | 12 ++++++++++++ dimos/robot/unitree_webrtc/unitree_g1.py | 16 +++++++++++++--- dimos/robot/unitree_webrtc/unitree_go2.py | 13 +------------ 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/dimos/constants.py b/dimos/constants.py index e7062f0a1e..86b3a39aa1 100644 --- a/dimos/constants.py +++ b/dimos/constants.py @@ -15,3 +15,15 @@ from pathlib import Path DIMOS_PROJECT_ROOT = Path(__file__).parent.parent + +""" +Constants for shared memory +Usually, auto-detection for size would be preferred. Sadly, though, channels are made +and frozen *before* the first frame is received. +Therefore, a maximum capacity for color image and depth image transfer should be defined +ahead of time. +""" +# Default color image size: 1920x1080 frame x 3 (RGB) x uint8 +DEFAULT_CAPACITY_COLOR_IMAGE = 1920 * 1080 * 3 +# Default depth image size: 1280x720 frame * 4 (float32 size) +DEFAULT_CAPACITY_DEPTH_IMAGE = 1280 * 720 * 4 diff --git a/dimos/robot/unitree_webrtc/unitree_g1.py b/dimos/robot/unitree_webrtc/unitree_g1.py index 18f9f95ae0..6a25b3527a 100644 --- a/dimos/robot/unitree_webrtc/unitree_g1.py +++ b/dimos/robot/unitree_webrtc/unitree_g1.py @@ -24,6 +24,7 @@ from typing import Optional from dimos import core +from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE, DEFAULT_CAPACITY_DEPTH_IMAGE from dimos.core import Module, In, Out, rpc from dimos.msgs.geometry_msgs import PoseStamped, Twist, TwistStamped from dimos.msgs.nav_msgs.Odometry import Odometry @@ -241,8 +242,12 @@ def _deploy_camera(self): ) # Configure ZED LCM transports (same for both real and fake) - self.zed_camera.color_image.transport = core.LCMTransport("/zed/color_image", Image) - self.zed_camera.depth_image.transport = core.LCMTransport("/zed/depth_image", Image) + self.zed_camera.color_image.transport = core.pSHMTransport( + "/zed/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ) + self.zed_camera.depth_image.transport = core.pSHMTransport( + "/zed/depth_image", default_capacity=DEFAULT_CAPACITY_DEPTH_IMAGE + ) self.zed_camera.camera_info.transport = core.LCMTransport("/zed/camera_info", CameraInfo) self.zed_camera.pose.transport = core.LCMTransport("/zed/pose", PoseStamped) @@ -257,7 +262,12 @@ def _deploy_visualization(self): # Note: robot_pose connection removed since odom was removed from G1ConnectionModule # Deploy Foxglove bridge - self.foxglove_bridge = FoxgloveBridge() + self.foxglove_bridge = FoxgloveBridge( + shm_channels=[ + "/zed/color_image#sensor_msgs.Image", + "/zed/depth_image#sensor_msgs.Image", + ] + ) def _deploy_joystick(self): """Deploy joystick control module.""" diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index 5bbc649463..00172f6b02 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -25,6 +25,7 @@ from reactivex import Observable from dimos import core +from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE, DEFAULT_CAPACITY_DEPTH_IMAGE from dimos.core import In, Module, Out, rpc from dimos.mapping.types import LatLon from dimos.msgs.std_msgs import Header @@ -81,18 +82,6 @@ warnings.filterwarnings("ignore", message="coroutine.*was never awaited") warnings.filterwarnings("ignore", message="H264Decoder.*failed to decode") -""" -Constants for shared memory -Usually, auto-detection for size would be preferred. Sadly, though, channels are made -and frozen *before* the first frame is received. -Therefore, a maximum capacity for color image and depth image transfer should be defined -ahead of time. -""" -# Default color image size: 1920x1080 frame x 3 (RGB) x uint8 -DEFAULT_CAPACITY_COLOR_IMAGE = 1920 * 1080 * 3 -# Default depth image size: 1280x720 frame * 4 (float32 size) -DEFAULT_CAPACITY_DEPTH_IMAGE = 1280 * 720 * 4 - class FakeRTC: """Fake WebRTC connection for testing with recorded data.""" From b601b79e8024d74debe49cc0a523b9c1c94e9428 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Sun, 31 Aug 2025 06:42:09 +0300 Subject: [PATCH 4/6] add utilization --- dimos/robot/unitree_webrtc/unitree_g1.py | 2 +- dimos/robot/unitree_webrtc/unitree_go2.py | 6 + dimos/utils/monitoring.py | 244 ++++++++++++++++++++++ 3 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 dimos/utils/monitoring.py diff --git a/dimos/robot/unitree_webrtc/unitree_g1.py b/dimos/robot/unitree_webrtc/unitree_g1.py index 6a25b3527a..0011eac760 100644 --- a/dimos/robot/unitree_webrtc/unitree_g1.py +++ b/dimos/robot/unitree_webrtc/unitree_g1.py @@ -28,7 +28,7 @@ from dimos.core import Module, In, Out, rpc from dimos.msgs.geometry_msgs import PoseStamped, Twist, TwistStamped from dimos.msgs.nav_msgs.Odometry import Odometry -from dimos.msgs.sensor_msgs import Image, CameraInfo, PointCloud2 +from dimos.msgs.sensor_msgs import CameraInfo, PointCloud2 from dimos.msgs.tf2_msgs.TFMessage import TFMessage from dimos.protocol import pubsub from dimos.protocol.pubsub.lcmpubsub import LCM diff --git a/dimos/robot/unitree_webrtc/unitree_go2.py b/dimos/robot/unitree_webrtc/unitree_go2.py index 00172f6b02..0babf1d029 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2.py +++ b/dimos/robot/unitree_webrtc/unitree_go2.py @@ -46,6 +46,7 @@ from dimos.protocol.pubsub.lcmpubsub import LCM, Topic from dimos.protocol.tf import TF from dimos.robot.foxglove_bridge import FoxgloveBridge +from dimos.utils.monitoring import UtilizationModule from dimos.web.websocket_vis.websocket_vis_module import WebsocketVisModule from dimos.navigation.global_planner import AstarPlanner from dimos.navigation.local_planner.holonomic_local_planner import HolonomicLocalPlanner @@ -351,6 +352,7 @@ def __init__( self.spatial_memory_module = None self.depth_module = None self.object_tracker = None + self.utilization_module = None self._setup_directories() @@ -411,6 +413,7 @@ def stop(self): self.spatial_memory_module.stop() # self.depth_module.stop() # self.object_tracker.stop() + self.utilization_module.stop() self.dimos.close_all() self.lcm.stop() @@ -541,6 +544,8 @@ def _deploy_perception(self): frame_id="camera_link", ) + self.utilization_module = self.dimos.deploy(UtilizationModule) + # Set up transports self.object_tracker.detection2darray.transport = core.LCMTransport( "/go2/detection2d", Detection2DArray @@ -590,6 +595,7 @@ def _start_modules(self): self.spatial_memory_module.start() self.depth_module.start() self.object_tracker.start() + self.utilization_module.start() # Initialize skills after connection is established if self.skill_library is not None: diff --git a/dimos/utils/monitoring.py b/dimos/utils/monitoring.py new file mode 100644 index 0000000000..7f3f379457 --- /dev/null +++ b/dimos/utils/monitoring.py @@ -0,0 +1,244 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Note, to enable ps-spy to run without sudo you need: + + echo 0 | sudo tee /proc/sys/kernel/yama/ptrace_scope +""" + +import subprocess +import threading +import re +import os +import shutil +from functools import lru_cache +from typing import Optional +from distributed.client import Client + +from distributed import get_client +from dimos.core import Module, rpc +from dimos.utils.logging_config import setup_logger + + +logger = setup_logger(__file__) + + +def print_data_table(data): + headers = ["active_percent", "gil_percent", "n_threads", "pid", "worker_id"] + + # Determine column widths + col_widths = [] + for h in headers: + max_len = max(len(str(d[h])) for d in data) + col_widths.append(max(len(h), max_len)) + + # Print header + header_row = " | ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers)) + print("-" * len(header_row)) + print(header_row) + print("-" * len(header_row)) + + # Print rows + for row in data: + print(" | ".join(str(row[h]).ljust(col_widths[i]) for i, h in enumerate(headers))) + + +class UtilizationThread(threading.Thread): + _module: "UtilizationModule" + _stop_event: threading.Event + _monitors: dict + + def __init__(self, module): + super().__init__(daemon=True) + self._module = module + self._stop_event = threading.Event() + self._monitors = {} + + def run(self): + while not self._stop_event.is_set(): + workers = self._module.client.scheduler_info()["workers"] + pids = {pid: None for pid in get_worker_pids()} + for worker, info in workers.items(): + pid = get_pid_by_port(worker.rsplit(":", 1)[-1]) + if pid is None: + continue + pids[pid] = info["id"] + data = [] + for pid, worker_id in pids.items(): + if pid not in self._monitors: + self._monitors[pid] = GilMonitorThread(pid) + self._monitors[pid].start() + gil, active, n_threads = self._monitors[pid].get_values() + data.append( + { + "worker_id": worker_id, + "pid": pid, + "gil_percent": gil, + "active_percent": active, + "n_threads": n_threads, + } + ) + data.sort(key=lambda x: x["pid"]) + print_data_table(data) + self._stop_event.wait(1) + + def stop(self): + self._stop_event.set() + for monitor in self._monitors.values(): + monitor.stop() + monitor.join(timeout=2) + + +class UtilizationModule(Module): + client: Optional[Client] + _utilization_thread: Optional[UtilizationThread] + + def __init__(self): + super().__init__() + self.client = None + self._utilization_thread = None + + if not os.getenv("MEASURE_GIL_UTILIZATION"): + logger.info("Set `MEASURE_GIL_UTILIZATION=true` to print GIL utilization.") + return + + if not _can_use_py_spy(): + logger.warning( + "Cannot start UtilizationModule because in order to run py-spy without " + "being root you need to enable this:\n" + "\n" + " echo 0 | sudo tee /proc/sys/kernel/yama/ptrace_scope" + ) + return + + if not shutil.which("py-spy"): + logger.warning("Cannot start UtilizationModule because `py-spy` is not installed.") + return + + self.client = get_client() + self._utilization_thread = UtilizationThread(self) + + @rpc + def start(self): + if self._utilization_thread: + self._utilization_thread.start() + + @rpc + def stop(self): + if self._utilization_thread: + self._utilization_thread.stop() + self._utilization_thread.join(timeout=2) + self._close_module() + + +def _can_use_py_spy(): + try: + with open("/proc/sys/kernel/yama/ptrace_scope") as f: + value = f.read().strip() + return value == "0" + except Exception: + pass + return False + + +@lru_cache(maxsize=None) +def get_pid_by_port(port: int) -> int | None: + try: + result = subprocess.run( + ["lsof", "-ti", f":{port}"], capture_output=True, text=True, check=True + ) + pid_str = result.stdout.strip() + return int(pid_str) if pid_str else None + except subprocess.CalledProcessError: + return None + + +def get_worker_pids(): + pids = [] + for pid in os.listdir("/proc"): + if not pid.isdigit(): + continue + try: + with open(f"/proc/{pid}/cmdline", "r") as f: + cmdline = f.read().replace("\x00", " ") + if "spawn_main" in cmdline: + pids.append(int(pid)) + except (FileNotFoundError, PermissionError): + continue + return pids + + +class GilMonitorThread(threading.Thread): + pid: int + _latest_values: tuple[float, float, int] + _stop_event: threading.Event + _lock: threading.Lock + + def __init__(self, pid): + super().__init__(daemon=True) + self.pid = pid + self._latest_values = (-1.0, -1.0, -1) + self._stop_event = threading.Event() + self._lock = threading.Lock() + + def run(self): + command = ["py-spy", "top", "--pid", str(self.pid), "--rate", "100"] + process = None + try: + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, # Line-buffered output + ) + + for line in iter(process.stdout.readline, ""): + if self._stop_event.is_set(): + break + + if "GIL:" not in line: + continue + + match = re.search( + r"GIL:\s*([\d.]+?)%,\s*Active:\s*([\d.]+?)%,\s*Threads:\s*(\d+)", line + ) + if not match: + continue + + try: + gil_percent = float(match.group(1)) + active_percent = float(match.group(2)) + num_threads = int(match.group(3)) + + with self._lock: + self._latest_values = (gil_percent, active_percent, num_threads) + except (ValueError, IndexError) as e: + pass + except Exception as e: + logger.error(f"An error occurred in GilMonitorThread for PID {self.pid}: {e}") + raise + finally: + if process: + process.terminate() + process.wait(timeout=1) + self._stop_event.set() + + def get_values(self): + with self._lock: + return self._latest_values + + def stop(self): + self._stop_event.set() From d1d12dbeb70c532276fcfa3b00ecf9a8b51068e5 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Thu, 2 Oct 2025 00:51:07 +0300 Subject: [PATCH 5/6] prettier box --- dimos/utils/monitoring.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/dimos/utils/monitoring.py b/dimos/utils/monitoring.py index 7f3f379457..ee032cca23 100644 --- a/dimos/utils/monitoring.py +++ b/dimos/utils/monitoring.py @@ -37,6 +37,7 @@ def print_data_table(data): headers = ["active_percent", "gil_percent", "n_threads", "pid", "worker_id"] + numeric_headers = {"active_percent", "gil_percent", "n_threads", "pid"} # Determine column widths col_widths = [] @@ -44,15 +45,24 @@ def print_data_table(data): max_len = max(len(str(d[h])) for d in data) col_widths.append(max(len(h), max_len)) - # Print header - header_row = " | ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers)) - print("-" * len(header_row)) + # Print header with DOS box characters + header_row = " │ ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers)) + border_parts = ["─" * w for w in col_widths] + border_line = "─┼─".join(border_parts) + print(border_line) print(header_row) - print("-" * len(header_row)) + print(border_line) # Print rows for row in data: - print(" | ".join(str(row[h]).ljust(col_widths[i]) for i, h in enumerate(headers))) + formatted_cells = [] + for i, h in enumerate(headers): + value = str(row[h]) + if h in numeric_headers: + formatted_cells.append(value.rjust(col_widths[i])) + else: + formatted_cells.append(value.ljust(col_widths[i])) + print(" │ ".join(formatted_cells)) class UtilizationThread(threading.Thread): From cb6d1e502b19010a5ef06bd28d743e52bccc9d7d Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Thu, 2 Oct 2025 01:30:47 +0300 Subject: [PATCH 6/6] add monitoring back --- dimos/core/__init__.py | 5 +++ dimos/utils/actor_registry.py | 85 +++++++++++++++++++++++++++++++++++ dimos/utils/monitoring.py | 58 +++++++++++++++++++++--- 3 files changed, 142 insertions(+), 6 deletions(-) create mode 100644 dimos/utils/actor_registry.py diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 12043300ae..1a1ed349e1 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -10,6 +10,7 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleBase, ModuleConfig from dimos.core.stream import In, Out, RemoteIn, RemoteOut, Transport +from dimos.utils.actor_registry import ActorRegistry from dimos.core.transport import ( LCMTransport, SHMTransport, @@ -120,6 +121,9 @@ def deploy( worker = actor.set_ref(actor).result() print((f"deployed: {colors.green(actor)} @ {colors.blue('worker ' + str(worker))}")) + # Register actor deployment in shared memory + ActorRegistry.update(str(actor), str(worker)) + return RPCClient(actor, actor_class) def check_worker_memory(): @@ -176,6 +180,7 @@ def check_worker_memory(): ) def close_all(): + ActorRegistry.clear() dask_client.shutdown() local_cluster.close() diff --git a/dimos/utils/actor_registry.py b/dimos/utils/actor_registry.py new file mode 100644 index 0000000000..3f1133fa4d --- /dev/null +++ b/dimos/utils/actor_registry.py @@ -0,0 +1,85 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared memory registry for tracking actor deployments across processes.""" + +import json +from multiprocessing import shared_memory +from typing import Dict + + +class ActorRegistry: + """Shared memory registry of actor deployments.""" + + SHM_NAME = "dimos_actor_registry" + SHM_SIZE = 65536 # 64KB should be enough for most deployments + + @staticmethod + def update(actor_name: str, worker_id: str): + """Update registry with new actor deployment.""" + try: + shm = shared_memory.SharedMemory(name=ActorRegistry.SHM_NAME) + except FileNotFoundError: + shm = shared_memory.SharedMemory( + name=ActorRegistry.SHM_NAME, create=True, size=ActorRegistry.SHM_SIZE + ) + + # Read existing data + data = ActorRegistry._read_from_shm(shm) + + # Update with new actor + data[actor_name] = worker_id + + # Write back + ActorRegistry._write_to_shm(shm, data) + shm.close() + + @staticmethod + def get_all() -> Dict[str, str]: + """Get all actor->worker mappings.""" + try: + shm = shared_memory.SharedMemory(name=ActorRegistry.SHM_NAME) + data = ActorRegistry._read_from_shm(shm) + shm.close() + return data + except FileNotFoundError: + return {} + + @staticmethod + def clear(): + """Clear the registry and free shared memory.""" + try: + shm = shared_memory.SharedMemory(name=ActorRegistry.SHM_NAME) + ActorRegistry._write_to_shm(shm, {}) + shm.close() + shm.unlink() + except FileNotFoundError: + pass + + @staticmethod + def _read_from_shm(shm) -> Dict[str, str]: + """Read JSON data from shared memory.""" + raw = bytes(shm.buf[:]).rstrip(b"\x00") + if not raw: + return {} + return json.loads(raw.decode("utf-8")) + + @staticmethod + def _write_to_shm(shm, data: Dict[str, str]): + """Write JSON data to shared memory.""" + json_bytes = json.dumps(data).encode("utf-8") + if len(json_bytes) > ActorRegistry.SHM_SIZE: + raise ValueError("Registry data too large for shared memory") + shm.buf[: len(json_bytes)] = json_bytes + shm.buf[len(json_bytes) :] = b"\x00" * (ActorRegistry.SHM_SIZE - len(json_bytes)) diff --git a/dimos/utils/monitoring.py b/dimos/utils/monitoring.py index ee032cca23..9c562dc0e7 100644 --- a/dimos/utils/monitoring.py +++ b/dimos/utils/monitoring.py @@ -29,6 +29,7 @@ from distributed import get_client from dimos.core import Module, rpc +from dimos.utils.actor_registry import ActorRegistry from dimos.utils.logging_config import setup_logger @@ -36,8 +37,25 @@ def print_data_table(data): - headers = ["active_percent", "gil_percent", "n_threads", "pid", "worker_id"] - numeric_headers = {"active_percent", "gil_percent", "n_threads", "pid"} + headers = [ + "cpu_percent", + "active_percent", + "gil_percent", + "n_threads", + "pid", + "worker_id", + "modules", + ] + numeric_headers = {"cpu_percent", "active_percent", "gil_percent", "n_threads", "pid"} + + # Add registered modules. + modules = ActorRegistry.get_all() + for worker in data: + worker["modules"] = ", ".join( + module_name.split("-", 1)[0] + for module_name, worker_id_str in modules.items() + if worker_id_str == str(worker["worker_id"]) + ) # Determine column widths col_widths = [] @@ -90,9 +108,10 @@ def run(self): if pid not in self._monitors: self._monitors[pid] = GilMonitorThread(pid) self._monitors[pid].start() - gil, active, n_threads = self._monitors[pid].get_values() + cpu, gil, active, n_threads = self._monitors[pid].get_values() data.append( { + "cpu_percent": cpu, "worker_id": worker_id, "pid": pid, "gil_percent": gil, @@ -101,6 +120,7 @@ def run(self): } ) data.sort(key=lambda x: x["pid"]) + self._fix_missing_ids(data) print_data_table(data) self._stop_event.wait(1) @@ -110,6 +130,16 @@ def stop(self): monitor.stop() monitor.join(timeout=2) + def _fix_missing_ids(self, data): + """ + Some worker IDs are None. But if we order the workers by PID and all + non-None ids are in order, then we can deduce that the None ones are the + missing indices. + """ + if all(x["worker_id"] in (i, None) for i, x in enumerate(data)): + for i, worker in enumerate(data): + worker["worker_id"] = i + class UtilizationModule(Module): client: Optional[Client] @@ -192,14 +222,14 @@ def get_worker_pids(): class GilMonitorThread(threading.Thread): pid: int - _latest_values: tuple[float, float, int] + _latest_values: tuple[float, float, float, int] _stop_event: threading.Event _lock: threading.Lock def __init__(self, pid): super().__init__(daemon=True) self.pid = pid - self._latest_values = (-1.0, -1.0, -1) + self._latest_values = (-1.0, -1.0, -1.0, -1) self._stop_event = threading.Event() self._lock = threading.Lock() @@ -229,12 +259,18 @@ def run(self): continue try: + cpu_percent = _get_cpu_percent(self.pid) gil_percent = float(match.group(1)) active_percent = float(match.group(2)) num_threads = int(match.group(3)) with self._lock: - self._latest_values = (gil_percent, active_percent, num_threads) + self._latest_values = ( + cpu_percent, + gil_percent, + active_percent, + num_threads, + ) except (ValueError, IndexError) as e: pass except Exception as e: @@ -252,3 +288,13 @@ def get_values(self): def stop(self): self._stop_event.set() + + +def _get_cpu_percent(pid: int) -> float: + try: + result = subprocess.run( + ["ps", "-p", str(pid), "-o", "%cpu="], capture_output=True, text=True, check=True + ) + return float(result.stdout.strip()) + except Exception: + return -1.0