From 28df1d74b132525f2c7aee1d4e69e97833e49b9f Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Tue, 30 Sep 2025 06:48:22 +0300 Subject: [PATCH 1/4] fix all thread leaks --- dimos/agents2/skills/conftest.py | 16 ++ .../skills/google_maps_skill_container.py | 1 + dimos/agents2/skills/gps_nav_skill.py | 1 + dimos/agents2/skills/navigation.py | 8 +- dimos/agents2/skills/osm.py | 1 + .../agents2/temp/test_unitree_agent_query.py | 24 ++- .../temp/test_unitree_skill_container.py | 139 ++++++++--------- dimos/agents2/test_mock_agent.py | 2 + dimos/core/__init__.py | 38 ++++- dimos/core/module.py | 10 +- dimos/core/test_core.py | 11 +- dimos/mapping/osm/query.py | 20 +-- .../local_planner/test_base_local_planner.py | 3 + dimos/navigation/visual/query.py | 47 ++++++ dimos/perception/detection2d/conftest.py | 35 ++++- dimos/perception/detection2d/test_module.py | 16 +- .../detection2d/test_yolo_2d_det.py | 18 ++- dimos/perception/detection2d/yolo_2d_det.py | 14 ++ dimos/protocol/service/lcmservice.py | 10 +- dimos/protocol/skill/coordinator.py | 6 + dimos/protocol/skill/skill.py | 20 ++- dimos/protocol/skill/test_coordinator.py | 4 +- dimos/protocol/tf/test_tf.py | 2 + dimos/robot/unitree_webrtc/type/test_map.py | 2 + .../unitree_webrtc/unitree_skill_container.py | 1 - dimos/types/test_timestamped.py | 97 ++++++------ dimos/types/timestamped.py | 36 ++--- dimos/utils/generic.py | 15 ++ dimos/utils/reactive.py | 42 +++++- dimos/utils/test_reactive.py | 141 +++++++++++------- dimos/utils/testing.py | 16 +- 31 files changed, 544 insertions(+), 252 deletions(-) create mode 100644 dimos/navigation/visual/query.py diff --git a/dimos/agents2/skills/conftest.py b/dimos/agents2/skills/conftest.py index 79283fc1fc..63f64ca5ee 100644 --- a/dimos/agents2/skills/conftest.py +++ b/dimos/agents2/skills/conftest.py @@ -15,6 +15,7 @@ import pytest import reactivex as rx from functools import partial +from reactivex.scheduler import ThreadPoolScheduler from dimos.agents2.skills.gps_nav_skill import GpsNavSkillContainer from dimos.agents2.skills.navigation import NavigationSkillContainer @@ -26,6 +27,21 @@ from dimos.msgs.sensor_msgs import Image +@pytest.fixture(autouse=True) +def cleanup_threadpool_scheduler(monkeypatch): + # TODO: get rid of this global threadpool + """Clean up and recreate the global ThreadPoolScheduler after each test.""" + # Disable ChromaDB telemetry to avoid leaking threads + monkeypatch.setenv("CHROMA_ANONYMIZED_TELEMETRY", "False") + yield + from dimos.utils import threadpool + + # Shutdown the global scheduler's executor + threadpool.scheduler.executor.shutdown(wait=True) + # Recreate it for the next test + threadpool.scheduler = ThreadPoolScheduler(max_workers=threadpool.get_max_workers()) + + @pytest.fixture def fake_robot(mocker): return mocker.MagicMock() diff --git a/dimos/agents2/skills/google_maps_skill_container.py b/dimos/agents2/skills/google_maps_skill_container.py index 652fa02aa5..167782fd74 100644 --- a/dimos/agents2/skills/google_maps_skill_container.py +++ b/dimos/agents2/skills/google_maps_skill_container.py @@ -52,6 +52,7 @@ def __enter__(self) -> "GoogleMapsSkillContainer": def __exit__(self, exc_type, exc_val, exc_tb): self._disposables.dispose() + self.stop() return False def _on_gps_location(self, location: LatLon) -> None: diff --git a/dimos/agents2/skills/gps_nav_skill.py b/dimos/agents2/skills/gps_nav_skill.py index b35e0bf77c..dd29e7189d 100644 --- a/dimos/agents2/skills/gps_nav_skill.py +++ b/dimos/agents2/skills/gps_nav_skill.py @@ -56,6 +56,7 @@ def __enter__(self) -> "GpsNavSkillContainer": def __exit__(self, exc_type, exc_val, exc_tb): self._disposables.dispose() + self.stop() return False def _on_gps_location(self, location: LatLon) -> None: diff --git a/dimos/agents2/skills/navigation.py b/dimos/agents2/skills/navigation.py index 519a3f5bcb..8bd7a030b2 100644 --- a/dimos/agents2/skills/navigation.py +++ b/dimos/agents2/skills/navigation.py @@ -17,11 +17,13 @@ import cv2 from reactivex import Observable +from dimos.models.vl.qwen import QwenVlModel from dimos.msgs.sensor_msgs import Image +from dimos.navigation.visual.query import get_object_bbox_from_image from dimos.protocol.skill.skill import SkillContainer, skill from dimos.robot.robot import UnitreeRobot from dimos.types.robot_location import RobotLocation -from dimos.models.qwen.video_query import BBox, get_bbox_from_qwen_frame +from dimos.models.qwen.video_query import BBox from dimos.msgs.geometry_msgs import PoseStamped from dimos.msgs.geometry_msgs.Vector3 import make_vector3 from dimos.utils.transform_utils import euler_to_quaternion, quaternion_to_euler @@ -46,6 +48,7 @@ def __init__(self, robot: UnitreeRobot, video_stream: Observable[Image]): self._video_stream = video_stream self._similarity_threshold = 0.23 self._started = False + self._vl_model = QwenVlModel() def __enter__(self) -> "NavigationSkillContainer": unsub = self._video_stream.subscribe(self._on_video) @@ -55,6 +58,7 @@ def __enter__(self) -> "NavigationSkillContainer": def __exit__(self, exc_type, exc_val, exc_tb): self._disposables.dispose() + self.stop() return False def _on_video(self, image: Image) -> None: @@ -174,7 +178,7 @@ def _get_bbox_for_current_frame(self, query: str) -> Optional[BBox]: if frame is None: return None - return get_bbox_from_qwen_frame(frame, object_name=query) + return get_object_bbox_from_image(self._vl_model, frame, query) def _navigate_using_semantic_map(self, query: str) -> str: results = self._robot.spatial_memory.query_by_text(query) diff --git a/dimos/agents2/skills/osm.py b/dimos/agents2/skills/osm.py index bc59b24599..c76242fe87 100644 --- a/dimos/agents2/skills/osm.py +++ b/dimos/agents2/skills/osm.py @@ -52,6 +52,7 @@ def __enter__(self) -> "OsmSkillContainer": def __exit__(self, exc_type, exc_val, exc_tb): self._disposables.dispose() + self.stop() return False def _on_gps_location(self, location: LatLon) -> None: diff --git a/dimos/agents2/temp/test_unitree_agent_query.py b/dimos/agents2/temp/test_unitree_agent_query.py index 19446d8cf2..81cf263739 100644 --- a/dimos/agents2/temp/test_unitree_agent_query.py +++ b/dimos/agents2/temp/test_unitree_agent_query.py @@ -101,6 +101,9 @@ def test_sync_query_with_thread(): agent.register_skills(container) agent.start() + # Track the thread we might create + loop_thread = None + # The agent's event loop should be running in the Module's thread # Let's check if it's running if agent._loop and agent._loop.is_running(): @@ -113,8 +116,8 @@ def run_loop(): asyncio.set_event_loop(agent._loop) agent._loop.run_forever() - thread = threading.Thread(target=run_loop, daemon=True) - thread.start() + loop_thread = threading.Thread(target=run_loop, daemon=False, name="EventLoopThread") + loop_thread.start() time.sleep(1) # Give loop time to start logger.info("Started event loop in thread") @@ -129,9 +132,24 @@ def run_loop(): traceback.print_exc() - # Clean up + # Clean up properly + # First stop the agent (this should stop its internal loop if any) agent.stop() + # Then stop the manually created event loop thread if we created one + if loop_thread and loop_thread.is_alive(): + logger.info("Stopping manually created event loop thread...") + # Stop the event loop + if agent._loop and agent._loop.is_running(): + agent._loop.call_soon_threadsafe(agent._loop.stop) + # Wait for thread to finish + loop_thread.join(timeout=5) + if loop_thread.is_alive(): + logger.warning("Thread did not stop cleanly within timeout") + + # Finally close the container + container._close_module() + # def test_with_real_module_system(): # """Test using the real DimOS module system (like in test_agent.py).""" diff --git a/dimos/agents2/temp/test_unitree_skill_container.py b/dimos/agents2/temp/test_unitree_skill_container.py index ede701f9e6..3b127e2ca0 100644 --- a/dimos/agents2/temp/test_unitree_skill_container.py +++ b/dimos/agents2/temp/test_unitree_skill_container.py @@ -19,6 +19,7 @@ """ import sys +import time from pathlib import Path # Add parent directories to path @@ -39,19 +40,25 @@ def test_skill_container_creation(): # Create container without robot (for testing) container = UnitreeSkillContainer(robot=None) - # Get available skills from the container - skills = container.skills() - - print(f"Number of skills registered: {len(skills)}") - print("\nAvailable skills:") - for name, skill_config in list(skills.items())[:10]: # Show first 10 - print( - f" - {name}: {skill_config.description if hasattr(skill_config, 'description') else 'No description'}" - ) - if len(skills) > 10: - print(f" ... and {len(skills) - 10} more skills") - - return container, skills + try: + # Get available skills from the container + skills = container.skills() + + print(f"Number of skills registered: {len(skills)}") + print("\nAvailable skills:") + for name, skill_config in list(skills.items())[:10]: # Show first 10 + print( + f" - {name}: {skill_config.description if hasattr(skill_config, 'description') else 'No description'}" + ) + if len(skills) > 10: + print(f" ... and {len(skills) - 10} more skills") + + return container, skills + finally: + # Ensure proper cleanup + container._close_module() + # Small delay to allow threads to finish cleanup + time.sleep(0.1) def test_agent_with_skills(): @@ -60,24 +67,33 @@ def test_agent_with_skills(): # Create skill container container = UnitreeSkillContainer(robot=None) + agent = None - # Create agent with configuration passed directly - agent = Agent( - system_prompt="You are a helpful robot assistant that can control a Unitree Go2 robot.", - model=Model.GPT_4O_MINI, - provider=Provider.OPENAI, - ) + try: + # Create agent with configuration passed directly + agent = Agent( + system_prompt="You are a helpful robot assistant that can control a Unitree Go2 robot.", + model=Model.GPT_4O_MINI, + provider=Provider.OPENAI, + ) - # Register skills - agent.register_skills(container) + # Register skills + agent.register_skills(container) - print("Agent created and skills registered successfully!") + print("Agent created and skills registered successfully!") - # Get tools to verify - tools = agent.get_tools() - print(f"Agent has access to {len(tools)} tools") + # Get tools to verify + tools = agent.get_tools() + print(f"Agent has access to {len(tools)} tools") - return agent + return agent + finally: + # Ensure proper cleanup in order + if agent: + agent.stop() + container._close_module() + # Small delay to allow threads to finish cleanup + time.sleep(0.1) def test_skill_schemas(): @@ -85,55 +101,26 @@ def test_skill_schemas(): print("\n=== Testing Skill Schemas ===") container = UnitreeSkillContainer(robot=None) - skills = container.skills() - - # Check a few key skills (using snake_case names now) - skill_names = ["move", "wait", "stand_up", "sit", "front_flip", "dance1"] - - for name in skill_names: - if name in skills: - skill_config = skills[name] - print(f"\n{name} skill:") - print(f" Config: {skill_config}") - if hasattr(skill_config, "schema"): - print( - f" Schema keys: {skill_config.schema.keys() if skill_config.schema else 'None'}" - ) - else: - print(f"\nWARNING: Skill '{name}' not found!") - - -def main(): - """Run all tests.""" - print("=" * 60) - print("Testing UnitreeSkillContainer with agents2 Framework") - print("=" * 60) try: - # Test 1: Container creation - container, skills = test_skill_container_creation() - - # Test 2: Agent with skills - agent = test_agent_with_skills() - - # Test 3: Skill schemas - test_skill_schemas() - - # Test 4: Simple query (async) - # asyncio.run(test_simple_query()) - print("\n=== Async query test skipped (would require running agent) ===") - - print("\n" + "=" * 60) - print("All tests completed successfully!") - print("=" * 60) - - except Exception as e: - print(f"\nERROR during testing: {e}") - import traceback - - traceback.print_exc() - sys.exit(1) - - -if __name__ == "__main__": - main() + skills = container.skills() + + # Check a few key skills (using snake_case names now) + skill_names = ["move", "wait", "stand_up", "sit", "front_flip", "dance1"] + + for name in skill_names: + if name in skills: + skill_config = skills[name] + print(f"\n{name} skill:") + print(f" Config: {skill_config}") + if hasattr(skill_config, "schema"): + print( + f" Schema keys: {skill_config.schema.keys() if skill_config.schema else 'None'}" + ) + else: + print(f"\nWARNING: Skill '{name}' not found!") + finally: + # Ensure proper cleanup + container._close_module() + # Small delay to allow threads to finish cleanup + time.sleep(0.1) diff --git a/dimos/agents2/test_mock_agent.py b/dimos/agents2/test_mock_agent.py index 4331b48c30..3609803f11 100644 --- a/dimos/agents2/test_mock_agent.py +++ b/dimos/agents2/test_mock_agent.py @@ -128,6 +128,8 @@ def test_image_tool_call(): ] assert len(human_messages_with_images) >= 0 # May have image messages agent.stop() + test_skill_module.stop() + dimos.close_all() @pytest.mark.tool diff --git a/dimos/core/__init__.py b/dimos/core/__init__.py index 12043300ae..2049e7aae7 100644 --- a/dimos/core/__init__.py +++ b/dimos/core/__init__.py @@ -77,13 +77,16 @@ def __getattr__(self, name: str): original_method = getattr(self.actor_class, name, None) def rpc_call(*args, **kwargs): - result, unsub_fn = self.rpc.call_sync(f"{self.remote_name}/{name}", (args, kwargs)) - self._unsub_fns.append(unsub_fn) - - # TODO: This is ugly. + # For stop/close/shutdown, use call_nowait to avoid deadlock + # (the remote side stops its RPC service before responding) if name in ("stop", "close", "shutdown"): + if self.rpc: + self.rpc.call_nowait(f"{self.remote_name}/{name}", (args, kwargs)) self.stop_client() + return None + result, unsub_fn = self.rpc.call_sync(f"{self.remote_name}/{name}", (args, kwargs)) + self._unsub_fns.append(unsub_fn) return result # Copy docstring and other attributes from original method @@ -176,8 +179,33 @@ def check_worker_memory(): ) def close_all(): - dask_client.shutdown() + import time + + # Get the event loop before shutting down + loop = dask_client.loop + + # Close cluster and client local_cluster.close() + dask_client.close() + + # Stop the Tornado IOLoop to clean up IO loop and Profile threads + if loop and hasattr(loop, "add_callback") and hasattr(loop, "stop"): + try: + loop.add_callback(loop.stop) + except Exception: + pass + + # Shutdown the Dask offload thread pool + try: + from distributed.utils import _offload_executor + + if _offload_executor: + _offload_executor.shutdown(wait=False) + except Exception: + pass + + # Give threads a moment to clean up + time.sleep(0.1) dask_client.deploy = deploy dask_client.check_worker_memory = check_worker_memory diff --git a/dimos/core/module.py b/dimos/core/module.py index f7786bd55e..f91c64e019 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -23,6 +23,7 @@ get_origin, get_type_hints, ) +from reactivex.disposable import CompositeDisposable from dask.distributed import Actor, get_worker @@ -74,12 +75,14 @@ class ModuleBase(Configurable[ModuleConfig], SkillContainer): _tf: Optional[TFSpec] = None _loop: Optional[asyncio.AbstractEventLoop] = None _loop_thread: Optional[threading.Thread] + _disposables: CompositeDisposable default_config = ModuleConfig def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._loop, self._loop_thread = get_loop() + self._disposables = CompositeDisposable() # we can completely override comms protocols if we want try: # here we attempt to figure out if we are running on a dask worker @@ -94,10 +97,13 @@ def __init__(self, *args, **kwargs): def _close_module(self): self._close_rpc() if hasattr(self, "_loop") and self._loop_thread: - self._loop.call_soon_threadsafe(self._loop.stop) + if self._loop_thread.is_alive(): + self._loop.call_soon_threadsafe(self._loop.stop) + self._loop_thread.join(timeout=2) self._loop = None - self._loop_thread.join(timeout=2) self._loop_thread = None + if hasattr(self, "_disposables"): + self._disposables.dispose() def _close_rpc(self): # Using hasattr is needed because SkillCoordinator skips ModuleBase.__init__ and self.rpc is never set. diff --git a/dimos/core/test_core.py b/dimos/core/test_core.py index d581375d8d..1acf87f078 100644 --- a/dimos/core/test_core.py +++ b/dimos/core/test_core.py @@ -29,6 +29,7 @@ from dimos.msgs.geometry_msgs import Vector3 from dimos.robot.unitree_webrtc.type.lidar import LidarMessage from dimos.robot.unitree_webrtc.type.odometry import Odometry +from reactivex.disposable import Disposable assert dimos @@ -55,7 +56,8 @@ def _odom(msg): print("RCV:", (time.perf_counter() - msg.pubtime) * 1000, msg) self.mov.publish(msg.position) - self.odometry.subscribe(_odom) + unsub = self.odometry.subscribe(_odom) + self._disposables.add(Disposable(unsub)) def _lidar(msg): self.lidar_msg_count += 1 @@ -64,7 +66,8 @@ def _lidar(msg): else: print("RCV: unknown time", msg) - self.lidar.subscribe(_lidar) + unsub = self.lidar.subscribe(_lidar) + self._disposables.add(Disposable(unsub)) def test_classmethods(): @@ -84,7 +87,7 @@ def test_classmethods(): # Check that we have the expected RPC methods assert "navigate_to" in class_rpcs, "navigate_to should be in rpcs" assert "start" in class_rpcs, "start should be in rpcs" - assert len(class_rpcs) == 5 + assert len(class_rpcs) == 6 # Check that the values are callable assert callable(class_rpcs["navigate_to"]), "navigate_to should be callable" @@ -96,6 +99,8 @@ def test_classmethods(): ) assert hasattr(class_rpcs["start"], "__rpc__"), "start should have __rpc__ attribute" + nav._close_module() + @pytest.mark.module def test_basic_deployment(dimos): diff --git a/dimos/mapping/osm/query.py b/dimos/mapping/osm/query.py index 1b9e0b9bf7..d4e7d97280 100644 --- a/dimos/mapping/osm/query.py +++ b/dimos/mapping/osm/query.py @@ -13,12 +13,12 @@ # limitations under the License. import re -import json -from typing import Any, Optional, Tuple +from typing import Optional, Tuple from dimos.mapping.osm.osm import MapImage from dimos.mapping.types import LatLon from dimos.models.vl.base import VlModel +from dimos.utils.generic import extract_json_from_llm_response from dimos.utils.logging_config import setup_logger @@ -47,24 +47,10 @@ def query_for_one_position_and_context( response = vl_model.query(map_image.image.data, full_query) try: - doc = _extract_json_from_response(response) + doc = extract_json_from_llm_response(response) return map_image.pixel_to_latlon(tuple(doc["coordinates"])), str(doc["description"]) except Exception: pass # TODO: Try more simplictic methods to parse. return None - - -def _extract_json_from_response(response: str) -> Any: - start_idx = response.find("{") - end_idx = response.rfind("}") + 1 - - if start_idx >= 0 and end_idx > start_idx: - json_str = response[start_idx:end_idx] - try: - return json.loads(json_str) - except Exception: - pass - - return None diff --git a/dimos/navigation/local_planner/test_base_local_planner.py b/dimos/navigation/local_planner/test_base_local_planner.py index 97183dda40..dc76bca83a 100644 --- a/dimos/navigation/local_planner/test_base_local_planner.py +++ b/dimos/navigation/local_planner/test_base_local_planner.py @@ -162,6 +162,7 @@ def test_lowpass_filter(self): # v2 should be between v1 and the raw velocity assert vel2.linear.x != first_vx # Should be different due to filtering assert 0 < vel2.linear.x <= planner.v_max # Should still be positive and within limits + planner._close_module() def test_no_path(self, planner, empty_costmap): """Test that planner returns None when no path is available.""" @@ -356,6 +357,7 @@ def test_robot_frame_transformation(self, empty_costmap): assert vel.angular.z < 0 # Should turn right (negative angular velocity) # X velocity should be relatively small compared to Y assert abs(vel.linear.x) < abs(vel.linear.y) # Lateral movement dominates + planner._close_module() def test_angular_velocity_computation(self, empty_costmap): """Test that angular velocity is computed to align with path.""" @@ -401,3 +403,4 @@ def test_angular_velocity_computation(self, empty_costmap): assert ( abs(vel.linear.x - vel.linear.y) < max(vel.linear.x, vel.linear.y) * 0.5 ) # Within 50% of each other + planner._close_module() diff --git a/dimos/navigation/visual/query.py b/dimos/navigation/visual/query.py new file mode 100644 index 0000000000..42ec9623ef --- /dev/null +++ b/dimos/navigation/visual/query.py @@ -0,0 +1,47 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional +import numpy as np +from dimos.models.qwen.video_query import BBox +from dimos.models.vl.base import VlModel +from dimos.utils.generic import extract_json_from_llm_response + + +def get_object_bbox_from_image( + vl_model: VlModel, image_data: np.ndarray, object_description: str +) -> Optional[BBox]: + if not isinstance(image_data, np.ndarray): + raise ValueError("Frame must be a numpy array") + + prompt = ( + f"Look at this image and find the '{object_description}'. " + "Return ONLY a JSON object with format: {'name': 'object_name', 'bbox': [x1, y1, x2, y2]} " + "where x1,y1 is the top-left and x2,y2 is the bottom-right corner of the bounding box. If not found, return None." + ) + + response = vl_model.query(image_data, prompt) + + result = extract_json_from_llm_response(response) + if not result: + return None + + try: + ret = tuple(map(float, result["bbox"])) + if len(ret) == 4: + return ret + except Exception: + pass + + return None diff --git a/dimos/perception/detection2d/conftest.py b/dimos/perception/detection2d/conftest.py index 93d771b373..52b6b15bb2 100644 --- a/dimos/perception/detection2d/conftest.py +++ b/dimos/perception/detection2d/conftest.py @@ -77,7 +77,7 @@ def moment(): tf = TF() tf.publish(*transforms) - return { + yield { "odom_frame": odom_frame, "lidar_frame": lidar_frame, "image_frame": image_frame, @@ -86,28 +86,38 @@ def moment(): "tf": tf, } + # Cleanup + tf.stop() + @pytest.fixture(scope="session") def publish_lcm(): def publish(moment: Moment): lcm.autoconf() + transports = [] + lidar_frame_transport: LCMTransport = LCMTransport("/lidar", LidarMessage) lidar_frame_transport.publish(moment.get("lidar_frame")) + transports.append(lidar_frame_transport) image_frame_transport: LCMTransport = LCMTransport("/image", Image) image_frame_transport.publish(moment.get("image_frame")) + transports.append(image_frame_transport) odom_frame_transport: LCMTransport = LCMTransport("/odom", Odometry) odom_frame_transport.publish(moment.get("odom_frame")) + transports.append(odom_frame_transport) camera_info_transport: LCMTransport = LCMTransport("/camera_info", CameraInfo) camera_info_transport.publish(moment.get("camera_info")) + transports.append(camera_info_transport) annotations = moment.get("annotations") if annotations: annotations_transport: LCMTransport = LCMTransport("/annotations", ImageAnnotations) annotations_transport.publish(annotations) + transports.append(annotations_transport) detections = moment.get("detections") if detections: @@ -116,27 +126,40 @@ def publish(moment: Moment): f"/detected/pointcloud/{i}", PointCloud2 ) detections_transport.publish(detection.pointcloud) + transports.append(detections_transport) detections_image_transport: LCMTransport = LCMTransport( f"/detected/image/{i}", Image ) detections_image_transport.publish(detection.cropped_image()) + transports.append(detections_image_transport) + + # Cleanup all transports immediately after publishing + for transport in transports: + if transport._started: + transport.lcm.stop() return publish @pytest.fixture(scope="session") def detections2d(moment: Moment): - return Detection2DModule().process_image_frame(moment["image_frame"]) + module = Detection2DModule() + yield module.process_image_frame(moment["image_frame"]) + module._close_module() @pytest.fixture(scope="session") def detections3d(moment: Moment): - detections2d = Detection2DModule().process_image_frame(moment["image_frame"]) + module2d = Detection2DModule() + detections2d = module2d.process_image_frame(moment["image_frame"]) camera_transform = moment["tf"].get("camera_optical", "world") if camera_transform is None: raise ValueError("No camera_optical transform in tf") - return Detection3DModule(camera_info=moment["camera_info"]).process_frame( - detections2d, moment["lidar_frame"], camera_transform - ) + module3d = Detection3DModule(camera_info=moment["camera_info"]) + + yield module3d.process_frame(detections2d, moment["lidar_frame"], camera_transform) + + module2d._close_module() + module3d._close_module() diff --git a/dimos/perception/detection2d/test_module.py b/dimos/perception/detection2d/test_module.py index 51ab6384a4..09ce3d1769 100644 --- a/dimos/perception/detection2d/test_module.py +++ b/dimos/perception/detection2d/test_module.py @@ -38,7 +38,8 @@ def test_module2d(moment: Moment, publish_lcm): - detections2d = Detection2DModule().process_image_frame(moment["image_frame"]) + module = Detection2DModule() + detections2d = module.process_image_frame(moment["image_frame"]) print(detections2d) @@ -65,18 +66,20 @@ def test_module2d(moment: Moment, publish_lcm): annotations = detections2d.to_image_annotations() publish_lcm({"annotations": annotations, **moment}) + module._close_module() + def test_module3d(moment: Moment, publish_lcm): - detections2d = Detection2DModule().process_image_frame(moment["image_frame"]) + module2d = Detection2DModule() + detections2d = module2d.process_image_frame(moment["image_frame"]) pointcloud = moment["lidar_frame"] camera_transform = moment["tf"].get("camera_optical", "world") if camera_transform is None: raise ValueError("No camera_optical transform in tf") annotations = detections2d.to_image_annotations() - detections3d = Detection3DModule(camera_info=moment["camera_info"]).process_frame( - detections2d, pointcloud, camera_transform - ) + module3d = Detection3DModule(camera_info=moment["camera_info"]) + detections3d = module3d.process_frame(detections2d, pointcloud, camera_transform) publish_lcm( { @@ -134,6 +137,9 @@ def test_module3d(moment: Moment, publish_lcm): assert repr_dict["dist"] == "0.88m" assert repr_dict["points"] == "81" + module2d._close_module() + module3d._close_module() + @pytest.mark.tool def test_module3d_replay(dimos_cluster): diff --git a/dimos/perception/detection2d/test_yolo_2d_det.py b/dimos/perception/detection2d/test_yolo_2d_det.py index 4240625744..07ecb1baeb 100644 --- a/dimos/perception/detection2d/test_yolo_2d_det.py +++ b/dimos/perception/detection2d/test_yolo_2d_det.py @@ -20,6 +20,7 @@ import pytest import reactivex as rx from reactivex import operators as ops +from reactivex.scheduler import ThreadPoolScheduler from dimos.perception.detection2d.yolo_2d_det import Yolo2DDetector from dimos.stream.video_provider import VideoProvider @@ -38,6 +39,8 @@ def test_yolo_detector_initialization(self): def test_yolo_detector_process_image(self): """Test YOLO detector can process video frames and return detection results.""" + # Create a dedicated scheduler for this test to avoid thread leaks + test_scheduler = ThreadPoolScheduler(max_workers=6) try: # Import data inside method to avoid pytest fixture confusion from dimos.utils.data import get_data @@ -48,7 +51,9 @@ def test_yolo_detector_process_image(self): # Create video provider and directly get a video stream observable assert os.path.exists(video_path), f"Test video not found: {video_path}" - video_provider = VideoProvider(dev_name="test_video", video_source=video_path) + video_provider = VideoProvider( + dev_name="test_video", video_source=video_path, pool_scheduler=test_scheduler + ) # Process more frames for thorough testing video_stream = video_provider.capture_video_as_observable(realtime=False, fps=15) @@ -113,6 +118,9 @@ def on_completed(): # Clean up subscription subscription.dispose() video_provider.dispose_all() + detector.cleanup() + # Shutdown the scheduler to clean up threads + test_scheduler.executor.shutdown(wait=True) # Check that we got detection results if len(results) == 0: pytest.skip("Skipping test due to error: Failed to get any detection results") @@ -170,7 +178,15 @@ def on_completed(): ) except Exception as e: + # Ensure cleanup happens even on exception + if "detector" in locals(): + detector.cleanup() + if "video_provider" in locals(): + video_provider.dispose_all() pytest.skip(f"Skipping test due to error: {e}") + finally: + # Always shutdown the scheduler + test_scheduler.executor.shutdown(wait=True) if __name__ == "__main__": diff --git a/dimos/perception/detection2d/yolo_2d_det.py b/dimos/perception/detection2d/yolo_2d_det.py index b9b04165cd..d40d5c2b15 100644 --- a/dimos/perception/detection2d/yolo_2d_det.py +++ b/dimos/perception/detection2d/yolo_2d_det.py @@ -104,6 +104,20 @@ def visualize_results(self, image, bboxes, track_ids, class_ids, confidences, na """ return plot_results(image, bboxes, track_ids, class_ids, confidences, names) + def cleanup(self): + """ + Clean up resources used by the detector, including tracker threads. + """ + if hasattr(self.model, "predictor") and self.model.predictor is not None: + predictor = self.model.predictor + if hasattr(predictor, "trackers") and predictor.trackers: + for tracker in predictor.trackers: + if hasattr(tracker, "tracker") and hasattr(tracker.tracker, "gmc"): + gmc = tracker.tracker.gmc + if hasattr(gmc, "executor") and gmc.executor is not None: + gmc.executor.shutdown(wait=True) + self.model.predictor = None + def main(): """Example usage of the Yolo2DDetector class.""" diff --git a/dimos/protocol/service/lcmservice.py b/dimos/protocol/service/lcmservice.py index b34dd7f9ab..bc3f7317b7 100644 --- a/dimos/protocol/service/lcmservice.py +++ b/dimos/protocol/service/lcmservice.py @@ -251,11 +251,11 @@ def start(self): print(f"Error checking system configuration: {e}") self._stop_event.clear() - self._thread = threading.Thread(target=self._loop) + self._thread = threading.Thread(target=self._lcm_loop) self._thread.daemon = True self._thread.start() - def _loop(self) -> None: + def _lcm_loop(self) -> None: """LCM message handling loop.""" while not self._stop_event.is_set(): try: @@ -271,7 +271,11 @@ def stop(self): """Stop the LCM loop.""" self._stop_event.set() if self._thread is not None: - self._thread.join() + # Only join if we're not the LCM thread (avoid "cannot join current thread") + if threading.current_thread() != self._thread: + self._thread.join(timeout=1.0) + if self._thread.is_alive(): + logger.warning("LCM thread did not stop cleanly within timeout") # Clean up LCM instance if we created it if not self.config.lcm: diff --git a/dimos/protocol/skill/coordinator.py b/dimos/protocol/skill/coordinator.py index a20f1f4b33..cfc889fabc 100644 --- a/dimos/protocol/skill/coordinator.py +++ b/dimos/protocol/skill/coordinator.py @@ -331,6 +331,12 @@ def stop(self) -> None: if self._transport_unsub_fn: self._transport_unsub_fn() + # Stop all registered skill containers + for container in self._static_containers: + container.stop() + for container in self._dynamic_containers: + container.stop() + def len(self) -> int: return len(self._skills) diff --git a/dimos/protocol/skill/skill.py b/dimos/protocol/skill/skill.py index f2e6662823..9d63689527 100644 --- a/dimos/protocol/skill/skill.py +++ b/dimos/protocol/skill/skill.py @@ -156,6 +156,7 @@ def dynamic_skills(self): def __str__(self) -> str: return f"SkillContainer({self.__class__.__name__})" + @rpc def stop(self): if self._skill_transport: self._skill_transport.stop() @@ -165,9 +166,22 @@ def stop(self): self._skill_thread_pool.shutdown(wait=True) self._skill_thread_pool = None - if hasattr(self, "_close_rpc"): + # If this container is also a Module, close the module properly + if hasattr(self, "_close_module"): + self._close_module() + elif hasattr(self, "_close_rpc"): self._close_rpc() + if hasattr(self, "_loop") and hasattr(self, "_loop_thread") and self._loop_thread: + if self._loop_thread.is_alive(): + self._loop.call_soon_threadsafe(self._loop.stop) + self._loop_thread.join(timeout=2) + self._loop = None + self._loop_thread = None + + if hasattr(self, "_disposables"): + self._disposables.dispose() + # TODO: figure out standard args/kwargs passing format, # use same interface as skill coordinator call_skill method @threaded @@ -225,11 +239,13 @@ def call_skill( @rpc def skills(self) -> dict[str, SkillConfig]: # Avoid recursion by excluding this property itself + # Also exclude known properties that shouldn't be accessed + excluded = {"skills", "tf", "rpc", "skill_transport"} return { name: getattr(self, name)._skill_config for name in dir(self) if not name.startswith("_") - and name != "skills" + and name not in excluded and hasattr(getattr(self, name), "_skill_config") } diff --git a/dimos/protocol/skill/test_coordinator.py b/dimos/protocol/skill/test_coordinator.py index 9d27af5ecf..3dad606227 100644 --- a/dimos/protocol/skill/test_coordinator.py +++ b/dimos/protocol/skill/test_coordinator.py @@ -126,8 +126,9 @@ async def test_coordinator_parallel_calls(): @pytest.mark.asyncio async def test_coordinator_generator(): + container = SkillContainerTest() skillCoordinator = SkillCoordinator() - skillCoordinator.register_skills(SkillContainerTest()) + skillCoordinator.register_skills(container) skillCoordinator.start() # here we call a skill that generates a sequence of messages @@ -144,4 +145,5 @@ async def test_coordinator_generator(): print("coordinator loop finished") print(skillCoordinator) + container._close_module() skillCoordinator.stop() diff --git a/dimos/protocol/tf/test_tf.py b/dimos/protocol/tf/test_tf.py index 2b5f99852f..4d39e8764e 100644 --- a/dimos/protocol/tf/test_tf.py +++ b/dimos/protocol/tf/test_tf.py @@ -52,6 +52,8 @@ def test_tf_ros_example(): assert end_effector_global_pose.translation.x == pytest.approx(1.366, abs=1e-3) assert end_effector_global_pose.translation.y == pytest.approx(0.366, abs=1e-3) + tf.stop() + def test_tf_main(): """Test TF broadcasting and querying between two TF instances. diff --git a/dimos/robot/unitree_webrtc/type/test_map.py b/dimos/robot/unitree_webrtc/type/test_map.py index e28df7ad8d..ef2418c7f4 100644 --- a/dimos/robot/unitree_webrtc/type/test_map.py +++ b/dimos/robot/unitree_webrtc/type/test_map.py @@ -96,3 +96,5 @@ def publish(self, msg): # Verify map has points assert len(pointcloud.points) > 0 print(f"Map contains {len(pointcloud.points)} points") + + map._close_module() diff --git a/dimos/robot/unitree_webrtc/unitree_skill_container.py b/dimos/robot/unitree_webrtc/unitree_skill_container.py index abf8967da0..4db720be81 100644 --- a/dimos/robot/unitree_webrtc/unitree_skill_container.py +++ b/dimos/robot/unitree_webrtc/unitree_skill_container.py @@ -51,7 +51,6 @@ def __init__(self, robot: Optional[UnitreeGo2] = None): # Dynamically generate skills from UNITREE_WEBRTC_CONTROLS self._generate_unitree_skills() - super().__init__() def _generate_unitree_skills(self): """Dynamically generate skills from the UNITREE_WEBRTC_CONTROLS list.""" diff --git a/dimos/types/test_timestamped.py b/dimos/types/test_timestamped.py index 327e97d68b..44f359dbc7 100644 --- a/dimos/types/test_timestamped.py +++ b/dimos/types/test_timestamped.py @@ -17,6 +17,7 @@ import pytest from reactivex import operators as ops +from reactivex.scheduler import ThreadPoolScheduler from dimos.msgs.sensor_msgs import Image from dimos.types.timestamped import ( @@ -268,52 +269,62 @@ def test_time_window_collection(): def test_timestamp_alignment(): - speed = 5.0 - - # ensure that lfs package is downloaded - get_data("unitree_office_walk") - - raw_frames = [] - - def spy(image): - raw_frames.append(image.ts) - print(image.ts) - return image - - # sensor reply of raw video frames - video_raw = ( - testing.TimedSensorReplay( - "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() + # Create a dedicated scheduler for this test to avoid thread leaks + test_scheduler = ThreadPoolScheduler(max_workers=6) + try: + speed = 5.0 + + # ensure that lfs package is downloaded + get_data("unitree_office_walk") + + raw_frames = [] + + def spy(image): + raw_frames.append(image.ts) + print(image.ts) + return image + + # sensor reply of raw video frames + video_raw = ( + testing.TimedSensorReplay( + "unitree_office_walk/video", autocast=lambda x: Image.from_numpy(x).to_rgb() + ) + .stream(speed) + .pipe(ops.take(30)) ) - .stream(speed) - .pipe(ops.take(30)) - ) - processed_frames = [] + processed_frames = [] - def process_video_frame(frame): - processed_frames.append(frame.ts) - time.sleep(0.5 / speed) - return frame + def process_video_frame(frame): + processed_frames.append(frame.ts) + time.sleep(0.5 / speed) + return frame - # fake reply of some 0.5s processor of video frames that drops messages - fake_video_processor = backpressure(video_raw.pipe(ops.map(spy))).pipe( - ops.map(process_video_frame) - ) - - aligned_frames = align_timestamped(fake_video_processor, video_raw).pipe(ops.to_list()).run() - - assert len(raw_frames) == 30 - assert len(processed_frames) > 2 - assert len(aligned_frames) > 2 + # fake reply of some 0.5s processor of video frames that drops messages + fake_video_processor = backpressure( + video_raw.pipe(ops.map(spy)), scheduler=test_scheduler + ).pipe(ops.map(process_video_frame)) - # Due to async processing, the last frame might not be aligned before completion - assert len(aligned_frames) >= len(processed_frames) - 1 - - for value in aligned_frames: - [primary, secondary] = value - diff = abs(primary.ts - secondary.ts) - print( - f"Aligned pair: primary={primary.ts:.6f}, secondary={secondary.ts:.6f}, diff={diff:.6f}s" + aligned_frames = ( + align_timestamped(fake_video_processor, video_raw).pipe(ops.to_list()).run() ) - assert diff <= 0.05 + + assert len(raw_frames) == 30 + assert len(processed_frames) > 2 + assert len(aligned_frames) > 2 + + # Due to async processing, the last frame might not be aligned before completion + assert len(aligned_frames) >= len(processed_frames) - 1 + + for value in aligned_frames: + [primary, secondary] = value + diff = abs(primary.ts - secondary.ts) + print( + f"Aligned pair: primary={primary.ts:.6f}, secondary={secondary.ts:.6f}, diff={diff:.6f}s" + ) + assert diff <= 0.05 + finally: + # Always shutdown the scheduler to clean up threads + test_scheduler.executor.shutdown(wait=True) + # Give threads time to finish cleanup + time.sleep(0.2) diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index 36f86b2ebb..0e5427d0b6 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -251,13 +251,18 @@ def align_timestamped( match_tolerance: float = 0.05, # seconds ) -> Observable[Tuple[PRIMARY, SECONDARY]]: from reactivex import create + from reactivex.disposable import CompositeDisposable def subscribe(observer, scheduler=None): secondary_collection: TimestampedBufferCollection[SECONDARY] = TimestampedBufferCollection( buffer_size ) - # Subscribe to secondary to populate the buffer - secondary_sub = secondary_observable.subscribe(secondary_collection.add) + # Subscribe to secondary to populate the buffer with proper error/complete handling + secondary_sub = secondary_observable.subscribe( + on_next=secondary_collection.add, + on_error=lambda e: None, # Silently ignore errors from secondary + on_completed=lambda: None, # Silently ignore completion from secondary + ) def on_primary(primary_item: PRIMARY): secondary_item = secondary_collection.find_closest( @@ -271,12 +276,8 @@ def on_primary(primary_item: PRIMARY): on_next=on_primary, on_error=observer.on_error, on_completed=observer.on_completed ) - # Return cleanup function - def dispose(): - secondary_sub.dispose() - primary_sub.dispose() - - return dispose + # Return cleanup disposable + return CompositeDisposable(secondary_sub, primary_sub) return create(subscribe) @@ -303,15 +304,21 @@ def align_timestamped_multiple( from reactivex import create def subscribe(observer, scheduler=None): + from reactivex.disposable import CompositeDisposable + # Create a buffer collection for each secondary observable secondary_collections: list[TimestampedBufferCollection[SECONDARY]] = [ TimestampedBufferCollection(buffer_size) for _ in secondary_observables ] - # Subscribe to all secondary observables + # Subscribe to all secondary observables with proper error/complete handling secondary_subs = [] for i, secondary_obs in enumerate(secondary_observables): - sub = secondary_obs.subscribe(secondary_collections[i].add) + sub = secondary_obs.subscribe( + on_next=secondary_collections[i].add, + on_error=lambda e: None, # Silently ignore errors from secondary + on_completed=lambda: None, # Silently ignore completion from secondary + ) secondary_subs.append(sub) def on_primary(primary_item: PRIMARY): @@ -329,12 +336,7 @@ def on_primary(primary_item: PRIMARY): on_next=on_primary, on_error=observer.on_error, on_completed=observer.on_completed ) - # Return cleanup function - def dispose(): - for sub in secondary_subs: - sub.dispose() - primary_sub.dispose() - - return dispose + # Return cleanup disposable + return CompositeDisposable(primary_sub, *secondary_subs) return create(subscribe) diff --git a/dimos/utils/generic.py b/dimos/utils/generic.py index 2fce0aca7c..3bd84bb845 100644 --- a/dimos/utils/generic.py +++ b/dimos/utils/generic.py @@ -13,6 +13,7 @@ # limitations under the License. import os +import json from typing import Any, Optional @@ -33,3 +34,17 @@ def truncate_display_string(arg: Any, max: Optional[int] = None) -> str: return string return string[:max_chars] + "...(truncated)..." + + +def extract_json_from_llm_response(response: str) -> Any: + start_idx = response.find("{") + end_idx = response.rfind("}") + 1 + + if start_idx >= 0 and end_idx > start_idx: + json_str = response[start_idx:end_idx] + try: + return json.loads(json_str) + except Exception: + pass + + return None diff --git a/dimos/utils/reactive.py b/dimos/utils/reactive.py index 3318eef2ec..abc8512dfb 100644 --- a/dimos/utils/reactive.py +++ b/dimos/utils/reactive.py @@ -83,14 +83,42 @@ def dispose(self) -> None: def getter_ondemand(observable: Observable[T], timeout: Optional[float] = 30.0) -> T: def getter(): + result = [] + error = [] + event = threading.Event() + + def on_next(value): + result.append(value) + event.set() + + def on_error(e): + error.append(e) + event.set() + + def on_completed(): + event.set() + + # Subscribe and wait for first value + subscription = observable.pipe(ops.first()).subscribe( + on_next=on_next, on_error=on_error, on_completed=on_completed + ) + try: - # Wait for first value with optional timeout - value = observable.pipe( - ops.first(), *([ops.timeout(timeout)] if timeout is not None else []) - ).run() - return value - except Exception as e: - raise Exception(f"No value received after {timeout} seconds") from e + if timeout is not None: + if not event.wait(timeout): + raise TimeoutError(f"No value received after {timeout} seconds") + else: + event.wait() + + if error: + raise error[0] + + if not result: + raise Exception("Observable completed without emitting a value") + + return result[0] + finally: + subscription.dispose() return getter diff --git a/dimos/utils/test_reactive.py b/dimos/utils/test_reactive.py index 2823850e03..21f2bd7894 100644 --- a/dimos/utils/test_reactive.py +++ b/dimos/utils/test_reactive.py @@ -17,6 +17,7 @@ import numpy as np import reactivex as rx from reactivex import operators as ops +from reactivex.scheduler import ThreadPoolScheduler from typing import Callable, TypeVar, Any from reactivex.disposable import Disposable from dimos.utils.reactive import ( @@ -74,54 +75,62 @@ def _dispose(): def test_backpressure_handling(): - received_fast = [] - received_slow = [] - # Create an observable that emits numpy arrays instead of integers - source = dispose_spy( - rx.interval(0.1).pipe(ops.map(lambda i: np.array([i, i + 1, i + 2])), ops.take(50)) - ) - - # Wrap with backpressure handling - safe_source = backpressure(source) - - # Fast sub - subscription1 = safe_source.subscribe(lambda x: received_fast.append(x)) - - # Slow sub (shouldn't block above) - subscription2 = safe_source.subscribe(lambda x: (time.sleep(0.25), received_slow.append(x))) - - time.sleep(2.5) - - subscription1.dispose() - assert not source.is_disposed(), "Observable should not be disposed yet" - subscription2.dispose() - time.sleep(0.1) - assert source.is_disposed(), "Observable should be disposed" - - # Check results - print("Fast observer received:", len(received_fast), [arr[0] for arr in received_fast]) - print("Slow observer received:", len(received_slow), [arr[0] for arr in received_slow]) - - # Fast observer should get all or nearly all items - assert len(received_fast) > 15, ( - f"Expected fast observer to receive most items, got {len(received_fast)}" - ) - - # Slow observer should get fewer items due to backpressure handling - assert len(received_slow) < len(received_fast), ( - "Slow observer should receive fewer items than fast observer" - ) - # Specifically, processing at 0.25s means ~4 items per second, so expect 8-10 items - assert 7 <= len(received_slow) <= 11, f"Expected 7-11 items, got {len(received_slow)}" - - # The slow observer should skip items (not process them in sequence) - # We test this by checking that the difference between consecutive arrays is sometimes > 1 - has_skips = False - for i in range(1, len(received_slow)): - if received_slow[i][0] - received_slow[i - 1][0] > 1: - has_skips = True - break - assert has_skips, "Slow observer should skip items due to backpressure" + # Create a dedicated scheduler for this test to avoid thread leaks + test_scheduler = ThreadPoolScheduler(max_workers=8) + try: + received_fast = [] + received_slow = [] + # Create an observable that emits numpy arrays instead of integers + source = dispose_spy( + rx.interval(0.1).pipe(ops.map(lambda i: np.array([i, i + 1, i + 2])), ops.take(50)) + ) + + # Wrap with backpressure handling + safe_source = backpressure(source, scheduler=test_scheduler) + + # Fast sub + subscription1 = safe_source.subscribe(lambda x: received_fast.append(x)) + + # Slow sub (shouldn't block above) + subscription2 = safe_source.subscribe(lambda x: (time.sleep(0.25), received_slow.append(x))) + + time.sleep(2.5) + + subscription1.dispose() + assert not source.is_disposed(), "Observable should not be disposed yet" + subscription2.dispose() + # Wait longer to ensure background threads finish processing + # (the slow subscriber sleeps for 0.25s, so we need to wait at least that long) + time.sleep(0.5) + assert source.is_disposed(), "Observable should be disposed" + + # Check results + print("Fast observer received:", len(received_fast), [arr[0] for arr in received_fast]) + print("Slow observer received:", len(received_slow), [arr[0] for arr in received_slow]) + + # Fast observer should get all or nearly all items + assert len(received_fast) > 15, ( + f"Expected fast observer to receive most items, got {len(received_fast)}" + ) + + # Slow observer should get fewer items due to backpressure handling + assert len(received_slow) < len(received_fast), ( + "Slow observer should receive fewer items than fast observer" + ) + # Specifically, processing at 0.25s means ~4 items per second, so expect 8-10 items + assert 7 <= len(received_slow) <= 11, f"Expected 7-11 items, got {len(received_slow)}" + + # The slow observer should skip items (not process them in sequence) + # We test this by checking that the difference between consecutive arrays is sometimes > 1 + has_skips = False + for i in range(1, len(received_slow)): + if received_slow[i][0] - received_slow[i - 1][0] > 1: + has_skips = True + break + assert has_skips, "Slow observer should skip items due to backpressure" + finally: + # Always shutdown the scheduler to clean up threads + test_scheduler.executor.shutdown(wait=True) def test_getter_streaming_blocking(): @@ -145,6 +154,7 @@ def test_getter_streaming_blocking(): assert getter()[0] >= 4, f"Expected array with first value >= 4, got {getter()}" getter.dispose() + time.sleep(0.3) # Wait for background interval timer threads to finish assert source.is_disposed(), "Observable should be disposed" @@ -153,6 +163,7 @@ def test_getter_streaming_blocking_timeout(): with pytest.raises(Exception): getter = getter_streaming(source, timeout=0.1) getter.dispose() + time.sleep(0.3) # Wait for background interval timer threads to finish assert source.is_disposed() @@ -177,6 +188,7 @@ def test_getter_streaming_nonblocking(): assert getter() >= 4, f"Expected value >= 4, got {getter()}" getter.dispose() + time.sleep(0.3) # Wait for background interval timer threads to finish assert source.is_disposed(), "Observable should be disposed" @@ -188,15 +200,32 @@ def test_getter_streaming_nonblocking_timeout(): assert not source.is_disposed(), "is not disposed, this is a job of the caller" + # Clean up the subscription to avoid thread leak + getter.dispose() + time.sleep(0.3) # Wait for background threads to finish + assert source.is_disposed(), "Observable should be disposed after cleanup" + def test_getter_ondemand(): - source = dispose_spy(rx.interval(0.1).pipe(ops.take(50))) - getter = getter_ondemand(source) - assert source.is_disposed(), "Observable should be disposed" - assert min_time(getter, 0.05) == 0, f"Expected to get the first value of 0, got {getter()}" - assert source.is_disposed(), "Observable should be disposed" - assert getter() == 0, f"Expected to get the first value of 0, got {getter()}" - assert source.is_disposed(), "Observable should be disposed" + # Create a controlled scheduler to avoid thread leaks from rx.interval + test_scheduler = ThreadPoolScheduler(max_workers=4) + try: + source = dispose_spy(rx.interval(0.1, scheduler=test_scheduler).pipe(ops.take(50))) + getter = getter_ondemand(source) + assert source.is_disposed(), "Observable should be disposed" + result = min_time(getter, 0.05) + assert result == 0, f"Expected to get the first value of 0, got {result}" + # Wait for background threads to clean up + time.sleep(0.3) + assert source.is_disposed(), "Observable should be disposed" + result2 = getter() + assert result2 == 0, f"Expected to get the first value of 0, got {result2}" + assert source.is_disposed(), "Observable should be disposed" + # Wait for threads to finish + time.sleep(0.3) + finally: + # Explicitly shutdown the scheduler to clean up threads + test_scheduler.executor.shutdown(wait=True) def test_getter_ondemand_timeout(): @@ -205,6 +234,8 @@ def test_getter_ondemand_timeout(): with pytest.raises(Exception): getter() assert source.is_disposed(), "Observable should be disposed" + # Wait for background interval timer threads to finish + time.sleep(0.3) def test_callback_to_observable(): diff --git a/dimos/utils/testing.py b/dimos/utils/testing.py index 8930b2f0e9..a7c1541d87 100644 --- a/dimos/utils/testing.py +++ b/dimos/utils/testing.py @@ -312,26 +312,36 @@ def _subscribe(observer, scheduler=None): observer.on_next(first_data) disp = CompositeDisposable() + completed = [False] # Use list to allow mutation in nested function def emit_next(prev_timestamp): + if completed[0]: + return + try: ts, data = next(iterator) except StopIteration: + completed[0] = True observer.on_completed() return delay = max(0.0, ts - prev_timestamp) / speed def _action(sc, _state=None): - observer.on_next(data) - emit_next(ts) # schedule the following sample + if not completed[0]: + observer.on_next(data) + emit_next(ts) # schedule the following sample # Schedule the next emission relative to previous timestamp disp.add(scheduler.schedule_relative(delay, _action)) emit_next(prev_ts) - return disp + def dispose(): + completed[0] = True + disp.dispose() + + return Disposable(dispose) from reactivex import create From 089e6f6eace6cd114c6532bec41e8a4055775041 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Wed, 1 Oct 2025 04:12:30 +0300 Subject: [PATCH 2/4] fail tests which leak --- .gitignore | 2 - dimos/conftest.py | 66 +++++------------ dimos/core/module.py | 3 + dimos/perception/detection2d/conftest.py | 91 ++++++++++++------------ 4 files changed, 67 insertions(+), 95 deletions(-) diff --git a/.gitignore b/.gitignore index adc50a7ef6..c8e4eb199d 100644 --- a/.gitignore +++ b/.gitignore @@ -43,5 +43,3 @@ data/* !data/.lfs/ FastSAM-x.pt yolo11n.pt - -/thread_monitor_report.csv diff --git a/dimos/conftest.py b/dimos/conftest.py index 551175a272..74d77fe2a9 100644 --- a/dimos/conftest.py +++ b/dimos/conftest.py @@ -15,9 +15,6 @@ import asyncio import threading import pytest -import csv -import os -from datetime import datetime @pytest.fixture @@ -27,57 +24,30 @@ def event_loop(): loop.close() +_seen_threads = set() +_seen_threads_lock = threading.RLock() + + @pytest.fixture(autouse=True) def monitor_threads(request): - test_name = request.node.name - test_module = request.node.module.__name__ - initial_threads = threading.active_count() - initial_thread_names = [t.name for t in threading.enumerate()] - start_time = datetime.now() - yield - end_time = datetime.now() - final_threads = threading.active_count() - final_thread_names = [t.name for t in threading.enumerate()] + threads = [t for t in threading.enumerate() if t.name != "MainThread"] - new_threads = [t for t in final_thread_names if t not in initial_thread_names] - dead_threads = [t for t in initial_thread_names if t not in final_thread_names] - leaked_threads = final_threads - initial_threads + if not threads: + return - csv_file = "thread_monitor_report.csv" - file_exists = os.path.isfile(csv_file) + with _seen_threads_lock: + new_leaks = [t for t in threads if t.ident not in _seen_threads] + for t in threads: + _seen_threads.add(t.ident) - with open(csv_file, "a", newline="") as f: - writer = csv.writer(f) + if not new_leaks: + return - if not file_exists: - writer.writerow( - [ - "timestamp", - "test_module", - "test_name", - "initial_threads", - "final_threads", - "thread_change", - "leaked_threads", - "new_thread_names", - "closed_thread_names", - "duration_seconds", - ] - ) + thread_names = [t.name for f in new_leaks] - writer.writerow( - [ - start_time.isoformat(), - test_module, - test_name, - initial_threads, - final_threads, - final_threads - initial_threads, - leaked_threads, - "|".join(new_threads) if new_threads else "", - "|".join(dead_threads) if dead_threads else "", - (end_time - start_time).total_seconds(), - ] - ) + pytest.fail( + f"Non-closed threads before or during this test. The thread names: {thread_names}. " + "Please look at the first test that fails and fix that." + ) diff --git a/dimos/core/module.py b/dimos/core/module.py index f91c64e019..e235ba16e0 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -102,6 +102,9 @@ def _close_module(self): self._loop_thread.join(timeout=2) self._loop = None self._loop_thread = None + if hasattr(self, "_tf") and self._tf is not None: + self._tf.stop() + self._tf = None if hasattr(self, "_disposables"): self._disposables.dispose() diff --git a/dimos/perception/detection2d/conftest.py b/dimos/perception/detection2d/conftest.py index 52b6b15bb2..c471965287 100644 --- a/dimos/perception/detection2d/conftest.py +++ b/dimos/perception/detection2d/conftest.py @@ -53,7 +53,7 @@ def dimos_cluster(): dimos.stop() -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def moment(): data_dir = "unitree_go2_lidar_corrected" get_data(data_dir) @@ -90,66 +90,67 @@ def moment(): tf.stop() -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def publish_lcm(): def publish(moment: Moment): lcm.autoconf() transports = [] - lidar_frame_transport: LCMTransport = LCMTransport("/lidar", LidarMessage) - lidar_frame_transport.publish(moment.get("lidar_frame")) - transports.append(lidar_frame_transport) - - image_frame_transport: LCMTransport = LCMTransport("/image", Image) - image_frame_transport.publish(moment.get("image_frame")) - transports.append(image_frame_transport) - - odom_frame_transport: LCMTransport = LCMTransport("/odom", Odometry) - odom_frame_transport.publish(moment.get("odom_frame")) - transports.append(odom_frame_transport) - - camera_info_transport: LCMTransport = LCMTransport("/camera_info", CameraInfo) - camera_info_transport.publish(moment.get("camera_info")) - transports.append(camera_info_transport) - - annotations = moment.get("annotations") - if annotations: - annotations_transport: LCMTransport = LCMTransport("/annotations", ImageAnnotations) - annotations_transport.publish(annotations) - transports.append(annotations_transport) - - detections = moment.get("detections") - if detections: - for i, detection in enumerate(detections): - detections_transport: LCMTransport = LCMTransport( - f"/detected/pointcloud/{i}", PointCloud2 - ) - detections_transport.publish(detection.pointcloud) - transports.append(detections_transport) - - detections_image_transport: LCMTransport = LCMTransport( - f"/detected/image/{i}", Image - ) - detections_image_transport.publish(detection.cropped_image()) - transports.append(detections_image_transport) - - # Cleanup all transports immediately after publishing - for transport in transports: - if transport._started: - transport.lcm.stop() + try: + lidar_frame_transport: LCMTransport = LCMTransport("/lidar", LidarMessage) + lidar_frame_transport.publish(moment.get("lidar_frame")) + transports.append(lidar_frame_transport) + + image_frame_transport: LCMTransport = LCMTransport("/image", Image) + image_frame_transport.publish(moment.get("image_frame")) + transports.append(image_frame_transport) + + odom_frame_transport: LCMTransport = LCMTransport("/odom", Odometry) + odom_frame_transport.publish(moment.get("odom_frame")) + transports.append(odom_frame_transport) + + camera_info_transport: LCMTransport = LCMTransport("/camera_info", CameraInfo) + camera_info_transport.publish(moment.get("camera_info")) + transports.append(camera_info_transport) + + annotations = moment.get("annotations") + if annotations: + annotations_transport: LCMTransport = LCMTransport("/annotations", ImageAnnotations) + annotations_transport.publish(annotations) + transports.append(annotations_transport) + + detections = moment.get("detections") + if detections: + for i, detection in enumerate(detections): + detections_transport: LCMTransport = LCMTransport( + f"/detected/pointcloud/{i}", PointCloud2 + ) + detections_transport.publish(detection.pointcloud) + transports.append(detections_transport) + + detections_image_transport: LCMTransport = LCMTransport( + f"/detected/image/{i}", Image + ) + detections_image_transport.publish(detection.cropped_image()) + transports.append(detections_image_transport) + finally: + # Cleanup all transports immediately after publishing + for transport in transports: + if transport._started: + transport.lcm.stop() return publish -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def detections2d(moment: Moment): module = Detection2DModule() yield module.process_image_frame(moment["image_frame"]) module._close_module() -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def detections3d(moment: Moment): module2d = Detection2DModule() detections2d = module2d.process_image_frame(moment["image_frame"]) From cdebf1844a8972ec3a408a9287aabfd84e1305bb Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Wed, 1 Oct 2025 05:04:36 +0300 Subject: [PATCH 3/4] ignore lcm, heavy, and ros --- dimos/conftest.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dimos/conftest.py b/dimos/conftest.py index 74d77fe2a9..e2a8a3ec36 100644 --- a/dimos/conftest.py +++ b/dimos/conftest.py @@ -27,9 +27,16 @@ def event_loop(): _seen_threads = set() _seen_threads_lock = threading.RLock() +_skip_for = ["lcm", "heavy", "ros"] + @pytest.fixture(autouse=True) def monitor_threads(request): + # Skip monitoring for tests marked with specified markers + if any(request.node.get_closest_marker(marker) for marker in _skip_for): + yield + return + yield threads = [t for t in threading.enumerate() if t.name != "MainThread"] From f69a6305803d127601a0c688cb00dbb60599aeef Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Thu, 2 Oct 2025 04:08:06 +0300 Subject: [PATCH 4/4] review comment --- dimos/perception/detection2d/test_yolo_2d_det.py | 4 ++-- dimos/perception/detection2d/yolo_2d_det.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dimos/perception/detection2d/test_yolo_2d_det.py b/dimos/perception/detection2d/test_yolo_2d_det.py index 07ecb1baeb..c04152a1d7 100644 --- a/dimos/perception/detection2d/test_yolo_2d_det.py +++ b/dimos/perception/detection2d/test_yolo_2d_det.py @@ -118,7 +118,7 @@ def on_completed(): # Clean up subscription subscription.dispose() video_provider.dispose_all() - detector.cleanup() + detector.stop() # Shutdown the scheduler to clean up threads test_scheduler.executor.shutdown(wait=True) # Check that we got detection results @@ -180,7 +180,7 @@ def on_completed(): except Exception as e: # Ensure cleanup happens even on exception if "detector" in locals(): - detector.cleanup() + detector.stop() if "video_provider" in locals(): video_provider.dispose_all() pytest.skip(f"Skipping test due to error: {e}") diff --git a/dimos/perception/detection2d/yolo_2d_det.py b/dimos/perception/detection2d/yolo_2d_det.py index d40d5c2b15..bc8b0bc577 100644 --- a/dimos/perception/detection2d/yolo_2d_det.py +++ b/dimos/perception/detection2d/yolo_2d_det.py @@ -26,7 +26,6 @@ from dimos.utils.data import get_data from dimos.utils.gpu_utils import is_cuda_available from dimos.utils.logging_config import setup_logger -from dimos.utils.path_utils import get_project_root logger = setup_logger("dimos.perception.detection2d.yolo_2d_det") @@ -104,7 +103,7 @@ def visualize_results(self, image, bboxes, track_ids, class_ids, confidences, na """ return plot_results(image, bboxes, track_ids, class_ids, confidences, names) - def cleanup(self): + def stop(self): """ Clean up resources used by the detector, including tracker threads. """