Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/source/overview/sim/solvers/opw_solver.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Analytical IK for OPW-parameterized 6-DOF manipulators
* Supports both parallel and offset axes, with custom axis flipping
* Fast batch computation for multiple target poses
* Configurable for CPU (py_opw_kinematics) and GPU (warp) backends
* Configurable for CPU and GPU backends
* Flexible configuration via `OPWSolverCfg`
* Strict enforcement of joint limits
* Forward kinematics (FK) and multiple IK solution branches
Expand Down Expand Up @@ -107,5 +107,4 @@ solver = OPWSolver(cfg, device="cuda")
## References

* [OPW Kinematics Paper](https://doi.org/10.1109/TRO.2017.2776312)
* [py_opw_kinematics Documentation](https://github.com/UM-ARM-Lab/py_opw_kinematics)
* [warp Documentation](https://github.com/NVIDIA/warp)
182 changes: 13 additions & 169 deletions embodichain/lab/sim/solvers/opw_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

import torch
import numpy as np
import warp as wp
import polars as pl

from itertools import product
from typing import Union, Tuple, Any, Literal, TYPE_CHECKING
from scipy.spatial.transform import Rotation
Expand All @@ -30,15 +33,6 @@
wp_vec6f,
)
from embodichain.utils.device_utils import standardize_device_string
import warp as wp
import polars as pl

try:
from py_opw_kinematics import KinematicModel, Robot, EulerConvention
except ImportError:
raise ImportError(
"py_opw_kinematics not installed. Install with `pip install py_opw_kinematics==0.1.6`"
)


if TYPE_CHECKING:
Expand Down Expand Up @@ -121,45 +115,18 @@ def __init__(self, cfg: OPWSolverCfg, device: str = "cpu", **kwargs):

"""
super().__init__(cfg=cfg, device=device, **kwargs)
if self.device.type == "cpu":
self._init_py_opw_kinematics_solver(cfg, **kwargs)
else:
self._init_warp_solver(cfg, **kwargs)
# Note: the warp-based solver is currently the only active backend and
# is initialized regardless of the selected device.
self._init_warp_solver(cfg, **kwargs)
self.set_tcp(np.eye(4))

def _init_py_opw_kinematics_solver(self, cfg: OPWSolverCfg, **kwargs) -> None:
self.kinematic_model = KinematicModel(
a1=cfg.a1,
a2=cfg.a2,
b=cfg.b,
c1=cfg.c1,
c2=cfg.c2,
c3=cfg.c3,
c4=cfg.c4,
offsets=cfg.offsets,
flip_axes=cfg.flip_axes,
has_parallelogram=cfg.has_parallelogram,
)
self.euler_convention = EulerConvention("ZYX", extrinsic=False, degrees=False)
self.opw_robot = Robot(
self.kinematic_model, self.euler_convention, ee_rotation=(0, 0, 0)
)
if self.pk_serial_chain != "":
fk_dict = self.pk_serial_chain.forward_kinematics(
th=np.zeros(6), end_only=False
)
root_tf = fk_dict[list(fk_dict.keys())[0]]

self.root_base_xpos = root_tf.get_matrix().cpu().numpy()

def set_tcp(self, xpos: np.ndarray):
super().set_tcp(xpos)
if self.device.type != "cpu":
self._tcp_warp = wp.mat44f(self.tcp_xpos)
tcp_inv = np.eye(4, dtype=float)
tcp_inv[:3, :3] = self.tcp_xpos[:3, :3].T
tcp_inv[:3, 3] = -tcp_inv[:3, :3].T @ self.tcp_xpos[:3, 3]
self._tcp_inv_warp = wp.mat44f(tcp_inv)
self._tcp_warp = wp.mat44f(self.tcp_xpos)
tcp_inv = np.eye(4, dtype=float)
tcp_inv[:3, :3] = self.tcp_xpos[:3, :3].T
tcp_inv[:3, 3] = -tcp_inv[:3, :3].T @ self.tcp_xpos[:3, 3]
self._tcp_inv_warp = wp.mat44f(tcp_inv)

def _init_warp_solver(self, cfg: OPWSolverCfg, **kwargs):
self.params = OPWparam()
Expand Down Expand Up @@ -255,14 +222,7 @@ def get_ik(
- target_joints (torch.Tensor): Computed target joint positions, shape (n_sample, num_joints).
- success (torch.Tensor): Boolean tensor indicating IK solution validity for each environment, shape (n_sample,).
"""
if self.device.type == "cpu":
return self.get_ik_py_opw(
target_xpos, qpos_seed, return_all_solutions, **kwargs
)
else:
return self.get_ik_warp(
target_xpos, qpos_seed, return_all_solutions, **kwargs
)
return self.get_ik_warp(target_xpos, qpos_seed, return_all_solutions, **kwargs)

def get_ik_warp(
self,
Expand Down Expand Up @@ -331,7 +291,7 @@ def get_ik_warp(
dtype=float,
device=standardize_device_string(self.device),
)
joint_weight = kwargs.get("joint_weight", torch.zeros(size=(DOF,), dtype=float))
joint_weight = kwargs.get("joint_weight", torch.ones(size=(DOF,), dtype=float))
joint_weight_wp = wp_vec6f(
joint_weight[0],
joint_weight[1],
Expand Down Expand Up @@ -362,122 +322,6 @@ def get_ik_warp(
best_ik_valid = wp.to_torch(best_ik_valid_wp)
return best_ik_valid, best_ik_result

def get_ik_py_opw(
self,
target_xpos: torch.Tensor,
qpos_seed: torch.Tensor,
return_all_solutions: bool = False,
**kwargs,
) -> Tuple[torch.Tensor, torch.Tensor]:
"""Compute target joint positions using OPW inverse kinematics.

Args:
target_xpos (torch.Tensor): Current end-effector position, shape (n_sample, 3).
qpos_seed (torch.Tensor): Current joint positions, shape (n_sample, num_joints).
return_all_solutions (bool, optional): Whether to return all IK solutions or just the best one. Defaults to False.
**kwargs: Additional keyword arguments for future extensions.

Returns:
Tuple[torch.Tensor, torch.Tensor]:
- target_joints (torch.Tensor): Computed target joint positions, shape (n_sample, num_joints).
- success (torch.Tensor): Boolean tensor indicating IK solution validity for each environment, shape (n_sample,).
"""
# TODO: opw solver can only get one solution at a time
DOF = 6
if qpos_seed is not None:
if isinstance(qpos_seed, torch.Tensor):
qpos_seed_np = qpos_seed.detach().cpu().numpy()
else:
qpos_seed_np = np.array(qpos_seed)
else:
qpos_seed_np = np.zeros(DOF)

if isinstance(target_xpos, torch.Tensor):
target_xpos = target_xpos.detach().cpu().numpy()

if target_xpos.shape == (4, 4):
target_xpos_batch = target_xpos[None, :, :]
else:
target_xpos_batch = target_xpos

# TODO: support root base transform
# target_xpos = self.root_base_xpos @ target_xpos
# compute_xpos = target_xpos @ np.linalg.inv(self.tcp_xpos)

# TODO: single version
# if target_xpos.ndim == 3:
# target_xpos = target_xpos[0]
# position = np.array(compute_xpos[:3, 3]) * 1000
# rotation = Rotation.from_matrix(compute_xpos[:3, :3])
# rotation = rotation.as_euler("ZYX")
# solutions = self.opw_robot.inverse((position, rotation))
# if len(solutions) == 0:
# logger.log_warning("OPWSolver failed: No solutions found.")
# if return_all_solutions:
# return torch.tensor([False]), torch.zeros((1, 1, 6))
# else:
# return torch.tensor([False]), torch.zeros((1, 6))

# ret, qpos = self._select_optimal_solution(
# qpos_seed_np, solutions, weights=None, return_all_valid=return_all_solutions
# )
# if not ret or len(qpos) == 0:
# logger.log_warning("No valid solutions found within joint limits.")
# if return_all_solutions:
# return torch.tensor([False]), torch.zeros((1, 1, 6))
# else:
# return torch.tensor([False]), torch.zeros((1, 6))

# if return_all_solutions:
# # qpos: (N, 6) -> (1, N, 6)
# qpos_tensor = torch.from_numpy(qpos).float().unsqueeze(0)
# else:
# # qpos: (6,) -> (1, 6)
# qpos_tensor = torch.from_numpy(qpos).float().reshape(1, 6)

x_list = []
y_list = []
z_list = []
a_list = []
b_list = []
c_list = []
for xpos in target_xpos_batch:
compute_xpos = xpos @ np.linalg.inv(self.tcp_xpos)
position = np.array(compute_xpos[:3, 3]) * 1000
rotation = Rotation.from_matrix(compute_xpos[:3, :3])
rotation = rotation.as_euler("ZYX")
x_list.append(position[0])
y_list.append(position[1])
z_list.append(position[2])
a_list.append(rotation[0])
b_list.append(rotation[1])
c_list.append(rotation[2])
poses = pl.DataFrame(
{
"X": x_list,
"Y": y_list,
"Z": z_list,
"A": a_list,
"B": b_list,
"C": c_list,
}
)
qpos_seed_np = qpos_seed_np.reshape(-1)[:DOF]
res = self.opw_robot.batch_inverse(current_joints=qpos_seed_np, poses=poses)
solutions = res.to_numpy().copy()
is_success = np.any(np.logical_not(np.isnan(solutions)), axis=1)
for i in range(solutions.shape[0]):
for j in range(solutions.shape[1]):
solutions[i, j] = normalize_to_pi(solutions[i, j])

if return_all_solutions:
logger.log_warning(
"return_all_solutions=True is not supported in OPWSolverCPUMode. Returning the best solution only."
)
qpos_tensor = torch.tensor(solutions, dtype=torch.float32, device=self.device)
qpos_tensor = qpos_tensor.reshape(-1, 1, DOF)
return torch.tensor(is_success), qpos_tensor

def _calculate_dynamic_weights(
self, current_joints, joint_limits, base_weights=None
) -> np.ndarray:
Expand Down
5 changes: 3 additions & 2 deletions embodichain/utils/warp/kinematics/opw_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

@wp.func
def normalize_to_pi(angle: float) -> float:
angle = (angle + wp.pi) % (2.0 * wp.pi) - wp.pi
return angle
# TODO: Cannot work in warp.
# return (angle + wp.pi) % (2.0 * wp.pi) - wp.pi
return wp.atan2(wp.sin(angle), wp.cos(angle))


@wp.func
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ dependencies = [
"pin-pink",
"casadi",
"qpsolvers[osqp]==4.8.1",
"py_opw_kinematics==0.1.6",
"pytorch_kinematics==0.7.6",
"polars==1.31.0",
"PyYAML>=6.0",
Expand Down
79 changes: 57 additions & 22 deletions tests/sim/solvers/test_opw_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,45 @@
from embodichain.lab.sim.robots import CobotMagicCfg


def grid_sample_qpos_from_limits(
qpos_limits: torch.Tensor,
steps_per_joint: int = 4,
device=None,
max_samples: int = 4096,
) -> torch.Tensor:
"""Generate grid samples for qpos from qpos_limits.

Args:
qpos_limits: tensor of shape (1, n, 2) or (n, 2) where each row is [low, high].
steps_per_joint: number of values per joint (defaults to 2: low and high).
device: torch device to place the samples on.
max_samples: cap the number of returned samples (take first N if grid is larger).

Returns:
Tensor of shape (N, n) where N <= max_samples.
"""
if device is None:
device = qpos_limits.device

limits = qpos_limits.squeeze(0) if qpos_limits.dim() == 3 else qpos_limits
lows = limits[:, 0].to(device)
highs = limits[:, 1].to(device)

# create per-joint linspaces
grids = [
torch.linspace(l.item(), h.item(), steps_per_joint, device=device)
for l, h in zip(lows, highs)
]

# meshgrid and stack
mesh = torch.meshgrid(*grids, indexing="ij")
stacked = torch.stack([m.reshape(-1) for m in mesh], dim=1)

if stacked.shape[0] > max_samples:
return stacked[:max_samples]
return stacked


# Base test class for OPWSolver
class BaseSolverTest:
sim = None # Define as a class attribute
Expand Down Expand Up @@ -75,42 +114,38 @@ def setup_simulation(self, sim_device):
def test_ik(self, arm_name: str):
# Test inverse kinematics (IK) with a 1x4x4 homogeneous matrix pose and a joint_seed

test_qpos = torch.tensor(
[[0.0, np.pi / 4, -np.pi / 4, 0.0, np.pi / 4, 0.0]],
dtype=torch.float32,
device=self.robot.device,
qpos_limit = self.robot.get_qpos_limits(name=arm_name)
# generate a small grid of qpos samples from the joint limits (low/high)
sample_qpos = grid_sample_qpos_from_limits(
qpos_limit, steps_per_joint=8, device=self.robot.device, max_samples=65536
Comment on lines +118 to +120
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test generates up to 65,536 IK/FK samples per arm (steps_per_joint=8 on a 6-DOF grid, then capped). That is a large workload for a unit test and can significantly slow CI or lead to timeouts. Consider lowering steps_per_joint / max_samples, or sampling a smaller randomized subset that still covers the edge cases you care about.

Suggested change
# generate a small grid of qpos samples from the joint limits (low/high)
sample_qpos = grid_sample_qpos_from_limits(
qpos_limit, steps_per_joint=8, device=self.robot.device, max_samples=65536
# generate a grid of qpos samples from the joint limits (low/high)
# Use a reduced grid size to keep the unit test lightweight for CI.
sample_qpos = grid_sample_qpos_from_limits(
qpos_limit, steps_per_joint=4, device=self.robot.device, max_samples=2048

Copilot uses AI. Check for mistakes.
)
sample_qpos = sample_qpos[None, :, :]

fk_xpos = self.robot.compute_fk(qpos=test_qpos, name=arm_name, to_matrix=True)
fk_xpos_xyzquat = self.robot.compute_fk(
qpos=test_qpos, name=arm_name, to_matrix=False
fk_xpos = self.robot.compute_batch_fk(
qpos=sample_qpos, name=arm_name, to_matrix=True
)
fk_xpos_xyzquat = self.robot.compute_batch_fk(
qpos=sample_qpos, name=arm_name, to_matrix=False
)

res, ik_qpos = self.robot.compute_ik(
pose=fk_xpos, joint_seed=test_qpos, name=arm_name
res, ik_qpos = self.robot.compute_batch_ik(
pose=fk_xpos, joint_seed=sample_qpos, name=arm_name
)
Comment on lines +131 to 133
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ik_qpos returned from compute_batch_ik here has shape (n_envs, n_batch, dof) and later in this test it is passed as joint_seed to the non-batched compute_ik(...) call. That API expects a per-env seed (n_envs, dof), so this relies on downstream .reshape(-1) behavior and can add a lot of unnecessary data movement. Consider extracting a single seed (e.g., ik_qpos[:, 0, :]) before reusing it as a non-batched seed.

Copilot uses AI. Check for mistakes.

res, ik_qpos_xyzquat = self.robot.compute_ik(
pose=fk_xpos_xyzquat, joint_seed=test_qpos, name=arm_name
res, ik_qpos_xyzquat = self.robot.compute_batch_ik(
pose=fk_xpos_xyzquat, joint_seed=sample_qpos, name=arm_name
)

assert torch.allclose(
ik_qpos, ik_qpos_xyzquat, atol=1e-4, rtol=1e-4
), "IK results do not match for different pose formats"

res, ik_qpos = self.robot.compute_ik(pose=fk_xpos, name=arm_name)

if ik_qpos_xyzquat.dim() == 3:
ik_xpos = self.robot.compute_fk(
qpos=ik_qpos_xyzquat[0][0], name=arm_name, to_matrix=True
)
else:
ik_xpos = self.robot.compute_fk(
qpos=ik_qpos_xyzquat, name=arm_name, to_matrix=True
)
ik_xpos = self.robot.compute_batch_fk(
qpos=ik_qpos_xyzquat, name=arm_name, to_matrix=True
)

assert torch.allclose(
test_qpos, ik_qpos, atol=5e-3, rtol=5e-3
sample_qpos, ik_qpos, atol=5e-3, rtol=5e-3
), f"FK and IK qpos do not match for {arm_name}"

assert torch.allclose(
Expand Down