diff --git a/dimos/perception/detection2d/detic_2d_det.py b/dimos/perception/detection2d/detic_2d_det.py index bed5700521..ff0a1ad979 100644 --- a/dimos/perception/detection2d/detic_2d_det.py +++ b/dimos/perception/detection2d/detic_2d_det.py @@ -27,7 +27,7 @@ def __init__(self, iou_threshold=0.3, max_age=5): self.iou_threshold = iou_threshold self.max_age = max_age self.next_id = 1 - self.tracks = {} # id -> {bbox, class_id, age, etc} + self.tracks = {} # id -> {bbox, class_id, age, mask, etc} def _calculate_iou(self, bbox1, bbox2): """Calculate IoU between two bboxes in format [x1,y1,x2,y2]""" @@ -46,14 +46,15 @@ def _calculate_iou(self, bbox1, bbox2): return intersection / union if union > 0 else 0 - def update(self, detections): + def update(self, detections, masks): """Update tracker with new detections Args: detections: List of [x1,y1,x2,y2,score,class_id] + masks: List of segmentation masks corresponding to detections Returns: - List of [track_id, bbox, score, class_id] + List of [track_id, bbox, score, class_id, mask] """ if len(detections) == 0: # Age existing tracks @@ -101,15 +102,17 @@ def update(self, detections): self.tracks[track_id]["bbox"] = detections[best_idx][:4] self.tracks[track_id]["score"] = detections[best_idx][4] self.tracks[track_id]["age"] = 0 + self.tracks[track_id]["mask"] = masks[best_idx] matched_indices.add(best_idx) - # Add to results + # Add to results with mask result.append( [ track_id, detections[best_idx][:4], detections[best_idx][4], int(detections[best_idx][5]), + self.tracks[track_id]["mask"], ] ) @@ -127,10 +130,11 @@ def update(self, detections): "score": det[4], "class_id": int(det[5]), "age": 0, + "mask": masks[i], } - # Add to results - result.append([new_id, det[:4], det[4], int(det[5])]) + # Add to results with mask directly from the track + result.append([new_id, det[:4], det[4], int(det[5]), masks[i]]) return result @@ -301,24 +305,26 @@ def process_image(self, image): image: Input image in BGR format (OpenCV) Returns: - tuple: (bboxes, track_ids, class_ids, confidences, names) + tuple: (bboxes, track_ids, class_ids, confidences, names, masks) - bboxes: list of [x1, y1, x2, y2] coordinates - track_ids: list of tracking IDs (or -1 if no tracking) - class_ids: list of class indices - confidences: list of detection confidences - names: list of class names + - masks: list of segmentation masks (numpy arrays) """ # Run inference with Detic outputs = self.predictor(image) instances = outputs["instances"].to("cpu") - # Extract bounding boxes, classes, and scores + # Extract bounding boxes, classes, scores, and masks if len(instances) == 0: - return [], [], [], [], [] + return [], [], [], [], [], [] boxes = instances.pred_boxes.tensor.numpy() class_ids = instances.pred_classes.numpy() scores = instances.scores.numpy() + masks = instances.pred_masks.numpy() # Convert boxes to [x1, y1, x2, y2] format bboxes = [] @@ -331,16 +337,18 @@ def process_image(self, image): # Apply tracking detections = [] + filtered_masks = [] for i, bbox in enumerate(bboxes): if scores[i] >= self.threshold: # Format for tracker: [x1, y1, x2, y2, score, class_id] detections.append(bbox + [scores[i], class_ids[i]]) + filtered_masks.append(masks[i]) if not detections: - return [], [], [], [], [] + return [], [], [], [], [], [] - # Update tracker with detections - track_results = self.tracker.update(detections) + # Update tracker with detections and correctly aligned masks + track_results = self.tracker.update(detections, filtered_masks) # Process tracking results track_ids = [] @@ -348,15 +356,24 @@ def process_image(self, image): tracked_class_ids = [] tracked_scores = [] tracked_names = [] + tracked_masks = [] - for track_id, bbox, score, class_id in track_results: + for track_id, bbox, score, class_id, mask in track_results: track_ids.append(int(track_id)) tracked_bboxes.append(bbox.tolist() if isinstance(bbox, np.ndarray) else bbox) tracked_class_ids.append(int(class_id)) tracked_scores.append(score) tracked_names.append(self.class_names[int(class_id)]) - - return tracked_bboxes, track_ids, tracked_class_ids, tracked_scores, tracked_names + tracked_masks.append(mask) + + return ( + tracked_bboxes, + track_ids, + tracked_class_ids, + tracked_scores, + tracked_names, + tracked_masks, + ) def visualize_results(self, image, bboxes, track_ids, class_ids, confidences, names): """ diff --git a/dimos/perception/object_detection_stream.py b/dimos/perception/object_detection_stream.py index daa9fa8273..7a98a621b8 100644 --- a/dimos/perception/object_detection_stream.py +++ b/dimos/perception/object_detection_stream.py @@ -13,6 +13,12 @@ ) from dimos.types.vector import Vector from typing import Optional, Union +from dimos.types.manipulation import ObjectData + +from dimos.utils.logging_config import setup_logger + +# Initialize logger for the ObjectDetectionStream +logger = setup_logger("dimos.perception.object_detection_stream") class ObjectDetectionStream: @@ -22,6 +28,7 @@ class ObjectDetectionStream: 2. Estimates depth using Metric3D 3. Calculates 3D position and dimensions using camera intrinsics 4. Transforms coordinates to map frame + 5. Draws bounding boxes and segmentation masks on the frame Provides a stream of structured object data with position and rotation information. """ @@ -36,6 +43,7 @@ def __init__( transform_to_map=None, # Optional function to transform coordinates to map frame detector: Optional[Union[Detic2DDetector, Yolo2DDetector]] = None, video_stream: Observable = None, + disable_depth: bool = False, # Flag to disable monocular Metric3D depth estimation ): """ Initialize the ObjectDetectionStream. @@ -53,22 +61,29 @@ def __init__( self.min_confidence = min_confidence self.class_filter = class_filter self.transform_to_map = transform_to_map + self.disable_depth = disable_depth # Initialize object detector self.detector = detector or Detic2DDetector(vocabulary=None, threshold=min_confidence) + # Set up camera intrinsics + self.camera_intrinsics = camera_intrinsics # Initialize depth estimation model - self.depth_model = Metric3D(gt_depth_scale) + self.depth_model = None + if not disable_depth: + self.depth_model = Metric3D(gt_depth_scale) - # Set up camera intrinsics - self.camera_intrinsics = camera_intrinsics - if camera_intrinsics is not None: - self.depth_model.update_intrinsic(camera_intrinsics) + if camera_intrinsics is not None: + self.depth_model.update_intrinsic(camera_intrinsics) - # Create 3x3 camera matrix for calculations - fx, fy, cx, cy = camera_intrinsics - self.camera_matrix = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float32) + # Create 3x3 camera matrix for calculations + fx, fy, cx, cy = camera_intrinsics + self.camera_matrix = np.array( + [[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float32 + ) + else: + raise ValueError("camera_intrinsics must be provided") else: - raise ValueError("camera_intrinsics must be provided") + logger.info("Depth estimation disabled") # If video_stream is provided, create and store the stream immediately self.stream = None @@ -89,7 +104,9 @@ def create_stream(self, video_stream: Observable) -> Observable: def process_frame(frame): # Detect objects - bboxes, track_ids, class_ids, confidences, names = self.detector.process_image(frame) + bboxes, track_ids, class_ids, confidences, names, masks = self.detector.process_image( + frame + ) # Create visualization viz_frame = frame.copy() @@ -107,36 +124,46 @@ def process_frame(frame): if self.class_filter and class_name not in self.class_filter: continue - # Get depth for this object - depth = calculate_depth_from_bbox(self.depth_model, frame, bbox) - if depth is None: - # Skip objects with invalid depth - continue - - # Calculate object position and rotation - position, rotation = calculate_position_rotation_from_bbox( - bbox, depth, self.camera_intrinsics - ) - - # Get object dimensions - width, height = calculate_object_size_from_bbox(bbox, depth, self.camera_intrinsics) - - # Transform to map frame if a transform function is provided - try: - if self.transform_to_map: - position = Vector([position["x"], position["y"], position["z"]]) - rotation = Vector([rotation["roll"], rotation["pitch"], rotation["yaw"]]) - position, rotation = self.transform_to_map( - position, rotation, source_frame="base_link" - ) - position = dict(x=position.x, y=position.y, z=position.z) - rotation = dict(roll=rotation.x, pitch=rotation.y, yaw=rotation.z) - except Exception as e: - print(f"Error transforming to map frame: {e}") - position, rotation = position, rotation - - # Create object data dictionary - object_data = { + if not self.disable_depth: + # Get depth for this object + depth = calculate_depth_from_bbox(self.depth_model, frame, bbox) + if depth is None: + # Skip objects with invalid depth + continue + # Calculate object position and rotation + position, rotation = calculate_position_rotation_from_bbox( + bbox, depth, self.camera_intrinsics + ) + # Get object dimensions + width, height = calculate_object_size_from_bbox( + bbox, depth, self.camera_intrinsics + ) + + # Transform to map frame if a transform function is provided + try: + if self.transform_to_map: + position = Vector([position["x"], position["y"], position["z"]]) + rotation = Vector( + [rotation["roll"], rotation["pitch"], rotation["yaw"]] + ) + position, rotation = self.transform_to_map( + position, rotation, source_frame="base_link" + ) + position = dict(x=position.x, y=position.y, z=position.z) + rotation = dict(roll=rotation.x, pitch=rotation.y, yaw=rotation.z) + except Exception as e: + logger.error(f"Error transforming to map frame: {e}") + position, rotation = position, rotation + + else: + depth = -1 + position = Vector(0, 0, 0) + rotation = Vector(0, 0, 0) + width = -1 + height = -1 + + # Create a properly typed ObjectData instance + object_data: ObjectData = { "object_id": track_ids[i] if i < len(track_ids) else -1, "bbox": bbox, "depth": depth, @@ -146,6 +173,7 @@ def process_frame(frame): "position": position, "rotation": rotation, "size": {"width": width, "height": height}, + "segmentation_mask": masks[i], } objects.append(object_data) @@ -153,6 +181,25 @@ def process_frame(frame): # Add visualization x1, y1, x2, y2 = map(int, bbox) color = (0, 255, 0) # Green for detected objects + mask_color = (0, 200, 200) # Yellow-green for masks + + # Draw segmentation mask if available + if object_data["segmentation_mask"] is not None: + # Create a colored mask overlay + mask = object_data["segmentation_mask"].astype(np.uint8) + colored_mask = np.zeros_like(viz_frame) + colored_mask[mask > 0] = mask_color + + # Apply the mask with transparency + alpha = 0.5 # transparency factor + mask_area = mask > 0 + viz_frame[mask_area] = cv2.addWeighted( + viz_frame[mask_area], 1 - alpha, colored_mask[mask_area], alpha, 0 + ) + + # Draw mask contour + contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + cv2.drawContours(viz_frame, contours, -1, mask_color, 2) # Draw bounding box cv2.rectangle(viz_frame, (x1, y1), (x2, y2), color, 2) diff --git a/dimos/skills/manipulation/force_constraint_skill.py b/dimos/skills/manipulation/force_constraint_skill.py new file mode 100644 index 0000000000..d655627b42 --- /dev/null +++ b/dimos/skills/manipulation/force_constraint_skill.py @@ -0,0 +1,66 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, List, Tuple +from pydantic import Field + +from dimos.skills.skills import AbstractRobotSkill +from dimos.types.manipulation_constraint import ForceConstraint, Vector +from dimos.utils.logging_config import setup_logger + +# Initialize logger +logger = setup_logger("dimos.skills.force_constraint_skill") + + +class ForceConstraintSkill(AbstractRobotSkill): + """ + Skill for generating force constraints for robot manipulation. + """ + + # Constraint parameters + max_force: float = Field(0.0, description="Maximum force in newtons to apply") + min_force: float = Field(0.0, description="Minimum force in newtons to apply") + + # Force direction as (x,y) tuple + force_direction: Optional[Tuple[float, float]] = Field( + None, description="Force direction vector (x,y)" + ) + + # Description + description: str = Field("", description="Description of the force constraint") + + def __call__(self) -> ForceConstraint: + """ + Generate a force constraint based on the parameters. + + Returns: + ForceConstraint: The generated constraint + """ + # Create force direction vector if provided (convert 2D point to 3D vector with z=0) + force_direction_vector = None + if self.force_direction: + force_direction_vector = Vector(self.force_direction[0], self.force_direction[1], 0.0) + + # Create and return the constraint + constraint = ForceConstraint( + max_force=self.max_force, + min_force=self.min_force, + force_direction=force_direction_vector, + description=self.description, + ) + + # Log the constraint creation + logger.info(f"Generated force constraint: {self.description}") + + return constraint diff --git a/dimos/skills/manipulation/manipulate_skill.py b/dimos/skills/manipulation/manipulate_skill.py new file mode 100644 index 0000000000..c28c1d84df --- /dev/null +++ b/dimos/skills/manipulation/manipulate_skill.py @@ -0,0 +1,123 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, List, Tuple, Dict, Any, Union +from pydantic import Field, validator + +from dimos.skills.skills import AbstractRobotSkill +from dimos.types.manipulation_constraint import ( + AbstractConstraint, + ManipulationConstraint, + TranslationConstraint, + RotationConstraint, + ForceConstraint, + ConstraintType, +) +from dimos.utils.logging_config import setup_logger + +# Initialize logger +logger = setup_logger("dimos.skills.manipulate_skill") + + +class Manipulate(AbstractRobotSkill): + """ + Skill for executing manipulation tasks with constraints. + Can be called by an LLM with a list of manipulation constraints. + """ + + # Core parameters + name: str = Field("Manipulation Task", description="Name of the manipulation task") + + description: str = Field("", description="Description of the manipulation task") + + # Target object information + target_object: str = Field( + "", description="Semantic label of the target object (e.g., 'cup', 'box')" + ) + + # Constraints - can be set directly + constraints: List[Union[AbstractConstraint, str]] = Field( + [], + description="List of AbstractConstraint objects or constraint IDs from AgentMemory to apply to the manipulation task", + ) + + # Additional metadata TODO: Maybe put the movement tolerances and other LLM generated parameters here + # metadata: Dict[str, Any] = Field( + # None, + # description="Additional metadata for the manipulation task" + # ) + + def __call__(self) -> Dict[str, Any]: + """ + Execute a manipulation task with the given constraints. + + Returns: + Dict[str, Any]: Result of the manipulation operation + """ + # Create the manipulation constraint object + manipulation_constraint = self._build_manipulation_constraint() + + if not manipulation_constraint or not manipulation_constraint.constraints: + logger.error("No valid constraints provided for manipulation") + return {"success": False, "error": "No valid constraints provided"} + + # Execute the manipulation with the constraint + result = self._execute_manipulation(manipulation_constraint) + + # Log the execution + constraint_types = [ + c.get_constraint_type().value for c in manipulation_constraint.constraints + ] + logger.info(f"Executed manipulation '{self.name}' with constraints: {constraint_types}") + + return result + + def _build_manipulation_constraint(self) -> ManipulationConstraint: + """ + Build a ManipulationConstraint object from the provided parameters. + """ + # Initialize the task manipulation constraint + constraint = ManipulationConstraint( + name=self.name, + description=self.description, + target_object=self.target_object, + # metadata=self.metadata or {} + ) + + # Add constraints directly or resolve from IDs + for c in self.constraints: + if isinstance(c, AbstractConstraint): + constraint.add_constraint(c) + elif isinstance(c, str) and self._robot and hasattr(self._robot, "get_constraint"): + # Try to load constraint from ID + saved_constraint = self._robot.get_constraint( + c + ) # TODO: implement constraint ID retrieval library + if saved_constraint: + constraint.add_constraint(saved_constraint) + + return constraint + + # TODO: Implement + def _execute_manipulation(self, constraint: ManipulationConstraint) -> Dict[str, Any]: + """ + Execute the manipulation with the given constraint. + + Args: + constraint: The manipulation constraint to use + + Returns: + Dict[str, Any]: Result of the manipulation operation + """ + pass diff --git a/dimos/skills/manipulation/rotation_constraint_skill.py b/dimos/skills/manipulation/rotation_constraint_skill.py new file mode 100644 index 0000000000..71dbc22acd --- /dev/null +++ b/dimos/skills/manipulation/rotation_constraint_skill.py @@ -0,0 +1,115 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, List, Tuple +from pydantic import Field + +from dimos.skills.skills import AbstractRobotSkill +from dimos.types.manipulation_constraint import RotationConstraint +from dimos.utils.logging_config import setup_logger +from dimos.types.vector import Vector + +# Initialize logger +logger = setup_logger("dimos.skills.rotation_constraint_skill") + + +class RotationConstraintSkill(AbstractRobotSkill): + """ + Skill for generating rotation constraints for robot manipulation. + """ + + # Constraint parameters + lock_roll: bool = Field(False, description="Whether to lock rotation around the x-axis") + lock_pitch: bool = Field(False, description="Whether to lock rotation around the y-axis") + lock_yaw: bool = Field(False, description="Whether to lock rotation around the z-axis") + + # Simple angle values for rotation (in radians) + start_angle: Optional[float] = Field(None, description="Starting angle in radians") + end_angle: Optional[float] = Field(None, description="Ending angle in radians") + + # Pivot points as (x,y) tuples + pivot_point: Optional[Tuple[float, float]] = Field( + None, description="Pivot point (x,y) for rotation" + ) + + # TODO: Secondary pivot point if needed (for double-point locked rotation) + secondary_pivot_point: Optional[Tuple[float, float]] = Field( + None, description="Secondary pivot point (x,y) for double-pivot rotation" + ) + + # Description + description: str = Field("", description="Description of the rotation constraint") + + def __call__(self) -> RotationConstraint: + """ + Generate a rotation constraint based on the parameters. + + Returns: + RotationConstraint: The generated constraint + """ + # Figure out which axis we're rotating around based on what's not locked + rotation_axis = None + if not self.lock_roll: + rotation_axis = "roll" + elif not self.lock_pitch: + rotation_axis = "pitch" + elif not self.lock_yaw: + rotation_axis = "yaw" + + # start angle vector with angle applied about correct locked axis + start_angle_vector = None + if self.start_angle is not None: + start_angle_vector = Vector( + self.start_angle if rotation_axis == "roll" else 0.0, + self.start_angle if rotation_axis == "pitch" else 0.0, + self.start_angle if rotation_axis == "yaw" else 0.0, + ) + + # end angle vector with angle applied about correct locked axis + end_angle_vector = None + if self.end_angle is not None: + end_angle_vector = Vector( + self.end_angle if rotation_axis == "roll" else 0.0, + self.end_angle if rotation_axis == "pitch" else 0.0, + self.end_angle if rotation_axis == "yaw" else 0.0, + ) + + # Create pivot point vector if provided (convert 2D point to 3D vector with z=0) + pivot_point_vector = None + if self.pivot_point: + pivot_point_vector = Vector(self.pivot_point[0], self.pivot_point[1], 0.0) + + # Create secondary pivot point vector if provided + secondary_pivot_vector = None + if self.secondary_pivot_point: + secondary_pivot_vector = Vector( + self.secondary_pivot_point[0], self.secondary_pivot_point[1], 0.0 + ) + + # Create and return the constraint + constraint = RotationConstraint( + lock_roll=self.lock_roll, + lock_pitch=self.lock_pitch, + lock_yaw=self.lock_yaw, + start_angle=start_angle_vector, + end_angle=end_angle_vector, + pivot_point=pivot_point_vector, + secondary_pivot_point=secondary_pivot_vector, + description=self.description, + ) + + # Log the constraint creation + logger.info(f"Generated rotation constraint: {self.description}") + + return constraint diff --git a/dimos/skills/manipulation/translation_constraint_skill.py b/dimos/skills/manipulation/translation_constraint_skill.py new file mode 100644 index 0000000000..c03dc092cc --- /dev/null +++ b/dimos/skills/manipulation/translation_constraint_skill.py @@ -0,0 +1,97 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, List, Tuple +from pydantic import Field + +from dimos.skills.skills import AbstractRobotSkill +from dimos.types.manipulation_constraint import TranslationConstraint, Vector +from dimos.utils.logging_config import setup_logger + +# Initialize logger +logger = setup_logger("dimos.skills.translation_constraint_skill") + + +class TranslationConstraintSkill(AbstractRobotSkill): + """ + Skill for generating translation constraints for robot manipulation. + """ + + # Constraint parameters + lock_x: bool = Field(False, description="Whether to lock translation along the x-axis") + lock_y: bool = Field(False, description="Whether to lock translation along the y-axis") + lock_z: bool = Field(False, description="Whether to lock translation along the z-axis") + + reference_point: Optional[Tuple[float, float]] = Field( + None, description="Reference point (x,y) on the target object for translation constraining" + ) + + bounds_min: Optional[Tuple[float, float]] = Field( + None, description="Minimum bounds (x,y) for bounded translation" + ) + + bounds_max: Optional[Tuple[float, float]] = Field( + None, description="Maximum bounds (x,y) for bounded translation" + ) + + target_point: Optional[Tuple[float, float]] = Field( + None, description="Final target position (x,y) for translation constraining" + ) + + # Description + description: str = Field("", description="Description of the translation constraint") + + def __call__(self) -> TranslationConstraint: + """ + Generate a translation constraint based on the parameters. + + Returns: + TranslationConstraint: The generated constraint + """ + # Create reference point vector if provided (convert 2D point to 3D vector with z=0) + reference_point = None + if self.reference_point: + reference_point = Vector(self.reference_point[0], self.reference_point[1], 0.0) + + # Create bounds minimum vector if provided + bounds_min = None + if self.bounds_min: + bounds_min = Vector(self.bounds_min[0], self.bounds_min[1], 0.0) + + # Create bounds maximum vector if provided + bounds_max = None + if self.bounds_max: + bounds_max = Vector(self.bounds_max[0], self.bounds_max[1], 0.0) + + # Create relative target vector if provided + target_point = None + if self.target_point: + target_point = Vector(self.target_point[0], self.target_point[1], 0.0) + + # Create and return the constraint + constraint = TranslationConstraint( + lock_x=self.lock_x, + lock_y=self.lock_y, + lock_z=self.lock_z, + reference_point=reference_point, + bounds_min=bounds_min, + bounds_max=bounds_max, + target_point=target_point, + description=self.description, + ) + + # Log the constraint creation + logger.info(f"Generated translation constraint: {self.description}") + + return constraint diff --git a/dimos/types/manipulation.py b/dimos/types/manipulation.py new file mode 100644 index 0000000000..c568a0e80e --- /dev/null +++ b/dimos/types/manipulation.py @@ -0,0 +1,162 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import Dict, List, Optional, Any, Union, TypedDict, Tuple +from dataclasses import dataclass, field, fields +from abc import ABC, abstractmethod +import uuid +import numpy as np +import time +from dimos.types.vector import Vector + + +class ConstraintType(Enum): + """Types of manipulation constraints.""" + + TRANSLATION = "translation" + ROTATION = "rotation" + FORCE = "force" + + +@dataclass +class AbstractConstraint(ABC): + """Base class for all manipulation constraints.""" + + description: str = "" + id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) + + +@dataclass +class TranslationConstraint(AbstractConstraint): + """Constraint parameters for translational movement.""" + + lock_x: bool = False + lock_y: bool = False + lock_z: bool = False + reference_point: Optional[Vector] = None + bounds_min: Optional[Vector] = None # For bounded translation + bounds_max: Optional[Vector] = None # For bounded translation + target_point: Optional[Vector] = None # For relative positioning + description: str = "" + + +@dataclass +class RotationConstraint(AbstractConstraint): + """Constraint parameters for rotational movement.""" + + lock_roll: bool = False + lock_pitch: bool = False + lock_yaw: bool = False + start_angle: Optional[Vector] = None # Roll, pitch, yaw start angles + end_angle: Optional[Vector] = None # Roll, pitch, yaw end angles + pivot_point: Optional[Vector] = None # Point of rotation + secondary_pivot_point: Optional[Vector] = None # For double point locked rotation + description: str = "" + + +@dataclass +class ForceConstraint(AbstractConstraint): + """Constraint parameters for force application.""" + + max_force: float = 0.0 # Maximum force in newtons + min_force: float = 0.0 # Minimum force in newtons + force_direction: Optional[Vector] = None # Direction of force application + description: str = "" + + +class ObjectData(TypedDict, total=False): + """Data about an object in the manipulation scene.""" + + object_id: int # Unique ID for the object + bbox: List[float] # Bounding box [x1, y1, x2, y2] + depth: float # Depth in meters from Metric3d + confidence: float # Detection confidence + class_id: int # Class ID from the detector + label: str # Semantic label (e.g., 'cup', 'table') + movement_tolerance: float # 0-1 value indicating how movable the object is + segmentation_mask: np.ndarray # Binary mask of the object's pixels + position: Dict[str, float] # 3D position {x, y, z} + rotation: Dict[str, float] # 3D rotation {roll, pitch, yaw} + size: Dict[str, float] # Object dimensions {width, height} + + +class ManipulationMetadata(TypedDict, total=False): + """Typed metadata for manipulation constraints.""" + + timestamp: float + objects: Dict[str, ObjectData] + + +@dataclass +class ManipulationTaskConstraint: + """Set of constraints for a specific manipulation action.""" + + constraints: List[AbstractConstraint] = field(default_factory=list) + + def add_constraint(self, constraint: AbstractConstraint): + """Add a constraint to this set.""" + if constraint not in self.constraints: + self.constraints.append(constraint) + + def get_constraints(self) -> List[AbstractConstraint]: + """Get all constraints in this set.""" + return self.constraints + + +@dataclass +class ManipulationTask: + """Complete definition of a manipulation task.""" + + description: str + target_object: str # Semantic label of target object + target_point: Optional[Tuple[float, float]] = ( + None # (X,Y) point in pixel-space of the point to manipulate on target object + ) + metadata: ManipulationMetadata = field(default_factory=dict) + timestamp: float = field(default_factory=time.time) + task_id: str = "" + result: Optional[Dict[str, Any]] = None # Any result data from the task execution + constraints: Union[List[AbstractConstraint], ManipulationTaskConstraint, AbstractConstraint] = ( + field(default_factory=list) + ) + + def add_constraint(self, constraint: AbstractConstraint): + """Add a constraint to this manipulation task.""" + # If constraints is a ManipulationTaskConstraint object + if isinstance(self.constraints, ManipulationTaskConstraint): + self.constraints.add_constraint(constraint) + return + + # If constraints is a single AbstractConstraint, convert to list + if isinstance(self.constraints, AbstractConstraint): + self.constraints = [self.constraints, constraint] + return + + # If constraints is a list, append to it + # This will also handle empty lists (the default case) + self.constraints.append(constraint) + + def get_constraints(self) -> List[AbstractConstraint]: + """Get all constraints in this manipulation task.""" + # If constraints is a ManipulationTaskConstraint object + if isinstance(self.constraints, ManipulationTaskConstraint): + return self.constraints.get_constraints() + + # If constraints is a single AbstractConstraint, return as list + if isinstance(self.constraints, AbstractConstraint): + return [self.constraints] + + # If constraints is a list (including empty list), return it + return self.constraints diff --git a/dimos/types/manipulation_constraint.py b/dimos/types/manipulation_constraint.py new file mode 100644 index 0000000000..0446539008 --- /dev/null +++ b/dimos/types/manipulation_constraint.py @@ -0,0 +1,109 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import Optional, List, Tuple, Dict, Any, Union +from dataclasses import dataclass +from abc import ABC, abstractmethod +from dimos.types.vector import Vector + + +class ConstraintType(Enum): + """Types of manipulation constraints.""" + + TRANSLATION = "translation" + ROTATION = "rotation" + FORCE = "force" + + +@dataclass +class AbstractConstraint(ABC): + """Base class for all manipulation constraints.""" + + description: str = "" + + +@dataclass +class TranslationConstraint(AbstractConstraint): + """Constraint parameters for translational movement.""" + + lock_x: bool = False + lock_y: bool = False + lock_z: bool = False + reference_point: Optional[Vector] = None + bounds_min: Optional[Vector] = None # For bounded translation + bounds_max: Optional[Vector] = None # For bounded translation + target_point: Optional[Vector] = None # For relative positioning + description: str = "" + + +@dataclass +class RotationConstraint(AbstractConstraint): + """Constraint parameters for rotational movement.""" + + lock_roll: bool = False + lock_pitch: bool = False + lock_yaw: bool = False + start_angle: Optional[Vector] = None # Roll, pitch, yaw start angles + end_angle: Optional[Vector] = None # Roll, pitch, yaw end angles + pivot_point: Optional[Vector] = None # Point of rotation + secondary_pivot_point: Optional[Vector] = None # For double point locked rotation + description: str = "" + + +@dataclass +class ForceConstraint(AbstractConstraint): + """Constraint parameters for force application.""" + + max_force: float = 0.0 # Maximum force in newtons + min_force: float = 0.0 # Minimum force in newtons + force_direction: Optional[Vector] = None # Direction of force application + description: str = "" + + +@dataclass +class ManipulationConstraint: + """Complete set of constraints for a manipulation task.""" + + name: str + description: str + constraints: List[AbstractConstraint] = None + + target_object: str = "" # Semantic label of target object + metadata: Dict[str, Any] = None + + def __post_init__(self): + # Initialize metadata dictionary if None + if self.metadata is None: + self.metadata = {} + + # Initialize constraints list if None + if self.constraints is None: + self.constraints = [] + + def add_constraint(self, constraint: AbstractConstraint): + """Add a constraint to this manipulation task.""" + if self.constraints is None: + self.constraints = [] + + # Add the constraint to the list + if constraint not in self.constraints: + self.constraints.append(constraint) + + def get_constraints_by_type(self, constraint_type: ConstraintType) -> List[AbstractConstraint]: + """Get all constraints of a specific type.""" + if self.constraints is None: + return [] + + return [c for c in self.constraints if c.get_constraint_type() == constraint_type] diff --git a/tests/test_manipulation_agent.py b/tests/test_manipulation_agent.py new file mode 100644 index 0000000000..7218f6b1e5 --- /dev/null +++ b/tests/test_manipulation_agent.py @@ -0,0 +1,344 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tests.test_header +import os + +import time +from dotenv import load_dotenv +from dimos.agents.claude_agent import ClaudeAgent +from dimos.robot.unitree.unitree_go2 import UnitreeGo2 +from dimos.robot.unitree.unitree_ros_control import UnitreeROSControl +from dimos.robot.unitree.unitree_skills import MyUnitreeSkills +from dimos.web.robot_web_interface import RobotWebInterface +from dimos.skills.observe_stream import ObserveStream +from dimos.skills.kill_skill import KillSkill +from dimos.skills.navigation import NavigateWithText, GetPose, NavigateToGoal +from dimos.skills.visual_navigation_skills import FollowHuman +import reactivex as rx +import reactivex.operators as ops +from dimos.stream.audio.pipelines import tts, stt +import threading +import json +import cv2 +import numpy as np +import os +import datetime +from dimos.types.vector import Vector +from dimos.skills.speak import Speak +from dimos.perception.object_detection_stream import ObjectDetectionStream +from dimos.perception.detection2d.detic_2d_det import Detic2DDetector +from dimos.agents.agent import OpenAIAgent +from dimos.agents.tokenizer.huggingface_tokenizer import HuggingFaceTokenizer +from openai import OpenAI +from dimos.utils.reactive import backpressure +from dimos.stream.video_provider import VideoProvider +from reactivex.subject import Subject, BehaviorSubject +from dimos.utils.logging_config import setup_logger + +# Initialize logger for the agent module +logger = setup_logger("dimos.tests.test_manipulation_agent") + +# Load API key from environment +load_dotenv() + +# Allow command line arguments to control spatial memory parameters +import argparse + + +def parse_arguments(): + parser = argparse.ArgumentParser( + description="Run the robot with optional spatial memory parameters" + ) + parser.add_argument( + "--new-memory", action="store_true", help="Create a new spatial memory from scratch" + ) + parser.add_argument( + "--spatial-memory-dir", type=str, help="Directory for storing spatial memory data" + ) + return parser.parse_args() + + +args = parse_arguments() + +# Initialize robot with spatial memory parameters +robot = UnitreeGo2( + ip=os.getenv("ROBOT_IP"), + ros_control=UnitreeROSControl(), + skills=MyUnitreeSkills(), + mock_connection=False, + spatial_memory_dir=args.spatial_memory_dir, # Will use default if None + new_memory=args.new_memory, +) # Create a new memory if specified + +# Create a subject for agent responses +agent_response_subject = rx.subject.Subject() +agent_response_stream = agent_response_subject.pipe(ops.share()) +local_planner_viz_stream = robot.local_planner_viz_stream.pipe(ops.share()) + +# Initialize object detection stream +min_confidence = 0.6 +class_filter = None # No class filtering +detector = Detic2DDetector(vocabulary=None, threshold=min_confidence) + +# Create video stream from robot's camera +video_stream = backpressure(robot.get_ros_video_stream()) + +# Initialize test video stream +# video_stream = VideoProvider( +# dev_name="UnitreeGo2", +# video_source=f"{os.getcwd()}/assets/trimmed_video_office.mov" +# ).capture_video_as_observable(realtime=False, fps=1) + +# Initialize ObjectDetectionStream with robot +object_detector = ObjectDetectionStream( + camera_intrinsics=robot.camera_intrinsics, + min_confidence=min_confidence, + class_filter=class_filter, + transform_to_map=robot.ros_control.transform_pose, + detector=detector, + video_stream=video_stream, +) + +# Create visualization stream for web interface (detection visualization) +viz_stream = backpressure(object_detector.get_stream()).pipe( + ops.share(), + ops.map(lambda x: x["viz_frame"] if x is not None else None), + ops.filter(lambda x: x is not None), +) + + +# Helper function to draw a manipulation point on a frame +def draw_point_on_frame(frame, x, y): + # Draw a circle at the manipulation point + cv2.circle(frame, (x, y), 10, (0, 0, 255), -1) # Red circle + cv2.circle(frame, (x, y), 12, (255, 255, 255), 2) # White border + + # Add text with coordinates + cv2.putText( + frame, f"({x},{y})", (x + 15, y + 15), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2 + ) + + return frame + + +# Function to add manipulation point to stream frames +def draw_manipulation_point(frame): + try: + if frame is None or latest_manipulation_point.value is None: + return frame + + # Make a copy to avoid modifying the original frame + viz_frame = frame.copy() + + # Get the latest manipulation point coordinates + x, y = latest_manipulation_point.value + + # Draw the point using our helper function + draw_point_on_frame(viz_frame, x, y) + + return viz_frame + except Exception as e: + logger.error(f"Error drawing manipulation point: {e}") + return frame + + +# Create manipulation point visualization stream +manipulation_viz_stream = video_stream.pipe( + ops.map(draw_manipulation_point), ops.filter(lambda x: x is not None), ops.share() +) + +# Get the formatted detection stream +formatted_detection_stream = object_detector.get_formatted_stream().pipe( + ops.filter(lambda x: x is not None) +) + + +# Create a direct mapping that combines detection data with locations +def combine_with_locations(object_detections): + # Get locations from spatial memory + try: + locations = robot.get_spatial_memory().get_robot_locations() + + # Format the locations section + locations_text = "\n\nSaved Robot Locations:\n" + if locations: + for loc in locations: + locations_text += f"- {loc.name}: Position ({loc.position[0]:.2f}, {loc.position[1]:.2f}, {loc.position[2]:.2f}), " + locations_text += f"Rotation ({loc.rotation[0]:.2f}, {loc.rotation[1]:.2f}, {loc.rotation[2]:.2f})\n" + else: + locations_text += "None\n" + + # Simply concatenate the strings + return object_detections + locations_text + except Exception as e: + print(f"Error adding locations: {e}") + return object_detections + + +# Create the combined stream with a simple pipe operation +enhanced_data_stream = formatted_detection_stream.pipe(ops.map(combine_with_locations), ops.share()) + +streams = { + "unitree_video": video_stream, + "local_planner_viz": local_planner_viz_stream, + "object_detection": viz_stream, + "manipulation_point": manipulation_viz_stream, +} +text_streams = { + "agent_responses": agent_response_stream, +} + +web_interface = RobotWebInterface(port=5555, text_streams=text_streams, **streams) + +# stt_node = stt() + +# Read system query from prompt.txt file +with open( + os.path.join(os.path.dirname(os.path.dirname(__file__)), "assets", "agent", "prompt.txt"), "r" +) as f: + system_query = f.read() + +# Create Qwen client +qwen_client = OpenAI( + base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1", + api_key=os.getenv("ALIBABA_API_KEY"), +) + +# Create response subject +response_subject = rx.subject.Subject() + +# Create behavior subjects to store the current frame and latest manipulation point +# BehaviorSubject stores the latest value and provides it to new subscribers +current_frame_subject = BehaviorSubject(None) +latest_manipulation_point = BehaviorSubject(None) # Will store (x, y) tuple + + +# Function to parse manipulation point coordinates from VLM response +def process_manipulation_point(response, frame): + logger.info(f"Processing manipulation point with response: {response}") + try: + # Parse coordinates from response (format: "x,y") + coords = response.strip().split(",") + if len(coords) != 2: + logger.error(f"Invalid coordinate format: {response}") + return + + x, y = int(coords[0]), int(coords[1]) + + # Update the latest manipulation point subject with the new coordinates + latest_manipulation_point.on_next((x, y)) + + # Save a static image with the point for reference + save_manipulation_point_image(frame, x, y) + + except Exception as e: + logger.error(f"Error processing manipulation point: {e}") + + +# Function to save a static image with manipulation point visualization +def save_manipulation_point_image(frame, x, y): + try: + if frame is None: + logger.error("Cannot save manipulation point image: frame is None") + return + + # Create a copy of the frame for static image saving + visualization = frame.copy() + + # Draw the manipulation point + draw_point_on_frame(visualization, x, y) + + # Create directory if it doesn't exist + output_dir = os.path.join(os.getcwd(), "assets", "agent", "manipulation_agent") + os.makedirs(output_dir, exist_ok=True) + + # Save image with timestamp + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + output_path = os.path.join(output_dir, f"manipulation_point_{timestamp}.jpg") + cv2.imwrite(output_path, visualization) + + logger.info(f"Saved manipulation point visualization to {output_path}") + except Exception as e: + logger.error(f"Error saving manipulation point image: {e}") + + +# Subscribe to video stream to capture current frame +# Use BehaviorSubject to store the latest frame for manipulation point visualization +video_stream.subscribe( + on_next=lambda frame: current_frame_subject.on_next( + frame.copy() if frame is not None else None + ), + on_error=lambda error: logger.error(f"Error in video stream: {error}"), +) + +# Create temporary agent for processing +manipulation_vlm = OpenAIAgent( + dev_name="QwenSingleFrameAgent", + openai_client=qwen_client, + model_name="qwen2.5-vl-72b-instruct", + tokenizer=HuggingFaceTokenizer(model_name=f"Qwen/qwen2.5-vl-72b-instruct"), + max_output_tokens_per_request=100, + system_query="Given the input task return ONLY the x,y coordinates of the ONE exact point on the object that would be best to grasp", + input_video_stream=video_stream, + input_query_stream=web_interface.query_stream, +) + +# Subscribe to VLM responses to process manipulation points +manipulation_vlm.get_response_observable().subscribe( + on_next=lambda response: process_manipulation_point(response, current_frame_subject.value), + on_error=lambda error: logger.error(f"Error in VLM response stream: {error}"), +) + + +# Create a ClaudeAgent instance +# manipulation_agent = ClaudeAgent( +# dev_name="test_agent", +# # input_query_stream=stt_node.emit_text(), +# input_query_stream=manipulation_vlm.get_response_observable(), +# input_data_stream=enhanced_data_stream, # Add the enhanced data stream +# skills=robot.get_skills(), +# system_query="system_query", +# model_name="claude-3-7-sonnet-latest", +# thinking_budget_tokens=0 +# ) + +# tts_node = tts() +# tts_node.consume_text(agent.get_response_observable()) + +# robot_skills = robot.get_skills() +# robot_skills.add(ObserveStream) +# robot_skills.add(KillSkill) +# robot_skills.add(NavigateWithText) +# robot_skills.add(FollowHuman) +# robot_skills.add(GetPose) +# # robot_skills.add(Speak) +# robot_skills.add(NavigateToGoal) +# robot_skills.create_instance("ObserveStream", robot=robot, agent=manipulation_agent) +# robot_skills.create_instance("KillSkill", robot=robot, skill_library=robot_skills) +# robot_skills.create_instance("NavigateWithText", robot=robot) +# robot_skills.create_instance("FollowHuman", robot=robot) +# robot_skills.create_instance("GetPose", robot=robot) +# robot_skills.create_instance("NavigateToGoal", robot=robot) +# robot_skills.create_instance("Speak", tts_node=tts_node) + +# Subscribe to agent responses and send them to the subject +# manipulation_agent.get_response_observable().subscribe( +# lambda x: agent_response_subject.on_next(x) +# ) + +print("ObserveStream and Kill skills registered and ready for use") +print("Created memory.txt file") + +web_interface.run() diff --git a/tests/test_object_detection_stream.py b/tests/test_object_detection_stream.py index 261f59382a..9803f30a6b 100644 --- a/tests/test_object_detection_stream.py +++ b/tests/test_object_detection_stream.py @@ -121,6 +121,7 @@ def main(): transform_to_map=robot.ros_control.transform_pose, detector=detector, video_stream=video_stream, + disable_depth=True, ) else: # webcam mode