Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,78 @@ jobs:
pip install -e ".[dev]"
- name: Run pre-commit
uses: pre-commit/action@v3.0.1

test:
name: ${{ matrix.os }} / Python ${{ matrix.python-version }}
runs-on: ${{ matrix.os }}
timeout-minutes: 30
timeout-minutes: 45
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: ['3.11', '3.12', '3.13', '3.14']

defaults:
run:
shell: bash -l {0}

steps:
- name: Checkout repository (with submodules)
uses: actions/checkout@v4

- name: Setup Python
- name: Detect matching pinokin branch
id: pinokin
shell: bash
run: |
BRANCH="${GITHUB_HEAD_REF:-${GITHUB_REF_NAME}}"
if git ls-remote --heads https://github.com/Jepson2k/pinokin.git "$BRANCH" 2>/dev/null | grep -q .; then
echo "matching=true" >> "$GITHUB_OUTPUT"
echo "branch=$BRANCH" >> "$GITHUB_OUTPUT"
else
echo "matching=false" >> "$GITHUB_OUTPUT"
fi

# ----------------------------------------------------------------------
# Path A: matching pinokin branch -> conda env (provides libpinocchio +
# libcoal headers/libs) so pinokin can be built from source. We install
# PAROL6 first (which pulls in the pinned pinokin v0.1.6 wheel) and
# then override pinokin with the source-built wheel from the matching
# branch. This is the only way to exercise unreleased pinokin features
# before a release lands.
# ----------------------------------------------------------------------
- name: Setup Miniforge (matching pinokin branch)
if: steps.pinokin.outputs.matching == 'true'
uses: conda-incubator/setup-miniconda@v3
with:
miniforge-version: latest
python-version: ${{ matrix.python-version }}
conda-remove-defaults: true
activate-environment: parol6-test

- name: Clone matching pinokin branch
if: steps.pinokin.outputs.matching == 'true'
run: |
git clone --depth=1 --branch="${{ steps.pinokin.outputs.branch }}" https://github.com/Jepson2k/pinokin.git pinokin-src
conda env update -n parol6-test -f pinokin-src/environment.yml

# ----------------------------------------------------------------------
# Path B: no matching pinokin branch -> plain pip with the pinned
# release wheel. Anything in PAROL6 that requires a newer pinokin
# feature will fail loudly here (intended).
# ----------------------------------------------------------------------
- name: Setup Python (pinned pinokin)
if: steps.pinokin.outputs.matching != 'true'
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
cache-dependency-path: pyproject.toml

# ---- Common: waldoctl override + PAROL6 install ----
- name: Install cross-repo dependencies
shell: bash
run: |
python -m pip install --upgrade pip
BRANCH="${GITHUB_HEAD_REF:-${GITHUB_REF_NAME}}"
# waldoctl: try matching branch, fall back to tagged version
if git ls-remote --heads https://github.com/Jepson2k/waldoctl.git "$BRANCH" 2>/dev/null | grep -q .; then
pip install "waldoctl @ git+https://github.com/Jepson2k/waldoctl.git@${BRANCH}"
fi
Expand All @@ -66,6 +111,16 @@ jobs:
run: |
pip install -e ".[dev]" pytest-timeout

# Override the pinned pinokin v0.1.6 wheel with the matching-branch
# source build. --force-reinstall + --no-deps swaps just pinokin
# without disturbing other resolved dependencies.
- name: Override pinokin with matching-branch source build
if: steps.pinokin.outputs.matching == 'true'
run: |
cd pinokin-src
pip install . --no-build-isolation --force-reinstall --no-deps
python -c "from pinokin import CollisionChecker; print('CollisionChecker available')"

- name: Show environment
run: |
python -V
Expand Down
126 changes: 125 additions & 1 deletion parol6/PAROL6_ROBOT.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import atexit
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Final

import numpy as np
from numpy.typing import NDArray
from pinokin import Robot
from pinokin import CollisionChecker, Robot

from parol6.tools import get_tool_transform

Expand Down Expand Up @@ -56,10 +57,131 @@
_urdf_path = str(
Path(__file__).resolve().parent / "urdf_model" / "urdf" / "PAROL6.urdf"
)
_mesh_dir = str(Path(_urdf_path).resolve().parent.parent)

# Current robot instance (tool transform applied in-place)
robot: Robot = Robot(_urdf_path)

# Self-collision checker bound to the same pinokin Robot. Lazy: only
# constructed when collision checking is enabled and geometry loads
# successfully. Treat None as "checks disabled" everywhere.
collision: CollisionChecker | None = None


def _resolved_urdf_for_collision() -> str:
"""Return a path to a URDF with `package://parol6/...` rewritten to
absolute `file://` paths so pinokin's mesh loader can resolve them.

The PAROL6 URDF was authored for a ROS package layout (meshes at
`parol6/meshes/`) but the Python package places them at
`parol6/urdf_model/meshes/`. Rewriting at runtime keeps the source
URDF unchanged and avoids fragile symlink farms.

The rewritten file is created in the system temp dir on first call and
cleaned up at interpreter exit.
"""
import tempfile

src = Path(_urdf_path)
text = src.read_text()
mesh_root = Path(_urdf_path).resolve().parent.parent / "meshes"
# `package://parol6/meshes/foo.STL` -> `file:///abs/path/to/foo.STL`
rewritten = text.replace("package://parol6/meshes/", f"file://{mesh_root}/")
fd, tmp_path = tempfile.mkstemp(prefix="parol6_collision_", suffix=".urdf")
with os.fdopen(fd, "w") as f:
f.write(rewritten)

@atexit.register
def _cleanup_tmp_urdf() -> None:
try:
os.unlink(tmp_path)
except OSError:
pass

return tmp_path


def _init_collision_checker() -> None:
"""Build the singleton CollisionChecker if enabled in config."""
global collision
# Late import so importing this module never crashes on a circular import
# with parol6.config.
from parol6.config import (
COLLISION_CHECK_ENABLED,
COLLISION_SRDF_PATH,
)

if not COLLISION_CHECK_ENABLED:
collision = None
return

try:
urdf_for_collision = _resolved_urdf_for_collision()
c = CollisionChecker(robot, urdf_for_collision, package_dirs=[_mesh_dir])
if COLLISION_SRDF_PATH and os.path.exists(COLLISION_SRDF_PATH):
c.load_srdf(COLLISION_SRDF_PATH)
collision = c
logger.info(
"Collision checker loaded: %d pairs, %d geometry objects",
c.num_collision_pairs,
c.num_geometry_objects,
)
except Exception as e: # noqa: BLE001
logger.warning(
"Failed to initialize collision checker (continuing without): %s", e
)
collision = None


# Geometry-object names for meshes attached to the collision checker on
# behalf of the currently-active tool. Mutated by
# `_refresh_collision_tool_geometry` on every `apply_tool` call.
_active_tool_geom_names: list[str] = []


def _refresh_collision_tool_geometry(
tool_key: str,
variant_key: str | None = None,
) -> None:
"""Sync the global collision checker's tool geometry with the active
tool. No-op if the checker isn't built yet (so this is safe to call
during early module init, before `_init_collision_checker` runs)."""
if collision is None:
return
for name in _active_tool_geom_names:
collision.remove_geometry_by_name(name)
_active_tool_geom_names.clear()
if tool_key == "NONE":
return
from parol6.tools import get_registry

cfg = get_registry().get(tool_key)
if cfg is None:
return
# ToolVariant.meshes (when non-empty) wholesale replaces cfg.meshes;
# mirrors WC's swap_tool_mesh semantics in urdf_scene.py.
meshes = cfg.meshes
if variant_key:
for v in cfg.variants:
if v.key == variant_key and v.meshes:
meshes = v.meshes
break
mesh_root = Path(_mesh_dir) / "meshes"
for spec in meshes:
path = mesh_root / spec.file
# All current MeshSpecs use rpy=(0,0,0); rotation is baked into
# the STL geometry (see _MESH_RPY comment in tools.py). Add a
# rotation branch here when a non-identity rpy appears.
T = np.eye(4, dtype=np.float64)
T[:3, 3] = spec.origin
collision.attach_mesh_to_frame(
spec.file,
str(path),
parent_frame="L6",
placement=T,
)
_active_tool_geom_names.append(spec.file)


def apply_tool(
tool_name: str,
Expand Down Expand Up @@ -95,6 +217,8 @@ def apply_tool(
robot.clear_tool_transform()
logger.info(f"Applied tool {label} (identity)")

_refresh_collision_tool_geometry(tool_name, variant_key=variant_key or None)


# Initialize with no tool
apply_tool("NONE")
Expand Down
96 changes: 96 additions & 0 deletions parol6/commands/_collision_guard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Shared self-collision pre-flight check used by motion commands.

`guard_joint_path(positions)` raises `MotionError(SYS_SELF_COLLISION)` if any
sampled configuration along the interpolated joint path would self-collide
(or world-collide given runtime obstacles attached to the singleton checker).
Disabled-by-config or unloaded-checker scenarios are no-ops.
"""

from __future__ import annotations

import numpy as np
from numpy.typing import NDArray

import parol6.PAROL6_ROBOT as PAROL6_ROBOT
from parol6.config import COLLISION_PATH_SAMPLES
from parol6.utils.error_catalog import make_error
from parol6.utils.error_codes import ErrorCode
from parol6.utils.errors import MotionError


def _format_pairs(pairs: list[tuple[str, str]]) -> str:
"""Render colliding (name, name) pairs as a human-readable string.

Caps at 4 to keep error messages tractable when many pairs collide
simultaneously (rare in practice; usually the first one is the
actionable one anyway).
"""
if not pairs:
return "?"
head = pairs[:4]
rendered = ", ".join(f"{a} vs {b}" for a, b in head)
if len(pairs) > 4:
rendered += f" (+{len(pairs) - 4} more)"
return rendered


def guard_config(q: NDArray[np.float64]) -> None:
"""Raise MotionError if q is in collision. No-op if checker disabled."""
checker = PAROL6_ROBOT.collision
if checker is None:
return
q_arr = np.ascontiguousarray(q, dtype=np.float64)
if checker.in_collision(q_arr):
pairs = checker.colliding_pairs(q_arr)
raise MotionError(
make_error(
ErrorCode.SYS_SELF_COLLISION,
sample="target",
total="1",
pairs=_format_pairs(pairs),
)
)


def guard_joint_path(positions: NDArray[np.float64]) -> None:
"""Raise MotionError if any sample in the path is in collision.

`positions` is (N, nq) joint positions in radians. Endpoints are always
included; up to COLLISION_PATH_SAMPLES interior samples are checked.
"""
checker = PAROL6_ROBOT.collision
if checker is None:
return

pos = np.ascontiguousarray(positions, dtype=np.float64)
n = pos.shape[0]
if n == 0:
return

# Subsample at COLLISION_PATH_SAMPLES interior points, always including
# first and last rows.
target = max(2, COLLISION_PATH_SAMPLES + 2)
if n <= target:
idx = np.arange(n)
else:
idx = np.unique(
np.concatenate(
[
np.linspace(0, n - 1, target).round().astype(int),
np.array([0, n - 1]),
]
)
)
sub = np.ascontiguousarray(pos[idx], dtype=np.float64)
hit = checker.check_path(sub)
if hit >= 0:
sample = int(idx[hit])
pairs = checker.colliding_pairs(pos[sample])
raise MotionError(
make_error(
ErrorCode.SYS_SELF_COLLISION,
sample=str(sample),
total=str(n),
pairs=_format_pairs(pairs),
)
)
Loading
Loading