Skip to content
Merged
1 change: 1 addition & 0 deletions .envrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ if ! has nix_direnv_version || ! nix_direnv_version 3.0.6; then
source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/3.0.6/direnvrc" "sha256-RYcUJaRMf8oF5LznDrlCXbkOQrywm0HDv1VjYGaJGdM="
fi
use flake .
dotenv
41 changes: 35 additions & 6 deletions dimos/agents2/skills/navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from dimos.msgs.geometry_msgs.Vector3 import make_vector3
from dimos.utils.transform_utils import euler_to_quaternion, quaternion_to_euler
from dimos.utils.logging_config import setup_logger
from dimos.navigation.bt_navigator.navigator import NavigatorState
from reactivex.disposable import Disposable, CompositeDisposable

logger = setup_logger(__file__)
Expand Down Expand Up @@ -162,13 +163,41 @@ def _navigate_to_object(self, query: str) -> Optional[str]:

logger.info(f"Found {query} at {bbox}")

success = self._robot.navigate_to_object(bbox)
# Start tracking - BBoxNavigationModule automatically generates goals
self._robot.object_tracker.track(bbox)

if not success:
logger.warning(f"Failed to navigate to '{query}' at {bbox}")
return None

return "Successfully navigated to object from query '{query}'."
start_time = time.time()
timeout = 30.0
goal_set = False

while time.time() - start_time < timeout:
# Check if navigator finished
if self._robot.navigator.get_state() == NavigatorState.IDLE and goal_set:
logger.info("Waiting for goal result")
time.sleep(1.0)
if not self._robot.navigator.is_goal_reached():
logger.info(f"Goal cancelled, tracking '{query}' failed")
self._robot.object_tracker.stop_track()
return None
else:
logger.info(f"Reached '{query}'")
self._robot.object_tracker.stop_track()
return f"Successfully arrived at '{query}'"

# If goal set and tracking lost, just continue (tracker will resume or timeout)
if goal_set and not self._robot.object_tracker.is_tracking():
continue

# BBoxNavigationModule automatically sends goals when tracker publishes
# Just check if we have any detections to mark goal_set
if self._robot.object_tracker.is_tracking():
goal_set = True

time.sleep(0.25)

logger.warning(f"Navigation to '{query}' timed out after {timeout}s")
self._robot.object_tracker.stop_track()
return None

def _get_bbox_for_current_frame(self, query: str) -> Optional[BBox]:
if self._latest_image is None:
Expand Down
23 changes: 20 additions & 3 deletions dimos/agents2/skills/test_navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,20 @@ def test_take_a_look_around(fake_robot, create_navigation_agent, mocker):


def test_go_to_object(fake_robot, create_navigation_agent, mocker):
fake_robot.navigate_to_object.return_value = True
fake_robot.object_tracker = mocker.MagicMock()
fake_robot.object_tracker.is_tracking.side_effect = [True, True, True, True] # Tracking active
fake_robot.navigator = mocker.MagicMock()

# Simulate navigation states: FOLLOWING_PATH -> IDLE (goal reached)
from dimos.navigation.bt_navigator.navigator import NavigatorState

fake_robot.navigator.get_state.side_effect = [
NavigatorState.FOLLOWING_PATH,
NavigatorState.FOLLOWING_PATH,
NavigatorState.IDLE,
]
fake_robot.navigator.is_goal_reached.return_value = True

mocker.patch(
"dimos.agents2.skills.navigation.NavigationSkillContainer._navigate_by_tagged_location",
return_value=None,
Expand All @@ -45,19 +58,23 @@ def test_go_to_object(fake_robot, create_navigation_agent, mocker):
"dimos.agents2.skills.navigation.NavigationSkillContainer._navigate_using_semantic_map",
return_value=None,
)
mocker.patch("dimos.agents2.skills.navigation.time.sleep")

agent = create_navigation_agent(fixture="test_go_to_object.json")

agent.query("go to the chair")

fake_robot.navigate_to_object.assert_called_once()
actual_bbox = fake_robot.navigate_to_object.call_args[0][0]
fake_robot.object_tracker.track.assert_called_once()
actual_bbox = fake_robot.object_tracker.track.call_args[0][0]
expected_bbox = (82, 51, 163, 159)

for actual_val, expected_val in zip(actual_bbox, expected_bbox):
assert abs(actual_val - expected_val) <= 5, (
f"BBox {actual_bbox} not within ±5 of {expected_bbox}"
)

fake_robot.object_tracker.stop_track.assert_called_once()


def test_go_to_semantic_location(fake_robot, create_navigation_agent, mocker):
mocker.patch(
Expand Down
3 changes: 3 additions & 0 deletions dimos/msgs/vision_msgs/Detection2DArray.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@

class Detection2DArray(LCMDetection2DArray):
msg_name = "vision_msgs.Detection2DArray"

# for _get_field_type() to work when decoding in _decode_one()
__annotations__ = LCMDetection2DArray.__annotations__
66 changes: 66 additions & 0 deletions dimos/navigation/bbox_navigation.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just going towards the target for self.goal_distance m. It is not using any sort of distance estimate of the target, this is a todo right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we need clean modules that are composable. This only vectors toward a bounding box

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The distance estimation is handled on @leshy side since he has ground truth object locations via his localization

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dimos.core import Module, In, Out, rpc
from dimos.msgs.vision_msgs import Detection2DArray
from dimos.msgs.geometry_msgs import PoseStamped, Vector3, Quaternion
from dimos_lcm.sensor_msgs import CameraInfo
from dimos.utils.logging_config import setup_logger
import logging

logger = setup_logger(__name__, level=logging.DEBUG)


class BBoxNavigationModule(Module):
"""Minimal module that converts 2D bbox center to navigation goals."""

detection2d: In[Detection2DArray] = None
camera_info: In[CameraInfo] = None
goal_request: Out[PoseStamped] = None

def __init__(self, goal_distance: float = 1.0):
super().__init__()
self.goal_distance = goal_distance
self.camera_intrinsics = None

@rpc
def start(self):
self.camera_info.subscribe(
lambda msg: setattr(self, "camera_intrinsics", [msg.K[0], msg.K[4], msg.K[2], msg.K[5]])
)
self.detection2d.subscribe(self._on_detection)

def _on_detection(self, det: Detection2DArray):
if det.detections_length == 0 or not self.camera_intrinsics:
return
fx, fy, cx, cy = self.camera_intrinsics
center_x, center_y = (
det.detections[0].bbox.center.position.x,
det.detections[0].bbox.center.position.y,
)
x, y, z = (
(center_x - cx) / fx * self.goal_distance,
(center_y - cy) / fy * self.goal_distance,
self.goal_distance,
)
goal = PoseStamped(
position=Vector3(z, -x, -y),
orientation=Quaternion(0, 0, 0, 1),
frame_id=det.header.frame_id,
)
logger.debug(
f"BBox center: ({center_x:.1f}, {center_y:.1f}) → "
f"Goal pose: ({z:.2f}, {-x:.2f}, {-y:.2f}) in frame '{det.header.frame_id}'"
)
self.goal_request.publish(goal)
Loading
Loading