Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added dimos/manipulation/__init__.py
Empty file.
404 changes: 404 additions & 0 deletions dimos/manipulation/manipulation_history.py

Large diffs are not rendered by default.

292 changes: 292 additions & 0 deletions dimos/manipulation/manipulation_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
# Copyright 2025 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
ManipulationInterface provides a unified interface for accessing manipulation history.

This module defines the ManipulationInterface class, which serves as an access point
for the robot's manipulation history, agent-generated constraints, and manipulation
metadata streams.
"""

from typing import Dict, List, Optional, Any, Tuple, Union
from dataclasses import dataclass
import os
import time
from datetime import datetime
from reactivex.disposable import Disposable
from dimos.perception.object_detection_stream import ObjectDetectionStream
from dimos.types.manipulation import (
AbstractConstraint,
TranslationConstraint,
RotationConstraint,
ForceConstraint,
ManipulationTaskConstraint,
ManipulationTask,
ManipulationMetadata,
ObjectData,
)
from dimos.manipulation.manipulation_history import (
ManipulationHistory,
ManipulationHistoryEntry,
)
from dimos.utils.logging_config import setup_logger

logger = setup_logger("dimos.robot.manipulation_interface")


class ManipulationInterface:
"""
Interface for accessing and managing robot manipulation data.

This class provides a unified interface for managing manipulation tasks and constraints.
It maintains a list of constraints generated by the Agent and provides methods to
add and manage manipulation tasks.
"""

def __init__(
self,
output_dir: str,
new_memory: bool = False,
perception_stream: ObjectDetectionStream = None,
):
"""
Initialize a new ManipulationInterface instance.

Args:
output_dir: Directory for storing manipulation data
new_memory: If True, creates a new manipulation history from scratch
perception_stream: ObjectDetectionStream instance for real-time object data
"""
self.output_dir = output_dir

# Create manipulation history directory
manipulation_dir = os.path.join(output_dir, "manipulation_history")
os.makedirs(manipulation_dir, exist_ok=True)

# Initialize manipulation history
self.manipulation_history: ManipulationHistory = ManipulationHistory(
output_dir=manipulation_dir, new_memory=new_memory
)

# List of constraints generated by the Agent via constraint generation skills
self.agent_constraints: List[AbstractConstraint] = []

# Initialize object detection stream and related properties
self.perception_stream = perception_stream
self.latest_objects: List[ObjectData] = []
self.stream_subscription: Optional[Disposable] = None

# Set up subscription to perception stream if available
self._setup_perception_subscription()

logger.info("ManipulationInterface initialized")

def add_constraint(self, constraint: AbstractConstraint) -> None:
"""
Add a constraint generated by the Agent via a constraint generation skill.

Args:
constraint: The constraint to add to agent_constraints
"""
self.agent_constraints.append(constraint)
logger.info(f"Added agent constraint: {constraint}")

def get_constraints(self) -> List[AbstractConstraint]:
"""
Get all constraints generated by the Agent via constraint generation skills.

Returns:
List of all constraints created by the Agent
"""
return self.agent_constraints

def get_constraint(self, constraint_id: str) -> Optional[AbstractConstraint]:
"""
Get a specific constraint by its ID.

Args:
constraint_id: ID of the constraint to retrieve

Returns:
The matching constraint or None if not found
"""
# Find constraint with matching ID
for constraint in self.agent_constraints:
if constraint.id == constraint_id:
return constraint

logger.warning(f"Constraint with ID {constraint_id} not found")
return None

def add_manipulation_task(
self, task: ManipulationTask, manipulation_response: Optional[str] = None
) -> None:
"""
Add a manipulation task to ManipulationHistory.

Args:
task: The ManipulationTask to add
manipulation_response: Optional response from the motion planner/executor

"""
# Add task to history
self.manipulation_history.add_entry(
task=task, result=None, notes=None, manipulation_response=manipulation_response
)

def get_manipulation_task(self, task_id: str) -> Optional[ManipulationTask]:
"""
Get a manipulation task by its ID.

Args:
task_id: ID of the task to retrieve

Returns:
The task object or None if not found
"""
return self.history.get_manipulation_task(task_id)

def get_all_manipulation_tasks(self) -> List[ManipulationTask]:
"""
Get all manipulation tasks.

Returns:
List of all manipulation tasks
"""
return self.history.get_all_manipulation_tasks()

def update_task_status(
self, task_id: str, status: str, result: Optional[Dict[str, Any]] = None
) -> Optional[ManipulationTask]:
"""
Update the status and result of a manipulation task.

Args:
task_id: ID of the task to update
status: New status for the task (e.g., 'completed', 'failed')
result: Optional dictionary with result data

Returns:
The updated task or None if task not found
"""
return self.history.update_task_status(task_id, status, result)

# === Perception stream methods ===

def _setup_perception_subscription(self):
"""
Set up subscription to perception stream if available.
"""
if self.perception_stream:
# Subscribe to the stream and update latest_objects
self.stream_subscription = self.perception_stream.get_stream().subscribe(
on_next=self._update_latest_objects,
on_error=lambda e: logger.error(f"Error in perception stream: {e}"),
)
logger.info("Subscribed to perception stream")

def _update_latest_objects(self, data):
"""
Update the latest detected objects.

Args:
data: Data from the object detection stream
"""
if "objects" in data:
self.latest_objects = data["objects"]

def get_latest_objects(self) -> List[ObjectData]:
"""
Get the latest detected objects from the stream.

Returns:
List of the most recently detected objects
"""
return self.latest_objects

def get_object_by_id(self, object_id: int) -> Optional[ObjectData]:
"""
Get a specific object by its tracking ID.

Args:
object_id: Tracking ID of the object

Returns:
The object data or None if not found
"""
for obj in self.latest_objects:
if obj["object_id"] == object_id:
return obj
return None

def get_objects_by_label(self, label: str) -> List[ObjectData]:
"""
Get all objects with a specific label.

Args:
label: Class label to filter objects by

Returns:
List of objects matching the label
"""
return [obj for obj in self.latest_objects if obj["label"] == label]

def set_perception_stream(self, perception_stream):
"""
Set or update the perception stream.

Args:
perception_stream: The PerceptionStream instance
"""
# Clean up existing subscription if any
self.cleanup_perception_subscription()

# Set new stream and subscribe
self.perception_stream = perception_stream
self._setup_perception_subscription()

def cleanup_perception_subscription(self):
"""
Clean up the stream subscription.
"""
if self.stream_subscription:
self.stream_subscription.dispose()
self.stream_subscription = None

# === Utility methods ===

def clear_history(self) -> None:
"""
Clear all manipulation history data and agent constraints.
"""
self.manipulation_history.clear()
self.agent_constraints.clear()
logger.info("Cleared manipulation history and agent constraints")

def __str__(self) -> str:
"""
String representation of the manipulation interface.

Returns:
String representation with key stats
"""
has_stream = self.perception_stream is not None
return f"ManipulationInterface(history={self.manipulation_history}, agent_constraints={len(self.agent_constraints)}, perception_stream={has_stream}, detected_objects={len(self.latest_objects)})"

def __del__(self):
"""
Clean up resources on deletion.
"""
self.cleanup_perception_subscription()
Loading
Loading