diff --git a/source/isaaclab/changelog.d/antoiner-feat-ovphysx_contactsensor.rst b/source/isaaclab/changelog.d/antoiner-feat-ovphysx_contactsensor.rst new file mode 100644 index 000000000000..7d92d9215b7c --- /dev/null +++ b/source/isaaclab/changelog.d/antoiner-feat-ovphysx_contactsensor.rst @@ -0,0 +1,32 @@ +Fixed +^^^^^ + +* Fixed three places where ``OvPhysxManager`` was misclassified as the + PhysX backend by a substring/schema match: + + - :meth:`~isaaclab.sensors.SensorBase._register_callbacks` matched + ``"physx" in physics_mgr_cls.__name__.lower()`` to gate the PhysX + ``IsaacEvents.PRIM_DELETION`` import — the substring also matches + ``"OvPhysxManager"``, so the ``isaaclab_physx`` import fired in + kitless OVPhysX mode and raised + :exc:`ModuleNotFoundError` because ``omni.physics.tensors`` is not + loaded. Switched to an exact ``physics_mgr_cls.__name__ == + "PhysxManager"`` match. + - :meth:`~isaaclab.assets.AssetBase.set_debug_vis` had the same + substring check guarding an ``import omni.kit.app`` call, which + would fire for OVPhysX-backed assets and break under + ``./scripts/run_ovphysx.sh``. Switched to an exact + ``"PhysxManager"`` match. + - :meth:`~isaaclab.physics.SceneDataProvider._get_backend` used + ``"physx" in manager_name`` to dispatch the backend factory; this + silently routed ``OvPhysxManager`` to the PhysX scene-data + provider. Switched to exact ``"PhysxManager"`` / + ``"NewtonManager"`` matches and an explicit ``ValueError`` for + unknown managers. +* Made + :attr:`~isaaclab.scene.InteractiveScene.physics_scene_path` accept a + bare :class:`pxr.UsdPhysics.Scene` prim as a fallback when no prim + with ``PhysxSceneAPI`` applied is on the stage. Kitless OVPhysX + does not load the ``omni.physx`` schema, so the auto-created scene + prim only carries the stock USD type. PhysX-backed flows continue + to prefer the ``PhysxSceneAPI`` prim. diff --git a/source/isaaclab/isaaclab/envs/mdp/events.py b/source/isaaclab/isaaclab/envs/mdp/events.py index 9483d2d2d0cd..e113c6f058eb 100644 --- a/source/isaaclab/isaaclab/envs/mdp/events.py +++ b/source/isaaclab/isaaclab/envs/mdp/events.py @@ -385,12 +385,37 @@ def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv): f" with type: '{type(self.asset)}'." ) - # detect physics backend and instantiate the appropriate implementation + # detect physics backend and instantiate the appropriate implementation. + # Check ``ovphysxmanager`` first: it contains the substring ``physx`` so + # would otherwise be caught by the ``"physx" in ...`` branch below and + # routed to the PhysX impl, which assumes a ``root_view`` with + # ``.link_paths`` — OVPhysX's per-tensor-type bindings dict does not + # satisfy that contract. Newton's subclasses (``NewtonMJWarpManager``, + # ``NewtonKaminoManager``, ...) are caught by the substring branch. manager_name = env.sim.physics_manager.__name__.lower() - if "newton" in manager_name: + if manager_name == "ovphysxmanager": + # No OVPhysX implementation yet — wheel-side + # ``RIGID_BODY_MATERIAL`` tensor binding is missing; randomization + # would require per-body view creation that ovphysx does not yet + # expose. Run with material randomization disabled (warns once). + import logging # noqa: PLC0415 + + logging.getLogger(__name__).warning( + "randomize_rigid_body_material is a no-op on the OVPhysX backend " + "(wheel-side gap — see docs/superpowers/specs/2026-04-27-ovphysx-contact-api-gaps.md)." + ) + + class _Noop: + def __call__(self, *args, **kwargs): + pass + + self._impl = _Noop() + elif "newton" in manager_name: self._impl = _RandomizeRigidBodyMaterialNewton(cfg, env, self.asset, self.asset_cfg) - else: + elif "physx" in manager_name: self._impl = _RandomizeRigidBodyMaterialPhysx(cfg, env, self.asset, self.asset_cfg) + else: + raise ValueError(f"Unsupported physics manager for randomize_rigid_body_material: {manager_name!r}") def __call__( self, diff --git a/source/isaaclab/isaaclab/physics/physics_manager.py b/source/isaaclab/isaaclab/physics/physics_manager.py index 6727d87174b6..b6e1a62f620b 100644 --- a/source/isaaclab/isaaclab/physics/physics_manager.py +++ b/source/isaaclab/isaaclab/physics/physics_manager.py @@ -157,11 +157,23 @@ def dispatch_event(cls, event: PhysicsEvent, payload: Any = None) -> None: @classmethod def clear_callbacks(cls) -> None: - """Remove all registered callbacks.""" + """Remove all registered callbacks. + + Do NOT reset ``_callback_id`` — handle IDs must remain monotonically + unique across the lifetime of the process. Resetting the counter + would let a future :meth:`register_callback` hand out an ID that an + old, still-alive :class:`CallbackHandle` (e.g. on a sensor that has + not been garbage-collected yet) holds, so when the old object + eventually finalizes its ``__del__`` would deregister the new + callback. This bit ovphysx's kitless multi-context tests where two + ``InteractiveScene``s are created in sequence: the first scene's + sensor would post-GC deregister the second scene's + ``_initialize_callback`` by ID collision, leaving the second sensor + forever uninitialized. + """ for cid in list(cls._callbacks.keys()): cls.deregister_callback(cid) cls._callbacks.clear() - cls._callback_id = 0 @classmethod def _wrap_weak_ref(cls, callback: Callable) -> Callable: diff --git a/source/isaaclab/isaaclab/scene/interactive_scene.py b/source/isaaclab/isaaclab/scene/interactive_scene.py index 543dfdfc9d57..70756007d6fb 100644 --- a/source/isaaclab/isaaclab/scene/interactive_scene.py +++ b/source/isaaclab/isaaclab/scene/interactive_scene.py @@ -454,11 +454,21 @@ def __str__(self) -> str: def physics_scene_path(self) -> str: """The path to the USD Physics Scene.""" if self._physics_scene_path is None: + # Prefer a prim with PhysxSceneAPI applied (Isaac Sim flow). Fall + # back to any UsdPhysics.Scene prim (kitless OvPhysX flow does not + # load the omni.physx schema, so the auto-created scene only + # carries the stock USD type without PhysxSceneAPI). + fallback_path: str | None = None for prim in self.stage.Traverse(): if "PhysxSceneAPI" in prim.GetAppliedSchemas(): self._physics_scene_path = prim.GetPrimPath().pathString logger.info(f"Physics scene prim path: {self._physics_scene_path}") break + if fallback_path is None and prim.GetTypeName() == "PhysicsScene": + fallback_path = prim.GetPrimPath().pathString + if self._physics_scene_path is None and fallback_path is not None: + self._physics_scene_path = fallback_path + logger.info(f"Physics scene prim path (no PhysxSceneAPI): {self._physics_scene_path}") if self._physics_scene_path is None: raise RuntimeError("No physics scene found! Please make sure one exists.") return self._physics_scene_path diff --git a/source/isaaclab/isaaclab/sensors/sensor_base.py b/source/isaaclab/isaaclab/sensors/sensor_base.py index d52c902f9d73..15fca5ee4ad4 100644 --- a/source/isaaclab/isaaclab/sensors/sensor_base.py +++ b/source/isaaclab/isaaclab/sensors/sensor_base.py @@ -294,7 +294,9 @@ def _invoke(callback_name, event): PhysicsEvent.STOP, order=10, ) - # Optional: prim deletion (only supported by PhysX backend) + # Optional: prim deletion (only supported by PhysX backend; the substring + # check would also match ``OvPhysxManager``, which does not expose + # ``IsaacEvents``, so use an exact class-name match). self._prim_deletion_handle = None if physics_mgr_cls.__name__ == "PhysxManager": from isaaclab_physx.physics import IsaacEvents # noqa: PLC0415 diff --git a/source/isaaclab_ovphysx/changelog.d/antoiner-feat-ovphysx_contactsensor.minor.rst b/source/isaaclab_ovphysx/changelog.d/antoiner-feat-ovphysx_contactsensor.minor.rst new file mode 100644 index 000000000000..1fa4e49623c7 --- /dev/null +++ b/source/isaaclab_ovphysx/changelog.d/antoiner-feat-ovphysx_contactsensor.minor.rst @@ -0,0 +1,46 @@ +Added +^^^^^ + +* Added :class:`~isaaclab_ovphysx.sensors.ContactSensor`, + :class:`~isaaclab_ovphysx.sensors.ContactSensorCfg`, and + :class:`~isaaclab_ovphysx.sensors.ContactSensorData` for the OVPhysX + backend, satisfying the + :class:`~isaaclab.sensors.contact_sensor.BaseContactSensor` and + :class:`~isaaclab.sensors.contact_sensor.BaseContactSensorData` + contracts. Wires net contact forces and the per-partner force matrix + through the OVPhysX :class:`ovphysx.api.ContactBinding` API + (``read_net_forces`` / ``read_force_matrix``); optional pose tracking + reads through a ``RIGID_BODY_POSE`` :class:`ovphysx.api.TensorBinding`. + Air/contact time tracking, + :meth:`~isaaclab_ovphysx.sensors.ContactSensor.compute_first_contact`, + :meth:`~isaaclab_ovphysx.sensors.ContactSensor.compute_first_air`, + history buffers, and reset semantics mirror the PhysX backend. +* Added the shared + :mod:`isaaclab_ovphysx.sensors.kernels` module with + :func:`~isaaclab_ovphysx.sensors.kernels.concat_pos_and_quat_to_pose_kernel` + and the 1D variant for reuse across future OVPhysX sensors. + +Changed +^^^^^^^ + +* Changed the existing + ``source/isaaclab_ovphysx/test/sensors/check_contact_sensor.py`` + stubs to real tests adapted from the PhysX + :mod:`isaaclab_physx.test.sensors.test_contact_sensor` suite. The + three tests that exercise ``track_contact_points`` or + ``track_friction_forces`` are decorated with + :func:`pytest.mark.skip` until the OVPhysX wheel ships + tensor-friendly per-sensor reads (see + ``docs/superpowers/specs/2026-04-27-ovphysx-contact-api-gaps.md``); + the test bodies are preserved so the decorator can be removed in a + follow-up. + +Removed +^^^^^^^ + +* **Breaking:** Removed the five + ``source/isaaclab_ovphysx/test/sensors/check_contact_sensor.py`` + ``pytest.skip("Contact sensor not yet supported by ovphysx + backend.")`` placeholders in favour of the real test suite above. + No public migration is required; the placeholder names did not + appear in any external API. diff --git a/source/isaaclab_ovphysx/isaaclab_ovphysx/physics/ovphysx_manager.py b/source/isaaclab_ovphysx/isaaclab_ovphysx/physics/ovphysx_manager.py index 3b6f49e916f1..729b06befa95 100644 --- a/source/isaaclab_ovphysx/isaaclab_ovphysx/physics/ovphysx_manager.py +++ b/source/isaaclab_ovphysx/isaaclab_ovphysx/physics/ovphysx_manager.py @@ -245,6 +245,36 @@ def register_clone( """ cls._pending_clones.append((source, targets, parent_positions or [])) + _physx_schemas_registered: ClassVar[bool] = False + + @classmethod + def _ensure_physx_schemas_registered(cls) -> None: + """Register the ``PhysxSchema`` USD plugin shipped with the ovphysx wheel. + + In Kit-based runs ``omni.physx`` registers the schema; in kitless + runs it must be registered manually before the wheel can match + ``PhysxContactReportAPI`` and friends on the stage. The wheel + bundles the plugin under ``ovphysx/plugins/usd/PhysxSchema``. This + method is idempotent — :meth:`pxr.Plug.Registry.RegisterPlugins` + is a no-op once the plugin is registered. + """ + if cls._physx_schemas_registered: + return + try: + import os # noqa: PLC0415 + + import ovphysx # noqa: PLC0415 + + from pxr import Plug # noqa: PLC0415 + except Exception: + return + plugin_root = os.path.join(os.path.dirname(ovphysx.__file__), "plugins", "usd") + for sub in ("PhysxSchema/resources", "PhysxSchemaAddition/resources"): + path = os.path.join(plugin_root, sub) + if os.path.isdir(path): + Plug.Registry().RegisterPlugins(path) + cls._physx_schemas_registered = True + @classmethod def initialize(cls, sim_context: SimulationContext) -> None: """Initialize the physics manager with simulation context. @@ -262,6 +292,7 @@ def initialize(cls, sim_context: SimulationContext) -> None: instance is bound to. """ super().initialize(sim_context) + cls._ensure_physx_schemas_registered() cls._warmup_done = False cls._usd_handle = None cls._stage_path = None diff --git a/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/__init__.py b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/__init__.py new file mode 100644 index 000000000000..f00f5832e6da --- /dev/null +++ b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Sub-package for ovphysx-backed sensors.""" + +from isaaclab.utils.module import lazy_export + +lazy_export() diff --git a/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/__init__.pyi b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/__init__.pyi new file mode 100644 index 000000000000..e00fc97cbbc6 --- /dev/null +++ b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/__init__.pyi @@ -0,0 +1,12 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +__all__ = [ + "ContactSensor", + "ContactSensorCfg", + "ContactSensorData", +] + +from .contact_sensor import ContactSensor, ContactSensorCfg, ContactSensorData diff --git a/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/__init__.py b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/__init__.py new file mode 100644 index 000000000000..b07f8b8b1df0 --- /dev/null +++ b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""OVPhysX-backed contact sensor.""" + +from isaaclab.utils.module import lazy_export + +lazy_export() diff --git a/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/__init__.pyi b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/__init__.pyi new file mode 100644 index 000000000000..fd936d53b0c0 --- /dev/null +++ b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/__init__.pyi @@ -0,0 +1,14 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +__all__ = [ + "ContactSensor", + "ContactSensorCfg", + "ContactSensorData", +] + +from .contact_sensor import ContactSensor +from .contact_sensor_cfg import ContactSensorCfg +from .contact_sensor_data import ContactSensorData diff --git a/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/contact_sensor.py b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/contact_sensor.py new file mode 100644 index 000000000000..898fbbeb4de6 --- /dev/null +++ b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/contact_sensor.py @@ -0,0 +1,515 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +# Ignore optional memory usage warning globally +# pyright: reportOptionalSubscript=false + +from __future__ import annotations + +import contextlib +import logging +import re +from collections.abc import Sequence +from typing import TYPE_CHECKING, Any + +import warp as wp + +import isaaclab.sim as sim_utils +from isaaclab.sensors.contact_sensor import BaseContactSensor +from isaaclab.utils.warp import ProxyArray + +import isaaclab_ovphysx.tensor_types as TT +from isaaclab_ovphysx.physics import OvPhysxManager + +from .contact_sensor_data import ContactSensorData +from .kernels import ( + compute_first_transition_kernel, + reset_contact_sensor_kernel, + split_flat_pose_to_pos_quat, + unpack_contact_buffer_data, # noqa: F401 -- reserved for v2 contact-points support + update_net_forces_ovphysx_kernel, +) + +if TYPE_CHECKING: + from .contact_sensor_cfg import ContactSensorCfg + +logger = logging.getLogger(__name__) + + +class ContactSensor(BaseContactSensor): + """An ovphysx contact reporting sensor. + + Reports normal contact forces in world frame using the ovphysx + :class:`ContactBinding` API. The `PhysxContactReportAPI` USD schema must + be applied to each sensor body (set + :attr:`isaaclab.sim.spawner.RigidObjectSpawnerCfg.activate_contact_sensors` + on the asset spawner). + + Optional features tracked by :attr:`ContactSensorCfg`: + + * ``track_pose`` — sensor body pose via a ``RIGID_BODY_POSE`` tensor binding. + * ``filter_prim_paths_expr`` — per-partner filtered forces via + :meth:`ContactBinding.read_force_matrix`. + * ``track_air_time`` — air/contact time tracking and + :meth:`compute_first_contact` / :meth:`compute_first_air`. + + The following config flags are not supported on the ovphysx backend yet + (the underlying ovphysx APIs do not expose tensor-friendly per-sensor + reads — see ``docs/superpowers/specs/2026-04-27-ovphysx-contact-api-gaps.md``): + + * ``track_contact_points`` + * ``track_friction_forces`` + + Setting either flag raises :class:`NotImplementedError` at initialization. + """ + + cfg: ContactSensorCfg + """The configuration parameters.""" + + __backend_name__: str = "ovphysx" + """The name of the backend for the contact sensor.""" + + def __init__(self, cfg: ContactSensorCfg): + """Initializes the contact sensor object. + + Args: + cfg: The configuration parameters. + """ + super().__init__(cfg) + + # Reject the v1 unsupported optional features early, before USD discovery. + if cfg.track_contact_points or cfg.track_friction_forces: + raise NotImplementedError( + "ovphysx ContactSensor does not yet support 'track_contact_points' or 'track_friction_forces'." + " ovphysx 0.3.7 lacks tensor-friendly per-sensor read APIs for these features." + " See docs/superpowers/specs/2026-04-27-ovphysx-contact-api-gaps.md for the maintainer asks." + ) + + self._data: ContactSensorData = ContactSensorData() + # Backend handles, populated in _initialize_impl. + self._physx_instance: Any = None + self._contact_binding: Any = None + self._pose_binding: Any = None + # Pre-allocated read buffers, populated in _create_buffers. + self._net_forces_flat_buf: wp.array | None = None + self._force_matrix_flat_buf: wp.array | None = None + self._poses_flat_buf: wp.array | None = None + # Body names (resolved during init). + self._body_names: list[str] = [] + # Default backend tunables matching the PhysX backend. + if self.cfg.max_contact_data_count_per_prim is None: + self.cfg.max_contact_data_count_per_prim = 4 + if self.cfg.force_threshold is None: + self.cfg.force_threshold = 1.0 + + def __str__(self) -> str: + """Returns: A string containing information about the instance.""" + return ( + f"Contact sensor @ '{self.cfg.prim_path}': \n" + f"\tbackend : ovphysx\n" + f"\tupdate period (s) : {self.cfg.update_period}\n" + f"\tnumber of bodies : {self.num_sensors}\n" + f"\tbody names : {self.body_names}\n" + ) + + """ + Properties + """ + + @property + def num_instances(self) -> int | None: + if self._contact_binding is None: + return None + return self._contact_binding.sensor_count + + @property + def data(self) -> ContactSensorData: + self._update_outdated_buffers() + return self._data + + @property + def num_sensors(self) -> int: + return self._num_sensors + + @property + def body_names(self) -> list[str]: + """The leaf-prim names of the sensor bodies. + + Raises: + RuntimeError: If accessed before the sensor has been initialized + (matches the eager non-``None`` contract PhysX provides). + """ + if not self._body_names: + raise RuntimeError( + "OvPhysxContactSensor.body_names accessed before initialization. " + "Step the simulation once (or wait for PhysicsEvent.PHYSICS_READY) so the " + "sensor can discover its bodies." + ) + return list(self._body_names) + + @property + def contact_view(self) -> Any: + """The underlying ovphysx :class:`ContactBinding` (or ``None`` before init). + + .. note:: + Use this view with caution. It owns native handles released at + simulation stop. + """ + return self._contact_binding + + @property + def pose_binding(self) -> Any: + """The underlying ovphysx ``RIGID_BODY_POSE`` :class:`TensorBinding`. + + ``None`` if ``cfg.track_pose`` is False or before initialization. + """ + return self._pose_binding + + """ + Implementation. + """ + + def _initialize_impl(self) -> None: + super()._initialize_impl() + + physx_instance = OvPhysxManager.get_physx_instance() + if physx_instance is None: + raise RuntimeError("OvPhysxManager has not been initialized yet.") + self._physx_instance = physx_instance + + # Discover sensor bodies. Mirror the PhysX discovery path but use + # ``GetPrimTypeInfo().GetAppliedAPISchemas()`` (raw apiSchemas listOp) + # rather than ``GetAppliedSchemas()`` (filtered by USD's plugin + # registry). Under the kitless ovphysx flow the ``PhysxSchema`` USD + # plugin is registered by :meth:`OvPhysxManager.initialize` so the + # wheel-side schema check passes, but the Python-side filtered API + # still hides ``PhysxContactReportAPI`` because the schema TYPE + # registration only happens when the C++ plugin library is loaded by + # ``omni.physx``. The unfiltered API matches what the underlying + # USD apiSchemas listOp actually carries (verified against + # :class:`pxr.Sdf.PrimSpec.GetInfo("apiSchemas")`). + leaf_pattern = self.cfg.prim_path.rsplit("/", 1)[-1] + template_prim_path = self._parent_prims[0].GetPath().pathString + body_names: list[str] = [] + for prim in sim_utils.find_matching_prims(template_prim_path + "/" + leaf_pattern): + if "PhysxContactReportAPI" in prim.GetPrimTypeInfo().GetAppliedAPISchemas(): + body_names.append(prim.GetPath().pathString.rsplit("/", 1)[-1]) + if not body_names: + raise RuntimeError( + f"Sensor at path '{self.cfg.prim_path}' could not find any bodies with contact reporter API." + "\nHINT: Make sure to enable 'activate_contact_sensors' in the corresponding asset spawn configuration." + ) + self._body_names = body_names + self._num_sensors = len(body_names) + + # Build glob patterns: one per (env, sensor body). + # IsaacLab path forms map to ovphysx fnmatch globs the same way Articulation does. + base_glob = self.cfg.prim_path.rsplit("/", 1)[0] + base_glob = re.sub(r"\{ENV_REGEX_NS\}", "*", base_glob) + base_glob = re.sub(r"\.\*", "*", base_glob) + sensor_patterns = [f"{base_glob}/{name}" for name in body_names] + + # Build filter patterns (flat: len = n_sensors * filters_per_sensor). + filter_globs = [ + re.sub(r"\.\*", "*", re.sub(r"\{ENV_REGEX_NS\}", "*", expr)) for expr in self.cfg.filter_prim_paths_expr + ] + filters_per_sensor = len(filter_globs) + if filters_per_sensor > 0: + filter_patterns: list[str] | None = filter_globs * self._num_sensors + else: + filter_patterns = None + + # Create the contact binding (must happen BEFORE the next step()). + # OVPhysX's ``InteractiveScene`` runs in ``clone_usd=False`` mode: + # env_1..N have no USD prim — they're physics-layer clones via + # ``physx.clone()``. The parent class's ``find_matching_prims`` walk + # therefore sees only env_0 and sets ``self._num_envs = 1`` even when + # the scene is configured for many envs. We size the + # ``max_contact_data_count`` for env_0 only here; the binding's + # ``sensor_count`` after creation gives us the real env count. + max_count = self.cfg.max_contact_data_count_per_prim * self._num_sensors * self._num_envs + self._contact_binding = physx_instance.create_contact_binding( + sensor_patterns=sensor_patterns, + filter_patterns=filter_patterns, + filters_per_sensor=filters_per_sensor, + max_contact_data_count=max_count, + ) + + # Validate: sensor_count must be a non-zero multiple of num_sensors. + if self._contact_binding.sensor_count == 0 or self._contact_binding.sensor_count % self._num_sensors != 0: + raise RuntimeError( + "Failed to initialize contact binding for specified bodies." + f"\n\tInput prim path : {self.cfg.prim_path}" + f"\n\tNum sensor bodies : {self._num_sensors}" + f"\n\tBound sensors : {self._contact_binding.sensor_count}" + ) + + # Override ``_num_envs`` with the binding's view if it differs (it does + # for any OVPhysX scene with ``num_envs > 1`` due to ``clone_usd=False``). + # Re-allocate the env-sized buffers from the parent class so they match + # the real env count. + binding_num_envs = self._contact_binding.sensor_count // self._num_sensors + if binding_num_envs != self._num_envs: + self._num_envs = binding_num_envs + self._ALL_ENV_MASK = wp.ones((self._num_envs,), dtype=wp.bool, device=self._device) + self._reset_mask = wp.zeros((self._num_envs,), dtype=wp.bool, device=self._device) + self._reset_mask_torch = wp.to_torch(self._reset_mask) + self._is_outdated = wp.ones(self._num_envs, dtype=wp.bool, device=self._device) + self._timestamp = wp.zeros(self._num_envs, dtype=wp.float32, device=self._device) + self._timestamp_last_update = wp.zeros_like(self._timestamp) + + # Optional: pose tracking via a RIGID_BODY_POSE tensor binding. + # ovphysx fnmatch does not brace-expand, so we cannot match multiple + # body names with a single glob. Single-body sensors (the common case + # — one prim path per sensor) use a tight per-body pattern. Multi-body + # sensors are rejected here; they need per-body bindings + an + # interleaved-read kernel that does not exist yet. + if self.cfg.track_pose: + if self._num_sensors != 1: + raise NotImplementedError( + "ovphysx ContactSensor.track_pose is not yet supported for sensors that " + f"resolve to more than one body per env (got {self._num_sensors} bodies " + f"under '{self.cfg.prim_path}'). Workaround: create one ContactSensor " + "per body." + ) + single_pose_pattern = f"{base_glob}/{body_names[0]}" + self._pose_binding = physx_instance.create_tensor_binding( + pattern=single_pose_pattern, + tensor_type=TT.RIGID_BODY_POSE, + ) + if self._pose_binding.count != self._contact_binding.sensor_count: + raise RuntimeError( + "RIGID_BODY_POSE binding count mismatch." + f"\n\tPattern: {single_pose_pattern}" + f"\n\tBound : {self._pose_binding.count}" + f"\n\tExpect : {self._contact_binding.sensor_count}" + ) + + self._create_buffers() + + def _create_buffers(self) -> None: + """Allocate Warp buffers, including the pre-allocated ovphysx read tensors.""" + self._num_filter_shapes = self._contact_binding.filter_count if self.cfg.filter_prim_paths_expr else 0 + self._history_length = max(self.cfg.history_length, 1) + + # Sensor data buffers (delegated to the data container). + self._data.create_buffers( + num_envs=self._num_envs, + num_sensors=self._num_sensors, + num_filter_shapes=self._num_filter_shapes, + history_length=self.cfg.history_length, + track_pose=self.cfg.track_pose, + track_air_time=self.cfg.track_air_time, + track_contact_points=self.cfg.track_contact_points, + track_friction_forces=self.cfg.track_friction_forces, + device=self._device, + ) + + # ovphysx ContactBinding writes into pre-allocated tensors. We allocate + # them once here and reuse every step. Shape: [S, 3] for net forces, + # [S, F, 3] for the force matrix (S = num_envs * num_sensors). + flat_count = self._num_envs * self._num_sensors + self._net_forces_flat_buf = wp.zeros((flat_count, 3), dtype=wp.float32, device=self._device) + if self._num_filter_shapes > 0: + self._force_matrix_flat_buf = wp.zeros( + (flat_count, self._num_filter_shapes, 3), + dtype=wp.float32, + device=self._device, + ) + else: + self._force_matrix_flat_buf = None + + # Pose buffer: [S, 7] for RIGID_BODY_POSE (px,py,pz,qx,qy,qz,qw). + if self.cfg.track_pose: + self._poses_flat_buf = wp.zeros((flat_count, 7), dtype=wp.float32, device=self._device) + else: + self._poses_flat_buf = None + + def _update_buffers_impl(self, env_mask: wp.array | None = None) -> None: + """Read contact data from ovphysx and update sensor buffers.""" + env_mask = self._resolve_indices_and_mask(None, env_mask) + + # Pull aggregate forces into the pre-allocated flat buffer: + # shape [num_envs * num_sensors, 3] float32 -> [num_envs * num_sensors] vec3f. + self._contact_binding.read_net_forces(self._net_forces_flat_buf) + net_forces_flat = self._net_forces_flat_buf.view(wp.vec3f) + + if self._force_matrix_flat_buf is not None: + self._contact_binding.read_force_matrix(self._force_matrix_flat_buf) + force_matrix_flat = self._force_matrix_flat_buf.view(wp.vec3f) + else: + force_matrix_flat = None + + wp.launch( + update_net_forces_ovphysx_kernel, + dim=(self._num_envs, self._num_sensors), + inputs=[ + net_forces_flat, + force_matrix_flat, + env_mask, + self._num_envs, + self._num_sensors, + self._num_filter_shapes, + self._history_length, + self.cfg.force_threshold, + self._timestamp, + self._timestamp_last_update, + ], + outputs=[ + self._data._net_forces_w, + self._data._net_forces_w_history, + self._data._force_matrix_w, + self._data._force_matrix_w_history, + self._data._current_air_time, + self._data._current_contact_time, + self._data._last_air_time, + self._data._last_contact_time, + ], + device=self._device, + ) + + if self.cfg.track_pose: + # Read pose into [num_envs * num_sensors, 7] float32 -> view as transformf. + self._pose_binding.read(self._poses_flat_buf) + poses_flat = self._poses_flat_buf.view(wp.transformf) + wp.launch( + split_flat_pose_to_pos_quat, + dim=(self._num_envs, self._num_sensors), + inputs=[poses_flat, env_mask, self._num_sensors], + outputs=[self._data._pos_w, self._data._quat_w], + device=self._device, + ) + + """ + Operations + """ + + def reset(self, env_ids: Sequence[int] | None = None, env_mask: wp.array | None = None) -> None: + env_mask = self._resolve_indices_and_mask(env_ids, env_mask) + super().reset(None, env_mask) + + wp.launch( + reset_contact_sensor_kernel, + dim=(self._num_envs, self._num_sensors), + inputs=[ + self._history_length, + self._num_filter_shapes, + env_mask, + self._data._net_forces_w, + self._data._net_forces_w_history, + self._data._force_matrix_w, + ], + outputs=[ + self._data._current_air_time, + self._data._last_air_time, + self._data._current_contact_time, + self._data._last_contact_time, + self._data._friction_forces_w, + self._data._contact_pos_w, + ], + device=self._device, + ) + + def compute_first_contact(self, dt: float, abs_tol: float = 1.0e-8) -> ProxyArray: + """Boolean mask (as float) of bodies that established contact within ``dt`` [s]. + + Args: + dt: Time window since contact establishment [s]. + abs_tol: Absolute tolerance for the comparison [s]. + + Returns: + Boolean tensor (1.0/0.0) of shape ``(num_envs, num_sensors)``. + + Raises: + RuntimeError: If :attr:`ContactSensorCfg.track_air_time` is False. + """ + if not self.cfg.track_air_time: + raise RuntimeError( + "The contact sensor is not configured to track contact time." + " Please enable 'track_air_time' in the sensor configuration." + ) + wp.launch( + compute_first_transition_kernel, + dim=(self._num_envs, self._num_sensors), + inputs=[float(dt + abs_tol), self._data._current_contact_time], + outputs=[self._data._first_transition], + device=self._device, + ) + return self._data._first_transition_ta + + def compute_first_air(self, dt: float, abs_tol: float = 1.0e-8) -> ProxyArray: + """Boolean mask (as float) of bodies that broke contact within ``dt`` [s]. + + Args: + dt: Time window since contact break [s]. + abs_tol: Absolute tolerance for the comparison [s]. + + Returns: + Boolean tensor (1.0/0.0) of shape ``(num_envs, num_sensors)``. + + Raises: + RuntimeError: If :attr:`ContactSensorCfg.track_air_time` is False. + """ + if not self.cfg.track_air_time: + raise RuntimeError( + "The contact sensor is not configured to track air time." + " Please enable 'track_air_time' in the sensor configuration." + ) + wp.launch( + compute_first_transition_kernel, + dim=(self._num_envs, self._num_sensors), + inputs=[float(dt + abs_tol), self._data._current_air_time], + outputs=[self._data._first_transition], + device=self._device, + ) + return self._data._first_transition_ta + + """ + Debug visualization + """ + + def _set_debug_vis_impl(self, debug_vis: bool) -> None: + """Toggle contact-marker visibility. + + The kitless OVPhysX flow has no Kit-based renderer, so visualization + markers are effectively invisible. The hook is still wired so that + callers setting ``cfg.debug_vis=True`` get an explicit warning rather + than silent no-op behaviour. + """ + if debug_vis and not getattr(self, "_warned_debug_vis_unavailable", False): + logger.warning( + "OVPhysX ContactSensor: debug visualization markers are not rendered under the " + "kitless OVPhysX flow (no Kit renderer present). The hook runs but marker " + "geometry will not appear." + ) + self._warned_debug_vis_unavailable = True + + def _debug_vis_callback(self, event) -> None: + """Per-frame visualization update. + + Under kitless OVPhysX this is a no-op -- there is no renderer driving + the per-frame marker positions. The method exists so the base + sensor's debug-vis lifecycle hooks have a callable target. + """ + return + + """ + Internal simulation callbacks. + """ + + def _invalidate_initialize_callback(self, event) -> None: + """Release native handles when the simulation stops.""" + super()._invalidate_initialize_callback(event) + # Drop strong references; ovphysx native handles are torn down on the + # next reset() of OvPhysxManager. + if self._contact_binding is not None: + with contextlib.suppress(Exception): + self._contact_binding.destroy() + self._contact_binding = None + if self._pose_binding is not None: + with contextlib.suppress(Exception): + self._pose_binding.destroy() + self._pose_binding = None + self._physx_instance = None diff --git a/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/contact_sensor_cfg.py b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/contact_sensor_cfg.py new file mode 100644 index 000000000000..f874b80ddae0 --- /dev/null +++ b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/contact_sensor_cfg.py @@ -0,0 +1,19 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from typing import TYPE_CHECKING + +from isaaclab.sensors.contact_sensor.contact_sensor_cfg import ContactSensorCfg as _BaseContactSensorCfg +from isaaclab.utils.configclass import configclass + +if TYPE_CHECKING: + from .contact_sensor import ContactSensor + + +@configclass +class ContactSensorCfg(_BaseContactSensorCfg): + """OVPhysX contact sensor configuration.""" + + class_type: type["ContactSensor"] | str = "{DIR}.contact_sensor:ContactSensor" diff --git a/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/contact_sensor_data.py b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/contact_sensor_data.py new file mode 100644 index 000000000000..0a74760faae7 --- /dev/null +++ b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/contact_sensor_data.py @@ -0,0 +1,318 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from __future__ import annotations + +import logging +import math + +import warp as wp + +from isaaclab.sensors.contact_sensor import BaseContactSensorData +from isaaclab.utils.warp import ProxyArray + +from isaaclab_ovphysx.sensors.kernels import concat_pos_and_quat_to_pose_kernel + +logger = logging.getLogger(__name__) + + +class ContactSensorData(BaseContactSensorData): + """Data container for the ovphysx contact reporting sensor.""" + + @property + def pose_w(self) -> ProxyArray | None: + """Pose of the sensor origin in world frame. + + None if :attr:`ContactSensorCfg.track_pose` is False. + """ + if self._pose_w is None: + return None + wp.launch( + concat_pos_and_quat_to_pose_kernel, + dim=(self._num_envs, self._num_sensors), + inputs=[self._pos_w, self._quat_w], + outputs=[self._pose_w], + device=self._device, + ) + if self._pose_w_ta is None: + self._pose_w_ta = ProxyArray(self._pose_w) + return self._pose_w_ta + + @property + def pos_w(self) -> ProxyArray | None: + """Position of the sensor origin in world frame. + + Shape is (num_instances, num_sensors), dtype = wp.vec3f. In torch this resolves to + (num_instances, num_sensors, 3). + + None if :attr:`ContactSensorCfg.track_pose` is False. + """ + if self._pos_w is None: + return None + if self._pos_w_ta is None: + self._pos_w_ta = ProxyArray(self._pos_w) + return self._pos_w_ta + + @property + def quat_w(self) -> ProxyArray | None: + """Orientation of the sensor origin in world frame. + + Shape is (num_instances, num_sensors), dtype = wp.quatf. In torch this resolves to + (num_instances, num_sensors, 4). The orientation is provided in (x, y, z, w) format. + + None if :attr:`ContactSensorCfg.track_pose` is False. + """ + if self._quat_w is None: + return None + if self._quat_w_ta is None: + self._quat_w_ta = ProxyArray(self._quat_w) + return self._quat_w_ta + + @property + def net_forces_w(self) -> ProxyArray | None: + """The net normal contact forces in world frame. + + Shape is (num_instances, num_sensors), dtype = wp.vec3f. In torch this resolves to + (num_instances, num_sensors, 3). + """ + if self._net_forces_w is None: + return None + if self._net_forces_w_ta is None: + self._net_forces_w_ta = ProxyArray(self._net_forces_w) + return self._net_forces_w_ta + + @property + def net_forces_w_history(self) -> ProxyArray | None: + """History of net normal contact forces. + + Shape is (num_instances, history_length, num_sensors), dtype = wp.vec3f. In torch this resolves to + (num_instances, history_length, num_sensors, 3). + """ + if self._net_forces_w_history is None: + return None + if self._net_forces_w_history_ta is None: + self._net_forces_w_history_ta = ProxyArray(self._net_forces_w_history) + return self._net_forces_w_history_ta + + @property + def force_matrix_w(self) -> ProxyArray | None: + """Normal contact forces filtered between sensor and filtered bodies. + + Shape is (num_instances, num_sensors, num_filter_shapes), dtype = wp.vec3f. In torch this resolves to + (num_instances, num_sensors, num_filter_shapes, 3). + + None if :attr:`ContactSensorCfg.filter_prim_paths_expr` is empty. + """ + if self._force_matrix_w is None: + return None + if self._force_matrix_w_ta is None: + self._force_matrix_w_ta = ProxyArray(self._force_matrix_w) + return self._force_matrix_w_ta + + @property + def force_matrix_w_history(self) -> ProxyArray | None: + """History of filtered contact forces. + + Shape is (num_instances, history_length, num_sensors, num_filter_shapes), dtype = wp.vec3f. + In torch this resolves to (num_instances, history_length, num_sensors, num_filter_shapes, 3). + + None if :attr:`ContactSensorCfg.filter_prim_paths_expr` is empty. + """ + if self._force_matrix_w_history is None: + return None + if self._force_matrix_w_history_ta is None: + self._force_matrix_w_history_ta = ProxyArray(self._force_matrix_w_history) + return self._force_matrix_w_history_ta + + @property + def contact_pos_w(self) -> ProxyArray | None: + """Average position of contact points. + + Shape is (num_instances, num_sensors, num_filter_shapes), dtype = wp.vec3f. In torch this resolves to + (num_instances, num_sensors, num_filter_shapes, 3). + + None if :attr:`ContactSensorCfg.track_contact_points` is False. + """ + if self._contact_pos_w is None: + return None + if self._contact_pos_w_ta is None: + self._contact_pos_w_ta = ProxyArray(self._contact_pos_w) + return self._contact_pos_w_ta + + @property + def friction_forces_w(self) -> ProxyArray | None: + """Sum of friction forces. + + Shape is (num_instances, num_sensors, num_filter_shapes), dtype = wp.vec3f. In torch this resolves to + (num_instances, num_sensors, num_filter_shapes, 3). + + None if :attr:`ContactSensorCfg.track_friction_forces` is False. + """ + if self._friction_forces_w is None: + return None + if self._friction_forces_w_ta is None: + self._friction_forces_w_ta = ProxyArray(self._friction_forces_w) + return self._friction_forces_w_ta + + @property + def last_air_time(self) -> ProxyArray | None: + """Time spent in air before last contact. + + Shape is (num_instances, num_sensors), dtype = wp.float32. + + None if :attr:`ContactSensorCfg.track_air_time` is False. + """ + if self._last_air_time is None: + return None + if self._last_air_time_ta is None: + self._last_air_time_ta = ProxyArray(self._last_air_time) + return self._last_air_time_ta + + @property + def current_air_time(self) -> ProxyArray | None: + """Time spent in air since last detach. + + Shape is (num_instances, num_sensors), dtype = wp.float32. + + None if :attr:`ContactSensorCfg.track_air_time` is False. + """ + if self._current_air_time is None: + return None + if self._current_air_time_ta is None: + self._current_air_time_ta = ProxyArray(self._current_air_time) + return self._current_air_time_ta + + @property + def last_contact_time(self) -> ProxyArray | None: + """Time spent in contact before last detach. + + Shape is (num_instances, num_sensors), dtype = wp.float32. + + None if :attr:`ContactSensorCfg.track_air_time` is False. + """ + if self._last_contact_time is None: + return None + if self._last_contact_time_ta is None: + self._last_contact_time_ta = ProxyArray(self._last_contact_time) + return self._last_contact_time_ta + + @property + def current_contact_time(self) -> ProxyArray | None: + """Time spent in contact since last contact. + + Shape is (num_instances, num_sensors), dtype = wp.float32. + + None if :attr:`ContactSensorCfg.track_air_time` is False. + """ + if self._current_contact_time is None: + return None + if self._current_contact_time_ta is None: + self._current_contact_time_ta = ProxyArray(self._current_contact_time) + return self._current_contact_time_ta + + def create_buffers( + self, + num_envs: int, + num_sensors: int, + num_filter_shapes: int, + history_length: int, + track_pose: bool, + track_air_time: bool, + track_contact_points: bool, + track_friction_forces: bool, + device: str, + ) -> None: + """Create internal buffers for sensor data. + + Args: + num_envs: Number of environments. + num_sensors: Number of sensors per environment. + num_filter_shapes: Number of filtered shapes for force matrix. + history_length: Length of force history buffer. + track_pose: Whether to track sensor pose. + track_air_time: Whether to track air/contact time. + track_contact_points: Whether to track contact points. + track_friction_forces: Whether to track friction forces. + device: Device for tensor storage. + """ + self._num_envs = num_envs + self._num_sensors = num_sensors + self._device = device + # Ensure history_length >= 1 for consistent buffer shapes + effective_history = max(history_length, 1) + + # Net forces (always tracked) + self._net_forces_w = wp.zeros((num_envs, num_sensors), dtype=wp.vec3f, device=device) + self._net_forces_w_history = wp.zeros((num_envs, effective_history, num_sensors), dtype=wp.vec3f, device=device) + + # Track force matrix if requested - only with filter + if num_filter_shapes > 0: + self._force_matrix_w = wp.zeros((num_envs, num_sensors, num_filter_shapes), dtype=wp.vec3f, device=device) + self._force_matrix_w_history = wp.zeros( + (num_envs, effective_history, num_sensors, num_filter_shapes), dtype=wp.vec3f, device=device + ) + else: + self._force_matrix_w = None + self._force_matrix_w_history = None + + # Track pose if requested + if track_pose: + self._pos_w = wp.zeros((num_envs, num_sensors), dtype=wp.vec3f, device=device) + self._quat_w = wp.zeros((num_envs, num_sensors), dtype=wp.quatf, device=device) + self._pose_w = wp.zeros((num_envs, num_sensors), dtype=wp.transformf, device=device) + else: + self._pos_w = None + self._quat_w = None + self._pose_w = None + + # Track air time if requested + if track_air_time: + self._last_air_time = wp.zeros((num_envs, num_sensors), dtype=wp.float32, device=device) + self._current_air_time = wp.zeros((num_envs, num_sensors), dtype=wp.float32, device=device) + self._last_contact_time = wp.zeros((num_envs, num_sensors), dtype=wp.float32, device=device) + self._current_contact_time = wp.zeros((num_envs, num_sensors), dtype=wp.float32, device=device) + self._first_transition = wp.zeros((num_envs, num_sensors), dtype=wp.float32, device=device) + self._first_transition_ta = ProxyArray(self._first_transition) + else: + self._last_air_time = None + self._current_air_time = None + self._last_contact_time = None + self._current_contact_time = None + self._first_transition = None + self._first_transition_ta = None + + # Track contact points if requested - filled with NaN + if track_contact_points: + self._contact_pos_w = wp.full( + (num_envs, num_sensors, num_filter_shapes), + dtype=wp.vec3f, + device=device, + value=wp.vec3f(math.nan, math.nan, math.nan), + ) + else: + self._contact_pos_w = None + + # Track friction forces if requested + if track_friction_forces: + self._friction_forces_w = wp.zeros( + (num_envs, num_sensors, num_filter_shapes), dtype=wp.vec3f, device=device + ) + else: + self._friction_forces_w = None + + # -- Pinned ProxyArray cache (one per read property, lazily created on first access) + self._pose_w_ta: ProxyArray | None = None + self._pos_w_ta: ProxyArray | None = None + self._quat_w_ta: ProxyArray | None = None + self._net_forces_w_ta: ProxyArray | None = None + self._net_forces_w_history_ta: ProxyArray | None = None + self._force_matrix_w_ta: ProxyArray | None = None + self._force_matrix_w_history_ta: ProxyArray | None = None + self._contact_pos_w_ta: ProxyArray | None = None + self._friction_forces_w_ta: ProxyArray | None = None + self._last_air_time_ta: ProxyArray | None = None + self._current_air_time_ta: ProxyArray | None = None + self._last_contact_time_ta: ProxyArray | None = None + self._current_contact_time_ta: ProxyArray | None = None diff --git a/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/kernels.py b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/kernels.py new file mode 100644 index 000000000000..437355dc470d --- /dev/null +++ b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/contact_sensor/kernels.py @@ -0,0 +1,300 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Warp kernels for the ovphysx contact sensor.""" + +import warp as wp + +# ---- Copy kernels (flat PhysX view -> structured data buffers) ---- + + +@wp.kernel +def split_flat_pose_to_pos_quat( + src: wp.array(dtype=wp.transformf), + mask: wp.array(dtype=wp.bool), + num_bodies: wp.int32, + dst_pos: wp.array2d(dtype=wp.vec3f), + dst_quat: wp.array2d(dtype=wp.quatf), +): + """Split flat (N*B,) transformf into (N, B) vec3f pos and (N, B) quatf quat. + + Args: + src: Flat source array of transforms from PhysX view. Shape is (N*B,). + mask: Boolean mask for which environments to update. Shape is (N,). + num_bodies: Number of bodies per environment. + dst_pos: Destination position buffer. Shape is (N, B). + dst_quat: Destination quaternion buffer. Shape is (N, B). + """ + env, sensor = wp.tid() + if mask: + if not mask[env]: + return + + src_idx = env * num_bodies + sensor + dst_pos[env, sensor] = wp.transform_get_translation(src[src_idx]) + dst_quat[env, sensor] = wp.transform_get_rotation(src[src_idx]) + + +# ---- Unpack contact buffer data kernel ---- + + +@wp.kernel +def unpack_contact_buffer_data( + contact_data: wp.array(dtype=wp.vec3f), + buffer_count: wp.array2d(dtype=wp.uint32), + buffer_start_indices: wp.array2d(dtype=wp.uint32), + mask: wp.array(dtype=wp.bool), + num_bodies: wp.int32, + avg: bool, + default_val: wp.float32, + dst: wp.array3d(dtype=wp.vec3f), +): + """Unpack and aggregate contact buffer data for each (env, body, filter) group. + + Each thread handles one (body, filter) pair for one environment. It reads + `count` contact entries starting at `start_index` and either averages or + sums them. + + Args: + contact_data: Flat buffer of contact data. Shape is (total_contacts,) vec3f. + buffer_count: Count of contacts per (env*body, filter). Shape is (N*B, M) uint32. + buffer_start_indices: Start indices per (env*body, filter). Shape is (N*B, M) uint32. + mask: Boolean mask for which environments to update. Shape is (N,). + num_bodies: Number of bodies per environment. + avg: If True, average the data; if False, sum it. + default_val: Default value for groups with zero contacts (e.g. NaN or 0.0). + dst: Destination buffer. Shape is (N, B, M). + """ + env, sensor, contact = wp.tid() + if mask: + if not mask[env]: + return + + flat_idx = env * num_bodies + sensor + count = wp.int32(buffer_count[flat_idx, contact]) + start = wp.int32(buffer_start_indices[flat_idx, contact]) + + if count > 0: + accum = wp.vec3f(0.0, 0.0, 0.0) + for c in range(count): + accum = accum + contact_data[start + c] + if avg: + accum = accum / wp.float32(count) + dst[env, sensor, contact] = accum + else: + dst[env, sensor, contact] = wp.vec3f(default_val, default_val, default_val) + + +@wp.kernel +def reset_contact_sensor_kernel( + # in + history_length: int, + num_filter_objects: int, + env_mask: wp.array(dtype=wp.bool), + # in-out + net_forces_w: wp.array2d(dtype=wp.vec3f), + net_forces_w_history: wp.array3d(dtype=wp.vec3f), + force_matrix_w: wp.array3d(dtype=wp.vec3f), + # outputs + current_air_time: wp.array2d(dtype=wp.float32), + last_air_time: wp.array2d(dtype=wp.float32), + current_contact_time: wp.array2d(dtype=wp.float32), + last_contact_time: wp.array2d(dtype=wp.float32), + friction_forces_w: wp.array3d(dtype=wp.vec3f), + contact_pos_w: wp.array3d(dtype=wp.vec3f), +): + """Reset the contact sensor data for specified environments. + + Launch with dim=(num_envs, num_sensors). + + Args: + history_length: Length of history. + num_filter_objects: Number of filter objects. + env_mask: Mask array. Shape is (num_envs,). + net_forces_w: Net forces array. Shape is (num_envs, num_sensors). + net_forces_w_history: Net forces history array. Shape is (num_envs, history_length, num_sensors). + force_matrix_w: Force matrix array. Shape is (num_envs, num_sensors, num_filter_objects). + current_air_time: Current air time array. Shape is (num_envs, num_sensors). + last_air_time: Last air time array. Shape is (num_envs, num_sensors). + current_contact_time: Current contact time array. Shape is (num_envs, num_sensors). + last_contact_time: Last contact time array. Shape is (num_envs, num_sensors). + friction_forces_w: Friction forces array. Shape is (num_envs, num_sensors, num_filter_objects). + contact_pos_w: Contact pos array. Shape is (num_envs, num_sensors, num_filter_objects). + """ + env, sensor = wp.tid() + + if env_mask: + if not env_mask[env]: + return + + # Reset net forces + net_forces_w[env, sensor] = wp.vec3f(0.0) + + # Reset history + if net_forces_w_history: + for i in range(history_length): + net_forces_w_history[env, i, sensor] = wp.vec3f(0.0) + + # Reset force matrix (guard for None case) + if force_matrix_w: + for f in range(num_filter_objects): + force_matrix_w[env, sensor, f] = wp.vec3f(0.0) + + # Reset air/contact time tracking + if current_air_time: + current_air_time[env, sensor] = 0.0 + last_air_time[env, sensor] = 0.0 + current_contact_time[env, sensor] = 0.0 + last_contact_time[env, sensor] = 0.0 + + if friction_forces_w: + for f in range(num_filter_objects): + friction_forces_w[env, sensor, f] = wp.vec3f(0.0) + + if contact_pos_w: + for f in range(num_filter_objects): + contact_pos_w[env, sensor, f] = wp.vec3f(0.0) + + +@wp.kernel +def compute_first_transition_kernel( + # in + threshold: wp.float32, + time: wp.array2d(dtype=wp.float32), + # out + result: wp.array2d(dtype=wp.float32), +): + """Compute boolean mask (as float) for sensors whose time is in (0, threshold). + + Used by both compute_first_contact (with current_contact_time) and + compute_first_air (with current_air_time). + + Launch with dim=(num_envs, num_sensors). + + Args: + threshold: Threshold for the time. + time: Time array. Shape is (num_envs, num_sensors). + result: Result array. Shape is (num_envs, num_sensors). + """ + env, sensor = wp.tid() + t = time[env, sensor] + if t > 0.0 and t < threshold: + result[env, sensor] = 1.0 + else: + result[env, sensor] = 0.0 + + +@wp.kernel +def update_net_forces_ovphysx_kernel( + # in + net_forces_flat: wp.array(dtype=wp.vec3f), + net_forces_matrix_flat: wp.array2d(dtype=wp.vec3f), + mask: wp.array(dtype=wp.bool), + num_envs: int, + num_sensors: int, + num_filter_shapes: int, + history_length: int, + contact_force_threshold: wp.float32, + timestamp: wp.array(dtype=wp.float32), + timestamp_last_update: wp.array(dtype=wp.float32), + # out + net_forces_w: wp.array2d(dtype=wp.vec3f), + net_forces_w_history: wp.array3d(dtype=wp.vec3f), + force_matrix_w: wp.array3d(dtype=wp.vec3f), + force_matrix_w_history: wp.array4d(dtype=wp.vec3f), + current_air_time: wp.array2d(dtype=wp.float32), + current_contact_time: wp.array2d(dtype=wp.float32), + last_air_time: wp.array2d(dtype=wp.float32), + last_contact_time: wp.array2d(dtype=wp.float32), +): + """Update the net forces, force matrix and air/contact time for each (env, sensor) pair. + + Launch with dim=(num_envs, num_sensors). + + The OVPhysX :class:`ContactBinding` returns sensors in **pattern-major** + order — the flat buffer index for ``(env, sensor)`` is + ``sensor * num_envs + env``, not the PhysX env-major + ``env * num_sensors + sensor``. We pass ``num_envs`` so the kernel can + compute the right index. + + Args: + net_forces_flat: Flat net forces. Shape is (num_sensors*num_envs,) in pattern-major order. + net_forces_matrix_flat: Flat force matrix. Shape is (num_sensors*num_envs, num_filter_shapes). + mask: Mask array. Shape is (num_envs,). + num_envs: Number of environments. + num_sensors: Number of sensors per environment. + num_filter_shapes: Number of filter shapes. + history_length: Length of history. + contact_force_threshold: Threshold for the contact force. + timestamp: Timestamp array. Shape is (num_envs,). + timestamp_last_update: Timestamp last update array. Shape is (num_envs,). + net_forces_w: Net forces array. Shape is (num_envs, num_sensors). + net_forces_w_history: Net forces history array. Shape is (num_envs, history_length, num_sensors). + force_matrix_w: Force matrix array. Shape is (num_envs, num_sensors, num_filter_shapes). + force_matrix_w_history: Force matrix history array. Shape is + (num_envs, history_length, num_sensors, num_filter_shapes). + current_air_time: Current air time array. Shape is (num_envs, num_sensors). + current_contact_time: Current contact time array. Shape is (num_envs, num_sensors). + last_air_time: Last air time array. Shape is (num_envs, num_sensors). + last_contact_time: Last contact time array. Shape is (num_envs, num_sensors). + """ + env, sensor = wp.tid() + + if mask: + if not mask[env]: + return + + src_idx = sensor * num_envs + env + + # Update net forces + net_forces_w[env, sensor] = net_forces_flat[src_idx] + # Update history + if net_forces_w_history: + for i in range(history_length - 1, 0, -1): + net_forces_w_history[env, i, sensor] = net_forces_w_history[env, i - 1, sensor] + net_forces_w_history[env, 0, sensor] = net_forces_w[env, sensor] + + # update force matrix + if net_forces_matrix_flat: + for f in range(num_filter_shapes): + force_matrix_w[env, sensor, f] = net_forces_matrix_flat[src_idx, f] + for i in range(history_length - 1, 0, -1): + force_matrix_w_history[env, i, sensor, f] = force_matrix_w_history[env, i - 1, sensor, f] + force_matrix_w_history[env, 0, sensor, f] = force_matrix_w[env, sensor, f] + + # Update air/contact time tracking + if current_air_time: + elapsed_time = timestamp[env] - timestamp_last_update[env] + in_contact = wp.length_sq(net_forces_w[env, sensor]) > contact_force_threshold * contact_force_threshold + + cat = current_air_time[env, sensor] + cct = current_contact_time[env, sensor] + is_first_contact = in_contact and (cat > 0.0) + is_first_detached = not in_contact and (cct > 0.0) + + if is_first_contact: + last_air_time[env, sensor] = cat + elapsed_time + elif is_first_detached: + last_contact_time[env, sensor] = cct + elapsed_time + + current_contact_time[env, sensor] = wp.where(in_contact, cct + elapsed_time, 0.0) + current_air_time[env, sensor] = wp.where(in_contact, 0.0, cat + elapsed_time) + + +@wp.kernel +def concat_pos_and_quat_to_pose_kernel( + pos: wp.array2d(dtype=wp.vec3f), + quat: wp.array2d(dtype=wp.quatf), + pose: wp.array2d(dtype=wp.transformf), +): + """Concatenate position and quaternion to pose. + + Args: + pos: Position array. Shape is (N, B). + quat: Quaternion array. Shape is (N, B). + pose: Pose array. Shape is (N, B). + """ + env, sensor = wp.tid() + pose[env, sensor] = wp.transform(pos[env, sensor], quat[env, sensor]) diff --git a/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/kernels.py b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/kernels.py new file mode 100644 index 000000000000..197cf2462601 --- /dev/null +++ b/source/isaaclab_ovphysx/isaaclab_ovphysx/sensors/kernels.py @@ -0,0 +1,42 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Shared warp kernels for ovphysx sensors.""" + +import warp as wp + + +@wp.kernel +def concat_pos_and_quat_to_pose_kernel( + pos: wp.array2d(dtype=wp.vec3f), + quat: wp.array2d(dtype=wp.quatf), + pose: wp.array2d(dtype=wp.transformf), +): + """Concatenate 2D position and quaternion arrays to pose. + + Args: + pos: Position array. Shape is (N, B). + quat: Quaternion array. Shape is (N, B). + pose: Pose array. Shape is (N, B). + """ + env, sensor = wp.tid() + pose[env, sensor] = wp.transform(pos[env, sensor], quat[env, sensor]) + + +@wp.kernel +def concat_pos_and_quat_to_pose_1d_kernel( + pos: wp.array(dtype=wp.vec3f), + quat: wp.array(dtype=wp.quatf), + pose: wp.array(dtype=wp.transformf), +): + """Concatenate 1D position and quaternion arrays to pose. + + Args: + pos: Position array. Shape is (N,). + quat: Quaternion array. Shape is (N,). + pose: Pose array. Shape is (N,). + """ + env = wp.tid() + pose[env] = wp.transform(pos[env], quat[env]) diff --git a/source/isaaclab_ovphysx/test/sensors/check_contact_sensor.py b/source/isaaclab_ovphysx/test/sensors/check_contact_sensor.py deleted file mode 100644 index 8bcd9f0941f6..000000000000 --- a/source/isaaclab_ovphysx/test/sensors/check_contact_sensor.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). -# All rights reserved. -# -# SPDX-License-Identifier: BSD-3-Clause - -"""Contact sensor parity tests for the ovphysx backend. - -Mirrors the structure of isaaclab_physx/test/sensors/check_contact_sensor.py. -Contact sensors are not yet supported by the ovphysx backend, so all tests -are skipped with an explanatory message. -""" - -import pytest - - -def test_contact_sensor_creation(): - """Verify contact sensor can be created on the ovphysx backend.""" - pytest.skip("Contact sensor not yet supported by ovphysx backend.") - - -def test_contact_sensor_data_reading(): - """Verify contact sensor data can be read after a simulation step.""" - pytest.skip("Contact sensor not yet supported by ovphysx backend.") - - -def test_contact_sensor_reset(): - """Verify contact sensor state resets correctly.""" - pytest.skip("Contact sensor not yet supported by ovphysx backend.") - - -def test_contact_sensor_air_time_tracking(): - """Verify contact sensor air time tracking.""" - pytest.skip("Contact sensor not yet supported by ovphysx backend.") - - -def test_contact_sensor_friction_forces(): - """Verify contact sensor friction force reporting.""" - pytest.skip("Contact sensor not yet supported by ovphysx backend.") diff --git a/source/isaaclab_ovphysx/test/sensors/test_contact_sensor.py b/source/isaaclab_ovphysx/test/sensors/test_contact_sensor.py new file mode 100644 index 000000000000..38b3bd9e579d --- /dev/null +++ b/source/isaaclab_ovphysx/test/sensors/test_contact_sensor.py @@ -0,0 +1,1004 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +# ignore private usage of variables warning +# pyright: reportPrivateUsage=none + +"""Real-backend tests for the OVPhysX ContactSensor. + +Run via ``./isaaclab.sh -p -m pytest``; the ovphysx wheel is now invocable +through the standard Kit Python entrypoint, so the older kitless +``./scripts/run_ovphysx.sh`` wrapper is no longer required. + +``ovphysx<=0.3.7`` binds device mode (CPU vs GPU) at the C++ layer on the +first ``ovphysx.PhysX(device=...)`` construction and cannot swap it without a +process restart. Full coverage therefore requires two separate pytest +invocations -- once with ``-k 'cpu'`` and once with ``-k 'cuda:0'``. The +``_ovphysx_skip_other_device`` autouse fixture below preempts the manager's +:exc:`RuntimeError` by ``pytest.skip``-ing on the unlocked device so +single-device runs finish cleanly. + +Two v1-unsupported feature tests are kept but marked ``@pytest.mark.skip``: + +* :func:`test_friction_reporting` — requires ``track_friction_forces``; see + issue #5325 and ``docs/superpowers/specs/2026-04-27-ovphysx-contact-api-gaps.md``. +* :func:`test_invalid_prim_paths_config` — requires ``track_friction_forces`` + (used to configure the scene); same issue. +* :func:`test_invalid_max_contact_points_config` — requires + ``track_friction_forces``; same issue. + +The ``disable_contact_processing`` PhysX/Kit setting is not available in the +kitless OVPhysX flow; :func:`test_cube_contact_time` and +:func:`test_sphere_contact_time` therefore drop that parametrize axis and run +once per device. +""" + +from __future__ import annotations + +from dataclasses import MISSING +from enum import Enum + +import pytest +import torch +import warp as wp +from flaky import flaky + +# The CI isaaclab_ov* pattern unintentionally collects isaaclab_ovphysx tests, +# but the ovphysx wheel is not installed in that environment. Skip gracefully +# so the isaaclab_ov CI pipeline is not blocked by an unrelated dependency. +pytest.importorskip("ovphysx.types", reason="ovphysx wheel not installed") + +from isaaclab_ovphysx.assets import RigidObject # noqa: E402 +from isaaclab_ovphysx.physics import OvPhysxCfg # noqa: E402 +from isaaclab_ovphysx.sensors import ContactSensor, ContactSensorCfg # noqa: E402 + +import isaaclab.sim as sim_utils # noqa: E402 +from isaaclab.assets import RigidObjectCfg # noqa: E402 +from isaaclab.scene import InteractiveScene, InteractiveSceneCfg # noqa: E402 +from isaaclab.sim import SimulationCfg, SimulationContext, build_simulation_context # noqa: E402 +from isaaclab.sim.utils.stage import get_current_stage # noqa: E402 +from isaaclab.terrains import HfRandomUniformTerrainCfg, TerrainGeneratorCfg, TerrainImporterCfg # noqa: E402 +from isaaclab.utils.configclass import configclass # noqa: E402 + +wp.init() + +# --------------------------------------------------------------------------- +# Device-lock autouse fixture +# --------------------------------------------------------------------------- + +_LOCKED_DEVICE: list[str | None] = [None] +"""Device the session pins to on the first parametrized test that runs.""" + + +@pytest.fixture(autouse=True) +def _ovphysx_skip_other_device(request): + """Skip parametrized tests on the device the session is not pinned to. + + See the module docstring for the wheel's process-global device-mode lock. + """ + callspec = getattr(request.node, "callspec", None) + device = callspec.params.get("device") if callspec is not None else None + if device is None: + # Test does not parametrize on device. + return + locked = _LOCKED_DEVICE[0] + if locked is None: + _LOCKED_DEVICE[0] = device + return + if device != locked: + pytest.skip( + f"ovphysx process-global device lock is held by '{locked}'; cannot run '{device}' " + "tests in the same session. Run pytest twice (once per device) for full coverage." + ) + + +# --------------------------------------------------------------------------- +# Simulation context helper +# --------------------------------------------------------------------------- + + +def _ovphysx_sim_context(device: str, **kwargs): + """Wrapper around :func:`build_simulation_context` that injects OVPhysX cfg. + + PhysX tests pass ``device=device`` directly and let + :func:`build_simulation_context` build a default :class:`SimulationCfg`. + OVPhysX needs ``physics=OvPhysxCfg()`` set on the cfg so the manager + dispatches to OVPhysX rather than PhysX, so we build the cfg here and + pass it through. ``gravity_enabled`` is consumed locally (it is ignored + by ``build_simulation_context`` once a ``sim_cfg`` is provided). + ``add_ground_plane``, ``auto_add_lighting``, and other kwargs continue + to flow through ``build_simulation_context`` as before. + """ + dt = kwargs.pop("dt", 1.0 / 60.0) + gravity_enabled = kwargs.pop("gravity_enabled", True) + gravity = (0.0, 0.0, -9.81) if gravity_enabled else (0.0, 0.0, 0.0) + sim_cfg = SimulationCfg(physics=OvPhysxCfg(), device=device, dt=dt, gravity=gravity) + return build_simulation_context(device=device, sim_cfg=sim_cfg, **kwargs) + + +## +# Custom helper classes. +## + + +class ContactTestMode(Enum): + """Enum to declare the type of contact sensor test to execute.""" + + IN_CONTACT = 0 + """Enum to test the condition where the test object is in contact with the ground plane.""" + NON_CONTACT = 1 + """Enum to test the condition where the test object is not in contact with the ground plane (air time).""" + + +@configclass +class ContactSensorRigidObjectCfg(RigidObjectCfg): + """Configuration for rigid objects used for the contact sensor test. + + This contains the expected values in the configuration to simplify test fixtures. + """ + + contact_pose: torch.Tensor = MISSING + """6D pose of the rigid object under test when it is in contact with the ground surface.""" + non_contact_pose: torch.Tensor = MISSING + """6D pose of the rigid object under test when it is not in contact.""" + + +@configclass +class ContactSensorSceneCfg(InteractiveSceneCfg): + """Configuration of the scene used by the contact sensor test.""" + + terrain: TerrainImporterCfg = MISSING + """Terrain configuration within the scene.""" + + shape: ContactSensorRigidObjectCfg = MISSING + """RigidObject contact prim configuration.""" + + contact_sensor: ContactSensorCfg = MISSING + """Contact sensor configuration.""" + + shape_2: ContactSensorRigidObjectCfg = None + """RigidObject contact prim configuration. Defaults to None, i.e. not included in the scene. + + This is a second prim used for testing contact filtering. + """ + + contact_sensor_2: ContactSensorCfg = None + """Contact sensor configuration. Defaults to None, i.e. not included in the scene. + + This is a second contact sensor used for testing contact filtering. + """ + + +## +# Scene entity configurations. +## + + +CUBE_CFG = ContactSensorRigidObjectCfg( + prim_path="/World/Objects/Cube", + spawn=sim_utils.CuboidCfg( + size=(0.5, 0.5, 0.5), + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=False, + ), + collision_props=sim_utils.CollisionPropertiesCfg( + collision_enabled=True, + ), + activate_contact_sensors=True, + visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.4, 0.6, 0.4)), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(0, -1.0, 1.0)), + contact_pose=torch.tensor([0, -1.0, 0, 1, 0, 0, 0]), + non_contact_pose=torch.tensor([0, -1.0, 1.0, 1, 0, 0, 0]), +) +"""Configuration of the cube prim.""" + +SPHERE_CFG = ContactSensorRigidObjectCfg( + prim_path="/World/Objects/Sphere", + spawn=sim_utils.SphereCfg( + radius=0.25, + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=False, + ), + collision_props=sim_utils.CollisionPropertiesCfg( + collision_enabled=True, + ), + activate_contact_sensors=True, + visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.4, 0.4, 0.6)), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(0, 1.0, 1.0)), + contact_pose=torch.tensor([0, 1.0, 0.0, 1, 0, 0, 0]), + non_contact_pose=torch.tensor([0, 1.0, 1.0, 1, 0, 0, 0]), +) +"""Configuration of the sphere prim.""" + +CYLINDER_CFG = ContactSensorRigidObjectCfg( + prim_path="/World/Objects/Cylinder", + spawn=sim_utils.CylinderCfg( + radius=0.5, + height=0.01, + axis="Y", + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=False, + ), + collision_props=sim_utils.CollisionPropertiesCfg( + collision_enabled=True, + ), + activate_contact_sensors=True, + visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.6, 0.4, 0.4)), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(0, 0.0, 1.0)), + contact_pose=torch.tensor([0, 0, 0.0, 1, 0, 0, 0]), + non_contact_pose=torch.tensor([0, 0, 1.0, 1, 0, 0, 0]), +) +"""Configuration of the cylinder prim.""" + +CAPSULE_CFG = ContactSensorRigidObjectCfg( + prim_path="/World/Objects/Capsule", + spawn=sim_utils.CapsuleCfg( + radius=0.25, + height=0.5, + axis="Z", + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=False, + ), + collision_props=sim_utils.CollisionPropertiesCfg( + collision_enabled=True, + ), + activate_contact_sensors=True, + visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.2, 0.4, 0.4)), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(1.0, 0.0, 1.5)), + contact_pose=torch.tensor([1.0, 0.0, 0.0, 1, 0, 0, 0]), + non_contact_pose=torch.tensor([1.0, 0.0, 1.5, 1, 0, 0, 0]), +) +"""Configuration of the capsule prim.""" + +CONE_CFG = ContactSensorRigidObjectCfg( + prim_path="/World/Objects/Cone", + spawn=sim_utils.ConeCfg( + radius=0.5, + height=0.5, + axis="Z", + rigid_props=sim_utils.RigidBodyPropertiesCfg( + disable_gravity=False, + ), + collision_props=sim_utils.CollisionPropertiesCfg( + collision_enabled=True, + ), + activate_contact_sensors=True, + visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.4, 0.2, 0.4)), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(-1.0, 0.0, 1.0)), + contact_pose=torch.tensor([-1.0, 0.0, 0.0, 1, 0, 0, 0]), + non_contact_pose=torch.tensor([-1.0, 0.0, 1.0, 1, 0, 0, 0]), +) +"""Configuration of the cone prim.""" + +FLAT_TERRAIN_CFG = TerrainImporterCfg(prim_path="/World/ground", terrain_type="plane") +"""Configuration of the flat ground plane.""" + +COBBLESTONE_TERRAIN_CFG = TerrainImporterCfg( + prim_path="/World/ground", + terrain_type="generator", + terrain_generator=TerrainGeneratorCfg( + seed=0, + size=(3.0, 3.0), + border_width=0.0, + num_rows=1, + num_cols=1, + sub_terrains={ + "random_rough": HfRandomUniformTerrainCfg( + proportion=1.0, noise_range=(0.0, 0.05), noise_step=0.01, border_width=0.25 + ), + }, + ), +) +"""Configuration of the generated mesh terrain.""" + +## +# Shared test constants. +## + +_SIM_DT = 0.0025 +"""Simulation time-step [s] used across all contact sensor tests.""" + +_DURATIONS = [_SIM_DT, _SIM_DT * 2, _SIM_DT * 32, _SIM_DT * 128] +"""Contact/air durations [s] exercised by the timing tests.""" + +_TERRAINS = [FLAT_TERRAIN_CFG, COBBLESTONE_TERRAIN_CFG] +"""Terrain configurations exercised by the timing tests.""" + +## +# Tests. +## + + +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +@flaky(max_runs=5, min_passes=1) +@pytest.mark.isaacsim_ci +def test_cube_contact_time(device): + """Checks contact sensor values for contact time and air time for a cube collision primitive.""" + _run_contact_sensor_test(CUBE_CFG, _SIM_DT, device, _TERRAINS, _DURATIONS) + + +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +@flaky(max_runs=5, min_passes=1) +@pytest.mark.isaacsim_ci +def test_sphere_contact_time(device): + """Checks contact sensor values for contact time and air time for a sphere collision primitive.""" + _run_contact_sensor_test(SPHERE_CFG, _SIM_DT, device, _TERRAINS, _DURATIONS) + + +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +@pytest.mark.parametrize("num_envs", [1, 6, 24]) +@pytest.mark.isaacsim_ci +def test_cube_stack_contact_filtering(device, num_envs): + """Checks contact sensor reporting for filtering stacked cube prims.""" + with _ovphysx_sim_context(device=device, dt=_SIM_DT, add_lighting=True) as sim: + # Instance new scene for the current terrain and contact prim. + # OVPhysX uses fnmatch globs (not regex), so ``Env_*`` rather than ``Env_.*``. + scene_cfg = ContactSensorSceneCfg(num_envs=num_envs, env_spacing=1.0, lazy_sensor_update=False) + scene_cfg.terrain = FLAT_TERRAIN_CFG.replace(prim_path="/World/ground") + # -- cube 1 + scene_cfg.shape = CUBE_CFG.replace(prim_path="{ENV_REGEX_NS}/Cube_1") + scene_cfg.shape.init_state.pos = (0, -1.0, 1.0) + # -- cube 2 (on top of cube 1) + scene_cfg.shape_2 = CUBE_CFG.replace(prim_path="{ENV_REGEX_NS}/Cube_2") + scene_cfg.shape_2.init_state.pos = (0, -1.0, 1.525) + # -- contact sensor 1 + scene_cfg.contact_sensor = ContactSensorCfg( + prim_path="{ENV_REGEX_NS}/Cube_1", + track_pose=True, + debug_vis=False, + update_period=0.0, + filter_prim_paths_expr=["{ENV_REGEX_NS}/Cube_2"], + ) + # -- contact sensor 2 + scene_cfg.contact_sensor_2 = ContactSensorCfg( + prim_path="{ENV_REGEX_NS}/Cube_2", + track_pose=True, + debug_vis=False, + update_period=0.0, + filter_prim_paths_expr=["{ENV_REGEX_NS}/Cube_1"], + ) + scene = InteractiveScene(scene_cfg) + + # Play the simulation + sim.reset() + + contact_sensor: ContactSensor = scene["contact_sensor"] + contact_sensor_2: ContactSensor = scene["contact_sensor_2"] + + # Check that the filter binding was created for each sensor + assert contact_sensor.contact_view.filter_count == 1 + assert contact_sensor_2.contact_view.filter_count == 1 + + # Let the scene settle and accumulate contacts + scene.reset() + for _ in range(500): + _perform_sim_step(sim, scene, _SIM_DT) + + # Check values for cube 2 — cube 1 is the only collision for cube 2 + torch.testing.assert_close( + contact_sensor_2.data.force_matrix_w.torch[:, :, 0], + contact_sensor_2.data.net_forces_w.torch, + ) + # Check that forces are opposite and equal + torch.testing.assert_close( + contact_sensor_2.data.force_matrix_w.torch[:, :, 0], + -contact_sensor.data.force_matrix_w.torch[:, :, 0], + ) + # Check values are non-zero (contacts are happening and are getting reported) + assert contact_sensor_2.data.net_forces_w.torch.sum().item() > 0.0 + assert contact_sensor.data.net_forces_w.torch.sum().item() > 0.0 + + +@pytest.mark.isaacsim_ci +def test_no_contact_reporting(): + """Test that OVPhysX contact sensor returns zero forces when no filter is configured. + + Without ``filter_prim_paths_expr``, the ``force_matrix_w`` buffer is not + populated (no per-partner breakdown is available), and ``net_forces_w`` + should still reflect the aggregate contact force. This test verifies the + simpler "unfiltered, CPU-only" path by using CPU and letting the scene + settle: with no filter the ``force_matrix_w`` sum is expected to be zero + (the buffer is not allocated). + + Note: + The PhysX variant of this test forcibly disables contact processing via + a Carbonite setting (``/physics/disableContactProcessing``). That + setting is not available in the kitless OVPhysX flow; instead we test + that a sensor with no filter has a zero ``force_matrix_w``. + """ + with _ovphysx_sim_context(device="cpu", dt=_SIM_DT, add_lighting=True) as sim: + scene_cfg = ContactSensorSceneCfg(num_envs=2, env_spacing=1.0, lazy_sensor_update=False) + scene_cfg.terrain = FLAT_TERRAIN_CFG + # -- cube 1 + scene_cfg.shape = CUBE_CFG.replace(prim_path="{ENV_REGEX_NS}/Cube_1") + scene_cfg.shape.init_state.pos = (0, -1.0, 1.0) + # -- cube 2 (on top of cube 1) + scene_cfg.shape_2 = CUBE_CFG.replace(prim_path="{ENV_REGEX_NS}/Cube_2") + scene_cfg.shape_2.init_state.pos = (0, -1.0, 1.525) + # No filter paths — force_matrix_w will not be allocated. + scene_cfg.contact_sensor = ContactSensorCfg( + prim_path="{ENV_REGEX_NS}/Cube_1", + track_pose=True, + debug_vis=False, + update_period=0.0, + filter_prim_paths_expr=[], + ) + scene_cfg.contact_sensor_2 = ContactSensorCfg( + prim_path="{ENV_REGEX_NS}/Cube_2", + track_pose=True, + debug_vis=False, + update_period=0.0, + filter_prim_paths_expr=[], + ) + scene = InteractiveScene(scene_cfg) + + # Play the simulation + sim.reset() + + contact_sensor: ContactSensor = scene["contact_sensor"] + contact_sensor_2: ContactSensor = scene["contact_sensor_2"] + + # Let the scene settle + scene.reset() + for _ in range(500): + _perform_sim_step(sim, scene, _SIM_DT) + + # Without filter_prim_paths_expr the force_matrix_w buffer is not allocated; + # its sum should be zero (or the tensor is None). + fm1 = contact_sensor.data.force_matrix_w + fm2 = contact_sensor_2.data.force_matrix_w + if fm1 is not None: + assert fm1.torch.sum().item() == 0.0 + if fm2 is not None: + assert fm2.torch.sum().item() == 0.0 + + +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +@pytest.mark.parametrize("num_envs", [1, 3]) +@pytest.mark.isaacsim_ci +def test_multi_body_per_sensor_indexing(device, num_envs): + """Ground-truth body-index check for a single sensor that resolves to two bodies. + + OVPhysX :class:`ContactBinding` returns sensors in **pattern-major** order + (``[env_0/body_0, env_1/body_0, …, env_0/body_1, env_1/body_1, …]``), + whereas the inherited PhysX kernel formula assumes env-major + (``[env_0/body_0, env_0/body_1, …, env_1/body_0, …]``). Single-body + sensors don't disambiguate the two layouts, so this test exercises the + multi-body discovery path with one cube on the ground and one floating + above it. After the scene settles, only the bottom cube should report a + non-zero net force. An env-major bug would attribute that force to the + wrong (env, body) slot — caught here. + """ + with _ovphysx_sim_context(device=device, dt=_SIM_DT, add_lighting=True) as sim: + scene_cfg = ContactSensorSceneCfg(num_envs=num_envs, env_spacing=2.0, lazy_sensor_update=False) + scene_cfg.terrain = FLAT_TERRAIN_CFG.replace(prim_path="/World/ground") + # -- Cube_low: on the ground, will report contact forces + scene_cfg.shape = CUBE_CFG.replace(prim_path="{ENV_REGEX_NS}/Cube_low") + scene_cfg.shape.init_state.pos = (0.0, 0.0, 0.25) + # -- Cube_high: floating well above the ground, should remain in air + scene_cfg.shape_2 = CUBE_CFG.replace(prim_path="{ENV_REGEX_NS}/Cube_high") + scene_cfg.shape_2.init_state.pos = (0.0, 1.5, 3.0) + # Single ContactSensor that matches BOTH cubes via a regex glob. + scene_cfg.contact_sensor = ContactSensorCfg( + prim_path="{ENV_REGEX_NS}/Cube_.*", + track_pose=False, + debug_vis=False, + update_period=0.0, + filter_prim_paths_expr=[], + ) + scene = InteractiveScene(scene_cfg) + sim.reset() + contact_sensor: ContactSensor = scene["contact_sensor"] + + # Sanity: the sensor discovered exactly two bodies, one per cube. + assert contact_sensor.body_names is not None + assert sorted(contact_sensor.body_names) == ["Cube_high", "Cube_low"] + low_idx = contact_sensor.body_names.index("Cube_low") + high_idx = contact_sensor.body_names.index("Cube_high") + + # Let physics settle and accumulate stable contacts on Cube_low. + scene.reset() + for _ in range(200): + _perform_sim_step(sim, scene, _SIM_DT) + + # Net force readout: shape (num_envs, num_sensors=2, 3) after .torch. + net_forces = contact_sensor.data.net_forces_w.torch + assert net_forces.shape == (num_envs, 2, 3) + low_force_mag = net_forces[:, low_idx, :].abs().sum().item() + high_force_mag = net_forces[:, high_idx, :].abs().sum().item() + # Cube_low rests on the ground: non-zero contact force per env. + assert low_force_mag > 0.0, "Cube_low (on ground) should report contact force" + # Cube_high floats: net force is zero (no contact). + assert high_force_mag == 0.0, ( + f"Cube_high (in air) should report zero contact force, got sum-abs={high_force_mag:.6f}." + " A non-zero value here usually means body indices are scrambled —" + " e.g. a Cube_low contact was attributed to Cube_high because the kernel" + " assumed env-major instead of pattern-major flat-buffer layout." + ) + + +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +@pytest.mark.isaacsim_ci +def test_sensor_print(device): + """Test sensor print is working correctly.""" + with _ovphysx_sim_context(device=device, dt=_SIM_DT, add_lighting=False) as sim: + scene_cfg = ContactSensorSceneCfg(num_envs=1, env_spacing=1.0, lazy_sensor_update=False) + scene_cfg.terrain = FLAT_TERRAIN_CFG.replace(prim_path="/World/ground") + scene_cfg.shape = CUBE_CFG + scene_cfg.contact_sensor = ContactSensorCfg( + prim_path=scene_cfg.shape.prim_path, + track_pose=True, + debug_vis=False, + update_period=0.0, + track_air_time=True, + history_length=3, + ) + scene = InteractiveScene(scene_cfg) + # Play the simulator + sim.reset() + # print info + print(scene.sensors["contact_sensor"]) + + +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +@pytest.mark.isaacsim_ci +def test_contact_sensor_threshold(device): + """Test that the contact sensor USD threshold attribute is set to 0.0.""" + with _ovphysx_sim_context(device=device, dt=_SIM_DT, add_lighting=False) as sim: + scene_cfg = ContactSensorSceneCfg(num_envs=1, env_spacing=1.0, lazy_sensor_update=False) + scene_cfg.terrain = FLAT_TERRAIN_CFG.replace(prim_path="/World/ground") + scene_cfg.shape = CUBE_CFG + scene_cfg.contact_sensor = ContactSensorCfg( + prim_path=scene_cfg.shape.prim_path, + track_pose=True, + debug_vis=False, + update_period=0.0, + track_air_time=True, + history_length=3, + ) + scene = InteractiveScene(scene_cfg) + # Play the simulator + sim.reset() + + stage = get_current_stage() + prim_path = scene_cfg.shape.prim_path + prim = stage.GetPrimAtPath(prim_path) + + # Ensure the contact sensor was created properly + contact_sensor = scene["contact_sensor"] + assert contact_sensor is not None, "Contact sensor was not created" + + # Check if the prim has contact report API and verify threshold is close to 0.0 + if "PhysxContactReportAPI" in prim.GetAppliedSchemas(): + threshold_attr = prim.GetAttribute("physxContactReport:threshold") + if threshold_attr.IsValid(): + threshold_value = threshold_attr.Get() + assert pytest.approx(threshold_value, abs=1e-6) == 0.0, ( + f"Expected USD threshold to be close to 0.0, but got {threshold_value}" + ) + + +@pytest.mark.skip( + reason=( + "ovphysx ContactSensor v1 does not support track_friction_forces; " + "see issue #5325 and docs/superpowers/specs/2026-04-27-ovphysx-contact-api-gaps.md" + ) +) +@pytest.mark.parametrize("grav_dir", [(-10.0, 0.0, -0.1), (0.0, -10.0, -0.1)]) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +@pytest.mark.isaacsim_ci +def test_friction_reporting(device, grav_dir): + """Test friction force reporting for contact sensors. + + This test places a contact sensor enabled cube onto a ground plane under different gravity directions. + It then compares the normalized friction force dir with the direction of gravity to ensure they are aligned. + """ + sim_cfg = SimulationCfg(physics=OvPhysxCfg(), dt=_SIM_DT, device=device, gravity=grav_dir) + with build_simulation_context(device=device, sim_cfg=sim_cfg, add_lighting=False) as sim: + scene_cfg = ContactSensorSceneCfg(num_envs=1, env_spacing=1.0, lazy_sensor_update=False) + scene_cfg.terrain = FLAT_TERRAIN_CFG + scene_cfg.shape = CUBE_CFG + + filter_prim_paths_expr = [scene_cfg.terrain.prim_path + "/terrain/GroundPlane/CollisionPlane"] + + scene_cfg.contact_sensor = ContactSensorCfg( + prim_path=scene_cfg.shape.prim_path, + track_pose=True, + debug_vis=False, + update_period=0.0, + track_air_time=True, + history_length=3, + track_friction_forces=True, + filter_prim_paths_expr=filter_prim_paths_expr, + ) + + scene = InteractiveScene(scene_cfg) + sim.reset() + + scene["contact_sensor"].reset() + shape: RigidObject = scene["shape"] + shape.write_root_pose_to_sim_index( + root_pose=torch.tensor([0, 0.0, CUBE_CFG.spawn.size[2] / 2.0, 1, 0, 0, 0], device=device).unsqueeze(0) + ) + + # step sim once to compute friction forces + _perform_sim_step(sim, scene, _SIM_DT) + + # check that forces are being reported match expected friction forces + expected_friction, _, _, _ = scene["contact_sensor"].contact_view.get_friction_data(dt=_SIM_DT) + expected_friction_torch = wp.to_torch(expected_friction) + reported_friction = scene["contact_sensor"].data.friction_forces_w.torch[0, 0, :] + + torch.testing.assert_close(expected_friction_torch.sum(dim=0), reported_friction[0], atol=1e-6, rtol=1e-5) + + # check that friction force direction opposes gravity direction + grav = torch.tensor(grav_dir, device=device) + norm_reported_friction = reported_friction / reported_friction.norm() + norm_gravity = grav / grav.norm() + dot = torch.dot(norm_reported_friction[0], norm_gravity) + + torch.testing.assert_close(torch.abs(dot), torch.tensor(1.0, device=device), atol=1e-4, rtol=1e-3) + + +@pytest.mark.skip( + reason=( + "ovphysx ContactSensor v1 does not support track_friction_forces; " + "see issue #5325 and docs/superpowers/specs/2026-04-27-ovphysx-contact-api-gaps.md" + ) +) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +@pytest.mark.isaacsim_ci +def test_invalid_prim_paths_config(device): + """Test that a ValueError is raised when track_friction_forces=True and filter_prim_paths_expr is empty.""" + sim_cfg = SimulationCfg(physics=OvPhysxCfg(), dt=_SIM_DT, device=device) + with build_simulation_context(device=device, sim_cfg=sim_cfg, add_lighting=False) as sim: + scene_cfg = ContactSensorSceneCfg(num_envs=1, env_spacing=1.0, lazy_sensor_update=False) + scene_cfg.terrain = FLAT_TERRAIN_CFG + scene_cfg.shape = CUBE_CFG + + scene_cfg.contact_sensor = ContactSensorCfg( + prim_path=scene_cfg.shape.prim_path, + track_pose=True, + debug_vis=False, + update_period=0.0, + track_air_time=True, + history_length=3, + track_friction_forces=True, + filter_prim_paths_expr=[], + ) + + try: + _ = InteractiveScene(scene_cfg) + sim.reset() + assert False, "Expected ValueError due to invalid contact sensor configuration." + except ValueError: + pass + + +@pytest.mark.skip( + reason=( + "ovphysx ContactSensor v1 does not support track_friction_forces; " + "see issue #5325 and docs/superpowers/specs/2026-04-27-ovphysx-contact-api-gaps.md" + ) +) +@pytest.mark.parametrize("device", ["cuda:0", "cpu"]) +@pytest.mark.isaacsim_ci +def test_invalid_max_contact_points_config(device): + """Test that a ValueError is raised when track_friction_forces=True and max_contact_data_count_per_prim=0.""" + sim_cfg = SimulationCfg(physics=OvPhysxCfg(), dt=_SIM_DT, device=device) + with build_simulation_context(device=device, sim_cfg=sim_cfg, add_lighting=False) as sim: + scene_cfg = ContactSensorSceneCfg(num_envs=1, env_spacing=1.0, lazy_sensor_update=False) + scene_cfg.terrain = FLAT_TERRAIN_CFG + scene_cfg.shape = CUBE_CFG + filter_prim_paths_expr = [scene_cfg.terrain.prim_path + "/terrain/GroundPlane/CollisionPlane"] + + scene_cfg.contact_sensor = ContactSensorCfg( + prim_path=scene_cfg.shape.prim_path, + track_pose=True, + debug_vis=False, + update_period=0.0, + track_air_time=True, + history_length=3, + track_friction_forces=True, + filter_prim_paths_expr=filter_prim_paths_expr, + max_contact_data_count_per_prim=0, + ) + + try: + _ = InteractiveScene(scene_cfg) + sim.reset() + assert False, "Expected ValueError due to invalid contact sensor configuration." + except ValueError: + pass + + +## +# Internal helpers. +## + + +def _run_contact_sensor_test( + shape_cfg: ContactSensorRigidObjectCfg, + sim_dt: float, + device: str, + terrains: list[TerrainImporterCfg], + durations: list[float], +): + """Run contact sensor timing tests for a single device across all terrain combinations. + + Args: + shape_cfg: Configuration of the rigid body used as contact primitive. + sim_dt: Simulation time-step [s]. + device: Compute device (e.g. ``"cuda:0"`` or ``"cpu"``). + terrains: List of terrain configurations to iterate over. + durations: Contact / air durations [s] to exercise. + + Note: + Unlike the PhysX variant, this helper never enables + ``track_contact_points`` or ``track_friction_forces`` because those + APIs are not yet available in the ovphysx v1 contact sensor (see + issue #5325). The ``test_contact_data`` path is therefore always + ``False``. The ``disable_contact_processing`` PhysX/Kit setting is + also not available in the kitless flow and is omitted. + """ + for terrain in terrains: + with _ovphysx_sim_context(device=device, dt=sim_dt, add_lighting=True) as sim: + scene_cfg = ContactSensorSceneCfg(num_envs=1, env_spacing=1.0, lazy_sensor_update=False) + scene_cfg.terrain = terrain + scene_cfg.shape = shape_cfg + + scene_cfg.contact_sensor = ContactSensorCfg( + prim_path=shape_cfg.prim_path, + track_pose=True, + debug_vis=False, + update_period=0.0, + track_air_time=True, + history_length=3, + track_contact_points=False, + track_friction_forces=False, + filter_prim_paths_expr=[], + ) + scene = InteractiveScene(scene_cfg) + + # Play the simulation + sim.reset() + + # Run contact time and air time tests + _test_sensor_contact( + shape=scene["shape"], + sensor=scene["contact_sensor"], + mode=ContactTestMode.IN_CONTACT, + sim=sim, + scene=scene, + sim_dt=sim_dt, + durations=durations, + ) + _test_sensor_contact( + shape=scene["shape"], + sensor=scene["contact_sensor"], + mode=ContactTestMode.NON_CONTACT, + sim=sim, + scene=scene, + sim_dt=sim_dt, + durations=durations, + ) + + +def _test_sensor_contact( + shape: RigidObject, + sensor: ContactSensor, + mode: ContactTestMode, + sim: SimulationContext, + scene: InteractiveScene, + sim_dt: float, + durations: list[float], +): + """Test for the contact sensor. + + This test sets the contact prim to a pose either in contact or out of contact with the ground plane for + a known duration. Once the contact duration has elapsed, the data stored inside the contact sensor + associated with the contact prim is checked against the expected values. + + This process is repeated for all elements in ``durations``, where each successive contact timing test + is punctuated by setting the contact prim to the complement of the desired contact mode for 1 sim time-step. + + Args: + shape: The contact prim used for the contact sensor test. + sensor: The sensor reporting data to be verified by the contact sensor test. + mode: The contact test mode: either contact with ground plane or air time. + sim: The active simulation context. + scene: The interactive scene. + sim_dt: Simulation time-step [s]. + durations: Contact / air durations [s] to exercise. + """ + # reset the test state + sensor.reset() + expected_last_test_contact_time = 0 + expected_last_reset_contact_time = 0 + + # set poses for shape for a given contact sensor test mode. + # desired contact mode to set for a given duration. + test_pose = None + # complement of the desired contact mode used to reset the contact sensor. + reset_pose = None + if mode == ContactTestMode.IN_CONTACT: + test_pose = shape.cfg.contact_pose + reset_pose = shape.cfg.non_contact_pose + elif mode == ContactTestMode.NON_CONTACT: + test_pose = shape.cfg.non_contact_pose + reset_pose = shape.cfg.contact_pose + else: + raise ValueError("Received incompatible contact sensor test mode") + + for idx in range(len(durations)): + current_test_time = 0 + duration = durations[idx] + while current_test_time < duration: + # set object states to contact the ground plane + shape.write_root_pose_to_sim_index(root_pose=torch.tensor(test_pose, device=shape.device).unsqueeze(0)) + # perform simulation step + _perform_sim_step(sim, scene, sim_dt) + # increment contact time + current_test_time += sim_dt + # set last contact time to the previous desired contact duration plus the extra dt allowance. + expected_last_test_contact_time = durations[idx - 1] + sim_dt if idx > 0 else 0 + # Check the data inside the contact sensor + if mode == ContactTestMode.IN_CONTACT: + _check_prim_contact_state_times( + sensor=sensor, + expected_air_time=0.0, + expected_contact_time=durations[idx], + expected_last_contact_time=expected_last_test_contact_time, + expected_last_air_time=expected_last_reset_contact_time, + dt=duration + sim_dt, + ) + elif mode == ContactTestMode.NON_CONTACT: + _check_prim_contact_state_times( + sensor=sensor, + expected_air_time=durations[idx], + expected_contact_time=0.0, + expected_last_contact_time=expected_last_reset_contact_time, + expected_last_air_time=expected_last_test_contact_time, + dt=duration + sim_dt, + ) + + # switch the contact mode for 1 dt step before the next contact test begins. + shape.write_root_pose_to_sim_index(root_pose=torch.tensor(reset_pose, device=shape.device).unsqueeze(0)) + # perform simulation step + _perform_sim_step(sim, scene, sim_dt) + # set the last air time to 2 sim_dt steps, because last_air_time and last_contact_time + # adds an additional sim_dt to the total time spent in the previous contact mode for uncertainty in + # when the contact switch happened in between a dt step. + expected_last_reset_contact_time = 2 * sim_dt + + +def _test_friction_forces(shape: RigidObject, sensor: ContactSensor, mode: ContactTestMode) -> None: + """Verify friction force values reported by the contact sensor. + + This helper is only called from skipped tests (requires ``track_friction_forces`` + which is not supported in ovphysx v1). + + Args: + shape: The contact prim used for the contact sensor test. + sensor: The sensor reporting data to be verified. + mode: The contact test mode. + """ + if not sensor.cfg.track_friction_forces: + assert sensor._data.friction_forces_w is None + return + + # check shape of the friction_forces_w tensor (wp.to_torch expands vec3f -> float32 trailing dim) + num_bodies = sensor.num_bodies + friction_torch = sensor._data.friction_forces_w.torch + assert friction_torch.shape == (sensor.num_instances // num_bodies, num_bodies, 1, 3) + # compare friction forces + if mode == ContactTestMode.IN_CONTACT: + assert torch.any(torch.abs(friction_torch) > 1e-5).item() + friction_forces, _, buffer_count, buffer_start_indices = sensor.contact_view.get_friction_data( + dt=sensor._sim_physics_dt + ) + friction_forces_t = wp.to_torch(friction_forces) + buffer_count_t = wp.to_torch(buffer_count).to(torch.int32) + buffer_start_t = wp.to_torch(buffer_start_indices).to(torch.int32) + for i in range(sensor.num_instances * num_bodies): + for j in range(sensor.contact_view.filter_count): + start_index_ij = buffer_start_t[i, j] + count_ij = buffer_count_t[i, j] + force = torch.sum(friction_forces_t[start_index_ij : (start_index_ij + count_ij), :], dim=0) + env_idx = i // num_bodies + body_idx = i % num_bodies + assert torch.allclose(force, friction_torch[env_idx, body_idx, j, :], atol=1e-5) + elif mode == ContactTestMode.NON_CONTACT: + assert torch.all(friction_torch == 0.0).item() + + +def _test_contact_position(shape: RigidObject, sensor: ContactSensor, mode: ContactTestMode) -> None: + """Test for the contact positions (only implemented for sphere and flat terrain). + + Checks that the contact position is radius distance away from the root of the object. + + This helper is only called from skipped tests (requires ``track_contact_points`` + which is not supported in ovphysx v1). + + Args: + shape: The contact prim used for the contact sensor test. + sensor: The sensor reporting data to be verified. + mode: The contact test mode. + """ + if not sensor.cfg.track_contact_points: + assert sensor._data.contact_pos_w is None + return + + # check shape of the contact_pos_w tensor (wp.to_torch expands vec3f -> float32 trailing dim) + num_bodies = sensor.num_bodies + contact_pos_torch = sensor._data.contact_pos_w.torch + assert contact_pos_torch.shape == (sensor.num_instances // num_bodies, num_bodies, 1, 3) + # check contact positions + if mode == ContactTestMode.IN_CONTACT: + pos_w_torch = sensor._data.pos_w.torch + contact_position = pos_w_torch + torch.tensor([[0.0, 0.0, -shape.cfg.spawn.radius]], device=pos_w_torch.device) + assert torch.all( + torch.abs(torch.linalg.norm(contact_pos_torch - contact_position.unsqueeze(1), ord=2, dim=-1)) < 1e-2 + ).item() + elif mode == ContactTestMode.NON_CONTACT: + assert torch.all(torch.isnan(contact_pos_torch)).item() + + +def _check_prim_contact_state_times( + sensor: ContactSensor, + expected_air_time: float, + expected_contact_time: float, + expected_last_air_time: float, + expected_last_contact_time: float, + dt: float, +): + """Check contact sensor data matches expected values. + + Args: + sensor: Instance of ContactSensor containing data to be tested. + expected_air_time: Air time ground truth [s]. + expected_contact_time: Contact time ground truth [s]. + expected_last_air_time: Last air time ground truth [s]. + expected_last_contact_time: Last contact time ground truth [s]. + dt: Time since previous contact mode switch [s]. If the contact prim left contact 0.1 seconds ago, + dt should be 0.1 + simulation dt seconds. + """ + # store current state of the contact prim + in_air = expected_air_time > 0.0 + in_contact = expected_contact_time > 0.0 + measured_contact_time = sensor.data.current_contact_time.torch + measured_air_time = sensor.data.current_air_time.torch + measured_last_contact_time = sensor.data.last_contact_time.torch + measured_last_air_time = sensor.data.last_air_time.torch + # check current contact state + assert pytest.approx(measured_contact_time.item(), 0.01) == expected_contact_time + assert pytest.approx(measured_air_time.item(), 0.01) == expected_air_time + # check last contact state + assert pytest.approx(measured_last_contact_time.item(), 0.01) == expected_last_contact_time + assert pytest.approx(measured_last_air_time.item(), 0.01) == expected_last_air_time + # check current contact mode + assert sensor.compute_first_contact(dt=dt).torch.item() == in_contact + assert sensor.compute_first_air(dt=dt).torch.item() == in_air + + +def _perform_sim_step(sim: SimulationContext, scene: InteractiveScene, sim_dt: float) -> None: + """Update sensors and step the contact sensor test scene. + + Args: + sim: The active simulation context. + scene: The interactive scene. + sim_dt: Simulation time-step [s]. + """ + # write data to simulation + scene.write_data_to_sim() + # simulate + sim.step(render=False) + # update buffers at sim dt + scene.update(dt=sim_dt) diff --git a/source/isaaclab_tasks/changelog.d/antoiner-feat-ovphysx_contactsensor.rst b/source/isaaclab_tasks/changelog.d/antoiner-feat-ovphysx_contactsensor.rst new file mode 100644 index 000000000000..ab7247b1bea5 --- /dev/null +++ b/source/isaaclab_tasks/changelog.d/antoiner-feat-ovphysx_contactsensor.rst @@ -0,0 +1,10 @@ +Added +^^^^^ + +* Added ``ovphysx`` preset to ``isaaclab_tasks.manager_based.locomotion.velocity`` + for use under the OVPhysX backend. ``AnymalDFlatPhysicsCfg`` now exposes + an ``ovphysx`` member, and the shared ``LocomotionVelocityRoughEnvCfg`` + injects the OVPhysX :class:`~isaaclab_ovphysx.sensors.ContactSensorCfg` + alongside the existing PhysX and Newton entries so the velocity task + selects the right contact sensor backend when run with + ``presets=ovphysx``. diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/config/anymal_d/flat_env_cfg.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/config/anymal_d/flat_env_cfg.py index 5a97b75f7936..abf4ffae0f9f 100644 --- a/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/config/anymal_d/flat_env_cfg.py +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/config/anymal_d/flat_env_cfg.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: BSD-3-Clause from isaaclab_newton.physics import MJWarpSolverCfg, NewtonCfg +from isaaclab_ovphysx.physics import OvPhysxCfg from isaaclab_physx.physics import PhysxCfg from isaaclab.sim import SimulationCfg @@ -28,6 +29,7 @@ class PhysicsCfg(PresetCfg): debug_mode=False, ) physx = default + ovphysx = OvPhysxCfg() @configclass diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/velocity_env_cfg.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/velocity_env_cfg.py index 1b936e9fc0b9..b1b7e0043bed 100644 --- a/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/velocity_env_cfg.py +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/velocity_env_cfg.py @@ -8,6 +8,7 @@ from isaaclab_newton.physics import MJWarpSolverCfg, NewtonCfg, NewtonCollisionPipelineCfg, NewtonShapeCfg from isaaclab_newton.sensors import ContactSensorCfg as NewtonContactSensorCfg +from isaaclab_ovphysx.sensors import ContactSensorCfg as OvPhysXContactSensorCfg from isaaclab_physx.physics import PhysxCfg from isaaclab_physx.sensors import ContactSensorCfg as PhysXContactSensorCfg @@ -78,6 +79,7 @@ class VelocityEnvContactSensorCfg(PresetCfg): default = PhysXContactSensorCfg(prim_path="{ENV_REGEX_NS}/Robot/.*", history_length=3, track_air_time=True) newton_mjwarp = NewtonContactSensorCfg(prim_path="{ENV_REGEX_NS}/Robot/.*", history_length=3, track_air_time=True) physx = default + ovphysx = OvPhysXContactSensorCfg(prim_path="{ENV_REGEX_NS}/Robot/.*", history_length=3, track_air_time=True) @configclass