From ba6601edfd4670be31e41b83061665d0a8546ff8 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Fri, 2 Jan 2026 06:56:01 +0200 Subject: [PATCH 1/4] remove old navigation modules --- data/.lfs/astar_corner_general.png.tar.gz | 3 - data/.lfs/astar_general.png.tar.gz | 3 - dimos/core/global_config.py | 2 - dimos/hardware/sensors/camera/module.py | 12 +- dimos/hardware/sensors/camera/spec.py | 2 +- dimos/mapping/occupancy/test_path_mask.py | 4 +- .../mapping/occupancy/test_path_resampling.py | 4 +- dimos/navigation/bt_navigator/__init__.py | 1 - dimos/navigation/bt_navigator/navigator.py | 358 --------------- .../bt_navigator/recovery_server.py | 118 ----- dimos/navigation/global_planner/astar.py | 52 --- .../global_planner/general_astar.py | 215 ---------- dimos/navigation/global_planner/planner.py | 107 ----- dimos/navigation/global_planner/types.py | 17 - dimos/navigation/local_planner/__init__.py | 4 - .../local_planner/holonomic_local_planner.py | 276 ------------ .../navigation/local_planner/local_planner.py | 212 --------- .../local_planner/test_base_local_planner.py | 406 ------------------ .../replanning_a_star/global_planner.py | 6 +- .../goal_validator.py | 256 ----------- .../min_cost_astar.py | 2 +- .../min_cost_astar_cpp.cpp | 0 .../min_cost_astar_ext.pyi | 0 .../test_goal_validator.py | 2 +- .../test_min_cost_astar.py} | 32 +- dimos/robot/all_blueprints.py | 2 - .../unitree_webrtc/unitree_g1_blueprints.py | 24 +- setup.py | 4 +- 28 files changed, 37 insertions(+), 2087 deletions(-) delete mode 100644 data/.lfs/astar_corner_general.png.tar.gz delete mode 100644 data/.lfs/astar_general.png.tar.gz delete mode 100644 dimos/navigation/bt_navigator/__init__.py delete mode 100644 dimos/navigation/bt_navigator/navigator.py delete mode 100644 dimos/navigation/bt_navigator/recovery_server.py delete mode 100644 dimos/navigation/global_planner/astar.py delete mode 100644 dimos/navigation/global_planner/general_astar.py delete mode 100644 dimos/navigation/global_planner/planner.py delete mode 100644 dimos/navigation/global_planner/types.py delete mode 100644 dimos/navigation/local_planner/__init__.py delete mode 100644 dimos/navigation/local_planner/holonomic_local_planner.py delete mode 100644 dimos/navigation/local_planner/local_planner.py delete mode 100644 dimos/navigation/local_planner/test_base_local_planner.py rename dimos/navigation/{bt_navigator => replanning_a_star}/goal_validator.py (51%) rename dimos/navigation/{global_planner => replanning_a_star}/min_cost_astar.py (99%) rename dimos/navigation/{global_planner => replanning_a_star}/min_cost_astar_cpp.cpp (100%) rename dimos/navigation/{global_planner => replanning_a_star}/min_cost_astar_ext.pyi (100%) rename dimos/navigation/{bt_navigator => replanning_a_star}/test_goal_validator.py (95%) rename dimos/navigation/{global_planner/test_astar.py => replanning_a_star/test_min_cost_astar.py} (74%) diff --git a/data/.lfs/astar_corner_general.png.tar.gz b/data/.lfs/astar_corner_general.png.tar.gz deleted file mode 100644 index 1303bf22d6..0000000000 --- a/data/.lfs/astar_corner_general.png.tar.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:9790641ad053f4566fb867f9d91450306328cf26f76368bf2639d985be3ae27a -size 5696 diff --git a/data/.lfs/astar_general.png.tar.gz b/data/.lfs/astar_general.png.tar.gz deleted file mode 100644 index 86fe9101d6..0000000000 --- a/data/.lfs/astar_general.png.tar.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:4d4d426bda849551a0555b958792d7bc738b71c9573b477555f5d55c425395cb -size 12109 diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py index dfa54830ac..bfb553a45d 100644 --- a/dimos/core/global_config.py +++ b/dimos/core/global_config.py @@ -19,7 +19,6 @@ from pydantic_settings import BaseSettings, SettingsConfigDict from dimos.mapping.occupancy.path_map import NavigationStrategy -from dimos.navigation.global_planner.types import AStarAlgorithm ViewerBackend: TypeAlias = Literal["rerun-web", "rerun-native", "foxglove"] @@ -48,7 +47,6 @@ class GlobalConfig(BaseSettings): robot_width: float = 0.3 robot_rotation_diameter: float = 0.6 planner_strategy: NavigationStrategy = "simple" - astar_algorithm: AStarAlgorithm = "min_cost" planner_robot_speed: float | None = None model_config = SettingsConfigDict( diff --git a/dimos/hardware/sensors/camera/module.py b/dimos/hardware/sensors/camera/module.py index 6d34a52fde..4232fe1f49 100644 --- a/dimos/hardware/sensors/camera/module.py +++ b/dimos/hardware/sensors/camera/module.py @@ -17,7 +17,6 @@ import queue import time -from dimos_lcm.sensor_msgs import CameraInfo import reactivex as rx from reactivex import operators as ops from reactivex.disposable import Disposable @@ -30,6 +29,7 @@ from dimos.hardware.sensors.camera.webcam import Webcam from dimos.msgs.geometry_msgs import Quaternion, Transform, Vector3 from dimos.msgs.sensor_msgs import Image +from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo from dimos.msgs.sensor_msgs.Image import Image, sharpness_barrier from dimos.spec import perception as spec # type: ignore[no-redef] @@ -48,6 +48,7 @@ class CameraModuleConfig(ModuleConfig): frame_id: str = "camera_link" transform: Transform | None = field(default_factory=default_transform) hardware: Callable[[], CameraHardware] | CameraHardware = Webcam # type: ignore[type-arg] + frequency: float = 5.0 class CameraModule(Module[CameraModuleConfig], spec.Camera): @@ -84,7 +85,7 @@ def start(self): # type: ignore[no-untyped-def] # camera_info_stream = self.camera_info_stream(frequency=5.0) - def publish_info(camera_info: CameraInfo): # type: ignore[no-untyped-def] + def publish_info(camera_info: CameraInfo) -> None: self.camera_info.publish(camera_info) if self.config.transform is None: @@ -103,7 +104,7 @@ def publish_info(camera_info: CameraInfo): # type: ignore[no-untyped-def] self.tf.publish(camera_link, camera_optical) self._camera_info_subscription = self.camera_info_stream().subscribe(publish_info) # type: ignore[assignment] - self._module_subscription = stream.subscribe(self.image.publish) # type: ignore[attr-defined] + self._module_subscription = stream.subscribe(self.color_image.publish) # type: ignore[attr-defined] @skill(stream=Stream.passive, output=Output.image, reducer=Reducer.latest) # type: ignore[arg-type] def video_stream(self) -> Image: # type: ignore[misc] @@ -149,3 +150,8 @@ def stop(self): # type: ignore[no-untyped-def] if self.hardware and hasattr(self.hardware, "stop"): self.hardware.stop() super().stop() + + +camera_module = CameraModule.blueprint + +__all__ = ["CameraModule", "camera_module"] diff --git a/dimos/hardware/sensors/camera/spec.py b/dimos/hardware/sensors/camera/spec.py index fa6d0d082c..95aed1ee43 100644 --- a/dimos/hardware/sensors/camera/spec.py +++ b/dimos/hardware/sensors/camera/spec.py @@ -15,10 +15,10 @@ from abc import ABC, abstractmethod, abstractproperty from typing import Generic, Protocol, TypeVar -from dimos_lcm.sensor_msgs import CameraInfo from reactivex.observable import Observable from dimos.msgs.sensor_msgs import Image +from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo from dimos.protocol.service import Configurable # type: ignore[attr-defined] diff --git a/dimos/mapping/occupancy/test_path_mask.py b/dimos/mapping/occupancy/test_path_mask.py index 539270dcbe..dede997946 100644 --- a/dimos/mapping/occupancy/test_path_mask.py +++ b/dimos/mapping/occupancy/test_path_mask.py @@ -22,7 +22,7 @@ from dimos.msgs.geometry_msgs import Pose from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.msgs.sensor_msgs import Image -from dimos.navigation.global_planner.astar import astar +from dimos.navigation.replanning_a_star.min_cost_astar import min_cost_astar from dimos.utils.data import get_data @@ -37,7 +37,7 @@ def test_make_path_mask(occupancy_gradient, pose_index, max_length, expected_ima start = Vector3(4.0, 2.0, 0) goal_pose = Pose(6.15, 10.0, 0, 0, 0, 0, 1) expected = Image.from_file(get_data(expected_image)) - path = astar("min_cost", occupancy_gradient, goal_pose.position, start, use_cpp=False) + path = min_cost_astar(occupancy_gradient, goal_pose.position, start, use_cpp=False) path = smooth_resample_path(path, goal_pose, 0.1) robot_width = 0.4 path_mask = make_path_mask(occupancy_gradient, path, robot_width, pose_index, max_length) diff --git a/dimos/mapping/occupancy/test_path_resampling.py b/dimos/mapping/occupancy/test_path_resampling.py index 1ab8241a45..c23f71cf89 100644 --- a/dimos/mapping/occupancy/test_path_resampling.py +++ b/dimos/mapping/occupancy/test_path_resampling.py @@ -22,7 +22,7 @@ from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.msgs.nav_msgs.OccupancyGrid import OccupancyGrid from dimos.msgs.sensor_msgs.Image import Image -from dimos.navigation.global_planner.astar import astar +from dimos.navigation.replanning_a_star.min_cost_astar import min_cost_astar from dimos.utils.data import get_data @@ -36,7 +36,7 @@ def test_resample_path(costmap, method) -> None: start = Vector3(4.0, 2.0, 0) goal_pose = Pose(6.15, 10.0, 0, 0, 0, 0, 1) expected = Image.from_file(get_data(f"resample_path_{method}.png")) - path = astar("min_cost", costmap, goal_pose.position, start, use_cpp=False) + path = min_cost_astar(costmap, goal_pose.position, start, use_cpp=False) match method: case "simple": diff --git a/dimos/navigation/bt_navigator/__init__.py b/dimos/navigation/bt_navigator/__init__.py deleted file mode 100644 index cfd252ff6a..0000000000 --- a/dimos/navigation/bt_navigator/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .navigator import BehaviorTreeNavigator diff --git a/dimos/navigation/bt_navigator/navigator.py b/dimos/navigation/bt_navigator/navigator.py deleted file mode 100644 index c0a554444e..0000000000 --- a/dimos/navigation/bt_navigator/navigator.py +++ /dev/null @@ -1,358 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Navigator module for coordinating global and local planning. -""" - -from collections.abc import Callable -import threading -import time - -from dimos_lcm.std_msgs import Bool, String -from reactivex.disposable import Disposable - -from dimos.core import In, Module, Out, rpc -from dimos.core.rpc_client import RpcCall -from dimos.mapping.occupancy.gradient import gradient -from dimos.mapping.occupancy.inflation import simple_inflate -from dimos.msgs.geometry_msgs import PoseStamped -from dimos.msgs.nav_msgs import OccupancyGrid -from dimos.navigation.base import NavigationInterface, NavigationState -from dimos.navigation.bt_navigator.goal_validator import find_safe_goal -from dimos.navigation.bt_navigator.recovery_server import RecoveryServer -from dimos.protocol.tf import TF -from dimos.utils.logging_config import setup_logger -from dimos.utils.transform_utils import apply_transform - -logger = setup_logger() - - -class BehaviorTreeNavigator(Module, NavigationInterface): - """ - Navigator module for coordinating navigation tasks. - - Manages the state machine for navigation, coordinates between global - and local planners, and monitors goal completion. - - Inputs: - - odom: Current robot odometry - - Outputs: - - goal: Goal pose for global planner - """ - - # LCM inputs - odom: In[PoseStamped] - goal_request: In[PoseStamped] # Input for receiving goal requests - global_costmap: In[OccupancyGrid] - - # LCM outputs - target: Out[PoseStamped] - goal_reached: Out[Bool] - navigation_state: Out[String] - - def __init__( # type: ignore[no-untyped-def] - self, - publishing_frequency: float = 1.0, - reset_local_planner: Callable[[], None] | None = None, - check_goal_reached: Callable[[], bool] | None = None, - **kwargs, - ) -> None: - """Initialize the Navigator. - - Args: - publishing_frequency: Frequency to publish goals to global planner (Hz) - goal_tolerance: Distance threshold to consider goal reached (meters) - """ - super().__init__(**kwargs) - - # Parameters - self.publishing_frequency = publishing_frequency - self.publishing_period = 1.0 / publishing_frequency - - # State machine - self.state = NavigationState.IDLE - self.state_lock = threading.Lock() - - # Current goal - self.current_goal: PoseStamped | None = None - self.original_goal: PoseStamped | None = None - self.goal_lock = threading.Lock() - - # Goal reached state - self._goal_reached = False - - # Latest data - self.latest_odom: PoseStamped | None = None - self.latest_costmap: OccupancyGrid | None = None - - # Control thread - self.control_thread: threading.Thread | None = None - self.stop_event = threading.Event() - - # TF listener - self.tf = TF() - - # Local planner - self.reset_local_planner = reset_local_planner - self.check_goal_reached = check_goal_reached - - # Recovery server for stuck detection - self.recovery_server = RecoveryServer(stuck_duration=5.0) - - logger.info("Navigator initialized with stuck detection") - - @rpc - def set_HolonomicLocalPlanner_reset(self, callable: RpcCall) -> None: - self.reset_local_planner = callable - self.reset_local_planner.set_rpc(self.rpc) # type: ignore[arg-type] - - @rpc - def set_HolonomicLocalPlanner_is_goal_reached(self, callable: RpcCall) -> None: - self.check_goal_reached = callable - self.check_goal_reached.set_rpc(self.rpc) # type: ignore[arg-type] - - @rpc - def start(self) -> None: - super().start() - - # Subscribe to inputs - unsub = self.odom.subscribe(self._on_odom) - self._disposables.add(Disposable(unsub)) - - unsub = self.goal_request.subscribe(self._on_goal_request) - self._disposables.add(Disposable(unsub)) - - unsub = self.global_costmap.subscribe(self._on_costmap) - self._disposables.add(Disposable(unsub)) - - # Start control thread - self.stop_event.clear() - self.control_thread = threading.Thread(target=self._control_loop, daemon=True) - self.control_thread.start() - - logger.info("Navigator started") - - @rpc - def stop(self) -> None: - """Clean up resources including stopping the control thread.""" - - self.stop_navigation() - - self.stop_event.set() - if self.control_thread and self.control_thread.is_alive(): - self.control_thread.join(timeout=2.0) - - super().stop() - - @rpc - def cancel_goal(self) -> bool: - """ - Cancel the current navigation goal. - - Returns: - True if goal was cancelled, False if no goal was active - """ - self.stop_navigation() - return True - - @rpc - def set_goal(self, goal: PoseStamped) -> bool: - """ - Set a new navigation goal. - - Args: - goal: Target pose to navigate to - - Returns: - non-blocking: True if goal was accepted, False otherwise - blocking: True if goal was reached, False otherwise - """ - transformed_goal = self._transform_goal_to_odom_frame(goal) - if not transformed_goal: - logger.error("Failed to transform goal to odometry frame") - return False - - with self.goal_lock: - self.current_goal = transformed_goal - self.original_goal = transformed_goal - - self._goal_reached = False - - with self.state_lock: - self.state = NavigationState.FOLLOWING_PATH - - return True - - @rpc - def get_state(self) -> NavigationState: - """Get the current state of the navigator.""" - return self.state - - def _on_odom(self, msg: PoseStamped) -> None: - """Handle incoming odometry messages.""" - self.latest_odom = msg - - if self.state == NavigationState.FOLLOWING_PATH: - self.recovery_server.update_odom(msg) - - def _on_goal_request(self, msg: PoseStamped) -> None: - """Handle incoming goal requests.""" - self.set_goal(msg) - - def _on_costmap(self, msg: OccupancyGrid) -> None: - """Handle incoming costmap messages.""" - self.latest_costmap = msg - - def _transform_goal_to_odom_frame(self, goal: PoseStamped) -> PoseStamped | None: - """Transform goal pose to the odometry frame.""" - if not goal.frame_id: - return goal - - if not self.latest_odom: - logger.error("No odometry data available to transform goal") - return None - - odom_frame = self.latest_odom.frame_id - if goal.frame_id == odom_frame: - return goal - - try: - transform = None - max_retries = 3 - - for attempt in range(max_retries): - transform = self.tf.get( - parent_frame=odom_frame, - child_frame=goal.frame_id, - ) - - if transform: - break - - if attempt < max_retries - 1: - logger.warning( - f"Transform attempt {attempt + 1}/{max_retries} failed, retrying..." - ) - time.sleep(1.0) - else: - logger.error( - f"Could not find transform from '{goal.frame_id}' to '{odom_frame}' after {max_retries} attempts" - ) - return None - - pose = apply_transform(goal, transform) # type: ignore[arg-type] - transformed_goal = PoseStamped( - position=pose.position, - orientation=pose.orientation, - frame_id=odom_frame, - ts=goal.ts, - ) - return transformed_goal - - except Exception as e: - logger.error(f"Failed to transform goal: {e}") - return None - - def _control_loop(self) -> None: - """Main control loop running in separate thread.""" - while not self.stop_event.is_set(): - with self.state_lock: - current_state = self.state - self.navigation_state.publish(String(data=current_state.value)) - - if current_state == NavigationState.FOLLOWING_PATH: - with self.goal_lock: - goal = self.current_goal - original_goal = self.original_goal - - if goal is not None and self.latest_costmap is not None: - # Check if robot is stuck - if self.recovery_server.check_stuck(): - logger.warning("Robot is stuck! Cancelling goal and resetting.") - self.cancel_goal() - continue - - costmap = gradient(simple_inflate(self.latest_costmap, 0.1), max_distance=1.0) - - # Find safe goal position - safe_goal_pos = find_safe_goal( - costmap, - original_goal.position, # type: ignore[union-attr] - algorithm="bfs", - cost_threshold=60, - min_clearance=0.25, - max_search_distance=5.0, - ) - - # Create new goal with safe position - if safe_goal_pos: - safe_goal = PoseStamped( - position=safe_goal_pos, - orientation=goal.orientation, - frame_id=goal.frame_id, - ts=goal.ts, - ) - self.target.publish(safe_goal) - self.current_goal = safe_goal - else: - logger.warning("Could not find safe goal position, cancelling goal") - self.cancel_goal() - - # Check if goal is reached - if self.check_goal_reached(): # type: ignore[misc] - reached_msg = Bool() - reached_msg.data = True - self.goal_reached.publish(reached_msg) - self.stop_navigation() - self._goal_reached = True - logger.info("Goal reached, resetting local planner") - - elif current_state == NavigationState.RECOVERY: - with self.state_lock: - self.state = NavigationState.IDLE - - time.sleep(self.publishing_period) - - @rpc - def is_goal_reached(self) -> bool: - """Check if the current goal has been reached. - - Returns: - True if goal was reached, False otherwise - """ - return self._goal_reached - - def stop_navigation(self) -> None: - """Stop navigation and return to IDLE state.""" - with self.goal_lock: - self.current_goal = None - - self._goal_reached = False - - with self.state_lock: - self.state = NavigationState.IDLE - - self.reset_local_planner() # type: ignore[misc] - self.recovery_server.reset() # Reset recovery server when stopping - - logger.info("Navigator stopped") - - -behavior_tree_navigator = BehaviorTreeNavigator.blueprint - -__all__ = ["BehaviorTreeNavigator", "behavior_tree_navigator"] diff --git a/dimos/navigation/bt_navigator/recovery_server.py b/dimos/navigation/bt_navigator/recovery_server.py deleted file mode 100644 index a8c10fccc4..0000000000 --- a/dimos/navigation/bt_navigator/recovery_server.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Recovery server for handling stuck detection and recovery behaviors. -""" - -from dimos.msgs.geometry_msgs import PoseStamped -from dimos.utils.logging_config import setup_logger -from dimos.utils.transform_utils import get_distance - -logger = setup_logger() - - -class RecoveryServer: - """ - Recovery server for detecting stuck situations and executing recovery behaviors. - - Currently implements stuck detection based on time without significant movement. - Will be extended with actual recovery behaviors in the future. - """ - - def __init__( - self, - position_threshold: float = 0.2, - stuck_duration: float = 3.0, - ) -> None: - """Initialize the recovery server. - - Args: - position_threshold: Minimum distance to travel to reset stuck timer (meters) - stuck_duration: Time duration without significant movement to consider stuck (seconds) - """ - self.position_threshold = position_threshold - self.stuck_duration = stuck_duration - - # Store last position that exceeded threshold - self.last_moved_pose = None - self.last_moved_time = None - self.current_odom = None - - logger.info( - f"RecoveryServer initialized with position_threshold={position_threshold}, " - f"stuck_duration={stuck_duration}" - ) - - def update_odom(self, odom: PoseStamped) -> None: - """Update the odometry data for stuck detection. - - Args: - odom: Current robot odometry with timestamp - """ - if odom is None: - return - - # Store current odom for checking stuck - self.current_odom = odom # type: ignore[assignment] - - # Initialize on first update - if self.last_moved_pose is None: - self.last_moved_pose = odom # type: ignore[assignment] - self.last_moved_time = odom.ts # type: ignore[assignment] - return - - # Calculate distance from the reference position (last significant movement) - distance = get_distance(odom, self.last_moved_pose) - - # If robot has moved significantly from the reference, update reference - if distance > self.position_threshold: - self.last_moved_pose = odom - self.last_moved_time = odom.ts - - def check_stuck(self) -> bool: - """Check if the robot is stuck based on time without movement. - - Returns: - True if robot appears to be stuck, False otherwise - """ - if self.last_moved_time is None: - return False - - # Need current odom to check - if self.current_odom is None: - return False - - # Calculate time since last significant movement - current_time = self.current_odom.ts - time_since_movement = current_time - self.last_moved_time - - # Check if stuck based on duration without movement - is_stuck = time_since_movement > self.stuck_duration - - if is_stuck: - logger.warning( - f"Robot appears stuck! No movement for {time_since_movement:.1f} seconds" - ) - - return is_stuck - - def reset(self) -> None: - """Reset the recovery server state.""" - self.last_moved_pose = None - self.last_moved_time = None - self.current_odom = None - logger.debug("RecoveryServer reset") diff --git a/dimos/navigation/global_planner/astar.py b/dimos/navigation/global_planner/astar.py deleted file mode 100644 index d722451cfa..0000000000 --- a/dimos/navigation/global_planner/astar.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from dimos.msgs.geometry_msgs import VectorLike -from dimos.msgs.nav_msgs import OccupancyGrid, Path -from dimos.navigation.global_planner.general_astar import general_astar -from dimos.navigation.global_planner.min_cost_astar import min_cost_astar -from dimos.navigation.global_planner.types import AStarAlgorithm -from dimos.utils.logging_config import setup_logger - -logger = setup_logger() - - -def astar( - algorithm: AStarAlgorithm, - costmap: OccupancyGrid, - goal: VectorLike, - start: VectorLike, - use_cpp: bool = True, -) -> Path | None: - """ - A* path planning algorithm from start to goal position. - - Args: - algorithm: The A* algorithm variant to use ("general" or "min_cost") - costmap: Costmap object containing the environment - goal: Goal position as any vector-like object - start: Start position as any vector-like object (default: origin [0,0]) - use_cpp: Use C++ implementation for min_cost algorithm if available (default: True) - - Returns: - Path object containing waypoints, or None if no path found - """ - - match algorithm: - case "general": - return general_astar(costmap, goal, start) - case "min_cost": - return min_cost_astar(costmap, goal, start, use_cpp=use_cpp) - case _: - raise NotImplementedError() diff --git a/dimos/navigation/global_planner/general_astar.py b/dimos/navigation/global_planner/general_astar.py deleted file mode 100644 index eea68fd269..0000000000 --- a/dimos/navigation/global_planner/general_astar.py +++ /dev/null @@ -1,215 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import heapq - -from dimos.msgs.geometry_msgs import PoseStamped, Quaternion, VectorLike -from dimos.msgs.nav_msgs import CostValues, OccupancyGrid, Path -from dimos.utils.logging_config import setup_logger - -logger = setup_logger() - - -def general_astar( - costmap: OccupancyGrid, - goal: VectorLike, - start: VectorLike = (0.0, 0.0), - cost_threshold: int = 90, - unknown_penalty: float = 0.8, -) -> Path | None: - """ - A* path planning algorithm from start to goal position. - - Args: - costmap: Costmap object containing the environment - goal: Goal position as any vector-like object - start: Start position as any vector-like object (default: origin [0,0]) - cost_threshold: Cost threshold above which a cell is considered an obstacle - - Returns: - Path object containing waypoints, or None if no path found - """ - - # Convert world coordinates to grid coordinates directly using vector-like inputs - start_vector = costmap.world_to_grid(start) - goal_vector = costmap.world_to_grid(goal) - logger.debug(f"ASTAR {costmap} {start_vector} -> {goal_vector}") - - # Store positions as tuples for dictionary keys - start_tuple = (int(start_vector.x), int(start_vector.y)) - goal_tuple = (int(goal_vector.x), int(goal_vector.y)) - - # Check if goal is out of bounds - if not (0 <= goal_tuple[0] < costmap.width and 0 <= goal_tuple[1] < costmap.height): - return None - - # Define possible movements (8-connected grid with diagonal movements) - directions = [ - (0, 1), - (1, 0), - (0, -1), - (-1, 0), - (1, 1), - (1, -1), - (-1, 1), - (-1, -1), - ] - - # Cost for each movement (straight vs diagonal) - sc = 1.0 # Straight cost - dc = 1.42 # Diagonal cost (approximately sqrt(2)) - movement_costs = [sc, sc, sc, sc, dc, dc, dc, dc] - - # A* algorithm implementation - open_set = [] # type: ignore[var-annotated] # Priority queue for nodes to explore - closed_set = set() # Set of explored nodes - - # Dictionary to store cost from start and parents for each node - g_score = {start_tuple: 0} - parents = {} # type: ignore[var-annotated] - - # Heuristic function (Octile distance for 8-connected grid) - def heuristic(x1, y1, x2, y2): # type: ignore[no-untyped-def] - dx = abs(x2 - x1) - dy = abs(y2 - y1) - # Octile distance: optimal for 8-connected grids with diagonal movement - return (dx + dy) + (dc - 2 * sc) * min(dx, dy) - - # Start with the starting node - f_score = g_score[start_tuple] + heuristic( # type: ignore[no-untyped-call] - start_tuple[0], start_tuple[1], goal_tuple[0], goal_tuple[1] - ) - heapq.heappush(open_set, (f_score, start_tuple)) - - # Track nodes already in open set to avoid duplicates - open_set_hash = {start_tuple} - - while open_set: - # Get the node with the lowest f_score - _current_f, current = heapq.heappop(open_set) - current_x, current_y = current - - # Remove from open set hash - if current in open_set_hash: - open_set_hash.remove(current) - - # Skip if already processed (can happen with duplicate entries) - if current in closed_set: - continue - - # Check if we've reached the goal - if current == goal_tuple: - # Reconstruct the path - waypoints = [] - while current in parents: - world_point = costmap.grid_to_world(current) - # Create PoseStamped with identity quaternion (no orientation) - pose = PoseStamped( - frame_id="world", - position=[world_point.x, world_point.y, 0.0], - orientation=Quaternion(0, 0, 0, 1), # Identity quaternion - ) - waypoints.append(pose) - current = parents[current] - - # Add the start position - start_world_point = costmap.grid_to_world(start_tuple) - start_pose = PoseStamped( - frame_id="world", - position=[start_world_point.x, start_world_point.y, 0.0], - orientation=Quaternion(0, 0, 0, 1), - ) - waypoints.append(start_pose) - - # Reverse the path (start to goal) - waypoints.reverse() - - # Add the goal position if it's not already included - goal_point = costmap.grid_to_world(goal_tuple) - - if ( - not waypoints - or (waypoints[-1].x - goal_point.x) ** 2 + (waypoints[-1].y - goal_point.y) ** 2 - > 1e-10 - ): - goal_pose = PoseStamped( - frame_id="world", - position=[goal_point.x, goal_point.y, 0.0], - orientation=Quaternion(0, 0, 0, 1), - ) - waypoints.append(goal_pose) - - return Path(frame_id="world", poses=waypoints) - - # Add current node to closed set - closed_set.add(current) - - # Explore neighbors - for i, (dx, dy) in enumerate(directions): - neighbor_x, neighbor_y = current_x + dx, current_y + dy - neighbor = (neighbor_x, neighbor_y) - - # Check if the neighbor is valid - if not (0 <= neighbor_x < costmap.width and 0 <= neighbor_y < costmap.height): - continue - - # Check if the neighbor is already explored - if neighbor in closed_set: - continue - - # Get the neighbor's cost value - neighbor_val = costmap.grid[neighbor_y, neighbor_x] - - # Skip if it's a hard obstacle - if neighbor_val >= cost_threshold: - continue - - # Calculate movement cost with penalties - # Unknown cells get half the penalty of obstacles - if neighbor_val == CostValues.UNKNOWN: # Unknown cell (-1) - # Unknown cells have a moderate traversal cost (half of obstacle threshold) - cell_cost = cost_threshold * unknown_penalty - elif neighbor_val == CostValues.FREE: # Free space (0) - # Free cells have minimal cost - cell_cost = 0.0 - else: - # Other cells use their actual cost value (1-99) - cell_cost = neighbor_val - - # Calculate cost penalty based on cell cost (higher cost = higher penalty) - # This encourages the planner to prefer lower-cost paths - cost_penalty = cell_cost / CostValues.OCCUPIED # Normalized penalty (divide by 100) - - tentative_g_score = g_score[current] + movement_costs[i] * (1.0 + cost_penalty) - - # Get the current g_score for the neighbor or set to infinity if not yet explored - neighbor_g_score = g_score.get(neighbor, float("inf")) - - # If this path to the neighbor is better than any previous one - if tentative_g_score < neighbor_g_score: - # Update the neighbor's scores and parent - parents[neighbor] = current - g_score[neighbor] = tentative_g_score # type: ignore[assignment] - f_score = tentative_g_score + heuristic( # type: ignore[no-untyped-call] - neighbor_x, neighbor_y, goal_tuple[0], goal_tuple[1] - ) - - # Add the neighbor to the open set with its f_score - # Only add if not already in open set to reduce duplicates - if neighbor not in open_set_hash: - heapq.heappush(open_set, (f_score, neighbor)) - open_set_hash.add(neighbor) - - # If we get here, no path was found - return None diff --git a/dimos/navigation/global_planner/planner.py b/dimos/navigation/global_planner/planner.py deleted file mode 100644 index 7dbd731854..0000000000 --- a/dimos/navigation/global_planner/planner.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from reactivex.disposable import Disposable - -from dimos.core import In, Module, Out, rpc -from dimos.core.global_config import GlobalConfig -from dimos.mapping.occupancy.path_map import make_navigation_map -from dimos.mapping.occupancy.path_resampling import simple_resample_path -from dimos.msgs.geometry_msgs import Pose, PoseStamped -from dimos.msgs.nav_msgs import OccupancyGrid, Path -from dimos.navigation.global_planner.astar import astar -from dimos.utils.logging_config import setup_logger - -logger = setup_logger() - - -class AstarPlanner(Module): - # LCM inputs - target: In[PoseStamped] - global_costmap: In[OccupancyGrid] - odom: In[PoseStamped] - - # LCM outputs - path: Out[Path] - - _global_config: GlobalConfig - - def __init__( - self, - global_config: GlobalConfig | None = None, - ) -> None: - super().__init__() - - self.latest_costmap: OccupancyGrid | None = None - self.latest_odom: PoseStamped | None = None - - self._global_config = global_config or GlobalConfig() - - @rpc - def start(self) -> None: - super().start() - - self._disposables.add(Disposable(self.target.subscribe(self._on_target))) - self._disposables.add(Disposable(self.global_costmap.subscribe(self._on_costmap))) - self._disposables.add(Disposable(self.odom.subscribe(self._on_odom))) - - @rpc - def stop(self) -> None: - super().stop() - - def _on_costmap(self, msg: OccupancyGrid) -> None: - self.latest_costmap = msg - - def _on_odom(self, msg: PoseStamped) -> None: - self.latest_odom = msg - - def _on_target(self, goal_pose: PoseStamped) -> None: - if self.latest_costmap is None or self.latest_odom is None: - logger.warning("Cannot plan: missing costmap or odometry data") - return - - path = self.plan(goal_pose) - if path: - self.path.publish(path) - - def plan(self, goal_pose: Pose) -> Path | None: - """Plan a path from current position to goal.""" - - if self.latest_costmap is None or self.latest_odom is None: - logger.warning("Cannot plan: missing costmap or odometry data") - return None - - logger.debug(f"Planning path to goal {goal_pose}") - - robot_pos = self.latest_odom.position - - costmap = make_navigation_map( - self.latest_costmap, - self._global_config.robot_width, - strategy=self._global_config.planner_strategy, - ) - - path = astar(self._global_config.astar_algorithm, costmap, goal_pose.position, robot_pos) - - if not path: - logger.warning("No path found to the goal.") - return None - - return simple_resample_path(path, goal_pose, 0.1) - - -astar_planner = AstarPlanner.blueprint - -__all__ = ["AstarPlanner", "astar_planner"] diff --git a/dimos/navigation/global_planner/types.py b/dimos/navigation/global_planner/types.py deleted file mode 100644 index 427547a9e1..0000000000 --- a/dimos/navigation/global_planner/types.py +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Literal, TypeAlias - -AStarAlgorithm: TypeAlias = Literal["general", "min_cost"] diff --git a/dimos/navigation/local_planner/__init__.py b/dimos/navigation/local_planner/__init__.py deleted file mode 100644 index b08e97b1aa..0000000000 --- a/dimos/navigation/local_planner/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from dimos.navigation.local_planner.holonomic_local_planner import HolonomicLocalPlanner -from dimos.navigation.local_planner.local_planner import BaseLocalPlanner - -__all__ = ["BaseLocalPlanner", "HolonomicLocalPlanner"] diff --git a/dimos/navigation/local_planner/holonomic_local_planner.py b/dimos/navigation/local_planner/holonomic_local_planner.py deleted file mode 100644 index 212c625dda..0000000000 --- a/dimos/navigation/local_planner/holonomic_local_planner.py +++ /dev/null @@ -1,276 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Gradient-Augmented Look-Ahead Pursuit (GLAP) holonomic local planner. -""" - -import numpy as np - -from dimos.core import rpc -from dimos.msgs.geometry_msgs import Twist, Vector3 -from dimos.navigation.local_planner.local_planner import BaseLocalPlanner -from dimos.utils.transform_utils import get_distance, normalize_angle, quaternion_to_euler - - -class HolonomicLocalPlanner(BaseLocalPlanner): - """ - Gradient-Augmented Look-Ahead Pursuit (GLAP) holonomic local planner. - - This planner combines path following with obstacle avoidance using - costmap gradients to produce smooth holonomic velocity commands. - - Args: - lookahead_dist: Look-ahead distance in meters (default: 1.0) - k_rep: Repulsion gain for obstacle avoidance (default: 1.0) - alpha: Low-pass filter coefficient [0-1] (default: 0.5) - v_max: Maximum velocity per component in m/s (default: 0.8) - goal_tolerance: Distance threshold to consider goal reached (default: 0.5) - control_frequency: Control loop frequency in Hz (default: 10.0) - """ - - def __init__( # type: ignore[no-untyped-def] - self, - lookahead_dist: float = 1.0, - k_rep: float = 0.5, - k_angular: float = 0.75, - alpha: float = 0.5, - v_max: float = 0.8, - goal_tolerance: float = 0.5, - orientation_tolerance: float = 0.2, - control_frequency: float = 10.0, - **kwargs, - ) -> None: - """Initialize the GLAP planner with specified parameters.""" - super().__init__( - goal_tolerance=goal_tolerance, - orientation_tolerance=orientation_tolerance, - control_frequency=control_frequency, - **kwargs, - ) - - # Algorithm parameters - self.lookahead_dist = lookahead_dist - self.k_rep = k_rep - self.alpha = alpha - self.v_max = v_max - self.k_angular = k_angular - - # Previous velocity for filtering (vx, vy, vtheta) - self.v_prev = np.array([0.0, 0.0, 0.0]) - - @rpc - def start(self) -> None: - super().start() - - @rpc - def stop(self) -> None: - super().stop() - - def compute_velocity(self) -> Twist | None: - """ - Compute velocity commands using GLAP algorithm. - - Returns: - Twist with linear and angular velocities in robot frame - """ - if self.latest_odom is None or self.latest_path is None or self.latest_costmap is None: - return None - - pose = np.array([self.latest_odom.position.x, self.latest_odom.position.y]) - - euler = quaternion_to_euler(self.latest_odom.orientation) - robot_yaw = euler.z - - path_points = [] - for pose_stamped in self.latest_path.poses: - path_points.append([pose_stamped.position.x, pose_stamped.position.y]) - - if len(path_points) == 0: - return None - - path = np.array(path_points) - - costmap = self.latest_costmap.grid - - v_follow_odom = self._compute_path_following(pose, path) - - v_rep_odom = self._compute_obstacle_repulsion(pose, costmap) - - v_odom = v_follow_odom + v_rep_odom - - # Transform velocity from odom frame to robot frame - cos_yaw = np.cos(robot_yaw) - sin_yaw = np.sin(robot_yaw) - - v_robot_x = cos_yaw * v_odom[0] + sin_yaw * v_odom[1] - v_robot_y = -sin_yaw * v_odom[0] + cos_yaw * v_odom[1] - - # Compute angular velocity - closest_idx, _ = self._find_closest_point_on_path(pose, path) - - # Check if we're near the final goal - goal_pose = self.latest_path.poses[-1] - distance_to_goal = get_distance(self.latest_odom, goal_pose) - - if distance_to_goal < self.goal_tolerance: - # Near goal - rotate to match final goal orientation - goal_euler = quaternion_to_euler(goal_pose.orientation) - desired_yaw = goal_euler.z - else: - # Not near goal - align with path direction - lookahead_point = self._find_lookahead_point(path, closest_idx) - dx = lookahead_point[0] - pose[0] - dy = lookahead_point[1] - pose[1] - desired_yaw = np.arctan2(dy, dx) - - yaw_error = normalize_angle(desired_yaw - robot_yaw) - k_angular = self.k_angular - v_theta = k_angular * yaw_error - - # Slow down linear velocity when turning - # Scale linear velocity based on angular velocity magnitude - angular_speed = abs(v_theta) - max_angular_speed = self.v_max - - # Calculate speed reduction factor (1.0 when not turning, 0.2 when at max turn rate) - turn_slowdown = 1.0 - 0.8 * min(angular_speed / max_angular_speed, 1.0) - - # Apply speed reduction to linear velocities - v_robot_x = np.clip(v_robot_x * turn_slowdown, -self.v_max, self.v_max) - v_robot_y = np.clip(v_robot_y * turn_slowdown, -self.v_max, self.v_max) - v_theta = np.clip(v_theta, -self.v_max, self.v_max) - - v_raw = np.array([v_robot_x, v_robot_y, v_theta]) - v_filtered = self.alpha * v_raw + (1 - self.alpha) * self.v_prev - self.v_prev = v_filtered - - return Twist( - linear=Vector3(v_filtered[0], v_filtered[1], 0.0), - angular=Vector3(0.0, 0.0, v_filtered[2]), - ) - - def _compute_path_following(self, pose: np.ndarray, path: np.ndarray) -> np.ndarray: # type: ignore[type-arg] - """ - Compute path following velocity using pure pursuit. - - Args: - pose: Current robot position [x, y] - path: Path waypoints as Nx2 array - - Returns: - Path following velocity vector [vx, vy] - """ - closest_idx, _ = self._find_closest_point_on_path(pose, path) - - carrot = self._find_lookahead_point(path, closest_idx) - - direction = carrot - pose - distance = np.linalg.norm(direction) - - if distance < 1e-6: - return np.zeros(2) - - v_follow = self.v_max * direction / distance - - return v_follow # type: ignore[no-any-return] - - def _compute_obstacle_repulsion(self, pose: np.ndarray, costmap: np.ndarray) -> np.ndarray: # type: ignore[type-arg] - """ - Compute obstacle repulsion velocity from costmap gradient. - - Args: - pose: Current robot position [x, y] - costmap: 2D costmap array - - Returns: - Repulsion velocity vector [vx, vy] - """ - grid_point = self.latest_costmap.world_to_grid(pose) # type: ignore[union-attr] - grid_x = int(grid_point.x) - grid_y = int(grid_point.y) - - height, width = costmap.shape - if not (1 <= grid_x < width - 1 and 1 <= grid_y < height - 1): - return np.zeros(2) - - # Compute gradient using central differences - # Note: costmap is in row-major order (y, x) - gx = (costmap[grid_y, grid_x + 1] - costmap[grid_y, grid_x - 1]) / ( - 2.0 * self.latest_costmap.resolution # type: ignore[union-attr] - ) - gy = (costmap[grid_y + 1, grid_x] - costmap[grid_y - 1, grid_x]) / ( - 2.0 * self.latest_costmap.resolution # type: ignore[union-attr] - ) - - # Gradient points towards higher cost, so negate for repulsion - v_rep = -self.k_rep * np.array([gx, gy]) - - return v_rep - - def _find_closest_point_on_path( - self, - pose: np.ndarray, # type: ignore[type-arg] - path: np.ndarray, # type: ignore[type-arg] - ) -> tuple[int, np.ndarray]: # type: ignore[type-arg] - """ - Find the closest point on the path to current pose. - - Args: - pose: Current position [x, y] - path: Path waypoints as Nx2 array - - Returns: - Tuple of (closest_index, closest_point) - """ - distances = np.linalg.norm(path - pose, axis=1) - closest_idx = np.argmin(distances) - return closest_idx, path[closest_idx] # type: ignore[return-value] - - def _find_lookahead_point(self, path: np.ndarray, start_idx: int) -> np.ndarray: # type: ignore[type-arg] - """ - Find look-ahead point on path at specified distance. - - Args: - path: Path waypoints as Nx2 array - start_idx: Starting index for search - - Returns: - Look-ahead point [x, y] - """ - accumulated_dist = 0.0 - - for i in range(start_idx, len(path) - 1): - segment_dist = np.linalg.norm(path[i + 1] - path[i]) - - if accumulated_dist + segment_dist >= self.lookahead_dist: - remaining_dist = self.lookahead_dist - accumulated_dist - t = remaining_dist / segment_dist - carrot = path[i] + t * (path[i + 1] - path[i]) - return carrot # type: ignore[no-any-return] - - accumulated_dist += segment_dist # type: ignore[assignment] - - return path[-1] # type: ignore[no-any-return] - - def _clip(self, v: np.ndarray) -> np.ndarray: # type: ignore[type-arg] - """Instance method to clip velocity with access to v_max.""" - return np.clip(v, -self.v_max, self.v_max) - - -holonomic_local_planner = HolonomicLocalPlanner.blueprint - -__all__ = ["HolonomicLocalPlanner", "holonomic_local_planner"] diff --git a/dimos/navigation/local_planner/local_planner.py b/dimos/navigation/local_planner/local_planner.py deleted file mode 100644 index ab2d9101eb..0000000000 --- a/dimos/navigation/local_planner/local_planner.py +++ /dev/null @@ -1,212 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Base Local Planner Module for robot navigation. -Subscribes to local costmap, odometry, and path, publishes movement commands. -""" - -from abc import abstractmethod -import threading -import time - -from reactivex.disposable import Disposable - -from dimos.core import In, Module, Out, rpc -from dimos.mapping.occupancy.gradient import gradient -from dimos.mapping.pointclouds.occupancy import general_occupancy -from dimos.msgs.geometry_msgs import PoseStamped, Twist -from dimos.msgs.nav_msgs import OccupancyGrid, Path -from dimos.robot.unitree_webrtc.type.lidar import LidarMessage -from dimos.utils.logging_config import setup_logger -from dimos.utils.transform_utils import get_distance, normalize_angle, quaternion_to_euler - -logger = setup_logger() - - -class BaseLocalPlanner(Module): - """ - local planner module for obstacle avoidance and path following. - - Subscribes to: - - /lidar: Local lidar for obstacle detection - - /odom: Robot odometry for current pose - - /path: Path to follow (continuously updated at ~1Hz) - - Publishes: - - /cmd_vel: Velocity commands for robot movement - """ - - # LCM inputs - lidar: In[LidarMessage] - odom: In[PoseStamped] - path: In[Path] - - # LCM outputs - cmd_vel: Out[Twist] - - def __init__( # type: ignore[no-untyped-def] - self, - goal_tolerance: float = 0.5, - orientation_tolerance: float = 0.2, - control_frequency: float = 10.0, - **kwargs, - ) -> None: - """Initialize the local planner module. - - Args: - goal_tolerance: Distance threshold to consider goal reached (meters) - orientation_tolerance: Orientation threshold to consider goal reached (radians) - control_frequency: Frequency for control loop (Hz) - """ - super().__init__(**kwargs) - - # Parameters - self.goal_tolerance = goal_tolerance - self.orientation_tolerance = orientation_tolerance - self.control_frequency = control_frequency - self.control_period = 1.0 / control_frequency - - # Latest data - self.latest_costmap: OccupancyGrid | None = None - self.latest_odom: PoseStamped | None = None - self.latest_path: Path | None = None - - # Control thread - self.planning_thread: threading.Thread | None = None - self.stop_planning = threading.Event() - - logger.info("Local planner module initialized") - - @rpc - def start(self) -> None: - super().start() - - self._disposables.add(Disposable(self.lidar.subscribe(self._on_lidar))) - self._disposables.add(Disposable(self.odom.subscribe(self._on_odom))) - self._disposables.add(Disposable(self.path.subscribe(self._on_path))) - - @rpc - def stop(self) -> None: - self.cancel_planning() - super().stop() - - def _on_lidar(self, msg: LidarMessage) -> None: - self.latest_costmap = gradient( - general_occupancy( - msg, - resolution=0.05, - min_height=0.15, - max_height=0.6, - ), - max_distance=0.25, - ) - - def _on_odom(self, msg: PoseStamped) -> None: - self.latest_odom = msg - - def _on_path(self, msg: Path) -> None: - self.latest_path = msg - - if msg and len(msg.poses) > 0: - if self.planning_thread is None or not self.planning_thread.is_alive(): - self._start_planning_thread() - - def _start_planning_thread(self) -> None: - self.stop_planning.clear() - self.planning_thread = threading.Thread(target=self._follow_path_loop, daemon=True) - self.planning_thread.start() - - def _follow_path_loop(self) -> None: - while not self.stop_planning.is_set(): - if self.is_goal_reached(): - self.stop_planning.set() - stop_cmd = Twist() - self.cmd_vel.publish(stop_cmd) - break - - # Compute and publish velocity - self._plan() - - time.sleep(self.control_period) - - def _plan(self) -> None: - """Compute and publish velocity command.""" - cmd_vel = self.compute_velocity() - - if cmd_vel is not None: - self.cmd_vel.publish(cmd_vel) - - @abstractmethod - def compute_velocity(self) -> Twist | None: - """ - Compute velocity commands based on current costmap, odometry, and path. - Must be implemented by derived classes. - - Returns: - Twist message with linear and angular velocity commands, or None if no command - """ - pass - - @rpc - def is_goal_reached(self) -> bool: - """ - Check if the robot has reached the goal position and orientation. - - Returns: - True if goal is reached within tolerance, False otherwise - """ - if self.latest_odom is None or self.latest_path is None: - return False - - if len(self.latest_path.poses) == 0: - return True - - goal_pose = self.latest_path.poses[-1] - distance = get_distance(self.latest_odom, goal_pose) - - # Check distance tolerance - if distance >= self.goal_tolerance: - return False - - # Check orientation tolerance - current_euler = quaternion_to_euler(self.latest_odom.orientation) - goal_euler = quaternion_to_euler(goal_pose.orientation) - - # Calculate yaw difference and normalize to [-pi, pi] - yaw_error = normalize_angle(goal_euler.z - current_euler.z) - - return bool(abs(yaw_error) < self.orientation_tolerance) - - @rpc - def reset(self) -> None: - """Reset the local planner state, clearing the current path.""" - # Clear the latest path - self.latest_path = None - self.latest_odom = None - self.latest_costmap = None - self.cancel_planning() - logger.info("Local planner reset") - - @rpc - def cancel_planning(self) -> None: - """Stop the local planner and any running threads.""" - if self.planning_thread and self.planning_thread.is_alive(): - self.stop_planning.set() - self.planning_thread.join(timeout=1.0) - self.planning_thread = None - stop_cmd = Twist() - self.cmd_vel.publish(stop_cmd) diff --git a/dimos/navigation/local_planner/test_base_local_planner.py b/dimos/navigation/local_planner/test_base_local_planner.py deleted file mode 100644 index 59ce7cbda4..0000000000 --- a/dimos/navigation/local_planner/test_base_local_planner.py +++ /dev/null @@ -1,406 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Unit tests for the GLAP (Gradient-Augmented Look-Ahead Pursuit) holonomic local planner. -""" - -import numpy as np -import pytest - -from dimos.msgs.geometry_msgs import Pose, PoseStamped, Quaternion -from dimos.msgs.nav_msgs import OccupancyGrid, Path -from dimos.navigation.local_planner.holonomic_local_planner import HolonomicLocalPlanner - - -class TestHolonomicLocalPlanner: - """Test suite for HolonomicLocalPlanner.""" - - @pytest.fixture - def planner(self): - """Create a planner instance for testing.""" - planner = HolonomicLocalPlanner( - lookahead_dist=1.5, - k_rep=1.0, - alpha=1.0, # No filtering for deterministic tests - v_max=1.0, - goal_tolerance=0.5, - control_frequency=10.0, - ) - yield planner - # TODO: This should call `planner.stop()` but that causes errors. - # Calling just this for now to fix thread leaks. - planner._close_module() - - @pytest.fixture - def empty_costmap(self): - """Create an empty costmap (all free space).""" - costmap = OccupancyGrid( - grid=np.zeros((100, 100), dtype=np.int8), resolution=0.1, origin=Pose() - ) - costmap.origin.position.x = -5.0 - costmap.origin.position.y = -5.0 - return costmap - - def test_straight_path_no_obstacles(self, planner, empty_costmap) -> None: - """Test that planner follows straight path with no obstacles.""" - # Set current position at origin - planner.latest_odom = PoseStamped() - planner.latest_odom.position.x = 0.0 - planner.latest_odom.position.y = 0.0 - - # Create straight path along +X - path = Path() - for i in range(10): - ps = PoseStamped() - ps.position.x = float(i) - ps.position.y = 0.0 - ps.orientation.w = 1.0 # Identity quaternion - path.poses.append(ps) - planner.latest_path = path - - # Set empty costmap - planner.latest_costmap = empty_costmap - - # Compute velocity - vel = planner.compute_velocity() - - # Should move along +X - assert vel is not None - assert vel.linear.x > 0.9 # Close to v_max - assert abs(vel.linear.y) < 0.1 # Near zero - assert abs(vel.angular.z) < 0.1 # Small angular velocity when aligned with path - - def test_obstacle_gradient_repulsion(self, planner) -> None: - """Test that obstacle gradients create repulsive forces.""" - # Set position at origin - planner.latest_odom = PoseStamped() - planner.latest_odom.position.x = 0.0 - planner.latest_odom.position.y = 0.0 - - # Simple path forward - path = Path() - ps = PoseStamped() - ps.position.x = 5.0 - ps.position.y = 0.0 - ps.orientation.w = 1.0 - path.poses.append(ps) - planner.latest_path = path - - # Create costmap with gradient pointing south (higher cost north) - costmap_grid = np.zeros((100, 100), dtype=np.int8) - for i in range(100): - costmap_grid[i, :] = max(0, 50 - i) # Gradient from north to south - - planner.latest_costmap = OccupancyGrid(grid=costmap_grid, resolution=0.1, origin=Pose()) - planner.latest_costmap.origin.position.x = -5.0 - planner.latest_costmap.origin.position.y = -5.0 - - # Compute velocity - vel = planner.compute_velocity() - - # Should have positive Y component (pushed north by gradient) - assert vel is not None - assert vel.linear.y > 0.1 # Repulsion pushes north - - def test_lowpass_filter(self) -> None: - """Test that low-pass filter smooths velocity commands.""" - # Create planner with alpha=0.5 for filtering - planner = HolonomicLocalPlanner( - lookahead_dist=1.0, - k_rep=0.0, # No repulsion - alpha=0.5, # 50% filtering - v_max=1.0, - ) - - # Setup similar to straight path test - planner.latest_odom = PoseStamped() - planner.latest_odom.position.x = 0.0 - planner.latest_odom.position.y = 0.0 - - path = Path() - ps = PoseStamped() - ps.position.x = 5.0 - ps.position.y = 0.0 - ps.orientation.w = 1.0 - path.poses.append(ps) - planner.latest_path = path - - planner.latest_costmap = OccupancyGrid( - grid=np.zeros((100, 100), dtype=np.int8), resolution=0.1, origin=Pose() - ) - planner.latest_costmap.origin.position.x = -5.0 - planner.latest_costmap.origin.position.y = -5.0 - - # First call - previous velocity is zero - vel1 = planner.compute_velocity() - assert vel1 is not None - - # Store first velocity - first_vx = vel1.linear.x - - # Second call - should be filtered - vel2 = planner.compute_velocity() - assert vel2 is not None - - # With alpha=0.5 and same conditions: - # v2 = 0.5 * v_raw + 0.5 * v1 - # The filtering effect should be visible - # v2 should be between v1 and the raw velocity - assert vel2.linear.x != first_vx # Should be different due to filtering - assert 0 < vel2.linear.x <= planner.v_max # Should still be positive and within limits - planner._close_module() - - def test_no_path(self, planner, empty_costmap) -> None: - """Test that planner returns None when no path is available.""" - planner.latest_odom = PoseStamped() - planner.latest_costmap = empty_costmap - planner.latest_path = Path() # Empty path - - vel = planner.compute_velocity() - assert vel is None - - def test_no_odometry(self, planner, empty_costmap) -> None: - """Test that planner returns None when no odometry is available.""" - planner.latest_odom = None - planner.latest_costmap = empty_costmap - - path = Path() - ps = PoseStamped() - ps.position.x = 1.0 - ps.position.y = 0.0 - path.poses.append(ps) - planner.latest_path = path - - vel = planner.compute_velocity() - assert vel is None - - def test_no_costmap(self, planner) -> None: - """Test that planner returns None when no costmap is available.""" - planner.latest_odom = PoseStamped() - planner.latest_costmap = None - - path = Path() - ps = PoseStamped() - ps.position.x = 1.0 - ps.position.y = 0.0 - path.poses.append(ps) - planner.latest_path = path - - vel = planner.compute_velocity() - assert vel is None - - def test_goal_reached(self, planner, empty_costmap) -> None: - """Test velocity when robot is at goal.""" - # Set robot at goal position - planner.latest_odom = PoseStamped() - planner.latest_odom.position.x = 5.0 - planner.latest_odom.position.y = 0.0 - - # Path with single point at robot position - path = Path() - ps = PoseStamped() - ps.position.x = 5.0 - ps.position.y = 0.0 - ps.orientation.w = 1.0 - path.poses.append(ps) - planner.latest_path = path - - planner.latest_costmap = empty_costmap - - # Compute velocity - vel = planner.compute_velocity() - - # Should have near-zero velocity - assert vel is not None - assert abs(vel.linear.x) < 0.1 - assert abs(vel.linear.y) < 0.1 - - def test_velocity_saturation(self, planner, empty_costmap) -> None: - """Test that velocities are capped at v_max.""" - # Set robot far from goal to maximize commanded velocity - planner.latest_odom = PoseStamped() - planner.latest_odom.position.x = 0.0 - planner.latest_odom.position.y = 0.0 - - # Create path far away - path = Path() - ps = PoseStamped() - ps.position.x = 100.0 # Very far - ps.position.y = 0.0 - ps.orientation.w = 1.0 - path.poses.append(ps) - planner.latest_path = path - - planner.latest_costmap = empty_costmap - - # Compute velocity - vel = planner.compute_velocity() - - # Velocity should be saturated at v_max - assert vel is not None - assert abs(vel.linear.x) <= planner.v_max + 0.01 # Small tolerance - assert abs(vel.linear.y) <= planner.v_max + 0.01 - assert abs(vel.angular.z) <= planner.v_max + 0.01 - - def test_lookahead_interpolation(self, planner, empty_costmap) -> None: - """Test that lookahead point is correctly interpolated on path.""" - # Set robot at origin - planner.latest_odom = PoseStamped() - planner.latest_odom.position.x = 0.0 - planner.latest_odom.position.y = 0.0 - - # Create path with waypoints closer than lookahead distance - path = Path() - for i in range(5): - ps = PoseStamped() - ps.position.x = i * 0.5 # 0.5m spacing - ps.position.y = 0.0 - ps.orientation.w = 1.0 - path.poses.append(ps) - planner.latest_path = path - - planner.latest_costmap = empty_costmap - - # Compute velocity - vel = planner.compute_velocity() - - # Should move forward along path - assert vel is not None - assert vel.linear.x > 0.5 # Moving forward - assert abs(vel.linear.y) < 0.1 # Staying on path - - def test_curved_path_following(self, planner, empty_costmap) -> None: - """Test following a curved path.""" - # Set robot at origin - planner.latest_odom = PoseStamped() - planner.latest_odom.position.x = 0.0 - planner.latest_odom.position.y = 0.0 - - # Create curved path (quarter circle) - path = Path() - for i in range(10): - angle = (np.pi / 2) * (i / 9.0) # 0 to 90 degrees - ps = PoseStamped() - ps.position.x = 2.0 * np.cos(angle) - ps.position.y = 2.0 * np.sin(angle) - ps.orientation.w = 1.0 - path.poses.append(ps) - planner.latest_path = path - - planner.latest_costmap = empty_costmap - - # Compute velocity - vel = planner.compute_velocity() - - # Should have both X and Y components for curved motion - assert vel is not None - # Test general behavior: should be moving (not exact values) - assert vel.linear.x > 0 # Moving forward (any positive value) - assert vel.linear.y > 0 # Turning left (any positive value) - # Ensure we have meaningful movement, not just noise - total_linear = np.sqrt(vel.linear.x**2 + vel.linear.y**2) - assert total_linear > 0.1 # Some reasonable movement - - def test_robot_frame_transformation(self, empty_costmap) -> None: - """Test that velocities are correctly transformed to robot frame.""" - # Create planner with no filtering for deterministic test - planner = HolonomicLocalPlanner( - lookahead_dist=1.0, - k_rep=0.0, # No repulsion - alpha=1.0, # No filtering - v_max=1.0, - ) - - # Set robot at origin but rotated 90 degrees (facing +Y in odom frame) - planner.latest_odom = PoseStamped() - planner.latest_odom.position.x = 0.0 - planner.latest_odom.position.y = 0.0 - # Quaternion for 90 degree rotation around Z - planner.latest_odom.orientation = Quaternion(0.0, 0.0, 0.7071068, 0.7071068) - - # Create path along +X axis in odom frame - path = Path() - for i in range(5): - ps = PoseStamped() - ps.position.x = float(i) - ps.position.y = 0.0 - ps.orientation.w = 1.0 - path.poses.append(ps) - planner.latest_path = path - - planner.latest_costmap = empty_costmap - - # Compute velocity - vel = planner.compute_velocity() - - # Robot is facing +Y, path is along +X - # So in robot frame: forward is +Y direction, path is to the right - assert vel is not None - # Test relative magnitudes and signs rather than exact values - # Path is to the right, so Y velocity should be negative - assert vel.linear.y < 0 # Should move right (negative Y in robot frame) - # Should turn to align with path - assert vel.angular.z < 0 # Should turn right (negative angular velocity) - # X velocity should be relatively small compared to Y - assert abs(vel.linear.x) < abs(vel.linear.y) # Lateral movement dominates - planner._close_module() - - def test_angular_velocity_computation(self, empty_costmap) -> None: - """Test that angular velocity is computed to align with path.""" - planner = HolonomicLocalPlanner( - lookahead_dist=2.0, - k_rep=0.0, # No repulsion - alpha=1.0, # No filtering - v_max=1.0, - ) - - # Robot at origin facing +X - planner.latest_odom = PoseStamped() - planner.latest_odom.position.x = 0.0 - planner.latest_odom.position.y = 0.0 - planner.latest_odom.orientation.w = 1.0 # Identity quaternion - - # Create path at 45 degrees - path = Path() - for i in range(5): - ps = PoseStamped() - ps.position.x = float(i) - ps.position.y = float(i) # Diagonal path - ps.orientation.w = 1.0 - path.poses.append(ps) - planner.latest_path = path - - planner.latest_costmap = empty_costmap - - # Compute velocity - vel = planner.compute_velocity() - - # Path is at 45 degrees, robot facing 0 degrees - # Should have positive angular velocity to turn left - assert vel is not None - # Test general behavior without exact thresholds - assert vel.linear.x > 0 # Moving forward (any positive value) - assert vel.linear.y > 0 # Moving left (holonomic, any positive value) - assert vel.angular.z > 0 # Turning left (positive angular velocity) - # Verify the robot is actually moving with reasonable speed - total_linear = np.sqrt(vel.linear.x**2 + vel.linear.y**2) - assert total_linear > 0.1 # Some meaningful movement - # Since path is diagonal, X and Y should be similar magnitude - assert ( - abs(vel.linear.x - vel.linear.y) < max(vel.linear.x, vel.linear.y) * 0.5 - ) # Within 50% of each other - planner._close_module() diff --git a/dimos/navigation/replanning_a_star/global_planner.py b/dimos/navigation/replanning_a_star/global_planner.py index 5dd36d426f..8dc1a42ccf 100644 --- a/dimos/navigation/replanning_a_star/global_planner.py +++ b/dimos/navigation/replanning_a_star/global_planner.py @@ -30,9 +30,9 @@ from dimos.msgs.nav_msgs.Path import Path from dimos.msgs.sensor_msgs import Image from dimos.navigation.base import NavigationState -from dimos.navigation.bt_navigator.goal_validator import find_safe_goal -from dimos.navigation.global_planner.astar import astar +from dimos.navigation.replanning_a_star.goal_validator import find_safe_goal from dimos.navigation.replanning_a_star.local_planner import LocalPlanner, StopMessage +from dimos.navigation.replanning_a_star.min_cost_astar import min_cost_astar from dimos.navigation.replanning_a_star.navigation_map import NavigationMap from dimos.navigation.replanning_a_star.position_tracker import PositionTracker from dimos.navigation.replanning_a_star.replan_limiter import ReplanLimiter @@ -314,7 +314,7 @@ def _find_wide_path(self, goal: Vector3, robot_pos: Vector3) -> Path | None: for size in sizes_to_try: costmap = self._navigation_map.make_gradient_costmap(size) - path = astar(self._global_config.astar_algorithm, costmap, goal, robot_pos) + path = min_cost_astar(costmap, goal, robot_pos) if path and path.poses: logger.info(f"Found path {size}x robot width.") return path diff --git a/dimos/navigation/bt_navigator/goal_validator.py b/dimos/navigation/replanning_a_star/goal_validator.py similarity index 51% rename from dimos/navigation/bt_navigator/goal_validator.py rename to dimos/navigation/replanning_a_star/goal_validator.py index 59fb6e71ff..5cd093e955 100644 --- a/dimos/navigation/bt_navigator/goal_validator.py +++ b/dimos/navigation/replanning_a_star/goal_validator.py @@ -16,7 +16,6 @@ import numpy as np -from dimos.mapping.occupancy.gradient import gradient from dimos.msgs.geometry_msgs import Vector3, VectorLike from dimos.msgs.nav_msgs import CostValues, OccupancyGrid @@ -64,28 +63,6 @@ def find_safe_goal( max_search_distance, connectivity_check_radius, ) - elif algorithm == "spiral": - return _find_safe_goal_spiral( - costmap, - goal, - cost_threshold, - min_clearance, - max_search_distance, - connectivity_check_radius, - ) - elif algorithm == "voronoi": - return _find_safe_goal_voronoi( - costmap, goal, cost_threshold, min_clearance, max_search_distance - ) - elif algorithm == "gradient_descent": - return _find_safe_goal_gradient( - costmap, - goal, - cost_threshold, - min_clearance, - max_search_distance, - connectivity_check_radius, - ) else: raise ValueError(f"Unknown algorithm: {algorithm}") @@ -221,239 +198,6 @@ def _find_safe_goal_bfs_contiguous( return None -def _find_safe_goal_spiral( - costmap: OccupancyGrid, - goal: VectorLike, - cost_threshold: int, - min_clearance: float, - max_search_distance: float, - connectivity_check_radius: int, -) -> Vector3 | None: - """ - Spiral search pattern from goal outward. - - Pros: - - Simple and predictable pattern - - Memory efficient - - Good for uniformly distributed obstacles - - Cons: - - May not find the absolute closest safe position - - Can miss nearby safe spots due to spiral pattern - """ - - # Convert goal to grid coordinates - goal_grid = costmap.world_to_grid(goal) - cx, cy = int(goal_grid.x), int(goal_grid.y) - - # Convert distances to grid cells - clearance_cells = int(np.ceil(min_clearance / costmap.resolution)) - max_radius = int(np.ceil(max_search_distance / costmap.resolution)) - - # Spiral outward - for radius in range(0, max_radius + 1): - if radius == 0: - # Check center point - if _is_position_safe( - costmap, cx, cy, cost_threshold, clearance_cells, connectivity_check_radius - ): - return costmap.grid_to_world((cx, cy)) - else: - # Check points on the square perimeter at this radius - points = [] - - # Top and bottom edges - for x in range(cx - radius, cx + radius + 1): - points.append((x, cy - radius)) # Top - points.append((x, cy + radius)) # Bottom - - # Left and right edges (excluding corners to avoid duplicates) - for y in range(cy - radius + 1, cy + radius): - points.append((cx - radius, y)) # Left - points.append((cx + radius, y)) # Right - - # Check each point - for x, y in points: - if 0 <= x < costmap.width and 0 <= y < costmap.height: - if _is_position_safe( - costmap, x, y, cost_threshold, clearance_cells, connectivity_check_radius - ): - return costmap.grid_to_world((x, y)) - - return None - - -def _find_safe_goal_voronoi( - costmap: OccupancyGrid, - goal: VectorLike, - cost_threshold: int, - min_clearance: float, - max_search_distance: float, -) -> Vector3 | None: - """ - Find safe position using Voronoi diagram (ridge points equidistant from obstacles). - - Pros: - - Finds positions maximally far from obstacles - - Good for narrow passages - - Natural safety margin - - Cons: - - More computationally expensive - - May find positions unnecessarily far from obstacles - - Requires scipy for efficient implementation - """ - - from scipy import ndimage # type: ignore[import-untyped] - from skimage.morphology import skeletonize # type: ignore[import-not-found] - - # Convert goal to grid coordinates - goal_grid = costmap.world_to_grid(goal) - gx, gy = int(goal_grid.x), int(goal_grid.y) - - # Create binary obstacle map - free_map = (costmap.grid < cost_threshold) & (costmap.grid != CostValues.UNKNOWN) - - # Compute distance transform - distance_field = ndimage.distance_transform_edt(free_map) - - # Find skeleton/medial axis (approximation of Voronoi diagram) - skeleton = skeletonize(free_map) - - # Filter skeleton points by minimum clearance - clearance_cells = int(np.ceil(min_clearance / costmap.resolution)) - valid_skeleton = skeleton & (distance_field >= clearance_cells) - - if not np.any(valid_skeleton): - # Fall back to BFS if no valid skeleton points - return _find_safe_goal_bfs( - costmap, goal, cost_threshold, min_clearance, max_search_distance, 3 - ) - - # Find nearest valid skeleton point to goal - skeleton_points = np.argwhere(valid_skeleton) - if len(skeleton_points) == 0: - return None - - # Calculate distances from goal to all skeleton points - distances = np.sqrt((skeleton_points[:, 1] - gx) ** 2 + (skeleton_points[:, 0] - gy) ** 2) - - # Filter by max search distance - max_search_cells = max_search_distance / costmap.resolution - valid_indices = distances <= max_search_cells - - if not np.any(valid_indices): - return None - - # Find closest valid point - valid_distances = distances[valid_indices] - valid_points = skeleton_points[valid_indices] - closest_idx = np.argmin(valid_distances) - best_y, best_x = valid_points[closest_idx] - - return costmap.grid_to_world((best_x, best_y)) - - -def _find_safe_goal_gradient( - costmap: OccupancyGrid, - goal: VectorLike, - cost_threshold: int, - min_clearance: float, - max_search_distance: float, - connectivity_check_radius: int, -) -> Vector3 | None: - """ - Use gradient descent on the costmap to find a safe position. - - Pros: - - Naturally flows away from obstacles - - Works well with gradient costmaps - - Can handle complex cost distributions - - Cons: - - Can get stuck in local minima - - Requires a gradient costmap - - May not find globally optimal position - """ - - # Convert goal to grid coordinates - goal_grid = costmap.world_to_grid(goal) - x, y = goal_grid.x, goal_grid.y - - # Convert distances to grid cells - clearance_cells = int(np.ceil(min_clearance / costmap.resolution)) - max_search_cells = int(np.ceil(max_search_distance / costmap.resolution)) - - # Create gradient if needed (assuming costmap might already be a gradient) - if np.all((costmap.grid == 0) | (costmap.grid == 100) | (costmap.grid == -1)): - # Binary map, create gradient - gradient_map = gradient( - costmap, obstacle_threshold=cost_threshold, max_distance=min_clearance * 2 - ) - grid = gradient_map.grid - else: - grid = costmap.grid - - # Gradient descent with momentum - momentum = 0.9 - learning_rate = 1.0 - vx, vy = 0.0, 0.0 - - best_x, best_y = None, None - best_cost = float("inf") - - for iteration in range(100): # Max iterations - ix, iy = int(x), int(y) - - # Check if current position is valid - if 0 <= ix < costmap.width and 0 <= iy < costmap.height: - current_cost = grid[iy, ix] - - # Check distance from original goal - dist = np.sqrt((x - goal_grid.x) ** 2 + (y - goal_grid.y) ** 2) - if dist > max_search_cells: - break - - # Check if position is safe - if _is_position_safe( - costmap, ix, iy, cost_threshold, clearance_cells, connectivity_check_radius - ): - if current_cost < best_cost: - best_x, best_y = ix, iy - best_cost = current_cost - - # If cost is very low, we found a good spot - if current_cost < 10: - break - - # Compute gradient using finite differences - gx, gy = 0.0, 0.0 - - if 0 < ix < costmap.width - 1: - gx = (grid[iy, min(ix + 1, costmap.width - 1)] - grid[iy, max(ix - 1, 0)]) / 2.0 - - if 0 < iy < costmap.height - 1: - gy = (grid[min(iy + 1, costmap.height - 1), ix] - grid[max(iy - 1, 0), ix]) / 2.0 - - # Update with momentum - vx = momentum * vx - learning_rate * gx - vy = momentum * vy - learning_rate * gy - - # Update position - x += vx - y += vy - - # Add small random noise to escape local minima - if iteration % 20 == 0: - x += np.random.randn() * 0.5 - y += np.random.randn() * 0.5 - - if best_x is not None and best_y is not None: - return costmap.grid_to_world((best_x, best_y)) - - return None - - def _is_position_safe( costmap: OccupancyGrid, x: int, diff --git a/dimos/navigation/global_planner/min_cost_astar.py b/dimos/navigation/replanning_a_star/min_cost_astar.py similarity index 99% rename from dimos/navigation/global_planner/min_cost_astar.py rename to dimos/navigation/replanning_a_star/min_cost_astar.py index 6580070957..c3430e64d9 100644 --- a/dimos/navigation/global_planner/min_cost_astar.py +++ b/dimos/navigation/replanning_a_star/min_cost_astar.py @@ -20,7 +20,7 @@ # Try to import C++ extension for faster pathfinding try: - from dimos.navigation.global_planner.min_cost_astar_ext import ( + from dimos.navigation.replanning_a_star.min_cost_astar_ext import ( min_cost_astar_cpp as _astar_cpp, ) diff --git a/dimos/navigation/global_planner/min_cost_astar_cpp.cpp b/dimos/navigation/replanning_a_star/min_cost_astar_cpp.cpp similarity index 100% rename from dimos/navigation/global_planner/min_cost_astar_cpp.cpp rename to dimos/navigation/replanning_a_star/min_cost_astar_cpp.cpp diff --git a/dimos/navigation/global_planner/min_cost_astar_ext.pyi b/dimos/navigation/replanning_a_star/min_cost_astar_ext.pyi similarity index 100% rename from dimos/navigation/global_planner/min_cost_astar_ext.pyi rename to dimos/navigation/replanning_a_star/min_cost_astar_ext.pyi diff --git a/dimos/navigation/bt_navigator/test_goal_validator.py b/dimos/navigation/replanning_a_star/test_goal_validator.py similarity index 95% rename from dimos/navigation/bt_navigator/test_goal_validator.py rename to dimos/navigation/replanning_a_star/test_goal_validator.py index b80331e46b..4cda9de863 100644 --- a/dimos/navigation/bt_navigator/test_goal_validator.py +++ b/dimos/navigation/replanning_a_star/test_goal_validator.py @@ -17,7 +17,7 @@ from dimos.msgs.geometry_msgs import Vector3 from dimos.msgs.nav_msgs.OccupancyGrid import CostValues, OccupancyGrid -from dimos.navigation.bt_navigator.goal_validator import find_safe_goal +from dimos.navigation.replanning_a_star.goal_validator import find_safe_goal from dimos.utils.data import get_data diff --git a/dimos/navigation/global_planner/test_astar.py b/dimos/navigation/replanning_a_star/test_min_cost_astar.py similarity index 74% rename from dimos/navigation/global_planner/test_astar.py rename to dimos/navigation/replanning_a_star/test_min_cost_astar.py index ad895fcca7..86c5575434 100644 --- a/dimos/navigation/global_planner/test_astar.py +++ b/dimos/navigation/replanning_a_star/test_min_cost_astar.py @@ -23,7 +23,7 @@ from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.msgs.nav_msgs.OccupancyGrid import OccupancyGrid from dimos.msgs.sensor_msgs.Image import Image -from dimos.navigation.global_planner.astar import astar +from dimos.navigation.replanning_a_star.min_cost_astar import min_cost_astar from dimos.utils.data import get_data @@ -37,37 +37,23 @@ def costmap_three_paths() -> PointCloud: return voronoi_gradient(OccupancyGrid(np.load(get_data("three_paths.npy"))), max_distance=1.5) -@pytest.mark.parametrize( - "mode,expected_image", - [ - ("general", "astar_general.png"), - ("min_cost", "astar_min_cost.png"), - ], -) -def test_astar(costmap, mode, expected_image) -> None: +def test_astar(costmap) -> None: start = Vector3(4.0, 2.0) goal = Vector3(6.15, 10.0) - expected = Image.from_file(get_data(expected_image)) + expected = Image.from_file(get_data("astar_min_cost.png")) - path = astar(mode, costmap, goal, start, use_cpp=False) + path = min_cost_astar(costmap, goal, start, use_cpp=False) actual = visualize_occupancy_grid(costmap, "rainbow", path) np.testing.assert_array_equal(actual.data, expected.data) -@pytest.mark.parametrize( - "mode,expected_image", - [ - ("general", "astar_corner_general.png"), - ("min_cost", "astar_corner_min_cost.png"), - ], -) -def test_astar_corner(costmap_three_paths, mode, expected_image) -> None: +def test_astar_corner(costmap_three_paths) -> None: start = Vector3(2.8, 3.35) goal = Vector3(6.35, 4.25) - expected = Image.from_file(get_data(expected_image)) + expected = Image.from_file(get_data("astar_corner_min_cost.png")) - path = astar(mode, costmap_three_paths, goal, start, use_cpp=False) + path = min_cost_astar(costmap_three_paths, goal, start, use_cpp=False) actual = visualize_occupancy_grid(costmap_three_paths, "rainbow", path) np.testing.assert_array_equal(actual.data, expected.data) @@ -78,14 +64,14 @@ def test_astar_python_and_cpp(costmap) -> None: goal = Vector3(6.15, 10.0) start_time = time.perf_counter() - path_python = astar("min_cost", costmap, goal, start, use_cpp=False) + path_python = min_cost_astar("min_cost", costmap, goal, start, use_cpp=False) elapsed_time_python = time.perf_counter() - start_time print(f"\nastar Python took {elapsed_time_python:.6f} seconds") assert path_python is not None assert len(path_python.poses) > 0 start_time = time.perf_counter() - path_cpp = astar("min_cost", costmap, goal, start, use_cpp=True) + path_cpp = min_cost_astar("min_cost", costmap, goal, start, use_cpp=True) elapsed_time_cpp = time.perf_counter() - start_time print(f"astar C++ took {elapsed_time_cpp:.6f} seconds") assert path_cpp is not None diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index e30d42dc32..c7db7bb2e9 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -57,7 +57,6 @@ all_modules = { "astar_planner": "dimos.navigation.global_planner.planner", - "behavior_tree_navigator": "dimos.navigation.bt_navigator.navigator", "camera_module": "dimos.hardware.camera.module", "depth_module": "dimos.robot.unitree_webrtc.depth_module", "detection_2d": "dimos.perception.detection2d.module2D", @@ -67,7 +66,6 @@ "g1_skills": "dimos.robot.unitree_webrtc.unitree_g1_skill_container", "google_maps_skill": "dimos.agents.skills.google_maps_skill_container", "gps_nav_skill": "dimos.agents.skills.gps_nav_skill", - "holonomic_local_planner": "dimos.navigation.local_planner.holonomic_local_planner", "human_input": "dimos.agents.cli.human", "keyboard_teleop": "dimos.robot.unitree_webrtc.keyboard_teleop", "llm_agent": "dimos.agents.agent", diff --git a/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py b/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py index 71b0551a70..3c11d32f0a 100644 --- a/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_g1_blueprints.py @@ -34,6 +34,8 @@ from dimos.hardware.sensors.camera import zed from dimos.hardware.sensors.camera.module import camera_module # type: ignore[attr-defined] from dimos.hardware.sensors.camera.webcam import Webcam +from dimos.mapping.costmapper import cost_mapper +from dimos.mapping.voxels import voxel_mapper from dimos.msgs.geometry_msgs import ( PoseStamped, Quaternion, @@ -45,16 +47,8 @@ from dimos.msgs.sensor_msgs import Image, PointCloud2 from dimos.msgs.std_msgs import Bool from dimos.msgs.vision_msgs import Detection2DArray -from dimos.navigation.bt_navigator.navigator import ( - behavior_tree_navigator, -) -from dimos.navigation.frontier_exploration import ( - wavefront_frontier_explorer, -) -from dimos.navigation.global_planner.planner import astar_planner -from dimos.navigation.local_planner.holonomic_local_planner import ( - holonomic_local_planner, -) +from dimos.navigation.frontier_exploration import wavefront_frontier_explorer +from dimos.navigation.replanning_a_star.module import replanning_a_star_planner from dimos.navigation.rosnav import ros_nav from dimos.perception.detection.detectors.person.yolo import YoloPersonDetector from dimos.perception.detection.module3D import Detection3DModule, detection3d_module @@ -66,7 +60,6 @@ from dimos.robot.unitree.connection.g1 import g1_connection from dimos.robot.unitree.connection.g1sim import g1_sim_connection from dimos.robot.unitree_webrtc.keyboard_teleop import keyboard_teleop -from dimos.robot.unitree_webrtc.type.map import mapper from dimos.robot.unitree_webrtc.unitree_g1_skill_container import g1_skills from dimos.utils.monitoring import utilization from dimos.web.websocket_vis.websocket_vis_module import websocket_vis @@ -87,11 +80,8 @@ camera_info=zed.CameraInfo.SingleWebcam, ), ), - # SLAM and mapping - mapper(voxel_size=0.5, global_publish_interval=2.5), - # Navigation stack - astar_planner(), - holonomic_local_planner(), + voxel_mapper(voxel_size=0.1), + cost_mapper(), wavefront_frontier_explorer(), # Visualization websocket_vis(), @@ -132,7 +122,7 @@ basic_sim = autoconnect( _basic_no_nav, g1_sim_connection(), - behavior_tree_navigator(), + replanning_a_star_planner(), ) _perception_and_memory = autoconnect( diff --git a/setup.py b/setup.py index 82ba9c8114..013ff731a8 100644 --- a/setup.py +++ b/setup.py @@ -20,8 +20,8 @@ # C++ extensions ext_modules = [ Pybind11Extension( - "dimos.navigation.global_planner.min_cost_astar_ext", - [os.path.join("dimos", "navigation", "global_planner", "min_cost_astar_cpp.cpp")], + "dimos.navigation.replanning_a_star.min_cost_astar_ext", + [os.path.join("dimos", "navigation", "replanning_a_star", "min_cost_astar_cpp.cpp")], extra_compile_args=[ "-O3", # Maximum optimization "-march=native", # Optimize for current CPU From 8c037fbc31ffe6397d7cb00d735c9f96b02082b5 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Fri, 2 Jan 2026 08:30:25 +0200 Subject: [PATCH 2/4] fix bad args --- dimos/navigation/replanning_a_star/test_min_cost_astar.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dimos/navigation/replanning_a_star/test_min_cost_astar.py b/dimos/navigation/replanning_a_star/test_min_cost_astar.py index 86c5575434..9cc0cad29a 100644 --- a/dimos/navigation/replanning_a_star/test_min_cost_astar.py +++ b/dimos/navigation/replanning_a_star/test_min_cost_astar.py @@ -64,14 +64,14 @@ def test_astar_python_and_cpp(costmap) -> None: goal = Vector3(6.15, 10.0) start_time = time.perf_counter() - path_python = min_cost_astar("min_cost", costmap, goal, start, use_cpp=False) + path_python = min_cost_astar(costmap, goal, start, use_cpp=False) elapsed_time_python = time.perf_counter() - start_time print(f"\nastar Python took {elapsed_time_python:.6f} seconds") assert path_python is not None assert len(path_python.poses) > 0 start_time = time.perf_counter() - path_cpp = min_cost_astar("min_cost", costmap, goal, start, use_cpp=True) + path_cpp = min_cost_astar(costmap, goal, start, use_cpp=True) elapsed_time_cpp = time.perf_counter() - start_time print(f"astar C++ took {elapsed_time_cpp:.6f} seconds") assert path_cpp is not None From e87c4ba264da795624bb050be29acb915abcf06f Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Fri, 2 Jan 2026 08:32:49 +0200 Subject: [PATCH 3/4] Update dimos/robot/all_blueprints.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- dimos/robot/all_blueprints.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index c7db7bb2e9..a128cc3c3f 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -56,7 +56,7 @@ all_modules = { - "astar_planner": "dimos.navigation.global_planner.planner", + "replanning_a_star_planner": "dimos.navigation.replanning_a_star.module", "camera_module": "dimos.hardware.camera.module", "depth_module": "dimos.robot.unitree_webrtc.depth_module", "detection_2d": "dimos.perception.detection2d.module2D", From ef4ccfe963070dd8539395c8535831f6aba1c900 Mon Sep 17 00:00:00 2001 From: paul-nechifor <1262969+paul-nechifor@users.noreply.github.com> Date: Fri, 2 Jan 2026 06:41:31 +0000 Subject: [PATCH 4/4] CI code cleanup --- dimos/robot/all_blueprints.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index a128cc3c3f..7dbbc9c67a 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -73,7 +73,6 @@ "navigation_skill": "dimos.agents.skills.navigation", "object_tracking": "dimos.perception.object_tracker", "osm_skill": "dimos.agents.skills.osm", - "replanning_a_star_planner": "dimos.navigation.replanning_a_star.module", "ros_nav": "dimos.navigation.rosnav", "spatial_memory": "dimos.perception.spatial_perception", "speak_skill": "dimos.agents.skills.speak_skill",