Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions dimos/perception/detection2d/detic_2d_det.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, iou_threshold=0.3, max_age=5):
self.iou_threshold = iou_threshold
self.max_age = max_age
self.next_id = 1
self.tracks = {} # id -> {bbox, class_id, age, etc}
self.tracks = {} # id -> {bbox, class_id, age, mask, etc}

def _calculate_iou(self, bbox1, bbox2):
"""Calculate IoU between two bboxes in format [x1,y1,x2,y2]"""
Expand All @@ -46,14 +46,15 @@ def _calculate_iou(self, bbox1, bbox2):

return intersection / union if union > 0 else 0

def update(self, detections):
def update(self, detections, masks):
"""Update tracker with new detections

Args:
detections: List of [x1,y1,x2,y2,score,class_id]
masks: List of segmentation masks corresponding to detections

Returns:
List of [track_id, bbox, score, class_id]
List of [track_id, bbox, score, class_id, mask]
"""
if len(detections) == 0:
# Age existing tracks
Expand Down Expand Up @@ -101,15 +102,17 @@ def update(self, detections):
self.tracks[track_id]["bbox"] = detections[best_idx][:4]
self.tracks[track_id]["score"] = detections[best_idx][4]
self.tracks[track_id]["age"] = 0
self.tracks[track_id]["mask"] = masks[best_idx]
matched_indices.add(best_idx)

# Add to results
# Add to results with mask
result.append(
[
track_id,
detections[best_idx][:4],
detections[best_idx][4],
int(detections[best_idx][5]),
self.tracks[track_id]["mask"],
]
)

Expand All @@ -127,10 +130,11 @@ def update(self, detections):
"score": det[4],
"class_id": int(det[5]),
"age": 0,
"mask": masks[i],
}

# Add to results
result.append([new_id, det[:4], det[4], int(det[5])])
# Add to results with mask directly from the track
result.append([new_id, det[:4], det[4], int(det[5]), masks[i]])

return result

Expand Down Expand Up @@ -301,24 +305,26 @@ def process_image(self, image):
image: Input image in BGR format (OpenCV)

Returns:
tuple: (bboxes, track_ids, class_ids, confidences, names)
tuple: (bboxes, track_ids, class_ids, confidences, names, masks)
- bboxes: list of [x1, y1, x2, y2] coordinates
- track_ids: list of tracking IDs (or -1 if no tracking)
- class_ids: list of class indices
- confidences: list of detection confidences
- names: list of class names
- masks: list of segmentation masks (numpy arrays)
"""
# Run inference with Detic
outputs = self.predictor(image)
instances = outputs["instances"].to("cpu")

# Extract bounding boxes, classes, and scores
# Extract bounding boxes, classes, scores, and masks
if len(instances) == 0:
return [], [], [], [], []
return [], [], [], [], [], []

boxes = instances.pred_boxes.tensor.numpy()
class_ids = instances.pred_classes.numpy()
scores = instances.scores.numpy()
masks = instances.pred_masks.numpy()

# Convert boxes to [x1, y1, x2, y2] format
bboxes = []
Expand All @@ -331,32 +337,43 @@ def process_image(self, image):

# Apply tracking
detections = []
filtered_masks = []
for i, bbox in enumerate(bboxes):
if scores[i] >= self.threshold:
# Format for tracker: [x1, y1, x2, y2, score, class_id]
detections.append(bbox + [scores[i], class_ids[i]])
filtered_masks.append(masks[i])

if not detections:
return [], [], [], [], []
return [], [], [], [], [], []

# Update tracker with detections
track_results = self.tracker.update(detections)
# Update tracker with detections and correctly aligned masks
track_results = self.tracker.update(detections, filtered_masks)

# Process tracking results
track_ids = []
tracked_bboxes = []
tracked_class_ids = []
tracked_scores = []
tracked_names = []
tracked_masks = []

for track_id, bbox, score, class_id in track_results:
for track_id, bbox, score, class_id, mask in track_results:
track_ids.append(int(track_id))
tracked_bboxes.append(bbox.tolist() if isinstance(bbox, np.ndarray) else bbox)
tracked_class_ids.append(int(class_id))
tracked_scores.append(score)
tracked_names.append(self.class_names[int(class_id)])

return tracked_bboxes, track_ids, tracked_class_ids, tracked_scores, tracked_names
tracked_masks.append(mask)

return (
tracked_bboxes,
track_ids,
tracked_class_ids,
tracked_scores,
tracked_names,
tracked_masks,
)

def visualize_results(self, image, bboxes, track_ids, class_ids, confidences, names):
"""
Expand Down
127 changes: 87 additions & 40 deletions dimos/perception/object_detection_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
)
from dimos.types.vector import Vector
from typing import Optional, Union
from dimos.types.manipulation import ObjectData

from dimos.utils.logging_config import setup_logger

# Initialize logger for the ObjectDetectionStream
logger = setup_logger("dimos.perception.object_detection_stream")


class ObjectDetectionStream:
Expand All @@ -22,6 +28,7 @@ class ObjectDetectionStream:
2. Estimates depth using Metric3D
3. Calculates 3D position and dimensions using camera intrinsics
4. Transforms coordinates to map frame
5. Draws bounding boxes and segmentation masks on the frame

Provides a stream of structured object data with position and rotation information.
"""
Expand All @@ -36,6 +43,7 @@ def __init__(
transform_to_map=None, # Optional function to transform coordinates to map frame
detector: Optional[Union[Detic2DDetector, Yolo2DDetector]] = None,
video_stream: Observable = None,
disable_depth: bool = False, # Flag to disable monocular Metric3D depth estimation
):
"""
Initialize the ObjectDetectionStream.
Expand All @@ -53,22 +61,29 @@ def __init__(
self.min_confidence = min_confidence
self.class_filter = class_filter
self.transform_to_map = transform_to_map
self.disable_depth = disable_depth
# Initialize object detector
self.detector = detector or Detic2DDetector(vocabulary=None, threshold=min_confidence)
# Set up camera intrinsics
self.camera_intrinsics = camera_intrinsics

# Initialize depth estimation model
self.depth_model = Metric3D(gt_depth_scale)
self.depth_model = None
if not disable_depth:
self.depth_model = Metric3D(gt_depth_scale)

# Set up camera intrinsics
self.camera_intrinsics = camera_intrinsics
if camera_intrinsics is not None:
self.depth_model.update_intrinsic(camera_intrinsics)
if camera_intrinsics is not None:
self.depth_model.update_intrinsic(camera_intrinsics)

# Create 3x3 camera matrix for calculations
fx, fy, cx, cy = camera_intrinsics
self.camera_matrix = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float32)
# Create 3x3 camera matrix for calculations
fx, fy, cx, cy = camera_intrinsics
self.camera_matrix = np.array(
[[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float32
)
else:
raise ValueError("camera_intrinsics must be provided")
else:
raise ValueError("camera_intrinsics must be provided")
logger.info("Depth estimation disabled")

# If video_stream is provided, create and store the stream immediately
self.stream = None
Expand All @@ -89,7 +104,9 @@ def create_stream(self, video_stream: Observable) -> Observable:

def process_frame(frame):
# Detect objects
bboxes, track_ids, class_ids, confidences, names = self.detector.process_image(frame)
bboxes, track_ids, class_ids, confidences, names, masks = self.detector.process_image(
frame
)

# Create visualization
viz_frame = frame.copy()
Expand All @@ -107,36 +124,46 @@ def process_frame(frame):
if self.class_filter and class_name not in self.class_filter:
continue

# Get depth for this object
depth = calculate_depth_from_bbox(self.depth_model, frame, bbox)
if depth is None:
# Skip objects with invalid depth
continue

# Calculate object position and rotation
position, rotation = calculate_position_rotation_from_bbox(
bbox, depth, self.camera_intrinsics
)

# Get object dimensions
width, height = calculate_object_size_from_bbox(bbox, depth, self.camera_intrinsics)

# Transform to map frame if a transform function is provided
try:
if self.transform_to_map:
position = Vector([position["x"], position["y"], position["z"]])
rotation = Vector([rotation["roll"], rotation["pitch"], rotation["yaw"]])
position, rotation = self.transform_to_map(
position, rotation, source_frame="base_link"
)
position = dict(x=position.x, y=position.y, z=position.z)
rotation = dict(roll=rotation.x, pitch=rotation.y, yaw=rotation.z)
except Exception as e:
print(f"Error transforming to map frame: {e}")
position, rotation = position, rotation

# Create object data dictionary
object_data = {
if not self.disable_depth:
# Get depth for this object
depth = calculate_depth_from_bbox(self.depth_model, frame, bbox)
if depth is None:
# Skip objects with invalid depth
continue
# Calculate object position and rotation
position, rotation = calculate_position_rotation_from_bbox(
bbox, depth, self.camera_intrinsics
)
# Get object dimensions
width, height = calculate_object_size_from_bbox(
bbox, depth, self.camera_intrinsics
)

# Transform to map frame if a transform function is provided
try:
if self.transform_to_map:
position = Vector([position["x"], position["y"], position["z"]])
rotation = Vector(
[rotation["roll"], rotation["pitch"], rotation["yaw"]]
)
position, rotation = self.transform_to_map(
position, rotation, source_frame="base_link"
)
position = dict(x=position.x, y=position.y, z=position.z)
rotation = dict(roll=rotation.x, pitch=rotation.y, yaw=rotation.z)
except Exception as e:
logger.error(f"Error transforming to map frame: {e}")
position, rotation = position, rotation

else:
depth = -1
position = Vector(0, 0, 0)
rotation = Vector(0, 0, 0)
width = -1
height = -1

# Create a properly typed ObjectData instance
object_data: ObjectData = {
"object_id": track_ids[i] if i < len(track_ids) else -1,
"bbox": bbox,
"depth": depth,
Expand All @@ -146,13 +173,33 @@ def process_frame(frame):
"position": position,
"rotation": rotation,
"size": {"width": width, "height": height},
"segmentation_mask": masks[i],
}

objects.append(object_data)

# Add visualization
x1, y1, x2, y2 = map(int, bbox)
color = (0, 255, 0) # Green for detected objects
mask_color = (0, 200, 200) # Yellow-green for masks

# Draw segmentation mask if available
if object_data["segmentation_mask"] is not None:
# Create a colored mask overlay
mask = object_data["segmentation_mask"].astype(np.uint8)
colored_mask = np.zeros_like(viz_frame)
colored_mask[mask > 0] = mask_color

# Apply the mask with transparency
alpha = 0.5 # transparency factor
mask_area = mask > 0
viz_frame[mask_area] = cv2.addWeighted(
viz_frame[mask_area], 1 - alpha, colored_mask[mask_area], alpha, 0
)

# Draw mask contour
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(viz_frame, contours, -1, mask_color, 2)

# Draw bounding box
cv2.rectangle(viz_frame, (x1, y1), (x2, y2), color, 2)
Expand Down
66 changes: 66 additions & 0 deletions dimos/skills/manipulation/force_constraint_skill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, List, Tuple
from pydantic import Field

from dimos.skills.skills import AbstractRobotSkill
from dimos.types.manipulation_constraint import ForceConstraint, Vector
from dimos.utils.logging_config import setup_logger

# Initialize logger
logger = setup_logger("dimos.skills.force_constraint_skill")


class ForceConstraintSkill(AbstractRobotSkill):
"""
Skill for generating force constraints for robot manipulation.
"""

# Constraint parameters
max_force: float = Field(0.0, description="Maximum force in newtons to apply")
min_force: float = Field(0.0, description="Minimum force in newtons to apply")

# Force direction as (x,y) tuple
force_direction: Optional[Tuple[float, float]] = Field(
None, description="Force direction vector (x,y)"
)

# Description
description: str = Field("", description="Description of the force constraint")

def __call__(self) -> ForceConstraint:
"""
Generate a force constraint based on the parameters.

Returns:
ForceConstraint: The generated constraint
"""
# Create force direction vector if provided (convert 2D point to 3D vector with z=0)
force_direction_vector = None
if self.force_direction:
force_direction_vector = Vector(self.force_direction[0], self.force_direction[1], 0.0)

# Create and return the constraint
constraint = ForceConstraint(
max_force=self.max_force,
min_force=self.min_force,
force_direction=force_direction_vector,
description=self.description,
)

# Log the constraint creation
logger.info(f"Generated force constraint: {self.description}")

return constraint
Loading