Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 35 additions & 26 deletions examples/precision.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,23 @@
HOST = "127.0.0.1"
PORT = 5001

HOME_ANGLES = [90.0, -90.0, 180.0, 0.0, 0.0, 180.0]
HOME_TOLERANCE_DEG = 2.0

with Robot(host=HOST, port=PORT, normalize_logs=True) as robot:
rbt = robot.create_sync_client(timeout=2.0)
rbt.wait_ready(timeout=5.0)
rbt.simulator(True)

# Select tool, and home only if not already near the home pose
rbt.select_tool("SSG-48")
rbt.tool.calibrate()
rbt.home(wait=True)
current = rbt.angles()
if (
current is None
or max(abs(a - h) for a, h in zip(current, HOME_ANGLES)) > HOME_TOLERANCE_DEG
):
rbt.home(wait=True)

PRECISION_POSE = [0, -250, 350, -90, 0, -90]
rbt.move_j(pose=PRECISION_POSE, speed=0.5, wait=True)
Expand All @@ -33,44 +42,44 @@

# Approach pencil: move_j to 100mm above, descend linearly, grab, retract
PENCIL_ABOVE = [-90, -81.6, 161.8, 0, -69.4, 180]
rbt.move_j(angles=PENCIL_ABOVE, speed=0.3, wait=True)
rbt.move_l([0, 0, -100, 0, 0, 0], rel=True, speed=0.2, wait=True)
rbt.move_j(angles=PENCIL_ABOVE, speed=0.8, wait=True)
rbt.move_l([0, 0, -93, 0, 0, 0], rel=True, speed=0.4, wait=True)
rbt.tool.close(wait=True)
rbt.move_l([0, 0, 100, 0, 0, 0], rel=True, speed=0.2, wait=True)
rbt.move_j(pose=PRECISION_POSE, speed=0.3, wait=True)
rbt.move_l([0, 0, 93, 0, 0, 0], rel=True, speed=0.4, wait=True)
rbt.move_j(pose=PRECISION_POSE, speed=0.8, wait=True)

# Offset TCP to pencil tip (~100mm exposed below gripper)
rbt.set_tcp_offset(0, 0, -100)
# Offset TCP to pencil tip (~100mm exposed below gripper). The pencil is
# clamped perpendicular to the gripper's jaw-closing direction, hanging
# along tool -X — that's the axis the offset goes on, not Z.
rbt.set_tcp_offset(-100, 0, 0)

# Pencil tip traces straight lines (linear precision demo)
# Forward/back (tool Z = world -Y at this pose)
rbt.move_l([0, 0, 100, 0, 0, 0], speed=0.3, frame="TRF", rel=True, wait=True)
rbt.move_l([0, 0, -200, 0, 0, 0], speed=0.3, frame="TRF", rel=True, wait=True)
rbt.move_l([0, 0, 100, 0, 0, 0], speed=0.3, frame="TRF", rel=True, wait=True)
# Side to side (tool Y = world -X at this pose)
rbt.move_l([0, 60, 0, 0, 0, 0], speed=0.3, frame="TRF", rel=True, wait=True)
rbt.move_l([0, -120, 0, 0, 0, 0], speed=0.3, frame="TRF", rel=True, wait=True)
rbt.move_l([0, 60, 0, 0, 0, 0], speed=0.3, frame="TRF", rel=True, wait=True)
rbt.move_l([0, 0, 100, 0, 0, 0], speed=0.8, frame="TRF", rel=True, wait=True)
rbt.move_l([0, 0, -200, 0, 0, 0], speed=0.8, frame="TRF", rel=True, wait=True)
rbt.move_l([0, 0, 100, 0, 0, 0], speed=0.8, frame="TRF", rel=True, wait=True)

# Precision TRF rotations — pencil tip stays stationary while wrist rotates
SWEEP = 20
# Precision TRF rotations — pencil tip stays stationary while wrist rotates.
# 40° is the largest sweep that keeps every axis IK-reachable from this pose
# with the 100mm pencil offset.
SWEEP = 40
for axis in range(3):
delta = [0, 0, 0, 0, 0, 0]
delta[3 + axis] = -SWEEP
rbt.move_l(delta, speed=0.5, frame="TRF", rel=True, wait=True)
rbt.move_l(delta, speed=0.8, frame="TRF", rel=True, wait=True)
delta[3 + axis] = SWEEP
rbt.move_l(delta, speed=0.5, frame="TRF", rel=True, wait=True)
rbt.move_l(delta, speed=0.5, frame="TRF", rel=True, wait=True)
rbt.move_l(delta, speed=0.8, frame="TRF", rel=True, wait=True)
rbt.move_l(delta, speed=0.8, frame="TRF", rel=True, wait=True)
delta[3 + axis] = -SWEEP
rbt.move_l(delta, speed=0.5, frame="TRF", rel=True, wait=True)
rbt.move_l(delta, speed=0.8, frame="TRF", rel=True, wait=True)

# Place pencil back: descend linearly, release, retract
rbt.set_tcp_offset(0, 0, 0)
rbt.move_j(angles=PENCIL_ABOVE, speed=0.3, wait=True)
rbt.move_l([0, 0, -100, 0, 0, 0], rel=True, speed=0.2, wait=True)
rbt.move_j(angles=PENCIL_ABOVE, speed=0.8, wait=True)
rbt.move_l([0, 0, -93, 0, 0, 0], rel=True, speed=0.4, wait=True)
rbt.tool.open(wait=True)
rbt.move_l([0, 0, 100, 0, 0, 0], rel=True, speed=0.2, wait=True)
rbt.move_l([0, 0, 93, 0, 0, 0], rel=True, speed=0.4, wait=True)

rbt.move_j(pose=PRECISION_POSE, speed=0.3, wait=True)
rbt.home(wait=True)
# Return to home position (joint move, not the full homing sequence)
rbt.move_j(pose=PRECISION_POSE, speed=0.8, wait=True)
rbt.move_j(angles=HOME_ANGLES, speed=0.8, wait=True)
print("Done!")
7 changes: 5 additions & 2 deletions parol6/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ def _trace(self, msg, *args, **kwargs):
NEAR_MM_TOL_MM: float = 2.0 # Proximity threshold for considering positions "near" (mm)
ENTRY_MM_TOL_MM: float = 5.0 # Entry trajectory threshold for smooth motion (mm)

# Trajectory path sampling (fixed samples for TOPP-RA input)
PATH_SAMPLES: int = int(os.getenv("PAROL6_PATH_SAMPLES", "50"))
# Trajectory path sampling (IK waypoints fed to TOPP-RA's piecewise-linear path).
# 200 halves per-tick acceleration jitter on the wrist relative to the old 50,
# converging by ~500. Higher values cost one extra IK solve per sample at
# trajectory build time.
PATH_SAMPLES: int = int(os.getenv("PAROL6_PATH_SAMPLES", "200"))

# Centralized loop interval (seconds).
INTERVAL_S: float = max(1e-6, 1.0 / max(CONTROL_RATE_HZ, 1.0))
Expand Down
70 changes: 69 additions & 1 deletion parol6/motion/trajectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from enum import Enum

import numpy as np
from numba import njit
from numpy.typing import NDArray
from ruckig import InputParameter, OutputParameter, Result, Ruckig # type: ignore[unresolved-import, ty:unresolved-import]

Expand Down Expand Up @@ -99,6 +100,71 @@ def from_string(cls, name: str) -> ProfileType:
return cls.TOPPRA


# Step is anomalous if its magnitude exceeds this multiple of the chain's
# median step. Relative threshold keeps the rule invariant to sample density,
# speed, and move type. Insensitive in [10, 20+]; real LM hops exceed 20×.
_IK_OUTLIER_RATIO: float = 10.0

# Symmetric padding around each outlier run; absorbs LM seed-bleed into the
# samples just after the hop. FK deviation scales linearly with pad.
_IK_OUTLIER_PADDING: int = 4


@njit(cache=True)
def _smooth_singularity_outliers(positions: NDArray[np.float64]) -> int:
"""In-place repair of LM-IK branch hops near wrist singularities.

pinokin's LM picks IK solutions without a continuity preference, so at
J5 ≈ 0 it can jump several degrees off the natural chain in one or two
samples. Detected as steps > _IK_OUTLIER_RATIO × the chain's median
step; replaced by linear interpolation over the run plus padding.
"""
n = positions.shape[0]
dims = positions.shape[1]
if n < 3:
return 0

diffs = np.empty(n - 1)
for i in range(n - 1):
s = 0.0
for c in range(dims):
v = positions[i + 1, c] - positions[i, c]
s += v * v
diffs[i] = np.sqrt(s)

median_step = np.median(diffs)
if median_step == 0.0:
return 0
threshold = _IK_OUTLIER_RATIO * median_step

n_patched = 0
i = 1
while i < n - 1:
if diffs[i - 1] <= threshold and diffs[i] <= threshold:
i += 1
continue
j = i
while j < n - 1 and (diffs[j - 1] > threshold or diffs[j] > threshold):
j += 1
lo = i - _IK_OUTLIER_PADDING
if lo < 1:
lo = 1
hi = j + _IK_OUTLIER_PADDING
if hi > n - 1:
hi = n - 1
span = hi - (lo - 1)
inv_span = 1.0 / span
for k in range(lo, hi):
alpha = (k - (lo - 1)) * inv_span
for c in range(dims):
positions[k, c] = positions[lo - 1, c] + alpha * (
positions[hi, c] - positions[lo - 1, c]
)
n_patched += 1
i = hi + 1
return n_patched


@dataclass
class JointPath:
"""
Expand Down Expand Up @@ -190,7 +256,9 @@ def from_poses(
)

if result.all_valid:
return cls(positions=result.joint_positions)
positions = np.asarray(result.joint_positions, dtype=np.float64)
_smooth_singularity_outliers(positions)
return cls(positions=positions)

valid = np.array(result.valid, dtype=np.bool_)
# Count consecutive valid from start
Expand Down
9 changes: 8 additions & 1 deletion parol6/server/transports/mock_serial_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,15 @@ def tick_simulation(
if not self._connected:
return

# Use the controller's fixed control interval rather than wallclock
# dt so the simulator behaves like the firmware (which reads encoders
# on its own fixed-rate timer). Wallclock dt inherits the host's
# scheduling jitter, which causes the simulator to under- or
# over-advance Position_in around slow ticks — visible as ghost
# spikes in recorded velocities even though the commanded trajectory
# is bit-identical across runs.
now = time.perf_counter()
dt = now - self._state.last_update
dt = cfg.INTERVAL_S
self._state.last_update = now

# Snap gripper position if teleport requested
Expand Down
9 changes: 9 additions & 0 deletions parol6/utils/warmup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
_pose_to_tangent_jit,
_tangent_to_pose_jit,
)
from parol6.motion.trajectory import _smooth_singularity_outliers
from parol6.protocol.wire import (
_pack_bitfield,
_pack_positions,
Expand Down Expand Up @@ -295,6 +296,14 @@ def warmup_jit() -> float:
# parol6/commands/servo_commands.py
_max_vel_ratio_jit(dummy_6f, dummy_6f)

# parol6/motion/trajectory.py — non-trivial array exercises every branch
# (diff loop, median, bad-detection, interp loop).
dummy_chain = np.zeros((10, 6), dtype=np.float64)
for i in range(10):
dummy_chain[i] = i * 0.01
dummy_chain[5, 3] += 1.0 # synthetic outlier so the interp loop compiles
_smooth_singularity_outliers(dummy_chain)

elapsed = time.perf_counter() - start
logger.info(f"\tJIT warmup completed in {elapsed * 1000:.1f}ms")
return elapsed
130 changes: 130 additions & 0 deletions tests/unit/test_motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

from parol6.config import INTERVAL_S, LIMITS, steps_to_rad
from parol6.motion import JointPath, ProfileType, Trajectory, TrajectoryBuilder
from parol6.motion.trajectory import (
_IK_OUTLIER_PADDING,
_IK_OUTLIER_RATIO,
_smooth_singularity_outliers,
)


class TestJointPath:
Expand Down Expand Up @@ -231,3 +236,128 @@ def test_getitem_returns_step(self):
assert np.array_equal(traj[0], steps[0])
assert np.array_equal(traj[5], steps[5])
assert np.array_equal(traj[-1], steps[-1])


class TestSmoothSingularityOutliers:
"""Tests for _smooth_singularity_outliers: LM-IK branch-hop repair."""

@staticmethod
def _smooth_chain(n: int = 40, step: float = 0.01) -> np.ndarray:
"""A 6-DOF chain with uniform per-sample step magnitude."""
positions = np.zeros((n, 6), dtype=np.float64)
for i in range(n):
positions[i] = i * step
return positions

def test_smooth_path_unchanged(self):
positions = self._smooth_chain()
original = positions.copy()
n = _smooth_singularity_outliers(positions)
assert n == 0
assert np.array_equal(positions, original)

def test_single_sample_hop_is_smoothed(self):
"""One outlier sample fires bad-flags on both neighbors (both adjacent
diffs are large) → run of 3, plus pad on each side → 2*pad+3 patched."""
positions = self._smooth_chain(n=40)
bad_idx = 20
positions[bad_idx, 3] += 5.0 # 5 rad J4 jump — well past 10× median

n_patched = _smooth_singularity_outliers(positions)
assert n_patched == 2 * _IK_OUTLIER_PADDING + 3
# The outlier is gone; uniform-step chain interpolates exactly.
assert positions[bad_idx, 3] == pytest.approx(bad_idx * 0.01, abs=1e-9)
# Samples just outside the patched window are untouched.
outside_lo = bad_idx - 1 - _IK_OUTLIER_PADDING - 1
outside_hi = bad_idx + 1 + _IK_OUTLIER_PADDING
assert positions[outside_lo, 3] == pytest.approx(outside_lo * 0.01)
assert positions[outside_hi, 3] == pytest.approx(outside_hi * 0.01)

def test_multi_sample_run_is_smoothed(self):
"""Wide hop with monotonically rising outliers → each step is large,
so the whole shelf forms one contiguous bad run."""
positions = self._smooth_chain(n=40)
# Use increasing magnitudes so every adjacent diff exceeds threshold.
positions[20, 3] += 4.0
positions[21, 3] += 8.0
positions[22, 3] += 12.0

n_patched = _smooth_singularity_outliers(positions)
# Bad samples: {19, 20, 21, 22, 23} → run [19, 24).
# Patched: [max(1,19-pad), min(n-1,23+pad)+1) = [15, 28) → 13.
assert n_patched == 5 + 2 * _IK_OUTLIER_PADDING
# All run samples land back on the chain.
for k in (20, 21, 22):
assert positions[k, 3] == pytest.approx(k * 0.01, abs=1e-9)

def test_outlier_near_start_clamps_padding(self):
"""Padding clamps at the array boundary; bookend at index 0."""
positions = self._smooth_chain(n=40)
positions[2, 3] += 5.0
original_first = positions[0].copy()
original_last = positions[-1].copy()

_smooth_singularity_outliers(positions)
assert positions[2, 3] == pytest.approx(2 * 0.01, abs=1e-9)
# Endpoints must never be modified.
assert np.array_equal(positions[0], original_first)
assert np.array_equal(positions[-1], original_last)

def test_outlier_near_end_clamps_padding(self):
positions = self._smooth_chain(n=40)
positions[-3, 3] += 5.0
original_first = positions[0].copy()
original_last = positions[-1].copy()

_smooth_singularity_outliers(positions)
assert positions[-3, 3] == pytest.approx((40 - 3) * 0.01, abs=1e-9)
assert np.array_equal(positions[0], original_first)
assert np.array_equal(positions[-1], original_last)

def test_short_path_is_noop(self):
for n in (0, 1, 2):
positions = np.zeros((n, 6), dtype=np.float64)
assert _smooth_singularity_outliers(positions) == 0

def test_zero_motion_is_noop(self):
"""Median step = 0 → can't form a ratio threshold; bail."""
positions = np.zeros((20, 6), dtype=np.float64)
positions[10, 3] = 5.0 # would-be outlier but median is 0
n = _smooth_singularity_outliers(positions)
assert n == 0
assert positions[10, 3] == 5.0

def test_below_threshold_step_not_smoothed(self):
"""Steps within `_IK_OUTLIER_RATIO`× median are normal motion, not hops."""
positions = self._smooth_chain(n=40)
# Bump one sample by ~5× the median step — should NOT trigger.
positions[20, 3] += 5 * 0.01
original = positions.copy()
n = _smooth_singularity_outliers(positions)
assert n == 0
assert np.array_equal(positions, original)

def test_threshold_exactly_at_ratio_not_smoothed(self):
"""Strict > comparison: step == threshold is NOT an outlier."""
positions = self._smooth_chain(n=40)
# Each step ~ sqrt(6) * 0.01; bump just at the threshold boundary.
median_step = np.sqrt(6) * 0.01
# Add exactly ratio * median to one component so the step magnitude
# is just over the legitimate motion but well within tolerance.
positions[20, 3] += _IK_OUTLIER_RATIO * median_step - 0.5 * median_step
n = _smooth_singularity_outliers(positions)
assert n == 0

def test_repair_preserves_endpoints(self):
"""No matter where the hop is, positions[0] and positions[-1] are
always exactly preserved (they're the IK seed and final target)."""
rng = np.random.default_rng(0)
for _ in range(20):
positions = self._smooth_chain(n=50)
hop_idx = int(rng.integers(2, 48))
positions[hop_idx, rng.integers(0, 6)] += rng.uniform(2.0, 8.0)
first = positions[0].copy()
last = positions[-1].copy()
_smooth_singularity_outliers(positions)
assert np.array_equal(positions[0], first)
assert np.array_equal(positions[-1], last)
Loading