diff --git a/docs/source/overview/sim/solvers/opw_solver.md b/docs/source/overview/sim/solvers/opw_solver.md index f4dd1bf1..8f7bff07 100644 --- a/docs/source/overview/sim/solvers/opw_solver.md +++ b/docs/source/overview/sim/solvers/opw_solver.md @@ -7,7 +7,7 @@ * Analytical IK for OPW-parameterized 6-DOF manipulators * Supports both parallel and offset axes, with custom axis flipping * Fast batch computation for multiple target poses -* Configurable for CPU (py_opw_kinematics) and GPU (warp) backends +* Configurable for CPU and GPU backends * Flexible configuration via `OPWSolverCfg` * Strict enforcement of joint limits * Forward kinematics (FK) and multiple IK solution branches @@ -107,5 +107,4 @@ solver = OPWSolver(cfg, device="cuda") ## References * [OPW Kinematics Paper](https://doi.org/10.1109/TRO.2017.2776312) -* [py_opw_kinematics Documentation](https://github.com/UM-ARM-Lab/py_opw_kinematics) * [warp Documentation](https://github.com/NVIDIA/warp) diff --git a/embodichain/lab/sim/solvers/opw_solver.py b/embodichain/lab/sim/solvers/opw_solver.py index 1252ec38..4d8f9047 100644 --- a/embodichain/lab/sim/solvers/opw_solver.py +++ b/embodichain/lab/sim/solvers/opw_solver.py @@ -16,6 +16,9 @@ import torch import numpy as np +import warp as wp +import polars as pl + from itertools import product from typing import Union, Tuple, Any, Literal, TYPE_CHECKING from scipy.spatial.transform import Rotation @@ -30,15 +33,6 @@ wp_vec6f, ) from embodichain.utils.device_utils import standardize_device_string -import warp as wp -import polars as pl - -try: - from py_opw_kinematics import KinematicModel, Robot, EulerConvention -except ImportError: - raise ImportError( - "py_opw_kinematics not installed. Install with `pip install py_opw_kinematics==0.1.6`" - ) if TYPE_CHECKING: @@ -121,45 +115,18 @@ def __init__(self, cfg: OPWSolverCfg, device: str = "cpu", **kwargs): """ super().__init__(cfg=cfg, device=device, **kwargs) - if self.device.type == "cpu": - self._init_py_opw_kinematics_solver(cfg, **kwargs) - else: - self._init_warp_solver(cfg, **kwargs) + # Note: the warp-based solver is currently the only active backend and + # is initialized regardless of the selected device. + self._init_warp_solver(cfg, **kwargs) self.set_tcp(np.eye(4)) - def _init_py_opw_kinematics_solver(self, cfg: OPWSolverCfg, **kwargs) -> None: - self.kinematic_model = KinematicModel( - a1=cfg.a1, - a2=cfg.a2, - b=cfg.b, - c1=cfg.c1, - c2=cfg.c2, - c3=cfg.c3, - c4=cfg.c4, - offsets=cfg.offsets, - flip_axes=cfg.flip_axes, - has_parallelogram=cfg.has_parallelogram, - ) - self.euler_convention = EulerConvention("ZYX", extrinsic=False, degrees=False) - self.opw_robot = Robot( - self.kinematic_model, self.euler_convention, ee_rotation=(0, 0, 0) - ) - if self.pk_serial_chain != "": - fk_dict = self.pk_serial_chain.forward_kinematics( - th=np.zeros(6), end_only=False - ) - root_tf = fk_dict[list(fk_dict.keys())[0]] - - self.root_base_xpos = root_tf.get_matrix().cpu().numpy() - def set_tcp(self, xpos: np.ndarray): super().set_tcp(xpos) - if self.device.type != "cpu": - self._tcp_warp = wp.mat44f(self.tcp_xpos) - tcp_inv = np.eye(4, dtype=float) - tcp_inv[:3, :3] = self.tcp_xpos[:3, :3].T - tcp_inv[:3, 3] = -tcp_inv[:3, :3].T @ self.tcp_xpos[:3, 3] - self._tcp_inv_warp = wp.mat44f(tcp_inv) + self._tcp_warp = wp.mat44f(self.tcp_xpos) + tcp_inv = np.eye(4, dtype=float) + tcp_inv[:3, :3] = self.tcp_xpos[:3, :3].T + tcp_inv[:3, 3] = -tcp_inv[:3, :3].T @ self.tcp_xpos[:3, 3] + self._tcp_inv_warp = wp.mat44f(tcp_inv) def _init_warp_solver(self, cfg: OPWSolverCfg, **kwargs): self.params = OPWparam() @@ -255,14 +222,7 @@ def get_ik( - target_joints (torch.Tensor): Computed target joint positions, shape (n_sample, num_joints). - success (torch.Tensor): Boolean tensor indicating IK solution validity for each environment, shape (n_sample,). """ - if self.device.type == "cpu": - return self.get_ik_py_opw( - target_xpos, qpos_seed, return_all_solutions, **kwargs - ) - else: - return self.get_ik_warp( - target_xpos, qpos_seed, return_all_solutions, **kwargs - ) + return self.get_ik_warp(target_xpos, qpos_seed, return_all_solutions, **kwargs) def get_ik_warp( self, @@ -331,7 +291,7 @@ def get_ik_warp( dtype=float, device=standardize_device_string(self.device), ) - joint_weight = kwargs.get("joint_weight", torch.zeros(size=(DOF,), dtype=float)) + joint_weight = kwargs.get("joint_weight", torch.ones(size=(DOF,), dtype=float)) joint_weight_wp = wp_vec6f( joint_weight[0], joint_weight[1], @@ -362,122 +322,6 @@ def get_ik_warp( best_ik_valid = wp.to_torch(best_ik_valid_wp) return best_ik_valid, best_ik_result - def get_ik_py_opw( - self, - target_xpos: torch.Tensor, - qpos_seed: torch.Tensor, - return_all_solutions: bool = False, - **kwargs, - ) -> Tuple[torch.Tensor, torch.Tensor]: - """Compute target joint positions using OPW inverse kinematics. - - Args: - target_xpos (torch.Tensor): Current end-effector position, shape (n_sample, 3). - qpos_seed (torch.Tensor): Current joint positions, shape (n_sample, num_joints). - return_all_solutions (bool, optional): Whether to return all IK solutions or just the best one. Defaults to False. - **kwargs: Additional keyword arguments for future extensions. - - Returns: - Tuple[torch.Tensor, torch.Tensor]: - - target_joints (torch.Tensor): Computed target joint positions, shape (n_sample, num_joints). - - success (torch.Tensor): Boolean tensor indicating IK solution validity for each environment, shape (n_sample,). - """ - # TODO: opw solver can only get one solution at a time - DOF = 6 - if qpos_seed is not None: - if isinstance(qpos_seed, torch.Tensor): - qpos_seed_np = qpos_seed.detach().cpu().numpy() - else: - qpos_seed_np = np.array(qpos_seed) - else: - qpos_seed_np = np.zeros(DOF) - - if isinstance(target_xpos, torch.Tensor): - target_xpos = target_xpos.detach().cpu().numpy() - - if target_xpos.shape == (4, 4): - target_xpos_batch = target_xpos[None, :, :] - else: - target_xpos_batch = target_xpos - - # TODO: support root base transform - # target_xpos = self.root_base_xpos @ target_xpos - # compute_xpos = target_xpos @ np.linalg.inv(self.tcp_xpos) - - # TODO: single version - # if target_xpos.ndim == 3: - # target_xpos = target_xpos[0] - # position = np.array(compute_xpos[:3, 3]) * 1000 - # rotation = Rotation.from_matrix(compute_xpos[:3, :3]) - # rotation = rotation.as_euler("ZYX") - # solutions = self.opw_robot.inverse((position, rotation)) - # if len(solutions) == 0: - # logger.log_warning("OPWSolver failed: No solutions found.") - # if return_all_solutions: - # return torch.tensor([False]), torch.zeros((1, 1, 6)) - # else: - # return torch.tensor([False]), torch.zeros((1, 6)) - - # ret, qpos = self._select_optimal_solution( - # qpos_seed_np, solutions, weights=None, return_all_valid=return_all_solutions - # ) - # if not ret or len(qpos) == 0: - # logger.log_warning("No valid solutions found within joint limits.") - # if return_all_solutions: - # return torch.tensor([False]), torch.zeros((1, 1, 6)) - # else: - # return torch.tensor([False]), torch.zeros((1, 6)) - - # if return_all_solutions: - # # qpos: (N, 6) -> (1, N, 6) - # qpos_tensor = torch.from_numpy(qpos).float().unsqueeze(0) - # else: - # # qpos: (6,) -> (1, 6) - # qpos_tensor = torch.from_numpy(qpos).float().reshape(1, 6) - - x_list = [] - y_list = [] - z_list = [] - a_list = [] - b_list = [] - c_list = [] - for xpos in target_xpos_batch: - compute_xpos = xpos @ np.linalg.inv(self.tcp_xpos) - position = np.array(compute_xpos[:3, 3]) * 1000 - rotation = Rotation.from_matrix(compute_xpos[:3, :3]) - rotation = rotation.as_euler("ZYX") - x_list.append(position[0]) - y_list.append(position[1]) - z_list.append(position[2]) - a_list.append(rotation[0]) - b_list.append(rotation[1]) - c_list.append(rotation[2]) - poses = pl.DataFrame( - { - "X": x_list, - "Y": y_list, - "Z": z_list, - "A": a_list, - "B": b_list, - "C": c_list, - } - ) - qpos_seed_np = qpos_seed_np.reshape(-1)[:DOF] - res = self.opw_robot.batch_inverse(current_joints=qpos_seed_np, poses=poses) - solutions = res.to_numpy().copy() - is_success = np.any(np.logical_not(np.isnan(solutions)), axis=1) - for i in range(solutions.shape[0]): - for j in range(solutions.shape[1]): - solutions[i, j] = normalize_to_pi(solutions[i, j]) - - if return_all_solutions: - logger.log_warning( - "return_all_solutions=True is not supported in OPWSolverCPUMode. Returning the best solution only." - ) - qpos_tensor = torch.tensor(solutions, dtype=torch.float32, device=self.device) - qpos_tensor = qpos_tensor.reshape(-1, 1, DOF) - return torch.tensor(is_success), qpos_tensor - def _calculate_dynamic_weights( self, current_joints, joint_limits, base_weights=None ) -> np.ndarray: diff --git a/embodichain/utils/warp/kinematics/opw_solver.py b/embodichain/utils/warp/kinematics/opw_solver.py index e70cb286..1f1cf459 100644 --- a/embodichain/utils/warp/kinematics/opw_solver.py +++ b/embodichain/utils/warp/kinematics/opw_solver.py @@ -25,8 +25,9 @@ @wp.func def normalize_to_pi(angle: float) -> float: - angle = (angle + wp.pi) % (2.0 * wp.pi) - wp.pi - return angle + # TODO: Cannot work in warp. + # return (angle + wp.pi) % (2.0 * wp.pi) - wp.pi + return wp.atan2(wp.sin(angle), wp.cos(angle)) @wp.func diff --git a/pyproject.toml b/pyproject.toml index fc4b8163..7b8e5ffe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,6 @@ dependencies = [ "pin-pink", "casadi", "qpsolvers[osqp]==4.8.1", - "py_opw_kinematics==0.1.6", "pytorch_kinematics==0.7.6", "polars==1.31.0", "PyYAML>=6.0", diff --git a/tests/sim/solvers/test_opw_solver.py b/tests/sim/solvers/test_opw_solver.py index 15027e8e..fe04f4b4 100644 --- a/tests/sim/solvers/test_opw_solver.py +++ b/tests/sim/solvers/test_opw_solver.py @@ -23,6 +23,45 @@ from embodichain.lab.sim.robots import CobotMagicCfg +def grid_sample_qpos_from_limits( + qpos_limits: torch.Tensor, + steps_per_joint: int = 4, + device=None, + max_samples: int = 4096, +) -> torch.Tensor: + """Generate grid samples for qpos from qpos_limits. + + Args: + qpos_limits: tensor of shape (1, n, 2) or (n, 2) where each row is [low, high]. + steps_per_joint: number of values per joint (defaults to 2: low and high). + device: torch device to place the samples on. + max_samples: cap the number of returned samples (take first N if grid is larger). + + Returns: + Tensor of shape (N, n) where N <= max_samples. + """ + if device is None: + device = qpos_limits.device + + limits = qpos_limits.squeeze(0) if qpos_limits.dim() == 3 else qpos_limits + lows = limits[:, 0].to(device) + highs = limits[:, 1].to(device) + + # create per-joint linspaces + grids = [ + torch.linspace(l.item(), h.item(), steps_per_joint, device=device) + for l, h in zip(lows, highs) + ] + + # meshgrid and stack + mesh = torch.meshgrid(*grids, indexing="ij") + stacked = torch.stack([m.reshape(-1) for m in mesh], dim=1) + + if stacked.shape[0] > max_samples: + return stacked[:max_samples] + return stacked + + # Base test class for OPWSolver class BaseSolverTest: sim = None # Define as a class attribute @@ -75,42 +114,38 @@ def setup_simulation(self, sim_device): def test_ik(self, arm_name: str): # Test inverse kinematics (IK) with a 1x4x4 homogeneous matrix pose and a joint_seed - test_qpos = torch.tensor( - [[0.0, np.pi / 4, -np.pi / 4, 0.0, np.pi / 4, 0.0]], - dtype=torch.float32, - device=self.robot.device, + qpos_limit = self.robot.get_qpos_limits(name=arm_name) + # generate a small grid of qpos samples from the joint limits (low/high) + sample_qpos = grid_sample_qpos_from_limits( + qpos_limit, steps_per_joint=8, device=self.robot.device, max_samples=65536 ) + sample_qpos = sample_qpos[None, :, :] - fk_xpos = self.robot.compute_fk(qpos=test_qpos, name=arm_name, to_matrix=True) - fk_xpos_xyzquat = self.robot.compute_fk( - qpos=test_qpos, name=arm_name, to_matrix=False + fk_xpos = self.robot.compute_batch_fk( + qpos=sample_qpos, name=arm_name, to_matrix=True + ) + fk_xpos_xyzquat = self.robot.compute_batch_fk( + qpos=sample_qpos, name=arm_name, to_matrix=False ) - res, ik_qpos = self.robot.compute_ik( - pose=fk_xpos, joint_seed=test_qpos, name=arm_name + res, ik_qpos = self.robot.compute_batch_ik( + pose=fk_xpos, joint_seed=sample_qpos, name=arm_name ) - res, ik_qpos_xyzquat = self.robot.compute_ik( - pose=fk_xpos_xyzquat, joint_seed=test_qpos, name=arm_name + res, ik_qpos_xyzquat = self.robot.compute_batch_ik( + pose=fk_xpos_xyzquat, joint_seed=sample_qpos, name=arm_name ) assert torch.allclose( ik_qpos, ik_qpos_xyzquat, atol=1e-4, rtol=1e-4 ), "IK results do not match for different pose formats" - res, ik_qpos = self.robot.compute_ik(pose=fk_xpos, name=arm_name) - - if ik_qpos_xyzquat.dim() == 3: - ik_xpos = self.robot.compute_fk( - qpos=ik_qpos_xyzquat[0][0], name=arm_name, to_matrix=True - ) - else: - ik_xpos = self.robot.compute_fk( - qpos=ik_qpos_xyzquat, name=arm_name, to_matrix=True - ) + ik_xpos = self.robot.compute_batch_fk( + qpos=ik_qpos_xyzquat, name=arm_name, to_matrix=True + ) assert torch.allclose( - test_qpos, ik_qpos, atol=5e-3, rtol=5e-3 + sample_qpos, ik_qpos, atol=5e-3, rtol=5e-3 ), f"FK and IK qpos do not match for {arm_name}" assert torch.allclose(