diff --git a/data/.lfs/xarm_description.tar.gz b/data/.lfs/xarm_description.tar.gz index ed2ddb780f..e7c473e44f 100644 --- a/data/.lfs/xarm_description.tar.gz +++ b/data/.lfs/xarm_description.tar.gz @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:82634d767b1f96474b9273500d3f8123f0df9dd1f7e1d84f369e65b3d257f00f -size 12703173 +oid sha256:0b247baea9b52f583824b0bc90e1e38640e6090aca7f4ae1a1a72a24e5a66fee +size 12706830 diff --git a/dimos/control/blueprints.py b/dimos/control/blueprints.py index 323c850046..ceb3a74fbe 100644 --- a/dimos/control/blueprints.py +++ b/dimos/control/blueprints.py @@ -35,6 +35,11 @@ from dimos.core.transport import LCMTransport from dimos.msgs.geometry_msgs import PoseStamped from dimos.msgs.sensor_msgs import JointState +from dimos.teleop.quest.quest_types import QuestButtons +from dimos.utils.data import LfsPath + +_PIPER_MODEL_PATH = LfsPath("piper_description/mujoco_model/piper_no_gripper_description.xml") +_XARM6_MODEL_PATH = LfsPath("xarm_description/urdf/xarm6/xarm6.urdf") # ============================================================================= # Single Arm Blueprints @@ -391,14 +396,6 @@ # ============================================================================= -def _get_piper_model_path() -> str: - """Get path to Piper MJCF model for IK solver.""" - from dimos.utils.data import get_data - - piper_path = get_data("piper_description") - return str(piper_path / "mujoco_model" / "piper_no_gripper_description.xml") - - # Mock 6-DOF arm with CartesianIK coordinator_cartesian_ik_mock = control_coordinator( tick_rate=100.0, @@ -418,7 +415,7 @@ def _get_piper_model_path() -> str: type="cartesian_ik", joint_names=[f"arm_joint{i + 1}" for i in range(6)], priority=10, - model_path=_get_piper_model_path(), + model_path=_PIPER_MODEL_PATH, ee_joint_id=6, ), ], @@ -452,8 +449,137 @@ def _get_piper_model_path() -> str: type="cartesian_ik", joint_names=[f"arm_joint{i + 1}" for i in range(6)], priority=10, - model_path=_get_piper_model_path(), + model_path=_PIPER_MODEL_PATH, + ee_joint_id=6, + ), + ], +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + ("cartesian_command", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + } +) + + +# ============================================================================= +# Teleop IK Blueprints (VR teleoperation with internal Pinocchio IK) +# ============================================================================= + +# Single XArm6 with TeleopIK +coordinator_teleop_xarm6 = control_coordinator( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[ + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + adapter_type="xarm", + address="192.168.1.210", + auto_enable=True, + ), + ], + tasks=[ + TaskConfig( + name="teleop_xarm", + type="teleop_ik", + joint_names=[f"arm_joint{i + 1}" for i in range(6)], + priority=10, + model_path=_XARM6_MODEL_PATH, + ee_joint_id=6, + hand="right", + ), + ], +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + ("cartesian_command", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("buttons", QuestButtons): LCMTransport("/teleop/buttons", QuestButtons), + } +) + +# Single Piper with TeleopIK +coordinator_teleop_piper = control_coordinator( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[ + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + adapter_type="piper", + address="can0", + auto_enable=True, + ), + ], + tasks=[ + TaskConfig( + name="teleop_piper", + type="teleop_ik", + joint_names=[f"arm_joint{i + 1}" for i in range(6)], + priority=10, + model_path=_PIPER_MODEL_PATH, + ee_joint_id=6, + hand="left", + ), + ], +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + ("cartesian_command", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("buttons", QuestButtons): LCMTransport("/teleop/buttons", QuestButtons), + } +) + +# Dual arm teleop: XArm6 + Piper with TeleopIK +coordinator_teleop_dual = control_coordinator( + tick_rate=100.0, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[ + HardwareComponent( + hardware_id="xarm_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("xarm_arm", 6), + adapter_type="xarm", + address="192.168.1.210", + auto_enable=True, + ), + HardwareComponent( + hardware_id="piper_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("piper_arm", 6), + adapter_type="piper", + address="can0", + auto_enable=True, + ), + ], + tasks=[ + TaskConfig( + name="teleop_xarm", + type="teleop_ik", + joint_names=[f"xarm_arm_joint{i + 1}" for i in range(6)], + priority=10, + model_path=_XARM6_MODEL_PATH, + ee_joint_id=6, + hand="left", + ), + TaskConfig( + name="teleop_piper", + type="teleop_ik", + joint_names=[f"piper_arm_joint{i + 1}" for i in range(6)], + priority=10, + model_path=_PIPER_MODEL_PATH, ee_joint_id=6, + hand="right", ), ], ).transports( @@ -462,6 +588,7 @@ def _get_piper_model_path() -> str: ("cartesian_command", PoseStamped): LCMTransport( "/coordinator/cartesian_command", PoseStamped ), + ("buttons", QuestButtons): LCMTransport("/teleop/buttons", QuestButtons), } ) @@ -500,6 +627,9 @@ def _get_piper_model_path() -> str: "coordinator_mock", "coordinator_piper", "coordinator_piper_xarm", + # Teleop IK + "coordinator_teleop_dual", + "coordinator_teleop_piper", "coordinator_teleop_xarm6", "coordinator_velocity_xarm6", "coordinator_xarm6", diff --git a/dimos/control/coordinator.py b/dimos/control/coordinator.py index 026a8c87e1..f0db048db6 100644 --- a/dimos/control/coordinator.py +++ b/dimos/control/coordinator.py @@ -44,10 +44,12 @@ from dimos.msgs.sensor_msgs import ( JointState, # noqa: TC001 - needed at runtime for Out[JointState] ) +from dimos.teleop.quest.quest_types import QuestButtons # noqa: TC001 - needed for teleop buttons from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: from collections.abc import Callable + from pathlib import Path from dimos.hardware.manipulators.spec import ManipulatorAdapter @@ -65,20 +67,21 @@ class TaskConfig: Attributes: name: Task name (e.g., "traj_arm") - type: Task type ("trajectory", "servo", "velocity", "cartesian_ik") + type: Task type ("trajectory", "servo", "velocity", "cartesian_ik", "teleop_ik") joint_names: List of joint names this task controls priority: Task priority (higher wins arbitration) - model_path: Path to URDF/MJCF for IK solver (cartesian_ik only) - ee_joint_id: End-effector joint ID in model (cartesian_ik only) + model_path: Path to URDF/MJCF for IK solver (cartesian_ik/teleop_ik only) + ee_joint_id: End-effector joint ID in model (cartesian_ik/teleop_ik only) """ name: str type: str = "trajectory" joint_names: list[str] = field(default_factory=lambda: []) priority: int = 10 - # Cartesian IK specific - model_path: str | None = None + # Cartesian IK / Teleop IK specific + model_path: str | Path | None = None ee_joint_id: int = 6 + hand: str = "" # teleop_ik only: "left" or "right" controller @dataclass @@ -145,6 +148,9 @@ class ControlCoordinator(Module[ControlCoordinatorConfig]): # Uses frame_id as task name for routing cartesian_command: In[PoseStamped] + # Input: Teleop buttons for engage/disengage signaling + buttons: In[QuestButtons] + config: ControlCoordinatorConfig default_config = ControlCoordinatorConfig @@ -168,6 +174,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # Subscription handles for streaming commands self._joint_command_unsub: Callable[[], None] | None = None self._cartesian_command_unsub: Callable[[], None] | None = None + self._buttons_unsub: Callable[[], None] | None = None logger.info(f"ControlCoordinator initialized at {self.config.tick_rate}Hz") @@ -276,6 +283,23 @@ def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask: ), ) + elif task_type == "teleop_ik": + from dimos.control.tasks.teleop_task import TeleopIKTask, TeleopIKTaskConfig + + if cfg.model_path is None: + raise ValueError(f"TeleopIKTask '{cfg.name}' requires model_path in TaskConfig") + + return TeleopIKTask( + cfg.name, + TeleopIKTaskConfig( + joint_names=cfg.joint_names, + model_path=cfg.model_path, + ee_joint_id=cfg.ee_joint_id, + priority=cfg.priority, + hand=cfg.hand, + ), + ) + else: raise ValueError(f"Unknown task type: {task_type}") @@ -437,14 +461,14 @@ def _on_joint_command(self, msg: JointState) -> None: continue # Route to servo tasks (position control) - if hasattr(task, "set_target_by_name") and msg.position: + if msg.position: positions_by_name = dict(zip(msg.name, msg.position, strict=False)) - task.set_target_by_name(positions_by_name, t_now) # type: ignore[attr-defined] + task.set_target_by_name(positions_by_name, t_now) # Route to velocity tasks (velocity control) - elif hasattr(task, "set_velocities_by_name") and msg.velocity: + elif msg.velocity: velocities_by_name = dict(zip(msg.name, msg.velocity, strict=False)) - task.set_velocities_by_name(velocities_by_name, t_now) # type: ignore[attr-defined] + task.set_velocities_by_name(velocities_by_name, t_now) def _on_cartesian_command(self, msg: PoseStamped) -> None: """Route incoming PoseStamped to CartesianIKTask by task name. @@ -464,11 +488,13 @@ def _on_cartesian_command(self, msg: PoseStamped) -> None: logger.warning(f"Cartesian command for unknown task: {task_name}") return - # Route to CartesianIKTask - if hasattr(task, "set_target_pose"): - task.set_target_pose(msg, t_now) # type: ignore[attr-defined] - else: - logger.warning(f"Task {task_name} does not support set_target_pose") + task.on_cartesian_command(msg, t_now) + + def _on_buttons(self, msg: QuestButtons) -> None: + """Forward button state to all tasks.""" + with self._task_lock: + for task in self._tasks.values(): + task.on_buttons(msg) @rpc def task_invoke( @@ -561,35 +587,34 @@ def start(self) -> None: streaming_types = ("servo", "velocity") has_streaming = any(t.type in streaming_types for t in self.config.tasks) if has_streaming: - # Only subscribe if transport is configured try: - if self.joint_command.transport: - self._joint_command_unsub = self.joint_command.subscribe(self._on_joint_command) - logger.info("Subscribed to joint_command for streaming tasks") - else: - logger.warning( - "Streaming tasks configured but no transport set for joint_command. " - "Use task_invoke RPC or set transport via blueprint." - ) - except Exception as e: - logger.warning(f"Could not subscribe to joint_command: {e}") + self._joint_command_unsub = self.joint_command.subscribe(self._on_joint_command) + logger.info("Subscribed to joint_command for streaming tasks") + except Exception: + logger.warning( + "Streaming tasks configured but could not subscribe to joint_command. " + "Use task_invoke RPC or set transport via blueprint." + ) # Subscribe to cartesian commands if any cartesian_ik tasks configured - has_cartesian_ik = any(t.type == "cartesian_ik" for t in self.config.tasks) + has_cartesian_ik = any(t.type in ("cartesian_ik", "teleop_ik") for t in self.config.tasks) if has_cartesian_ik: try: - if self.cartesian_command.transport: - self._cartesian_command_unsub = self.cartesian_command.subscribe( - self._on_cartesian_command - ) - logger.info("Subscribed to cartesian_command for CartesianIK tasks") - else: - logger.warning( - "CartesianIK tasks configured but no transport set for cartesian_command. " - "Use task_invoke RPC or set transport via blueprint." - ) - except Exception as e: - logger.warning(f"Could not subscribe to cartesian_command: {e}") + self._cartesian_command_unsub = self.cartesian_command.subscribe( + self._on_cartesian_command + ) + logger.info("Subscribed to cartesian_command for CartesianIK/TeleopIK tasks") + except Exception: + logger.warning( + "CartesianIK/TeleopIK tasks configured but could not subscribe to cartesian_command. " + "Use task_invoke RPC or set transport via blueprint." + ) + + # Subscribe to buttons if any teleop_ik tasks configured (engage/disengage) + has_teleop_ik = any(t.type == "teleop_ik" for t in self.config.tasks) + if has_teleop_ik: + self._buttons_unsub = self.buttons.subscribe(self._on_buttons) + logger.info("Subscribed to buttons for engage/disengage") logger.info(f"ControlCoordinator started at {self.config.tick_rate}Hz") @@ -605,6 +630,9 @@ def stop(self) -> None: if self._cartesian_command_unsub: self._cartesian_command_unsub() self._cartesian_command_unsub = None + if self._buttons_unsub: + self._buttons_unsub() + self._buttons_unsub = None if self._tick_loop: self._tick_loop.stop() diff --git a/dimos/control/task.py b/dimos/control/task.py index e9d9c4a584..5782393880 100644 --- a/dimos/control/task.py +++ b/dimos/control/task.py @@ -28,11 +28,15 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Protocol, runtime_checkable +from typing import TYPE_CHECKING, Protocol, runtime_checkable from dimos.control.components import JointName from dimos.hardware.manipulators.spec import ControlMode +if TYPE_CHECKING: + from dimos.msgs.geometry_msgs import Pose, PoseStamped + from dimos.teleop.quest.quest_types import QuestButtons + # ============================================================================= # Data Types # ============================================================================= @@ -287,11 +291,52 @@ def on_preempted(self, by_task: str, joints: frozenset[JointName]) -> None: """ ... + def on_buttons(self, msg: QuestButtons) -> bool: + """Handle button state from teleop controllers.""" + ... + + def on_cartesian_command(self, pose: Pose | PoseStamped, t_now: float) -> bool: + """Handle incoming cartesian command (target or delta pose).""" + ... + + def set_target_by_name(self, positions: dict[str, float], t_now: float) -> bool: + """Handle servo position commands by joint name.""" + ... + + def set_velocities_by_name(self, velocities: dict[str, float], t_now: float) -> bool: + """Handle velocity commands by joint name.""" + ... + + +class BaseControlTask(ControlTask): + """Base class with no-op defaults for optional listener methods. + + Inherit from this to avoid implementing empty methods + in tasks that don't need them. Only override what your task uses. + """ + + def on_buttons(self, msg: QuestButtons) -> bool: + """No-op default.""" + return False + + def on_cartesian_command(self, pose: Pose | PoseStamped, t_now: float) -> bool: + """No-op default.""" + return False + + def set_target_by_name(self, positions: dict[str, float], t_now: float) -> bool: + """No-op default.""" + return False + + def set_velocities_by_name(self, velocities: dict[str, float], t_now: float) -> bool: + """No-op default.""" + return False + __all__ = [ + # Protocol + Base + "BaseControlTask", # Types "ControlMode", - # Protocol "ControlTask", "CoordinatorState", "JointCommandOutput", diff --git a/dimos/control/tasks/__init__.py b/dimos/control/tasks/__init__.py index bbb77dc129..5b869b01f9 100644 --- a/dimos/control/tasks/__init__.py +++ b/dimos/control/tasks/__init__.py @@ -22,6 +22,10 @@ JointServoTask, JointServoTaskConfig, ) +from dimos.control.tasks.teleop_task import ( + TeleopIKTask, + TeleopIKTaskConfig, +) from dimos.control.tasks.trajectory_task import ( JointTrajectoryTask, JointTrajectoryTaskConfig, @@ -40,4 +44,6 @@ "JointTrajectoryTaskConfig", "JointVelocityTask", "JointVelocityTaskConfig", + "TeleopIKTask", + "TeleopIKTaskConfig", ] diff --git a/dimos/control/tasks/cartesian_ik_task.py b/dimos/control/tasks/cartesian_ik_task.py index 7ff5b21e52..6ea5ddc55b 100644 --- a/dimos/control/tasks/cartesian_ik_task.py +++ b/dimos/control/tasks/cartesian_ik_task.py @@ -24,25 +24,31 @@ from __future__ import annotations from dataclasses import dataclass -from pathlib import Path import threading from typing import TYPE_CHECKING, Any import numpy as np -from numpy.linalg import norm, solve -import pinocchio # type: ignore[import-untyped] from dimos.control.task import ( + BaseControlTask, ControlMode, - ControlTask, CoordinatorState, JointCommandOutput, ResourceClaim, ) +from dimos.manipulation.planning.kinematics.pinocchio_ik import ( + PinocchioIK, + check_joint_delta, + get_worst_joint_delta, + pose_to_se3, +) from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: + from pathlib import Path + from numpy.typing import NDArray + import pinocchio # type: ignore[import-untyped] from dimos.msgs.geometry_msgs import Pose, PoseStamped @@ -59,12 +65,7 @@ class CartesianIKTaskConfig: ee_joint_id: End-effector joint ID in the kinematic chain priority: Priority for arbitration (higher wins) timeout: If no command received for this many seconds, go inactive (0 = never) - ik_max_iter: Maximum IK solver iterations - ik_eps: IK convergence threshold (error norm in meters) - ik_damp: IK damping factor for singularity handling (higher = more stable) - ik_dt: IK integration step size max_joint_delta_deg: Maximum allowed joint change per tick (safety limit) - max_velocity: Max joint velocity per IK iteration (rad/s) """ joint_names: list[str] @@ -72,18 +73,13 @@ class CartesianIKTaskConfig: ee_joint_id: int priority: int = 10 timeout: float = 0.5 - ik_max_iter: int = 100 - ik_eps: float = 1e-4 - ik_damp: float = 1e-2 - ik_dt: float = 1.0 max_joint_delta_deg: float = 15.0 # ~1500°/s at 100Hz - max_velocity: float = 2.0 -class CartesianIKTask(ControlTask): +class CartesianIKTask(BaseControlTask): """Cartesian control task with internal Pinocchio IK solver. - Accepts streaming cartesian poses via set_target_pose() and computes IK + Accepts streaming cartesian poses via on_cartesian_command() and computes IK internally to output joint commands. Uses current joint state from CoordinatorState as IK warm-start for fast convergence. @@ -107,7 +103,7 @@ class CartesianIKTask(ControlTask): >>> task.start() >>> >>> # From teleop callback or other source: - >>> task.set_target_pose(pose_stamped, t_now=time.perf_counter()) + >>> task.on_cartesian_command(pose_stamped, t_now=time.perf_counter()) """ def __init__(self, name: str, config: CartesianIKTaskConfig) -> None: @@ -128,29 +124,19 @@ def __init__(self, name: str, config: CartesianIKTaskConfig) -> None: self._joint_names_list = list(config.joint_names) self._num_joints = len(config.joint_names) - # Load Pinocchio model - model_path = Path(config.model_path) - if not model_path.exists(): - raise FileNotFoundError(f"Model file not found: {model_path}") - - if model_path.suffix == ".xml": - self._model = pinocchio.buildModelFromMJCF(str(model_path)) - else: - self._model = pinocchio.buildModelFromUrdf(str(model_path)) - - self._data = self._model.createData() - self._ee_joint_id = config.ee_joint_id + # Create IK solver from model + self._ik = PinocchioIK.from_model_path(config.model_path, config.ee_joint_id) # Validate DOF matches joint names - if self._model.nq != self._num_joints: + if self._ik.nq != self._num_joints: logger.warning( - f"CartesianIKTask {name}: model DOF ({self._model.nq}) != " + f"CartesianIKTask {name}: model DOF ({self._ik.nq}) != " f"joint_names count ({self._num_joints})" ) # Thread-safe target state self._lock = threading.Lock() - self._target_pose: pinocchio.SE3 | None = None + self._target_pose: Pose | PoseStamped | None = None self._last_update_time: float = 0.0 self._active = False @@ -158,7 +144,7 @@ def __init__(self, name: str, config: CartesianIKTaskConfig) -> None: self._last_q_solution: NDArray[np.floating[Any]] | None = None logger.info( - f"CartesianIKTask {name} initialized with model: {model_path}, " + f"CartesianIKTask {name} initialized with model: {config.model_path}, " f"ee_joint_id={config.ee_joint_id}, joints={config.joint_names}" ) @@ -192,7 +178,6 @@ def compute(self, state: CoordinatorState) -> JointCommandOutput | None: with self._lock: if not self._active or self._target_pose is None: return None - # Check timeout if self._config.timeout > 0: time_since_update = state.t_now - self._last_update_time @@ -203,9 +188,10 @@ def compute(self, state: CoordinatorState) -> JointCommandOutput | None: ) self._active = False return None + raw_pose = self._target_pose - target_pose = self._target_pose - + # Convert to SE3 right before use + target_pose = pose_to_se3(raw_pose) # Get current joint positions for IK warm-start q_current = self._get_current_joints(state) if q_current is None: @@ -213,27 +199,30 @@ def compute(self, state: CoordinatorState) -> JointCommandOutput | None: return None # Compute IK - q_solution, converged, final_error = self._solve_ik(target_pose, q_current) - - # Use the solution even if it didn't fully converge - the safety clamp - # will handle any large jumps. This prevents the arm from "sticking" - # when near singularities or workspace boundaries. + q_solution, converged, final_error = self._ik.solve(target_pose, q_current) + # Use the solution even if it didn't fully converge if not converged: logger.debug( f"CartesianIKTask {self._name}: IK did not converge " f"(error={final_error:.4f}), using partial solution" ) - # Safety check: clamp large joint jumps for smooth motion - _is_exact, q_clamped = self._safety_check(q_solution, q_current) + # Safety check: reject if any joint delta exceeds limit + if not check_joint_delta(q_solution, q_current, self._config.max_joint_delta_deg): + worst_idx, worst_deg = get_worst_joint_delta(q_solution, q_current) + logger.warning( + f"CartesianIKTask {self._name}: rejecting motion - " + f"joint {self._joint_names_list[worst_idx]} delta " + f"{worst_deg:.1f}° exceeds limit {self._config.max_joint_delta_deg}°" + ) + return None # Cache solution for next warm-start with self._lock: - self._last_q_solution = q_clamped.copy() - + self._last_q_solution = q_solution.copy() return JointCommandOutput( joint_names=self._joint_names_list, - positions=q_clamped.flatten().tolist(), + positions=q_solution.flatten().tolist(), mode=ControlMode.SERVO_POSITION, ) @@ -254,77 +243,6 @@ def _get_current_joints(self, state: CoordinatorState) -> NDArray[np.floating[An positions.append(pos) return np.array(positions, dtype=np.float64) - def _solve_ik( - self, - target_pose: pinocchio.SE3, - q_init: NDArray[np.floating[Any]], - ) -> tuple[NDArray[np.floating[Any]], bool, float]: - """Solve IK using damped least-squares (Levenberg-Marquardt). - - Args: - target_pose: Target end-effector pose as SE3 - q_init: Initial joint configuration (warm-start) - - Returns: - Tuple of (joint_angles, converged, final_error) - """ - q = q_init.copy() - final_err = float("inf") - - for _ in range(self._config.ik_max_iter): - pinocchio.forwardKinematics(self._model, self._data, q) - iMd = self._data.oMi[self._ee_joint_id].actInv(target_pose) - - err = pinocchio.log(iMd).vector - final_err = float(norm(err)) - if final_err < self._config.ik_eps: - return q, True, final_err - - J = pinocchio.computeJointJacobian(self._model, self._data, q, self._ee_joint_id) - J = -np.dot(pinocchio.Jlog6(iMd.inverse()), J) - v = -J.T.dot(solve(J.dot(J.T) + self._config.ik_damp * np.eye(6), err)) - - # Clamp velocity to prevent explosion near singularities - v_norm = norm(v) - if v_norm > self._config.max_velocity: - v = v * (self._config.max_velocity / v_norm) - - q = pinocchio.integrate(self._model, q, v * self._config.ik_dt) - - return q, False, final_err - - def _safety_check( - self, q_new: NDArray[np.floating[Any]], q_current: NDArray[np.floating[Any]] - ) -> tuple[bool, NDArray[np.floating[Any]]]: - """Check IK solution and clamp if needed. - - Args: - q_new: Proposed joint positions from IK - q_current: Current joint positions - - Returns: - Tuple of (is_safe, clamped_q) - if not safe, returns interpolated position - """ - max_delta_rad = np.radians(self._config.max_joint_delta_deg) - joint_deltas = q_new - q_current - - # Check if any joint exceeds the limit - if np.any(np.abs(joint_deltas) > max_delta_rad): - # Clamp the delta to the max allowed - clamped_delta = np.clip(joint_deltas, -max_delta_rad, max_delta_rad) - q_clamped = q_current + clamped_delta - - worst_joint = int(np.argmax(np.abs(joint_deltas))) - logger.debug( - f"CartesianIKTask {self._name}: clamping joint motion - " - f"joint {self._joint_names_list[worst_joint]} delta " - f"{np.degrees(joint_deltas[worst_joint]):.1f}° -> " - f"{np.degrees(clamped_delta[worst_joint]):.1f}°" - ) - return False, q_clamped - - return True, q_new - def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: """Handle preemption by higher-priority task. @@ -341,10 +259,8 @@ def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: # Task-specific methods # ========================================================================= - def set_target_pose(self, pose: Pose | PoseStamped, t_now: float) -> bool: - """Set target end-effector pose. - - Call this from your teleop callback, visual servoing, or other input source. + def on_cartesian_command(self, pose: Pose | PoseStamped, t_now: float) -> bool: + """Handle incoming cartesian command (target EE pose). Args: pose: Target end-effector pose (Pose or PoseStamped) @@ -353,65 +269,13 @@ def set_target_pose(self, pose: Pose | PoseStamped, t_now: float) -> bool: Returns: True if accepted """ - target_se3 = self._pose_to_se3(pose) - - with self._lock: - self._target_pose = target_se3 - self._last_update_time = t_now - self._active = True - - return True - - def set_target_pose_dict( - self, - pose: dict[str, float], - t_now: float, - ) -> bool: - """Set target from pose dict with position and RPY orientation. - - Args: - pose: {x, y, z, roll, pitch, yaw} in meters/radians - t_now: Current time - - Returns: - True if accepted, False if missing required keys - """ - required_keys = {"x", "y", "z", "roll", "pitch", "yaw"} - if not required_keys.issubset(pose.keys()): - missing = required_keys - set(pose.keys()) - logger.warning(f"CartesianIKTask {self._name}: missing pose keys {missing}") - return False - - position = np.array([pose["x"], pose["y"], pose["z"]]) - rotation = pinocchio.rpy.rpyToMatrix(pose["roll"], pose["pitch"], pose["yaw"]) - target_se3 = pinocchio.SE3(rotation, position) - with self._lock: - self._target_pose = target_se3 + self._target_pose = pose # Store raw, convert to SE3 in compute() self._last_update_time = t_now self._active = True return True - def _pose_to_se3(self, pose: Pose | PoseStamped) -> pinocchio.SE3: - """Convert a Pose message to pinocchio SE3. - - Uses quaternion directly to avoid Euler angle conversion issues. - """ - # Handle both Pose and PoseStamped - if hasattr(pose, "position"): - # Assume Pose or PoseStamped with position/orientation attributes - position = np.array([pose.x, pose.y, pose.z]) - quat = pose.orientation - rotation = pinocchio.Quaternion(quat.w, quat.x, quat.y, quat.z).toRotationMatrix() - else: - # Assume it has x, y, z directly - position = np.array([pose.x, pose.y, pose.z]) - quat = pose.orientation - rotation = pinocchio.Quaternion(quat.w, quat.x, quat.y, quat.z).toRotationMatrix() - - return pinocchio.SE3(rotation, position) - def start(self) -> None: """Activate the task (start accepting and outputting commands).""" with self._lock: @@ -451,8 +315,7 @@ def get_current_ee_pose(self, state: CoordinatorState) -> pinocchio.SE3 | None: if q_current is None: return None - pinocchio.forwardKinematics(self._model, self._data, q_current) - return self._data.oMi[self._ee_joint_id].copy() + return self._ik.forward_kinematics(q_current) def forward_kinematics(self, joint_positions: NDArray[np.floating[Any]]) -> pinocchio.SE3: """Compute end-effector pose from joint positions. @@ -463,8 +326,7 @@ def forward_kinematics(self, joint_positions: NDArray[np.floating[Any]]) -> pino Returns: End-effector pose as SE3 """ - pinocchio.forwardKinematics(self._model, self._data, joint_positions) - return self._data.oMi[self._ee_joint_id].copy() + return self._ik.forward_kinematics(joint_positions) __all__ = [ diff --git a/dimos/control/tasks/servo_task.py b/dimos/control/tasks/servo_task.py index 9c5eb1f446..b69b4dd099 100644 --- a/dimos/control/tasks/servo_task.py +++ b/dimos/control/tasks/servo_task.py @@ -27,8 +27,8 @@ import threading from dimos.control.task import ( + BaseControlTask, ControlMode, - ControlTask, CoordinatorState, JointCommandOutput, ResourceClaim, @@ -53,7 +53,7 @@ class JointServoTaskConfig: timeout: float = 0.5 # 500ms default timeout -class JointServoTask(ControlTask): +class JointServoTask(BaseControlTask): """Streaming joint position control for teleoperation/visual servoing. Accepts target positions via set_target() or set_target_by_name() and diff --git a/dimos/control/tasks/teleop_task.py b/dimos/control/tasks/teleop_task.py new file mode 100644 index 0000000000..0399f52c23 --- /dev/null +++ b/dimos/control/tasks/teleop_task.py @@ -0,0 +1,332 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Teleop cartesian control task with internal Pinocchio IK solver. + +Accepts streaming cartesian delta poses from teleoperation and computes +inverse kinematics internally to output joint commands. Deltas are applied +relative to the EE pose captured at engage time. + +Participates in joint-level arbitration. + +CRITICAL: Uses t_now from CoordinatorState, never calls time.time() +""" + +from __future__ import annotations + +from dataclasses import dataclass +import threading +from typing import TYPE_CHECKING, Any + +import numpy as np +import pinocchio # type: ignore[import-untyped] + +from dimos.control.task import ( + BaseControlTask, + ControlMode, + CoordinatorState, + JointCommandOutput, + ResourceClaim, +) +from dimos.manipulation.planning.kinematics.pinocchio_ik import ( + PinocchioIK, + check_joint_delta, + pose_to_se3, +) +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from pathlib import Path + + from numpy.typing import NDArray + + from dimos.msgs.geometry_msgs import Pose, PoseStamped + from dimos.teleop.quest.quest_types import QuestButtons + +logger = setup_logger() + + +@dataclass +class TeleopIKTaskConfig: + """Configuration for teleop IK task. + + Attributes: + joint_names: List of joint names this task controls (must match model DOF) + model_path: Path to URDF or MJCF file for IK solver + ee_joint_id: End-effector joint ID in the kinematic chain + priority: Priority for arbitration (higher wins) + timeout: If no command received for this many seconds, go inactive (0 = never) + max_joint_delta_deg: Maximum allowed joint change per tick (safety limit) + hand: "left" or "right" — which controller's primary button to listen to + """ + + joint_names: list[str] + model_path: str | Path + ee_joint_id: int + priority: int = 10 + timeout: float = 0.5 + max_joint_delta_deg: float = 5.0 # ~500°/s at 100Hz + hand: str = "" + + +class TeleopIKTask(BaseControlTask): + """Teleop cartesian control task with internal Pinocchio IK solver. + + Accepts streaming cartesian delta poses via on_cartesian_command() and computes IK + internally to output joint commands. Deltas are applied relative to the EE pose + captured at engage time (first compute). + + Uses current joint state from CoordinatorState as IK warm-start for fast convergence. + Outputs JointCommandOutput and participates in joint-level arbitration. + + Example: + >>> from dimos.utils.data import get_data + >>> piper_path = get_data("piper_description") + >>> task = TeleopIKTask( + ... name="teleop_arm", + ... config=TeleopIKTaskConfig( + ... joint_names=["joint1", "joint2", "joint3", "joint4", "joint5", "joint6"], + ... model_path=piper_path / "mujoco_model" / "piper_no_gripper_description.xml", + ... ee_joint_id=6, + ... priority=10, + ... timeout=0.5, + ... ), + ... ) + >>> coordinator.add_task(task) + >>> task.start() + >>> + >>> # From teleop callback: + >>> task.on_cartesian_command(delta_pose, t_now=time.perf_counter()) + """ + + def __init__(self, name: str, config: TeleopIKTaskConfig) -> None: + """Initialize teleop IK task. + + Args: + name: Unique task name + config: Task configuration + """ + if not config.joint_names: + raise ValueError(f"TeleopIKTask '{name}' requires at least one joint") + if not config.model_path: + raise ValueError(f"TeleopIKTask '{name}' requires model_path for IK solver") + + self._name = name + self._config = config + self._joint_names = frozenset(config.joint_names) + self._joint_names_list = list(config.joint_names) + self._num_joints = len(config.joint_names) + + # Create IK solver from model + self._ik = PinocchioIK.from_model_path(config.model_path, config.ee_joint_id) + + # Validate DOF matches joint names + if self._ik.nq != self._num_joints: + logger.warning( + f"TeleopIKTask {name}: model DOF ({self._ik.nq}) != " + f"joint_names count ({self._num_joints})" + ) + + # Thread-safe target state + self._lock = threading.Lock() + self._target_pose: Pose | PoseStamped | None = None + self._last_update_time: float = 0.0 + self._active = False + + # Initial EE pose for delta application + self._initial_ee_pose: pinocchio.SE3 | None = None + self._prev_primary: bool = False + + logger.info( + f"TeleopIKTask {name} initialized with model: {config.model_path}, " + f"ee_joint_id={config.ee_joint_id}, joints={config.joint_names}" + ) + + @property + def name(self) -> str: + """Unique task identifier.""" + return self._name + + def claim(self) -> ResourceClaim: + """Declare resource requirements.""" + return ResourceClaim( + joints=self._joint_names, + priority=self._config.priority, + mode=ControlMode.SERVO_POSITION, + ) + + def is_active(self) -> bool: + """Check if task should run this tick.""" + with self._lock: + return self._active and self._target_pose is not None + + def compute(self, state: CoordinatorState) -> JointCommandOutput | None: + """Compute IK and output joint positions. + + Args: + state: Current coordinator state (contains joint positions for IK warm-start) + + Returns: + JointCommandOutput with positions, or None if inactive/timed out/IK failed + """ + with self._lock: + if not self._active or self._target_pose is None: + return None + + # Timeout safety: stop if teleop stream drops + if self._config.timeout > 0: + time_since_update = state.t_now - self._last_update_time + if time_since_update > self._config.timeout: + logger.warning( + f"TeleopIKTask {self._name} timed out " + f"(no update for {time_since_update:.3f}s)" + ) + self._target_pose = None + self._active = False + return None + raw_pose = self._target_pose + + # Convert to SE3 right before use + delta_se3 = pose_to_se3(raw_pose) + # Capture initial EE pose if not set (first command after engage) + with self._lock: + need_capture = self._initial_ee_pose is None + + if need_capture: + q_current = self._get_current_joints(state) + if q_current is None: + logger.debug( + f"TeleopIKTask {self._name}: cannot capture initial pose, joint state unavailable" + ) + return None + initial_pose = self._ik.forward_kinematics(q_current) + with self._lock: + self._initial_ee_pose = initial_pose + + # Apply delta to initial pose: target = initial + delta + with self._lock: + if self._initial_ee_pose is None: + return None + target_pose = pinocchio.SE3( + delta_se3.rotation @ self._initial_ee_pose.rotation, + self._initial_ee_pose.translation + delta_se3.translation, + ) + + # Get current joint positions for IK warm-start + q_current = self._get_current_joints(state) + if q_current is None: + logger.debug(f"TeleopIKTask {self._name}: missing joint state for IK warm-start") + return None + + # Compute IK + q_solution, converged, final_error = self._ik.solve(target_pose, q_current) + # Use the solution even if it didn't fully converge + if not converged: + logger.debug( + f"TeleopIKTask {self._name}: IK did not converge " + f"(error={final_error:.4f}), using partial solution" + ) + # Safety: reject if any joint would jump too far in one tick + if not check_joint_delta(q_solution, q_current, self._config.max_joint_delta_deg): + logger.warning( + f"TeleopIKTask {self._name}: joint delta exceeds " + f"{self._config.max_joint_delta_deg}°, rejecting solution" + ) + return None + + positions = q_solution.flatten().tolist() + return JointCommandOutput( + joint_names=self._joint_names_list, + positions=positions, + mode=ControlMode.SERVO_POSITION, + ) + + def _get_current_joints(self, state: CoordinatorState) -> NDArray[np.floating[Any]] | None: + """Get current joint positions from coordinator state.""" + positions = [] + for joint_name in self._joint_names_list: + pos = state.joints.get_position(joint_name) + if pos is None: + return None + positions.append(pos) + return np.array(positions) + + def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: + """Handle preemption by higher-priority task. + + Args: + by_task: Name of preempting task + joints: Joints that were preempted + """ + if joints & self._joint_names: + logger.warning(f"TeleopIKTask {self._name} preempted by {by_task} on joints {joints}") + + # ========================================================================= + # Task-specific methods + # ========================================================================= + + def on_buttons(self, msg: QuestButtons) -> bool: + """Press-and-hold engage: hold primary button to track, release to stop. + + Checks only the button matching self._config.hand (left_x or right_a). + If hand is not set, listens to both. + """ + hand = self._config.hand + if hand == "left": + primary = msg.left_x + elif hand == "right": + primary = msg.right_a + else: + primary = msg.left_x or msg.right_a + + if primary and not self._prev_primary: + # Rising edge: reset initial pose so compute() recaptures + logger.info(f"TeleopIKTask {self._name}: engage") + with self._lock: + self._initial_ee_pose = None + elif not primary and self._prev_primary: + # Falling edge: stop tracking + logger.info(f"TeleopIKTask {self._name}: disengage") + with self._lock: + self._target_pose = None + self._initial_ee_pose = None + self._prev_primary = primary + return True + + def on_cartesian_command(self, pose: Pose | PoseStamped, t_now: float) -> bool: + """Handle incoming cartesian command (delta pose from teleop)""" + with self._lock: + self._target_pose = pose # Store raw, convert to SE3 in compute() + self._last_update_time = t_now + self._active = True + + return True + + def start(self) -> None: + """Activate the task (start accepting and outputting commands).""" + with self._lock: + self._active = True + logger.info(f"TeleopIKTask {self._name} started") + + def stop(self) -> None: + """Deactivate the task (stop outputting commands).""" + with self._lock: + self._active = False + logger.info(f"TeleopIKTask {self._name} stopped") + + +__all__ = [ + "TeleopIKTask", + "TeleopIKTaskConfig", +] diff --git a/dimos/control/tasks/trajectory_task.py b/dimos/control/tasks/trajectory_task.py index 676f1dac70..4d2eaa188b 100644 --- a/dimos/control/tasks/trajectory_task.py +++ b/dimos/control/tasks/trajectory_task.py @@ -26,8 +26,8 @@ from dataclasses import dataclass from dimos.control.task import ( + BaseControlTask, ControlMode, - ControlTask, CoordinatorState, JointCommandOutput, ResourceClaim, @@ -51,7 +51,7 @@ class JointTrajectoryTaskConfig: priority: int = 10 -class JointTrajectoryTask(ControlTask): +class JointTrajectoryTask(BaseControlTask): """Passive trajectory execution task. Unlike JointTrajectoryController which owns a thread, this task diff --git a/dimos/control/tasks/velocity_task.py b/dimos/control/tasks/velocity_task.py index 23c56b0511..163bc09827 100644 --- a/dimos/control/tasks/velocity_task.py +++ b/dimos/control/tasks/velocity_task.py @@ -29,8 +29,8 @@ import threading from dimos.control.task import ( + BaseControlTask, ControlMode, - ControlTask, CoordinatorState, JointCommandOutput, ResourceClaim, @@ -57,7 +57,7 @@ class JointVelocityTaskConfig: zero_on_timeout: bool = True # Send zeros to stop motion -class JointVelocityTask(ControlTask): +class JointVelocityTask(BaseControlTask): """Streaming joint velocity control for joystick/force feedback. Accepts target velocities via set_velocities() or set_velocities_by_name() diff --git a/dimos/manipulation/planning/kinematics/__init__.py b/dimos/manipulation/planning/kinematics/__init__.py index 588fccbd72..dacd2007cb 100644 --- a/dimos/manipulation/planning/kinematics/__init__.py +++ b/dimos/manipulation/planning/kinematics/__init__.py @@ -43,5 +43,9 @@ DrakeOptimizationIK, ) from dimos.manipulation.planning.kinematics.jacobian_ik import JacobianIK +from dimos.manipulation.planning.kinematics.pinocchio_ik import ( + PinocchioIK, + PinocchioIKConfig, +) -__all__ = ["DrakeOptimizationIK", "JacobianIK"] +__all__ = ["DrakeOptimizationIK", "JacobianIK", "PinocchioIK", "PinocchioIKConfig"] diff --git a/dimos/manipulation/planning/kinematics/pinocchio_ik.py b/dimos/manipulation/planning/kinematics/pinocchio_ik.py new file mode 100644 index 0000000000..4224dda556 --- /dev/null +++ b/dimos/manipulation/planning/kinematics/pinocchio_ik.py @@ -0,0 +1,291 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pinocchio-based inverse kinematics solver. + +Standalone IK solver using Pinocchio for forward kinematics and Jacobian +computation. Uses damped least-squares (Levenberg-Marquardt) for robust +convergence near singularities. + +Unlike JacobianIK (which uses the WorldSpec interface), this solver operates +directly on a Pinocchio model. This makes it suitable for lightweight, +real-time IK in control tasks where a full WorldSpec is not needed. + +Usage: + >>> from dimos.manipulation.planning.kinematics.pinocchio_ik import PinocchioIK + >>> ik = PinocchioIK.from_model_path("robot.urdf", ee_joint_id=6) + >>> q_solution, converged, error = ik.solve(target_se3, q_init) + >>> ee_pose = ik.forward_kinematics(q_solution) +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import numpy as np +from numpy.linalg import norm, solve +import pinocchio # type: ignore[import-untyped] + +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from numpy.typing import NDArray + + from dimos.msgs.geometry_msgs import Pose, PoseStamped + +logger = setup_logger() + + +# ============================================================================= +# Configuration +# ============================================================================= + + +@dataclass +class PinocchioIKConfig: + """Configuration for the Pinocchio IK solver. + + Attributes: + max_iter: Maximum IK solver iterations + eps: Convergence threshold (SE3 log-error norm) + damp: Damping factor for singularity handling (higher = more stable) + dt: Integration step size + max_velocity: Max joint velocity per iteration (rad/s), clamps near singularities + """ + + max_iter: int = 100 + eps: float = 1e-4 + damp: float = 1e-2 + dt: float = 1.0 + max_velocity: float = 10.0 + + +# ============================================================================= +# PinocchioIK Solver +# ============================================================================= + + +class PinocchioIK: + """Pinocchio-based damped least-squares IK solver. + + Loads a URDF or MJCF model and provides: + - solve(): Damped least-squares IK from SE3 target + - forward_kinematics(): FK from joint angles to EE pose + + Thread safety: NOT thread-safe. Each caller should use its own instance + or protect calls with an external lock. Control tasks typically hold a + lock around compute() which covers IK calls. + + Example: + >>> ik = PinocchioIK.from_model_path("robot.urdf", ee_joint_id=6) + >>> target = pose_to_se3(pose_stamped) + >>> q, converged, err = ik.solve(target, q_current) + >>> if converged: + ... ee = ik.forward_kinematics(q) + """ + + def __init__( + self, + model: pinocchio.Model, + data: pinocchio.Data, + ee_joint_id: int, + config: PinocchioIKConfig | None = None, + ) -> None: + """Initialize solver with an existing Pinocchio model. + + Args: + model: Pinocchio model + data: Pinocchio data (created from model) + ee_joint_id: End-effector joint ID in the kinematic chain + config: Solver configuration (uses defaults if None) + """ + self._model = model + self._data = data + self._ee_joint_id = ee_joint_id + self._config = config or PinocchioIKConfig() + + @classmethod + def from_model_path( + cls, + model_path: str | Path, + ee_joint_id: int, + ) -> PinocchioIK: + """Create solver by loading a URDF or MJCF file. + + Args: + model_path: Path to URDF (.urdf) or MJCF (.xml) file + ee_joint_id: End-effector joint ID in the kinematic chain + + Returns: + Configured PinocchioIK instance + + Raises: + FileNotFoundError: If model file doesn't exist + """ + path = Path(str(model_path)) + if not path.exists(): + raise FileNotFoundError(f"Model file not found: {path}") + + if path.suffix == ".xml": + model = pinocchio.buildModelFromMJCF(str(path)) + else: + model = pinocchio.buildModelFromUrdf(str(path)) + + data = model.createData() + return cls(model, data, ee_joint_id) + + @property + def model(self) -> pinocchio.Model: + """The Pinocchio model.""" + return self._model + + @property + def nq(self) -> int: + """Number of configuration variables (DOF).""" + return int(self._model.nq) + + @property + def ee_joint_id(self) -> int: + """End-effector joint ID.""" + return self._ee_joint_id + + # ========================================================================= + # Core IK + # ========================================================================= + + def solve( + self, + target_pose: pinocchio.SE3, + q_init: NDArray[np.floating[Any]], + config: PinocchioIKConfig | None = None, + ) -> tuple[NDArray[np.floating[Any]], bool, float]: + """Solve IK using damped least-squares (Levenberg-Marquardt). + + Args: + target_pose: Target end-effector pose as SE3 + q_init: Initial joint configuration (warm-start) + config: Override solver config for this call (uses instance config if None) + + Returns: + Tuple of (joint_angles, converged, final_error) + """ + cfg = config or self._config + q = q_init.copy() + final_err = float("inf") + + for _ in range(cfg.max_iter): + pinocchio.forwardKinematics(self._model, self._data, q) + iMd = self._data.oMi[self._ee_joint_id].actInv(target_pose) + + err = pinocchio.log(iMd).vector + final_err = float(norm(err)) + if final_err < cfg.eps: + return q, True, final_err + + J = pinocchio.computeJointJacobian(self._model, self._data, q, self._ee_joint_id) + J = -np.dot(pinocchio.Jlog6(iMd.inverse()), J) + v = -J.T.dot(solve(J.dot(J.T) + cfg.damp * np.eye(6), err)) + + # Clamp velocity to prevent explosion near singularities + v_norm = norm(v) + if v_norm > cfg.max_velocity: + v = v * (cfg.max_velocity / v_norm) + + q = pinocchio.integrate(self._model, q, v * cfg.dt) + + return q, False, final_err + + # ========================================================================= + # Forward Kinematics + # ========================================================================= + + def forward_kinematics(self, joint_positions: NDArray[np.floating[Any]]) -> pinocchio.SE3: + """Compute end-effector pose from joint positions. + + Args: + joint_positions: Joint angles in radians + + Returns: + End-effector pose as SE3 + """ + pinocchio.forwardKinematics(self._model, self._data, joint_positions) + return self._data.oMi[self._ee_joint_id].copy() + + +# ============================================================================= +# Pose Conversion Helpers +# ============================================================================= + + +def pose_to_se3(pose: Pose | PoseStamped) -> pinocchio.SE3: + """Convert Pose or PoseStamped to pinocchio SE3""" + + position = np.array([pose.x, pose.y, pose.z]) + quat = pose.orientation + rotation = pinocchio.Quaternion(quat.w, quat.x, quat.y, quat.z).toRotationMatrix() + return pinocchio.SE3(rotation, position) + + +# ============================================================================= +# Safety Utilities +# ============================================================================= + + +def check_joint_delta( + q_new: NDArray[np.floating[Any]], + q_current: NDArray[np.floating[Any]], + max_delta_deg: float, +) -> bool: + """Check if joint position change is within safety limits. + + Args: + q_new: Proposed joint positions (radians) + q_current: Current joint positions (radians) + max_delta_deg: Maximum allowed change per joint (degrees) + + Returns: + True if all joint deltas are within limits + """ + max_delta_rad = np.radians(max_delta_deg) + joint_deltas = np.abs(q_new - q_current) + return bool(np.all(joint_deltas <= max_delta_rad)) + + +def get_worst_joint_delta( + q_new: NDArray[np.floating[Any]], + q_current: NDArray[np.floating[Any]], +) -> tuple[int, float]: + """Find the joint with the largest position change. + + Args: + q_new: Proposed joint positions (radians) + q_current: Current joint positions (radians) + + Returns: + Tuple of (joint_index, delta_in_degrees) + """ + joint_deltas = np.abs(q_new - q_current) + worst_idx = int(np.argmax(joint_deltas)) + return worst_idx, float(np.degrees(joint_deltas[worst_idx])) + + +__all__ = [ + "PinocchioIK", + "PinocchioIKConfig", + "check_joint_delta", + "get_worst_joint_delta", + "pose_to_se3", +] diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index ed8e88504b..4601dff7c9 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -17,7 +17,10 @@ all_blueprints = { "arm-teleop": "dimos.teleop.blueprints:arm_teleop", + "arm-teleop-dual": "dimos.teleop.blueprints:arm_teleop_dual", + "arm-teleop-piper": "dimos.teleop.blueprints:arm_teleop_piper", "arm-teleop-visualizing": "dimos.teleop.blueprints:arm_teleop_visualizing", + "arm-teleop-xarm6": "dimos.teleop.blueprints:arm_teleop_xarm6", "coordinator-basic": "dimos.control.blueprints:coordinator_basic", "coordinator-cartesian-ik-mock": "dimos.control.blueprints:coordinator_cartesian_ik_mock", "coordinator-cartesian-ik-piper": "dimos.control.blueprints:coordinator_cartesian_ik_piper", @@ -27,6 +30,8 @@ "coordinator-mock": "dimos.control.blueprints:coordinator_mock", "coordinator-piper": "dimos.control.blueprints:coordinator_piper", "coordinator-piper-xarm": "dimos.control.blueprints:coordinator_piper_xarm", + "coordinator-teleop-dual": "dimos.control.blueprints:coordinator_teleop_dual", + "coordinator-teleop-piper": "dimos.control.blueprints:coordinator_teleop_piper", "coordinator-teleop-xarm6": "dimos.control.blueprints:coordinator_teleop_xarm6", "coordinator-velocity-xarm6": "dimos.control.blueprints:coordinator_velocity_xarm6", "coordinator-xarm6": "dimos.control.blueprints:coordinator_xarm6", diff --git a/dimos/teleop/blueprints.py b/dimos/teleop/blueprints.py index bca504b1d8..605c471243 100644 --- a/dimos/teleop/blueprints.py +++ b/dimos/teleop/blueprints.py @@ -15,6 +15,11 @@ """Teleop blueprints for testing and deployment.""" +from dimos.control.blueprints import ( + coordinator_teleop_dual, + coordinator_teleop_piper, + coordinator_teleop_xarm6, +) from dimos.core.blueprints import autoconnect from dimos.core.transport import LCMTransport from dimos.msgs.geometry_msgs import PoseStamped @@ -25,7 +30,7 @@ # Quest Teleop Blueprints # ----------------------------------------------------------------------------- -# Arm teleop with toggle-based engage +# Arm teleop with press-and-hold engage arm_teleop = autoconnect( arm_teleop_module(), ).transports( @@ -48,4 +53,62 @@ ) -__all__ = ["arm_teleop", "arm_teleop_visualizing"] +# ----------------------------------------------------------------------------- +# Teleop wired to Coordinator (TeleopIK) +# ----------------------------------------------------------------------------- + +# Single XArm6 teleop: right controller -> xarm6 +# Usage: dimos run arm-teleop-xarm6 + +arm_teleop_xarm6 = autoconnect( + arm_teleop_module(task_names={"right": "teleop_xarm"}), + coordinator_teleop_xarm6, +).transports( + { + ("right_controller_output", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("buttons", QuestButtons): LCMTransport("/teleop/buttons", QuestButtons), + } +) + + +# Single Piper teleop: left controller -> piper arm +# Usage: dimos run arm-teleop-piper +arm_teleop_piper = autoconnect( + arm_teleop_module(task_names={"left": "teleop_piper"}), + coordinator_teleop_piper, +).transports( + { + ("left_controller_output", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("buttons", QuestButtons): LCMTransport("/teleop/buttons", QuestButtons), + } +) + + +# Dual arm teleop: right -> piper, left -> xarm6 (TeleopIK) +arm_teleop_dual = autoconnect( + arm_teleop_module(task_names={"right": "teleop_piper", "left": "teleop_xarm"}), + coordinator_teleop_dual, +).transports( + { + ("right_controller_output", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("left_controller_output", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("buttons", QuestButtons): LCMTransport("/teleop/buttons", QuestButtons), + } +) + + +__all__ = [ + "arm_teleop", + "arm_teleop_dual", + "arm_teleop_piper", + "arm_teleop_visualizing", + "arm_teleop_xarm6", +] diff --git a/dimos/teleop/quest/quest_extensions.py b/dimos/teleop/quest/quest_extensions.py index 1131ab0dec..b121e431f6 100644 --- a/dimos/teleop/quest/quest_extensions.py +++ b/dimos/teleop/quest/quest_extensions.py @@ -16,15 +16,16 @@ """Quest teleop module extensions and subclasses. Available subclasses: - - ArmTeleopModule: Per-hand toggle engage (X/A press to toggle) + - ArmTeleopModule: Per-hand press-and-hold engage (X/A hold to track), task name routing - TwistTeleopModule: Outputs Twist instead of PoseStamped - - VisualizingTeleopModule: Adds Rerun visualization (uses toggle engage) + - VisualizingTeleopModule: Adds Rerun visualization (inherits press-and-hold engage) """ -from dataclasses import dataclass -from typing import Any +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any -from dimos.core import Out from dimos.msgs.geometry_msgs import PoseStamped, TwistStamped from dimos.teleop.quest.quest_teleop_module import Hand, QuestTeleopConfig, QuestTeleopModule from dimos.teleop.utils.teleop_visualization import ( @@ -32,6 +33,9 @@ visualize_pose, ) +if TYPE_CHECKING: + from dimos.core import Out + @dataclass class TwistTeleopConfig(QuestTeleopConfig): @@ -56,18 +60,18 @@ class TwistTeleopModule(QuestTeleopModule): """ default_config = TwistTeleopConfig + config: TwistTeleopConfig left_twist: Out[TwistStamped] right_twist: Out[TwistStamped] def _publish_msg(self, hand: Hand, output_msg: PoseStamped) -> None: """Convert PoseStamped to TwistStamped, apply scaling, and publish.""" - cfg: TwistTeleopConfig = self.config # type: ignore[assignment] twist = TwistStamped( ts=output_msg.ts, frame_id=output_msg.frame_id, - linear=output_msg.position * cfg.linear_scale, - angular=output_msg.orientation.to_euler() * cfg.angular_scale, + linear=output_msg.position * self.config.linear_scale, + angular=output_msg.orientation.to_euler() * self.config.angular_scale, ) if hand == Hand.LEFT: self.left_twist.publish(twist) @@ -75,11 +79,28 @@ def _publish_msg(self, hand: Hand, output_msg: PoseStamped) -> None: self.right_twist.publish(twist) +@dataclass +class ArmTeleopConfig(QuestTeleopConfig): + """Configuration for ArmTeleopModule. + + Attributes: + task_names: Mapping of Hand -> coordinator task name. Used to set + frame_id on output PoseStamped so the coordinator routes each + hand's commands to the correct TeleopIKTask. + """ + + task_names: dict[str, str] = field(default_factory=dict) + + class ArmTeleopModule(QuestTeleopModule): - """Quest teleop with per-hand toggle engage. + """Quest teleop with per-hand press-and-hold engage and task name routing. Each controller's primary button (X for left, A for right) - toggles that hand's engage state independently. + engages that hand while held, disengages on release. + + When task_names is configured, output PoseStamped messages have their + frame_id set to the task name, enabling the coordinator to route + each hand's commands to the correct TeleopIKTask. Outputs: - left_controller_output: PoseStamped (inherited) @@ -87,24 +108,27 @@ class ArmTeleopModule(QuestTeleopModule): - buttons: QuestButtons (inherited) """ + default_config = ArmTeleopConfig + config: ArmTeleopConfig + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) - self._prev_primary: dict[Hand, bool] = {Hand.LEFT: False, Hand.RIGHT: False} - def _handle_engage(self) -> None: - """Toggle per-hand engage on primary button rising edge.""" - for hand in Hand: - controller = self._controllers.get(hand) - if controller is None: - continue + self._task_names: dict[Hand, str] = { + Hand[k.upper()]: v for k, v in self.config.task_names.items() + } - pressed = controller.primary - if pressed and not self._prev_primary[hand]: - if self._is_engaged[hand]: - self._disengage(hand) - else: - self._engage(hand) - self._prev_primary[hand] = pressed + def _publish_msg(self, hand: Hand, output_msg: PoseStamped) -> None: + """Stamp frame_id with task name and publish.""" + task_name = self._task_names.get(hand) + if task_name: + output_msg = PoseStamped( + position=output_msg.position, + orientation=output_msg.orientation, + ts=output_msg.ts, + frame_id=task_name, + ) + super()._publish_msg(hand, output_msg) class VisualizingTeleopModule(ArmTeleopModule): @@ -147,6 +171,7 @@ def _get_output_pose(self, hand: Hand) -> PoseStamped | None: visualizing_teleop_module = VisualizingTeleopModule.blueprint __all__ = [ + "ArmTeleopConfig", "ArmTeleopModule", "TwistTeleopModule", "VisualizingTeleopModule", diff --git a/dimos/utils/data.py b/dimos/utils/data.py index 094d4a6a7c..d14ac04730 100644 --- a/dimos/utils/data.py +++ b/dimos/utils/data.py @@ -262,3 +262,75 @@ def get_data(name: str | Path) -> Path: if nested_path: return archive_path / nested_path return archive_path + + +class LfsPath(type(Path())): # type: ignore[misc] + """ + A Path subclass that lazily downloads LFS data when accessed. + + This is useful for both lazy loading and differentiating between LFS paths and regular paths. + + This class wraps pathlib.Path and ensures that get_data() is called + before any meaningful filesystem operation, making LFS data lazy-loaded. + + Usage: + path = LfsPath("sample_data") + # No download yet + + with path.open('rb') as f: # Downloads now if needed + data = f.read() + + # Or use any Path operation: + if path.exists(): # Downloads now if needed + files = list(path.iterdir()) + """ + + def __new__(cls, filename: str | Path) -> "LfsPath": + # Create instance with a placeholder path to satisfy Path.__new__ + # We use "." as a dummy path that always exists + instance: LfsPath = super().__new__(cls, ".") # type: ignore[call-arg] + # Store the actual filename as an instance attribute + object.__setattr__(instance, "_lfs_filename", filename) + object.__setattr__(instance, "_lfs_resolved_cache", None) + return instance + + def _ensure_downloaded(self) -> Path: + """Ensure the LFS data is downloaded and return the resolved path.""" + cache: Path | None = object.__getattribute__(self, "_lfs_resolved_cache") + if cache is None: + filename = object.__getattribute__(self, "_lfs_filename") + cache = get_data(filename) + object.__setattr__(self, "_lfs_resolved_cache", cache) + return cache + + def __getattribute__(self, name: str) -> object: + # During Path.__new__(), _lfs_filename hasn't been set yet. + # Fall through to normal Path behavior until construction is complete. + try: + object.__getattribute__(self, "_lfs_filename") + except AttributeError: + return object.__getattribute__(self, name) + + # After construction, allow access to our internal attributes directly + if name in ("_lfs_filename", "_lfs_resolved_cache", "_ensure_downloaded"): + return object.__getattribute__(self, name) + + # For all other attributes, ensure download first then delegate to resolved path + resolved = object.__getattribute__(self, "_ensure_downloaded")() + return getattr(resolved, name) + + def __str__(self) -> str: + """String representation returns resolved path.""" + return str(self._ensure_downloaded()) + + def __fspath__(self) -> str: + """Return filesystem path, downloading from LFS if needed.""" + return str(self._ensure_downloaded()) + + def __truediv__(self, other: object) -> Path: + """Path division operator - returns resolved path.""" + return self._ensure_downloaded() / other # type: ignore[operator, return-value] + + def __rtruediv__(self, other: object) -> Path: + """Reverse path division operator.""" + return other / self._ensure_downloaded() # type: ignore[operator, return-value] diff --git a/dimos/utils/test_data.py b/dimos/utils/test_data.py index ba135fe255..e5be4307c7 100644 --- a/dimos/utils/test_data.py +++ b/dimos/utils/test_data.py @@ -14,11 +14,13 @@ import hashlib import os +from pathlib import Path import subprocess import pytest from dimos.utils import data +from dimos.utils.data import LfsPath @pytest.mark.heavy @@ -128,3 +130,220 @@ def test_pull_dir() -> None: with file.open("rb") as f: sha256 = hashlib.sha256(f.read()).hexdigest() assert sha256 == expected_hash + + +# ============================================================================ +# LfsPath Tests +# ============================================================================ + + +def test_lfs_path_lazy_creation() -> None: + """Test that creating LfsPath doesn't trigger download.""" + lfs_path = LfsPath("test_data_file") + + # Check that the object is created + assert isinstance(lfs_path, LfsPath) + + # Check that cache is None (not downloaded yet) + cache = object.__getattribute__(lfs_path, "_lfs_resolved_cache") + assert cache is None + + # Check that filename is stored + filename = object.__getattribute__(lfs_path, "_lfs_filename") + assert filename == "test_data_file" + + +def test_lfs_path_safe_attributes() -> None: + """Test that safe attributes don't trigger download.""" + lfs_path = LfsPath("test_data_file") + + # Access safe attributes directly + filename = object.__getattribute__(lfs_path, "_lfs_filename") + cache = object.__getattribute__(lfs_path, "_lfs_resolved_cache") + ensure_fn = object.__getattribute__(lfs_path, "_ensure_downloaded") + + # Verify they exist and cache is still None + assert filename == "test_data_file" + assert cache is None + assert callable(ensure_fn) + + +def test_lfs_path_no_download_on_creation() -> None: + """Test that LfsPath construction doesn't trigger download. + + Path(lfs_path) extracts internal _raw_paths (\".\") and does NOT + call __fspath__, so it won't trigger download. The correct way to + convert is Path(str(lfs_path)), which triggers __str__ -> download. + """ + lfs_path = LfsPath("nonexistent_file") + + # Construction should not trigger download + cache = object.__getattribute__(lfs_path, "_lfs_resolved_cache") + assert cache is None + + # Accessing internal LfsPath attributes should not trigger download + filename = object.__getattribute__(lfs_path, "_lfs_filename") + assert filename == "nonexistent_file" + assert cache is None + + +@pytest.mark.heavy +def test_lfs_path_with_real_file() -> None: + """Test LfsPath with a real small LFS file.""" + # Use a small existing LFS file + filename = "three_paths.png" + lfs_path = LfsPath(filename) + + # Initially, cache should be None + cache = object.__getattribute__(lfs_path, "_lfs_resolved_cache") + assert cache is None + + # Access a Path method - this should trigger download + exists = lfs_path.exists() + + # Now cache should be populated + cache = object.__getattribute__(lfs_path, "_lfs_resolved_cache") + assert cache is not None + assert isinstance(cache, Path) + + # File should exist after download + assert exists is True + + # Should be able to get file stats + stat_result = lfs_path.stat() + assert stat_result.st_size > 0 + + # Should be able to read the file + content = lfs_path.read_bytes() + assert len(content) > 0 + + # Verify it's a PNG file + assert content.startswith(b"\x89PNG") + + +@pytest.mark.heavy +def test_lfs_path_unload_and_reload() -> None: + """Test unloading and reloading an LFS file.""" + filename = "three_paths.png" + data_dir = data.get_data_dir() + file_path = data_dir / filename + + # Clean up if file already exists + if file_path.exists(): + file_path.unlink() + + # Create LfsPath + lfs_path = LfsPath(filename) + + # Verify file doesn't exist yet + assert not file_path.exists() + + # Access the file - this triggers download + content_first = lfs_path.read_bytes() + assert file_path.exists() + + # Get hash of first download + hash_first = hashlib.sha256(content_first).hexdigest() + + # Now unload (delete the file) + file_path.unlink() + assert not file_path.exists() + + # Create a new LfsPath instance for the same file + lfs_path_2 = LfsPath(filename) + + # Access the file again - should re-download + content_second = lfs_path_2.read_bytes() + assert file_path.exists() + + # Get hash of second download + hash_second = hashlib.sha256(content_second).hexdigest() + + # Hashes should match (same file downloaded) + assert hash_first == hash_second + + # Content should be identical + assert content_first == content_second + + +@pytest.mark.heavy +def test_lfs_path_operations() -> None: + """Test various Path operations with LfsPath.""" + filename = "three_paths.png" + lfs_path = LfsPath(filename) + + # Test is_file + assert lfs_path.is_file() is True + assert lfs_path.is_dir() is False + + # Test absolute path + abs_path = lfs_path.absolute() + assert abs_path.is_absolute() + + # Test resolve + resolved = lfs_path.resolve() + assert resolved.is_absolute() + + # Test string conversion + path_str = str(lfs_path) + assert isinstance(path_str, str) + assert filename in path_str + + # Test __fspath__ + fspath_result = os.fspath(lfs_path) + assert isinstance(fspath_result, str) + assert filename in fspath_result + + +@pytest.mark.heavy +def test_lfs_path_division_operator() -> None: + """Test path division operator with LfsPath.""" + # Use a directory for testing + lfs_path = LfsPath("three_paths.png") + + # Test truediv - this should trigger download and return resolved path + result = lfs_path / "subpath" + assert isinstance(result, Path) + + # The result should be the resolved path with subpath appended + assert "three_paths.png" in str(result) + + +@pytest.mark.heavy +def test_lfs_path_multiple_instances() -> None: + """Test that multiple LfsPath instances for same file work correctly.""" + filename = "three_paths.png" + + # Create two separate instances + lfs_path_1 = LfsPath(filename) + lfs_path_2 = LfsPath(filename) + + # Both should start with None cache + cache_1 = object.__getattribute__(lfs_path_1, "_lfs_resolved_cache") + cache_2 = object.__getattribute__(lfs_path_2, "_lfs_resolved_cache") + assert cache_1 is None + assert cache_2 is None + + # Access file through first instance + content_1 = lfs_path_1.read_bytes() + + # First instance should have cache + cache_1 = object.__getattribute__(lfs_path_1, "_lfs_resolved_cache") + assert cache_1 is not None + + # Second instance cache should still be None (separate instance) + cache_2 = object.__getattribute__(lfs_path_2, "_lfs_resolved_cache") + assert cache_2 is None + + # Access through second instance + content_2 = lfs_path_2.read_bytes() + + # Now second instance should also have cache + cache_2 = object.__getattribute__(lfs_path_2, "_lfs_resolved_cache") + assert cache_2 is not None + + # Content should be the same + assert content_1 == content_2 + + # Both caches should point to the same file + assert cache_1 == cache_2