diff --git a/docs/source/features/isaac_teleop.rst b/docs/source/features/isaac_teleop.rst index ac910d6ed4ed..733150259586 100644 --- a/docs/source/features/isaac_teleop.rst +++ b/docs/source/features/isaac_teleop.rst @@ -115,8 +115,10 @@ and Isaac Lab. It composes three collaborators: Isaac Sim's XR bridge, creates the ``TeleopSession``, and steps it each frame to produce an action tensor. -* **CommandHandler** -- registers and dispatches START / STOP / RESET callbacks triggered by XR UI - buttons or the message bus. +* **CommandHandler** -- lightweight callback registry for START / STOP / RESET commands. Scripts + can register callbacks via :meth:`~isaaclab_teleop.IsaacTeleopDevice.add_callback`, but the + primary control path uses :func:`~isaaclab_teleop.poll_control_events` (see + :ref:`isaac-teleop-control-states`). .. dropdown:: Session lifecycle details @@ -127,6 +129,104 @@ and Isaac Lab. It composes three collaborators: the session is not yet ready or has been torn down. +.. _isaac-teleop-control-states: + +Teleop Control States (Start / Stop / Reset) +--------------------------------------------- + +Isaac Lab supports remote teleop control commands -- **start**, **stop**, and **reset** -- sent +from the XR headset to the simulation. These are used to begin and end demonstration recording, +pause the robot, or reset the environment without touching the simulation host. + +How it works +~~~~~~~~~~~~ + +By default, every :class:`~isaaclab_teleop.IsaacTeleopCfg` enables a control message channel +using the well-known UUID ``uuid5(NAMESPACE_DNS, "teleop_command")``. The channel is created as +a ``teleop_control_pipeline`` inside TeleopCore's :class:`TeleopSession`, which means: + +1. A :class:`~isaacteleop.retargeting_engine.deviceio_source_nodes.MessageChannelSource` opens an + OpenXR opaque data channel (``XR_NV_opaque_data_channel``) with the agreed-upon UUID. +2. The CloudXR JS client (or any other client) discovers the channel by UUID and sends UTF-8 + JSON commands:: + + {"type": "teleop_command", "message": {"command": "start teleop"}} + {"type": "teleop_command", "message": {"command": "stop teleop"}} + {"type": "teleop_command", "message": {"command": "reset teleop"}} + +3. A :class:`~isaaclab_teleop.teleop_message_processor.TeleopMessageProcessor` parses these + payloads and produces boolean pulse signals (``run_toggle``, ``kill``, ``reset``). +4. :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager` consumes the + boolean signals, runs its state machine (edge detection, fail-safe), and produces + ``teleop_state`` (one-hot) and ``reset_event`` (bool pulse) outputs. +5. TeleopCore decodes these outputs into ``ExecutionEvents`` and injects them into every + retargeter's ``ComputeContext``, so stateful retargeters can react to state changes + (e.g. reinitializing cross-step state on reset). + +Polling control events in your script +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Use :func:`~isaaclab_teleop.poll_control_events` to read the latest control state each frame: + +.. code-block:: python + + from isaaclab_teleop import poll_control_events + + with IsaacTeleopDevice(cfg) as device: + running = False + while sim_app.is_running(): + action = device.advance() + + ctrl = poll_control_events(device) + if ctrl.is_active is not None: + running = ctrl.is_active # True after "start", False after "stop" + if ctrl.should_reset: + env.reset() # "reset" command received this frame + + if action is not None and running: + env.step(action.repeat(num_envs, 1)) + else: + env.sim.render() + +:class:`~isaaclab_teleop.ControlEvents` has two fields: + +* ``is_active`` -- ``True`` after a "start" command, ``False`` after "stop", ``None`` when no + command has been received yet (callers should leave their own flag unchanged). +* ``should_reset`` -- ``True`` for exactly one frame after a "reset" command. + +Disabling the control channel +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If you do not need headset-driven start/stop/reset (e.g. keyboard-only workflows), set +``control_channel_uuid=None`` in your config: + +.. code-block:: python + + IsaacTeleopCfg( + pipeline_builder=_build_my_pipeline, + control_channel_uuid=None, # no opaque data channel created + ) + +Using a custom channel UUID +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To use a different channel UUID (e.g. for a separate control protocol), pass any 16-byte +``bytes`` value: + +.. code-block:: python + + import uuid + + MY_UUID = uuid.uuid5(uuid.NAMESPACE_DNS, "my_custom_control").bytes + + IsaacTeleopCfg( + pipeline_builder=_build_my_pipeline, + control_channel_uuid=MY_UUID, + ) + +The CloudXR JS client must be updated to discover this UUID when sending commands. + + .. _isaac-teleop-retargeting: Retargeting Framework @@ -908,6 +1008,10 @@ See the :ref:`isaaclab_teleop-api` for full class and function documentation: * :class:`~isaaclab_teleop.IsaacTeleopCfg` * :class:`~isaaclab_teleop.IsaacTeleopDevice` * :func:`~isaaclab_teleop.create_isaac_teleop_device` +* :class:`~isaaclab_teleop.ControlEvents` +* :class:`~isaaclab_teleop.SupportsControlEvents` +* :func:`~isaaclab_teleop.poll_control_events` +* :data:`~isaaclab_teleop.TELEOP_CONTROL_CHANNEL_UUID` * :class:`~isaaclab_teleop.XrCfg` * :class:`~isaaclab_teleop.XrAnchorRotationMode` diff --git a/scripts/environments/teleoperation/teleop_se3_agent.py b/scripts/environments/teleoperation/teleop_se3_agent.py index 897eb159e86e..cdd5c104c44f 100644 --- a/scripts/environments/teleoperation/teleop_se3_agent.py +++ b/scripts/environments/teleoperation/teleop_se3_agent.py @@ -218,7 +218,7 @@ def stop_teleoperation() -> None: try: if use_isaac_teleop: - from isaaclab_teleop import create_isaac_teleop_device + from isaaclab_teleop import create_isaac_teleop_device, poll_control_events teleop_interface = create_isaac_teleop_device( env_cfg.isaac_teleop, @@ -297,6 +297,13 @@ def run_loop(): # get device command action = teleop_interface.advance() + if use_isaac_teleop: + ctrl = poll_control_events(teleop_interface) + if ctrl.is_active is not None: + teleoperation_active = ctrl.is_active + if ctrl.should_reset: + should_reset_recording_instance = True + # action is None when IsaacTeleop session hasn't started yet # (e.g. waiting for user to click "Start AR") if action is None: diff --git a/scripts/tools/record_demos.py b/scripts/tools/record_demos.py index bd318b7a2625..75df9e0ee92a 100644 --- a/scripts/tools/record_demos.py +++ b/scripts/tools/record_demos.py @@ -406,26 +406,34 @@ def process_success_condition(env: gym.Env, success_term: object | None, success def handle_reset( - env: gym.Env, success_step_count: int, instruction_display: InstructionDisplay, label_text: str + env: gym.Env, + success_step_count: int, + instruction_display: InstructionDisplay, + label_text: str, + teleop_interface: object | None = None, ) -> int: """Handle resetting the environment. - Resets the environment, recorder manager, and related state variables. - Updates the instruction display with current status. + Resets the environment, recorder manager, teleop device, and related + state variables. Updates the instruction display with current status. Args: - env: The environment instance to reset - success_step_count: Current count of consecutive successful steps - instruction_display: The display object to update - label_text: Text to display showing current recording status + env: The environment instance to reset. + success_step_count: Current count of consecutive successful steps. + instruction_display: The display object to update. + label_text: Text to display showing current recording status. + teleop_interface: Optional teleop device to reset (resets XR anchor + and retargeter cross-step state). Returns: - int: Reset success step count (0) + Reset success step count (0). """ print("Resetting environment...") env.sim.reset() env.recorder_manager.reset() env.reset() + if teleop_interface is not None and hasattr(teleop_interface, "reset"): + teleop_interface.reset() success_step_count = 0 instruction_display.show_demo(label_text) return success_step_count @@ -476,7 +484,9 @@ def stop_recording_instance(): running_recording_instance = False print("Recording paused") - # Set up teleoperation callbacks + # Set up teleoperation callbacks. For IsaacTeleop the primary control + # path is poll_control_events(); these callbacks are bridged automatically + # and also serve native (keyboard / spacemouse) devices. teleoperation_callbacks = { "R": reset_recording_instance, "START": start_recording_instance, @@ -485,7 +495,6 @@ def stop_recording_instance(): } teleop_interface = setup_teleop_device(teleoperation_callbacks, use_isaac_teleop) - teleop_interface.add_callback("R", reset_recording_instance) label_text = f"Recorded {current_recorded_demo_count} successful demonstrations." instruction_display = setup_ui(label_text, env) @@ -504,10 +513,21 @@ def inner_loop(): stack_name = "IsaacTeleop" if use_isaac_teleop else "native" print(f"{stack_name} recording started.") + if use_isaac_teleop: + from isaaclab_teleop import poll_control_events + with contextlib.suppress(KeyboardInterrupt), torch.inference_mode(): while simulation_app.is_running(): # Get teleop command (may be None while waiting for session start) action = teleop_interface.advance() + + if use_isaac_teleop: + ctrl = poll_control_events(teleop_interface) + if ctrl.is_active is not None: + running_recording_instance = ctrl.is_active + if ctrl.should_reset: + should_reset_recording_instance = True + if action is None: env.sim.render() continue @@ -558,7 +578,9 @@ def inner_loop(): # Handle reset if requested if should_reset_recording_instance: - success_step_count = handle_reset(env, success_step_count, instruction_display, label_text) + success_step_count = handle_reset( + env, success_step_count, instruction_display, label_text, teleop_interface + ) should_reset_recording_instance = False # Check if simulation is stopped diff --git a/source/isaaclab_teleop/config/extension.toml b/source/isaaclab_teleop/config/extension.toml index 881c57a52727..13c63e04ab99 100644 --- a/source/isaaclab_teleop/config/extension.toml +++ b/source/isaaclab_teleop/config/extension.toml @@ -1,6 +1,6 @@ [package] # Semantic Versioning is used: https://semver.org/ -version = "0.3.5" +version = "0.3.6" # Description title = "Isaac Lab Teleop" diff --git a/source/isaaclab_teleop/docs/CHANGELOG.rst b/source/isaaclab_teleop/docs/CHANGELOG.rst index d526500022c1..9ad0bf77ecd2 100644 --- a/source/isaaclab_teleop/docs/CHANGELOG.rst +++ b/source/isaaclab_teleop/docs/CHANGELOG.rst @@ -1,6 +1,51 @@ Changelog --------- +0.3.6 (2026-04-21) +~~~~~~~~~~~~~~~~~~~ + +Added +^^^^^ + +* Added :attr:`~isaaclab_teleop.IsaacTeleopCfg.control_channel_uuid` for + receiving teleop control commands (start/stop/reset) from the headset via + an OpenXR message channel. The channel is managed by TeleopCore's native + ``teleop_control_pipeline`` mechanism. + +* Added :class:`~isaaclab_teleop.teleop_message_processor.TeleopMessageProcessor` + retargeter that converts raw message-channel payloads into boolean control + signals for :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager`. + +* Added :func:`~isaaclab_teleop.poll_control_events` helper, + :class:`~isaaclab_teleop.ControlEvents` dataclass, and + :class:`~isaaclab_teleop.SupportsControlEvents` protocol for polling + start/stop/reset signals from any teleop device in a single call. + +* Added :attr:`~isaaclab_teleop.IsaacTeleopDevice.last_control_events` + property exposing the most recent control events from the message channel. + Control events are automatically bridged to legacy + :meth:`~isaaclab_teleop.IsaacTeleopDevice.add_callback` callbacks. + +Changed +^^^^^^^ + +* :meth:`~isaaclab_teleop.IsaacTeleopDevice.reset` now injects a + ``reset`` :class:`ExecutionEvents` into TeleopCore's ``ComputeContext`` + on the next pipeline step, resetting retargeter cross-step state. + Previously only the XR anchor was reset. + +Fixed +^^^^^ + +* Fixed ``record_demos.py`` not resetting the teleop device when a + success condition triggers an environment reset. Retargeters now + reinitialize their state on success-triggered resets. + +* Fixed shutdown hang caused by Kit's pre-shutdown callback calling + ``stop()`` while the simulation loop was still running. The callback + now uses the same graceful teardown path as the XR-disabled handler. + + 0.3.5 (2026-04-06) ~~~~~~~~~~~~~~~~~~~ diff --git a/source/isaaclab_teleop/isaaclab_teleop/__init__.pyi b/source/isaaclab_teleop/isaaclab_teleop/__init__.pyi index 655c7025cb0f..045f16f0c690 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/__init__.pyi +++ b/source/isaaclab_teleop/isaaclab_teleop/__init__.pyi @@ -6,15 +6,20 @@ __all__ = [ "CLOUDXR_AVP_ENV", "CLOUDXR_JS_ENV", + "ControlEvents", "IsaacTeleopCfg", "IsaacTeleopDevice", - "create_isaac_teleop_device", - "XrAnchorSynchronizer", + "SupportsControlEvents", + "TELEOP_CONTROL_CHANNEL_UUID", "XrAnchorRotationMode", + "XrAnchorSynchronizer", "XrCfg", + "create_isaac_teleop_device", + "poll_control_events", "remove_camera_configs", ] +from .control_events import TELEOP_CONTROL_CHANNEL_UUID, ControlEvents, SupportsControlEvents, poll_control_events from .isaac_teleop_cfg import CLOUDXR_AVP_ENV, CLOUDXR_JS_ENV, IsaacTeleopCfg from .isaac_teleop_device import IsaacTeleopDevice, create_isaac_teleop_device from .xr_anchor_utils import XrAnchorSynchronizer diff --git a/source/isaaclab_teleop/isaaclab_teleop/command_handler.py b/source/isaaclab_teleop/isaaclab_teleop/command_handler.py index eb5fb38aeb44..7e999e638f5e 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/command_handler.py +++ b/source/isaaclab_teleop/isaaclab_teleop/command_handler.py @@ -3,57 +3,30 @@ # # SPDX-License-Identifier: BSD-3-Clause -"""Teleop command handling for IsaacTeleop-based teleoperation.""" +"""Teleop command callback registry for IsaacTeleop-based teleoperation.""" from __future__ import annotations -import logging from collections.abc import Callable -from typing import Any - -import carb - -logger = logging.getLogger(__name__) class CommandHandler: - """Handles teleop command callbacks and XR message bus events. - - This class is responsible for: - - 1. Registering callbacks for teleop commands (START, STOP, RESET) - 2. Subscribing to the XR message bus for command events - 3. Dispatching callbacks when commands are received - - Teleop commands can be triggered via XR controller buttons or the - message bus. The handler normalizes command names (e.g. mapping - ``"R"`` to ``"RESET"``) and dispatches to registered callbacks. + """Lightweight callback registry for teleop commands. + + Scripts can register callbacks for ``START``, ``STOP``, and ``RESET`` + commands via :meth:`add_callback`. The callbacks are dispatched by + :meth:`fire` when the corresponding command is received. + + Note: + In the current architecture control signals arrive through + TeleopCore's ``teleop_control_pipeline`` and are consumed via + :func:`~isaaclab_teleop.poll_control_events`. This registry is + retained for backward compatibility with scripts that register + callbacks before the pipeline-based path was introduced. """ - TELEOP_COMMAND_EVENT_TYPE = "teleop_command" - - def __init__(self, xr_core: Any | None = None, on_reset: Callable[[], None] | None = None): - """Initialize the command handler. - - Args: - xr_core: The XRCore singleton, or ``None`` if XR is not available. - When provided, the handler subscribes to the message bus for - teleop command events. - on_reset: Optional hook called whenever a ``"reset"`` message-bus - event is received, *in addition to* the user's RESET callback. - This allows the device to perform internal reset actions (e.g. - resetting the XR anchor) without coupling the handler to the - anchor manager. - """ + def __init__(self) -> None: self._callbacks: dict[str, Callable] = {} - self._on_reset = on_reset - self._xr_core = xr_core - self._vc_subscription = None - - if self._xr_core is not None: - self._vc_subscription = self._xr_core.get_message_bus().create_subscription_to_pop_by_type( - carb.events.type_from_string(self.TELEOP_COMMAND_EVENT_TYPE), self._on_teleop_command - ) @property def callbacks(self) -> dict[str, Callable]: @@ -70,7 +43,6 @@ def add_callback(self, key: str, func: Callable) -> None: func: The function to call when the command is received. Should take no arguments. """ - # Map "R" to "RESET" for compatibility with existing scripts if key == "R": key = "RESET" self._callbacks[key] = func @@ -84,19 +56,5 @@ def fire(self, command: str) -> None: if command in self._callbacks: self._callbacks[command]() - def _on_teleop_command(self, event: carb.events.IEvent) -> None: - """Handle teleop command events from the message bus.""" - msg = event.payload.get("message", "") - - if "start" in msg: - self.fire("START") - elif "stop" in msg: - self.fire("STOP") - elif "reset" in msg: - self.fire("RESET") - if self._on_reset is not None: - self._on_reset() - def cleanup(self) -> None: - """Release event subscriptions.""" - self._vc_subscription = None + """Release resources (no-op; retained for API compatibility).""" diff --git a/source/isaaclab_teleop/isaaclab_teleop/control_events.py b/source/isaaclab_teleop/isaaclab_teleop/control_events.py new file mode 100644 index 000000000000..69ddd7c1ed21 --- /dev/null +++ b/source/isaaclab_teleop/isaaclab_teleop/control_events.py @@ -0,0 +1,78 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Teleop control events dataclass, polling helper, and well-known channel UUID.""" + +from __future__ import annotations + +import dataclasses +import uuid +from typing import Protocol, runtime_checkable + +TELEOP_CONTROL_CHANNEL_UUID: bytes = uuid.uuid5(uuid.NAMESPACE_DNS, "teleop_command").bytes +"""Well-known 16-byte UUID for the teleop control message channel. + +Derived deterministically as ``uuid5(NAMESPACE_DNS, "teleop_command")`` +so that both the Isaac Lab server and the Quest client can independently +compute the same channel identifier from the string ``"teleop_command"``. + +Pass this value as :attr:`~isaaclab_teleop.IsaacTeleopCfg.control_channel_uuid` +when configuring a teleop session with message-channel-based control. +""" + + +@dataclasses.dataclass(frozen=True) +class ControlEvents: + """Result of :func:`poll_control_events`. + + Attributes: + is_active: ``True`` when the teleop state machine is in RUNNING, + ``False`` when PAUSED or STOPPED, or ``None`` when no control + channel is configured (callers should leave their own active + flag unchanged). + should_reset: ``True`` when a reset was triggered this frame. + """ + + is_active: bool | None = None + should_reset: bool = False + + +_NO_OP_EVENTS = ControlEvents() +"""Shared immutable sentinel returned when no control channel is active.""" + + +@runtime_checkable +class SupportsControlEvents(Protocol): + """Duck type for teleop devices that expose control events.""" + + @property + def last_control_events(self) -> ControlEvents: ... + + +def poll_control_events(teleop_interface: SupportsControlEvents | object) -> ControlEvents: + """Poll control events from any teleop interface. + + Safe to call with any device type (keyboard, spacemouse, etc.). + Devices that do not expose the message-channel protocol return + a no-op :class:`ControlEvents`. + + Args: + teleop_interface: The teleop device to poll. Devices implementing + :class:`SupportsControlEvents` provide full type safety; other + devices are handled gracefully via duck typing. + + Returns: + A :class:`ControlEvents` with the latest start/stop and reset + signals. + """ + events = getattr(teleop_interface, "last_control_events", None) + if events is None: + return _NO_OP_EVENTS + if isinstance(events, ControlEvents): + return events + return ControlEvents( + is_active=getattr(events, "is_active", None), + should_reset=getattr(events, "should_reset", False), + ) diff --git a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py index 6539fa67f346..f94a63d57589 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py +++ b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py @@ -14,6 +14,7 @@ from isaaclab.utils import configclass +from .control_events import TELEOP_CONTROL_CHANNEL_UUID from .xr_cfg import XrCfg _CLOUDXR_ENV_DIR = Path(__file__).resolve().parent @@ -117,6 +118,24 @@ def build_pipeline(): If ``None``, the tuning UI will not be opened. """ + control_channel_uuid: bytes | None = TELEOP_CONTROL_CHANNEL_UUID + """16-byte UUID for the teleop control message channel. + + Defaults to :data:`~isaaclab_teleop.TELEOP_CONTROL_CHANNEL_UUID` + (``uuid5(NAMESPACE_DNS, "teleop_command")``), which is the well-known + channel both the Isaac Lab server and CloudXR JS client use to + exchange start/stop/reset commands. + + When set, a ``teleop_control_pipeline`` is created automatically + using :class:`~isaaclab_teleop.teleop_message_processor.TeleopMessageProcessor` + and :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager`. + The remote client sends UTF-8 control commands over the OpenXR opaque + data channel identified by this UUID, and the results are exposed via + :func:`~isaaclab_teleop.poll_control_events`. + + Set to ``None`` to disable the control channel entirely. + """ + target_frame_prim_path: str | None = None """Optional USD prim path whose world frame becomes the target coordinate frame for all output poses. diff --git a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py index 2e7c2c7a406b..3f8c565a7e21 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py +++ b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py @@ -15,6 +15,7 @@ import torch from .command_handler import CommandHandler +from .control_events import ControlEvents from .isaac_teleop_cfg import IsaacTeleopCfg from .session_lifecycle import TeleopSessionLifecycle from .xr_anchor_manager import XrAnchorManager @@ -35,8 +36,8 @@ class IsaacTeleopDevice: and coordinate-frame transform computation. * :class:`TeleopSessionLifecycle` -- pipeline building, OpenXR handle acquisition, session creation/destruction, and action-tensor extraction. - * :class:`CommandHandler` -- callback registration and XR message-bus - command dispatch. + * :class:`CommandHandler` -- callback registration for START / STOP / RESET + commands, bridged from the pipeline-based control events. Together they manage: @@ -67,7 +68,8 @@ class IsaacTeleopDevice: Teleop commands: The device supports callbacks for START, STOP, and RESET commands - that can be triggered via XR controller buttons or the message bus. + that can be triggered via the message-channel control pipeline or + registered directly via :meth:`add_callback`. Example: .. code-block:: python @@ -118,20 +120,16 @@ def __init__( """ self._cfg = cfg - # Compose the three collaborators self._anchor_manager = XrAnchorManager(cfg.xr_cfg) + self._command_handler = CommandHandler() self._session_lifecycle = TeleopSessionLifecycle( cfg, cloudxr_env_file=cloudxr_env_file, auto_launch_cloudxr=auto_launch_cloudxr, ) - self._command_handler = CommandHandler( - xr_core=self._anchor_manager.xr_core, - on_reset=self._anchor_manager.reset, - ) - # Controller button polling state (edge detection for right 'A') self._prev_right_a_pressed = False + self._prev_control_is_active: bool | None = None def __del__(self): """Clean up resources when the object is destroyed.""" @@ -188,9 +186,23 @@ def __exit__(self, exc_type, exc_val, exc_tb): def reset(self) -> None: """Reset the device state. - Resets the XR anchor synchronizer if present. + Resets the XR anchor synchronizer and schedules a + ``reset`` :class:`~isaacteleop.retargeting_engine.interface.execution_events.ExecutionEvents` + for the next pipeline step so that all retargeters reinitialize + their cross-step state. """ self._anchor_manager.reset() + self._session_lifecycle.request_reset() + + @property + def last_control_events(self) -> ControlEvents: + """Control events from the most recent :meth:`advance`. + + Returns a :class:`ControlEvents` derived from the teleop control + pipeline. When no control channel is configured, returns a + default (no-op) :class:`ControlEvents`. + """ + return self._session_lifecycle.last_control_events def add_callback(self, key: str, func: Callable) -> None: """Add a callback function for teleop commands. @@ -252,8 +264,39 @@ def advance(self, target_T_world: np.ndarray | torch.Tensor | SupportsDLPack | N # Poll controller buttons (e.g. toggle anchor rotation on right 'A' press) self._poll_buttons() + self._dispatch_control_callbacks() + return action + # ------------------------------------------------------------------ + # Control event -> callback bridge + # ------------------------------------------------------------------ + + def _dispatch_control_callbacks(self) -> None: + """Fire legacy callbacks when control events indicate a state change. + + This bridges the pipeline-based :class:`ControlEvents` with the + callback-based :class:`CommandHandler` so that scripts which registered + callbacks via :meth:`add_callback` still receive dispatches. + + Only fires START/STOP when ``is_active`` transitions between ``True`` + and ``False``; initial transitions from ``None`` are ignored to avoid + spurious callbacks during ``DefaultTeleopStateManager``'s + STOPPED -> PAUSED progression. + """ + from .control_events import _NO_OP_EVENTS + + events = self._session_lifecycle.last_control_events + if events is _NO_OP_EVENTS: + return + if events.should_reset: + self._command_handler.fire("RESET") + self._anchor_manager.reset() + if events.is_active is not None: + if self._prev_control_is_active is not None and events.is_active != self._prev_control_is_active: + self._command_handler.fire("START" if events.is_active else "STOP") + self._prev_control_is_active = events.is_active + # ------------------------------------------------------------------ # Target frame transform (config-driven rebase) # ------------------------------------------------------------------ diff --git a/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py b/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py index d395fab31a30..fa5f36f658e0 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py +++ b/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py @@ -18,10 +18,13 @@ if TYPE_CHECKING: from isaacteleop.cloudxr import CloudXRLauncher from isaacteleop.oxr import OpenXRSessionHandles + from isaacteleop.retargeting_engine.interface.execution_events import ExecutionEvents from isaacteleop.retargeting_engine_ui import MultiRetargeterTuningUIImGui from isaacteleop.teleop_session_manager import TeleopSession +from .control_events import _NO_OP_EVENTS, ControlEvents from .isaac_teleop_cfg import IsaacTeleopCfg +from .teleop_message_processor import TeleopMessageProcessor class SupportsDLPack(Protocol): @@ -71,6 +74,19 @@ def _to_numpy_4x4(mat: np.ndarray | torch.Tensor | SupportsDLPack) -> np.ndarray return np.asarray(mat, dtype=np.float32) +def _execution_events_to_control(ee: ExecutionEvents) -> ControlEvents: + """Map TeleopCore :class:`ExecutionEvents` to the script-facing :class:`ControlEvents`.""" + from isaacteleop.retargeting_engine.interface.execution_events import ExecutionState + + if ee.execution_state == ExecutionState.RUNNING: + is_active: bool | None = True + elif ee.execution_state in (ExecutionState.PAUSED, ExecutionState.STOPPED): + is_active = False + else: + is_active = None + return ControlEvents(is_active=is_active, should_reset=ee.reset) + + class TeleopSessionLifecycle: """Manages the IsaacTeleop session lifecycle. @@ -78,11 +94,13 @@ class TeleopSessionLifecycle: 1. Building the retargeting pipeline from configuration 2. Adding a parallel ``ControllersSource`` for button-state access - 3. Acquiring OpenXR handles from Kit's XR bridge extension - 4. Creating, entering, and exiting the ``TeleopSession`` - 5. Building external inputs for pipeline leaf nodes (e.g. world-to-anchor transform) - 6. Stepping the session and extracting the flattened action tensor - 7. Managing the optional retargeting tuning UI + 3. Building the optional ``teleop_control_pipeline`` for headset-driven + start/stop/reset via a message channel + 4. Acquiring OpenXR handles from Kit's XR bridge extension + 5. Creating, entering, and exiting the ``TeleopSession`` + 6. Building external inputs for pipeline leaf nodes (e.g. world-to-anchor transform) + 7. Stepping the session and extracting the flattened action tensor + 8. Managing the optional retargeting tuning UI """ WORLD_T_ANCHOR_INPUT_NAME = "world_T_anchor" @@ -118,8 +136,12 @@ def __init__( # Session state (populated during start) self._session: TeleopSession | None = None self._pipeline = None + self._teleop_control_pipeline = None + self._message_processor: TeleopMessageProcessor | None = None self._last_right_controller = None self._session_start_deferred_logged = False + # Fallback for host-initiated resets when no control pipeline is configured + self._pending_reset = False # CloudXR runtime launcher (created in start if configured, stopped in stop) self._cloudxr_launcher: CloudXRLauncher | None = None @@ -192,6 +214,48 @@ def last_right_controller(self): """ return self._last_right_controller + @property + def has_control_channel(self) -> bool: + """Whether a message-channel-based control pipeline is configured.""" + return self._message_processor is not None + + @property + def last_control_events(self) -> ControlEvents: + """Control events from the most recent :meth:`step`. + + When a ``teleop_control_pipeline`` is configured, derives + :class:`ControlEvents` from + ``session.last_context.execution_events``. Otherwise returns a + default (no-op) :class:`ControlEvents`. + """ + if self._message_processor is None: + return _NO_OP_EVENTS + if self._session is None: + return _NO_OP_EVENTS + ctx = self._session.last_context + if ctx is None: + return _NO_OP_EVENTS + return _execution_events_to_control(ctx.execution_events) + + def request_reset(self) -> None: + """Schedule a reset for the next pipeline step. + + When a control pipeline is configured, the reset flows through + :meth:`TeleopMessageProcessor.inject_reset` so + :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager` + processes it normally. Otherwise falls back to an + ``execution_events`` override on the next :meth:`step` call. + + If the control channel already processed a reset this frame, + this method is a no-op to avoid a redundant second reset pulse. + """ + if self.last_control_events.should_reset: + return + if self._message_processor is not None: + self._message_processor.inject_reset() + else: + self._pending_reset = True + # ------------------------------------------------------------------ # Lifecycle: start / stop # ------------------------------------------------------------------ @@ -203,9 +267,10 @@ def start(self) -> None: the CloudXR runtime and WSS proxy are launched first. Builds the retargeting pipeline, wraps it with a parallel - ``ControllersSource`` for button-state access, attempts to acquire - OpenXR handles, and opens the retargeting tuning UI if retargeters - are configured. + ``ControllersSource`` for button-state access, builds the optional + ``teleop_control_pipeline`` for message-channel control, attempts + to acquire OpenXR handles, and opens the retargeting tuning UI if + retargeters are configured. If the OpenXR handles are not yet available (e.g. user hasn't clicked "Start AR"), session creation is deferred and will be retried on each @@ -222,12 +287,19 @@ def start(self) -> None: self._last_right_controller = None button_controllers = ControllersSource("_button_controllers") - self._pipeline = OutputCombiner( - { - "action": user_pipeline.output("action"), - self._CONTROLLER_RIGHT_KEY: button_controllers.output(ControllersSource.RIGHT), - } - ) + pipeline_outputs: dict[str, Any] = { + "action": user_pipeline.output("action"), + self._CONTROLLER_RIGHT_KEY: button_controllers.output(ControllersSource.RIGHT), + } + self._pipeline = OutputCombiner(pipeline_outputs) + + # Build optional teleop_control_pipeline for message-channel control + self._teleop_control_pipeline = None + self._message_processor = None + if self._cfg.control_channel_uuid is not None: + self._teleop_control_pipeline, self._message_processor = self._build_control_pipeline( + self._cfg.control_channel_uuid + ) # Try to start the session now; it may be deferred self._try_start_session() @@ -269,7 +341,12 @@ def stop(self, exc_type=None, exc_val=None, exc_tb=None) -> None: # expected and safe to suppress. logger.debug(f"Suppressed error during IsaacTeleop session cleanup: {e}") self._session = None - self._pipeline = None + + # Always clear pipeline state (session may never have been created if + # OpenXR handles were never available). + self._pipeline = None + self._teleop_control_pipeline = None + self._message_processor = None if self._cloudxr_launcher is not None: try: @@ -282,18 +359,68 @@ def stop(self, exc_type=None, exc_val=None, exc_tb=None) -> None: logger.info("IsaacTeleop session ended") + # ------------------------------------------------------------------ + # Control pipeline construction + # ------------------------------------------------------------------ + + @staticmethod + def _build_control_pipeline(channel_uuid: bytes) -> tuple[Any, TeleopMessageProcessor]: + """Build a ``teleop_control_pipeline`` from a message channel UUID. + + Wires ``MessageChannelSource`` -> :class:`TeleopMessageProcessor` + -> :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager`. + + Args: + channel_uuid: 16-byte UUID for the OpenXR opaque data channel. + + Returns: + A ``(teleop_control_pipeline, message_processor)`` tuple. + """ + from isaacteleop.retargeting_engine.deviceio_source_nodes import message_channel_config + from isaacteleop.teleop_session_manager import DefaultTeleopStateManager + + source, _sink = message_channel_config( + name="_teleop_control", + channel_uuid=channel_uuid, + ) + + processor = TeleopMessageProcessor(name="_teleop_msg_processor") + processor_graph = processor.connect({processor.INPUT_MESSAGES: source.output("messages_tracked")}) + + state_manager = DefaultTeleopStateManager(name="_teleop_state") + teleop_control_pipeline = state_manager.connect( + { + state_manager.INPUT_KILL: processor_graph.output("kill"), + state_manager.INPUT_RUN_TOGGLE: processor_graph.output("run_toggle"), + state_manager.INPUT_RESET: processor_graph.output("reset"), + } + ) + + return teleop_control_pipeline, processor + + # ------------------------------------------------------------------ + # Extension / XR lifecycle callbacks + # ------------------------------------------------------------------ + def _on_request_required_extensions(self) -> list[str]: """Callback for required extensions subscription. + Inspects both the main pipeline and the ``teleop_control_pipeline`` + (if configured) so that extensions required by the control channel + (e.g. ``XR_NV_opaque_data_channel``) are included. + Returns: A list of required extensions. """ from isaacteleop.teleop_session_manager.helpers import get_required_oxr_extensions_from_pipeline - required_extensions = ( - get_required_oxr_extensions_from_pipeline(self._pipeline) if self._pipeline is not None else [] - ) + required_extensions: list[str] = [] + if self._pipeline is not None: + required_extensions.extend(get_required_oxr_extensions_from_pipeline(self._pipeline)) + if self._teleop_control_pipeline is not None: + required_extensions.extend(get_required_oxr_extensions_from_pipeline(self._teleop_control_pipeline)) + required_extensions = sorted(set(required_extensions)) logger.info(f"Required extensions: {required_extensions}") return required_extensions @@ -307,10 +434,16 @@ def _on_xr_enabled_changed(self, item, event_type): self._teardown_dead_session() def _on_pre_shutdown(self, _event): - """Called when Kit is closing; run full cleanup since the app is exiting.""" + """Called when Kit is closing; tear down the session but leave the + pipeline intact so the main loop can exit via its own control flow + (``simulation_app.is_running()`` will go ``False``). + + Full resource cleanup happens later when the context manager's + ``__exit__`` calls :meth:`stop`. + """ logger.info("Shutting down IsaacTeleop session due to Kit close") self._pre_shutdown_subscription = None - self.stop() + self._teardown_dead_session() # ------------------------------------------------------------------ # Deferred session creation @@ -341,11 +474,6 @@ def _try_start_session(self) -> bool: if self._session is not None: return True - # In headless mode the AR profile setting is deliberately omitted - # from the .kit file so that all extensions (including the teleop - # bridge and its BridgeComponent) can load and register before Kit - # creates the OpenXR instance. We enable it here, after extensions - # are loaded; Kit will process the change on the next event-loop tick. self._ensure_xr_ar_profile_enabled() from isaacteleop.oxr import OpenXRSessionHandles @@ -371,6 +499,7 @@ def _try_start_session(self) -> bool: app_name=self._cfg.app_name, trackers=[], pipeline=self._pipeline, + teleop_control_pipeline=self._teleop_control_pipeline, plugins=self._cfg.plugins, oxr_handles=oxr_handles, ) @@ -436,6 +565,15 @@ def step( # pipeline contains ValueInput leaf nodes. external_inputs = self._build_external_inputs(anchor_world_matrix_fn, target_T_world) + # When no control pipeline is configured, host-initiated resets use + # the execution_events override as a fallback path. + execution_events = None + if self._pending_reset: + from isaacteleop.retargeting_engine.interface.execution_events import ExecutionEvents, ExecutionState + + execution_events = ExecutionEvents(reset=True, execution_state=ExecutionState.RUNNING) + self._pending_reset = False + # Execute one step of the teleop session. # If the underlying OpenXR session was destroyed externally (e.g. # user clicked "Stop AR"), the step call will fail. We catch the @@ -443,7 +581,10 @@ def step( # can continue rendering (or wait for the session to restart). assert self._session is not None # guaranteed by _try_start_session above try: - result = self._session.step(external_inputs=external_inputs) + result = self._session.step( + external_inputs=external_inputs, + execution_events=execution_events, + ) except Exception as e: logger.warning(f"IsaacTeleop session step failed (XR session likely torn down): {e}") self._teardown_dead_session() diff --git a/source/isaaclab_teleop/isaaclab_teleop/teleop_message_processor.py b/source/isaaclab_teleop/isaaclab_teleop/teleop_message_processor.py new file mode 100644 index 000000000000..1844925c4d0c --- /dev/null +++ b/source/isaaclab_teleop/isaaclab_teleop/teleop_message_processor.py @@ -0,0 +1,232 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Message-channel payload parser for TeleopCore's teleop_control_pipeline. + +Provides :class:`TeleopMessageProcessor`, a lightweight +:class:`~isaacteleop.retargeting_engine.interface.BaseRetargeter` that +converts message-channel payloads into boolean pulse signals suitable for +:class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager`. +""" + +from __future__ import annotations + +import json +import re +from typing import TYPE_CHECKING + +from isaacteleop.retargeting_engine.interface import BaseRetargeter, RetargeterIOType + +if TYPE_CHECKING: + from isaacteleop.retargeting_engine.interface.retargeter_core_types import ComputeContext, RetargeterIO + +_COMMAND_PATTERNS: list[tuple[re.Pattern[str], str]] = [ + (re.compile(r"\breset\b", re.IGNORECASE), "reset"), + (re.compile(r"\bstop\b", re.IGNORECASE), "stop"), + (re.compile(r"\bstart\b", re.IGNORECASE), "start"), +] +"""Ordered patterns for classifying a command string. + +``reset`` is checked first so that a hypothetical payload containing +both "reset" and "start" is treated as a reset (the more destructive +operation wins). ``stop`` precedes ``start`` for the same reason. +""" + +# Shadow states mirroring DefaultTeleopStateManager's ExecutionState. +_STOPPED = "stopped" +_PAUSED = "paused" +_RUNNING = "running" + +# DefaultTeleopStateManager cycles states on run_toggle rising edges: +# STOPPED -> PAUSED -> RUNNING -> PAUSED -> RUNNING -> ... +# To map imperative "start" (= go to RUNNING) and "stop" (= go to PAUSED) +# we emit the right number of toggle edges based on predicted state. +_START_TOGGLE_SEQUENCES: dict[str, list[bool]] = { + _STOPPED: [True, False, True], # 2 edges: STOPPED -> PAUSED -> RUNNING + _PAUSED: [True], # 1 edge: PAUSED -> RUNNING + _RUNNING: [], # already running +} +_STOP_TOGGLE_SEQUENCES: dict[str, list[bool]] = { + _RUNNING: [True], # 1 edge: RUNNING -> PAUSED + _PAUSED: [], # already paused + _STOPPED: [], # already stopped +} +# Shadow state advances on each rising edge (True after False). +_TOGGLE_TRANSITIONS: dict[str, str] = { + _STOPPED: _PAUSED, + _PAUSED: _RUNNING, + _RUNNING: _PAUSED, +} + + +class TeleopMessageProcessor(BaseRetargeter): + """Parse message-channel payloads into boolean control signals. + + Consumes the ``messages_tracked`` output of a + :class:`~isaacteleop.retargeting_engine.deviceio_source_nodes.MessageChannelSource` + and produces three boolean pulse outputs that drive + :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager`: + + * ``run_toggle`` -- pulsed ``True`` on rising edges; the number of + edges depends on the target state (e.g. ``"start"`` from STOPPED + emits two edges over three frames: STOPPED -> PAUSED -> RUNNING). + * ``kill`` -- always ``False`` (reserved for fail-safe; ``"stop"`` + uses ``run_toggle`` to reach PAUSED instead of STOPPED). + * ``reset`` -- pulsed ``True`` for one frame on ``"reset"``. + + The processor maintains a *shadow state* that mirrors + ``DefaultTeleopStateManager``'s internal state so it can emit the + correct toggle sequence for imperative commands. + + Payload formats supported: + + 1. **JSON (Quest client format)**:: + + {"type": "teleop_command", "message": {"command": "start teleop"}} + + 2. **Plain text (fallback)**: raw UTF-8 string matched by word boundary + (``"start"``, ``"stop"``, ``"reset"``). + + Host-initiated resets (e.g. environment success) are injected via + :meth:`inject_reset`, which sets the ``reset`` output ``True`` on the + next compute call without requiring a message-channel payload. + """ + + INPUT_MESSAGES = "messages_tracked" + + def __init__(self, name: str) -> None: + self._inject_reset_pending = False + self._shadow_state = _STOPPED + self._run_toggle_queue: list[bool] = [] + self._prev_toggle_output = False + super().__init__(name=name) + + def inject_reset(self) -> None: + """Schedule a reset pulse on the next pipeline step. + + The ``reset`` output will be ``True`` for exactly one frame, then + automatically cleared. + """ + self._inject_reset_pending = True + + def _make_toggle_sequence(self, base_sequence: list[bool]) -> list[bool]: + """Prepend a ``False`` frame if needed to guarantee a clean rising edge. + + ``DefaultTeleopStateManager`` uses edge detection + (``pressed and not prev_pressed``), so emitting ``True`` when the + previous output was already ``True`` would not trigger a state + transition. This method prepends ``False`` when necessary. + """ + if not base_sequence: + return [] + seq = list(base_sequence) + if self._prev_toggle_output: + seq.insert(0, False) + return seq + + def input_spec(self) -> RetargeterIOType: + from isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types import ( + MessageChannelMessagesTrackedGroup, + ) + + return {self.INPUT_MESSAGES: MessageChannelMessagesTrackedGroup()} + + def output_spec(self) -> RetargeterIOType: + from isaacteleop.teleop_session_manager.teleop_state_manager_types import bool_signal + + return { + "run_toggle": bool_signal("run_toggle"), + "kill": bool_signal("kill"), + "reset": bool_signal("reset"), + } + + def _compute_fn( + self, + inputs: RetargeterIO, + outputs: RetargeterIO, + context: ComputeContext, + ) -> None: + del context + + reset = self._inject_reset_pending + self._inject_reset_pending = False + + # Parse incoming messages and enqueue toggle sequences. + messages_tracked = inputs[self.INPUT_MESSAGES][0] + data = getattr(messages_tracked, "data", None) + if data: + for message in data: + payload = getattr(message, "payload", None) + if payload is None: + continue + try: + text = bytes(payload).decode("utf-8") + except (UnicodeDecodeError, TypeError): + continue + + command = _extract_command(text) + if command is None: + continue + + kind = _classify_command(command) + if kind == "start" and not self._run_toggle_queue: + self._run_toggle_queue = self._make_toggle_sequence(_START_TOGGLE_SEQUENCES[self._shadow_state]) + elif kind == "stop" and not self._run_toggle_queue: + self._run_toggle_queue = self._make_toggle_sequence(_STOP_TOGGLE_SEQUENCES[self._shadow_state]) + elif kind == "reset": + reset = True + + # Drain the toggle queue (one value per frame). + if self._run_toggle_queue: + run_toggle = self._run_toggle_queue.pop(0) + else: + run_toggle = False + + # Advance shadow state on rising edges (matches DefaultTeleopStateManager's + # edge detection: ``pressed and not prev_pressed``). + if run_toggle and not self._prev_toggle_output: + self._shadow_state = _TOGGLE_TRANSITIONS[self._shadow_state] + self._prev_toggle_output = run_toggle + + outputs["run_toggle"][0] = run_toggle + outputs["kill"][0] = False + outputs["reset"][0] = reset + + +def _classify_command(text: str) -> str | None: + """Return ``"start"``, ``"stop"``, ``"reset"``, or ``None``. + + Uses word-boundary matching so that e.g. ``"stop_and_restart"`` + matches ``"stop"`` (not ``"start"``). + """ + for pattern, label in _COMMAND_PATTERNS: + if pattern.search(text): + return label + return None + + +def _extract_command(text: str) -> str | None: + """Extract the command string from a JSON or plain-text payload. + + Tries JSON parsing first (Quest client format) and falls back to the + raw text for plain-string payloads. Non-string JSON scalars (numbers, + arrays, booleans) are discarded. + """ + try: + obj = json.loads(text) + except (json.JSONDecodeError, TypeError): + return text + + if not isinstance(obj, dict): + return None + if obj.get("type") != "teleop_command": + return None + + msg = obj.get("message") + if isinstance(msg, dict): + return msg.get("command", "") + if isinstance(msg, str): + return msg + return None diff --git a/source/isaaclab_teleop/test/test_cloudxr_lifecycle.py b/source/isaaclab_teleop/test/test_cloudxr_lifecycle.py index e9565a7d3e41..43131f70cfc3 100644 --- a/source/isaaclab_teleop/test/test_cloudxr_lifecycle.py +++ b/source/isaaclab_teleop/test/test_cloudxr_lifecycle.py @@ -38,8 +38,15 @@ "isaacteleop.oxr", "isaacteleop.retargeting_engine", "isaacteleop.retargeting_engine.interface", + "isaacteleop.retargeting_engine.interface.execution_events", + "isaacteleop.retargeting_engine.interface.retargeter_core_types", + "isaacteleop.retargeting_engine.interface.tensor_group_type", + "isaacteleop.retargeting_engine.deviceio_source_nodes", + "isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types", "isaacteleop.retargeting_engine_ui", "isaacteleop.teleop_session_manager", + "isaacteleop.teleop_session_manager.teleop_state_manager_retargeter", + "isaacteleop.teleop_session_manager.teleop_state_manager_types", "isaacsim", "isaacsim.kit", "isaacsim.kit.xr", @@ -85,6 +92,7 @@ def _make_cfg() -> IsaacTeleopCfg: """Build a minimal IsaacTeleopCfg with a dummy pipeline_builder.""" return IsaacTeleopCfg( pipeline_builder=lambda: MagicMock(), + control_channel_uuid=None, ) diff --git a/source/isaaclab_teleop/test/test_control_events.py b/source/isaaclab_teleop/test/test_control_events.py new file mode 100644 index 000000000000..8bc05d3f957c --- /dev/null +++ b/source/isaaclab_teleop/test/test_control_events.py @@ -0,0 +1,586 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +# pyright: reportPrivateUsage=none + +"""Tests for TeleopMessageProcessor, _classify_command, _extract_command, +and poll_control_events. + +These tests exercise pure logic (no Omniverse/Isaac Sim stack required). +The message processor is tested by calling its ``_compute_fn`` method +directly with fake pipeline I/O, mirroring how TeleopCore's +``teleop_control_pipeline`` mechanism invokes it. +""" + +from __future__ import annotations + +import dataclasses +import json +import sys +from types import ModuleType +from unittest.mock import MagicMock + +import pytest + +# --------------------------------------------------------------------------- +# Stub out isaacteleop modules before any isaaclab_teleop imports so the +# tests can run in a plain Python environment without Omniverse. +# --------------------------------------------------------------------------- + +_MODULES_TO_STUB = [ + "isaacteleop", + "isaacteleop.deviceio", + "isaacteleop.deviceio_trackers", + "isaacteleop.retargeting_engine", + "isaacteleop.retargeting_engine.deviceio_source_nodes", + "isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types", + "isaacteleop.retargeting_engine.interface", + "isaacteleop.retargeting_engine.interface.retargeter_core_types", + "isaacteleop.retargeting_engine.interface.tensor_group_type", + "isaacteleop.retargeting_engine_ui", + "isaacteleop.schema", + "isaacteleop.teleop_session_manager", + "isaacteleop.teleop_session_manager.teleop_state_manager_retargeter", + "isaacteleop.teleop_session_manager.teleop_state_manager_types", +] + +_stubs: dict[str, ModuleType | MagicMock] = {} + + +def _install_stubs(): + for name in _MODULES_TO_STUB: + if name not in sys.modules: + _stubs[name] = MagicMock() + sys.modules[name] = _stubs[name] + + from enum import Enum + + class ExecutionState(str, Enum): + UNKNOWN = "unknown" + STOPPED = "stopped" + PAUSED = "paused" + RUNNING = "running" + + @dataclasses.dataclass + class ExecutionEvents: + reset: bool = False + execution_state: ExecutionState = ExecutionState.UNKNOWN + + ee_mod = sys.modules["isaacteleop.retargeting_engine.interface.execution_events"] = ModuleType( + "isaacteleop.retargeting_engine.interface.execution_events" + ) + ee_mod.ExecutionState = ExecutionState # type: ignore[attr-defined] + ee_mod.ExecutionEvents = ExecutionEvents # type: ignore[attr-defined] + + iface = sys.modules["isaacteleop.retargeting_engine.interface"] + iface.ExecutionState = ExecutionState # type: ignore[attr-defined] + iface.ExecutionEvents = ExecutionEvents # type: ignore[attr-defined] + iface.RetargeterIOType = dict # type: ignore[attr-defined] + + class FakeBaseRetargeter: + def __init__(self, name: str) -> None: + self.name = name + + iface.BaseRetargeter = FakeBaseRetargeter # type: ignore[attr-defined] + + tsm_types = sys.modules["isaacteleop.teleop_session_manager.teleop_state_manager_types"] + tsm_types.bool_signal = MagicMock # type: ignore[attr-defined] + + dt_mod = sys.modules["isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types"] + dt_mod.MessageChannelMessagesTrackedGroup = MagicMock # type: ignore[attr-defined] + + +_install_stubs() + +from isaaclab_teleop.control_events import ControlEvents, poll_control_events # noqa: E402 +from isaaclab_teleop.teleop_message_processor import ( # noqa: E402 + TeleopMessageProcessor, + _classify_command, + _extract_command, +) + +# --------------------------------------------------------------------------- +# Test doubles for MessageChannelMessagesTrackedT +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass +class _FakePayload: + payload: bytes + + +@dataclasses.dataclass +class _FakeTracked: + data: list[_FakePayload] | None = None + + +def _tracked(*payloads: bytes) -> _FakeTracked: + """Build a lightweight stand-in for ``MessageChannelMessagesTrackedT``.""" + return _FakeTracked(data=[_FakePayload(p) for p in payloads]) + + +def _empty_tracked() -> _FakeTracked: + return _FakeTracked(data=[]) + + +def _null_tracked() -> _FakeTracked: + return _FakeTracked(data=None) + + +def _make_inputs(messages_tracked): + """Build a fake RetargeterIO dict for the processor.""" + tg = MagicMock() + tg.__getitem__ = MagicMock(return_value=messages_tracked) + return {TeleopMessageProcessor.INPUT_MESSAGES: tg} + + +class _FakeOutputSlot: + """Captures ``outputs["key"][0] = value`` assignments.""" + + def __init__(self): + self.value = None + + def __setitem__(self, idx, val): + self.value = val + + def __getitem__(self, idx): + return self.value + + +def _make_outputs(): + """Build a fake outputs dict with capturable slots.""" + return {"run_toggle": _FakeOutputSlot(), "kill": _FakeOutputSlot(), "reset": _FakeOutputSlot()} + + +def _step(proc, messages_tracked) -> dict: + """Run the processor's _compute_fn and return captured outputs.""" + inputs = _make_inputs(messages_tracked) + outputs = _make_outputs() + proc._compute_fn(inputs, outputs, context=None) + return {k: v.value for k, v in outputs.items()} + + +# =========================================================================== +# TeleopMessageProcessor: basic command parsing +# =========================================================================== + + +class TestStartCommand: + def test_start_sets_run_toggle(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"start")) + assert result["run_toggle"] is True + assert result["kill"] is False + assert result["reset"] is False + + def test_start_does_not_set_reset(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"start")) + assert result["reset"] is False + + +class TestStopCommand: + def test_stop_from_stopped_is_noop(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"stop")) + assert result["run_toggle"] is False + assert result["kill"] is False + + +class TestResetCommand: + def test_reset_sets_reset_flag(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"reset")) + assert result["reset"] is True + assert result["run_toggle"] is False + assert result["kill"] is False + + +class TestResetPulseBehaviour: + def test_reset_clears_on_next_step(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"reset")) + assert result["reset"] is True + + result = _step(proc, _empty_tracked()) + assert result["reset"] is False + + +class TestKillAlwaysFalse: + def test_kill_is_always_false(self): + proc = TeleopMessageProcessor(name="test") + for payload in [b"start", b"stop", b"reset", b"hello"]: + result = _step(proc, _tracked(payload)) + assert result["kill"] is False + + +# =========================================================================== +# Shadow state and toggle sequences +# =========================================================================== + + +class TestStartFromStopped: + """``start`` from STOPPED needs 2 toggle edges over 3 frames.""" + + def test_full_sequence_reaches_running(self): + proc = TeleopMessageProcessor(name="test") + # Frame 0: "start" received, first toggle edge queued + r0 = _step(proc, _tracked(b"start")) + assert r0["run_toggle"] is True # edge 1: STOPPED -> PAUSED + + # Frame 1: queue drains False (prev resets) + r1 = _step(proc, _empty_tracked()) + assert r1["run_toggle"] is False + + # Frame 2: queue drains True (second edge) + r2 = _step(proc, _empty_tracked()) + assert r2["run_toggle"] is True # edge 2: PAUSED -> RUNNING + + # Frame 3: queue empty, back to idle + r3 = _step(proc, _empty_tracked()) + assert r3["run_toggle"] is False + + def test_shadow_state_is_running_after_sequence(self): + proc = TeleopMessageProcessor(name="test") + _step(proc, _tracked(b"start")) + _step(proc, _empty_tracked()) + _step(proc, _empty_tracked()) + assert proc._shadow_state == "running" + + +class TestStartFromPaused: + """``start`` from PAUSED needs 1 toggle edge.""" + + def test_single_edge_reaches_running(self): + proc = TeleopMessageProcessor(name="test") + # Drive to RUNNING: start sequence plays 3 frames + _step(proc, _tracked(b"start")) + _step(proc, _empty_tracked()) + _step(proc, _empty_tracked()) + assert proc._shadow_state == "running" + + # Stop to reach PAUSED (prev_toggle is True from start sequence, + # so a False is prepended before the toggle edge) + _step(proc, _tracked(b"stop")) # drains False (prepended) + r_stop_edge = _step(proc, _empty_tracked()) # drains True (edge) + assert r_stop_edge["run_toggle"] is True + assert proc._shadow_state == "paused" + + # Start from PAUSED: prev_toggle is True, so False prepended + _step(proc, _tracked(b"start")) # drains False (prepended) + r_start_edge = _step(proc, _empty_tracked()) # drains True (edge) + assert r_start_edge["run_toggle"] is True + assert proc._shadow_state == "running" + + +class TestStartFromRunning: + """``start`` when already RUNNING is a no-op.""" + + def test_start_from_running_noop(self): + proc = TeleopMessageProcessor(name="test") + _step(proc, _tracked(b"start")) + _step(proc, _empty_tracked()) + _step(proc, _empty_tracked()) + assert proc._shadow_state == "running" + + result = _step(proc, _tracked(b"start")) + assert result["run_toggle"] is False + + +class TestStopFromRunning: + """``stop`` from RUNNING uses one toggle edge to reach PAUSED.""" + + def test_stop_pauses(self): + proc = TeleopMessageProcessor(name="test") + _step(proc, _tracked(b"start")) + _step(proc, _empty_tracked()) + _step(proc, _empty_tracked()) + assert proc._shadow_state == "running" + + # prev_toggle is True, so stop prepends False before the edge + r0 = _step(proc, _tracked(b"stop")) + assert r0["run_toggle"] is False # prepended False + r1 = _step(proc, _empty_tracked()) + assert r1["run_toggle"] is True # edge: RUNNING -> PAUSED + assert proc._shadow_state == "paused" + + +class TestStopFromPaused: + """``stop`` when already PAUSED is a no-op.""" + + def test_stop_from_paused_noop(self): + proc = TeleopMessageProcessor(name="test") + _step(proc, _tracked(b"start")) + _step(proc, _empty_tracked()) + _step(proc, _empty_tracked()) + # Stop to PAUSED + _step(proc, _tracked(b"stop")) + _step(proc, _empty_tracked()) + assert proc._shadow_state == "paused" + + result = _step(proc, _tracked(b"stop")) + assert result["run_toggle"] is False + + +class TestCommandDuringToggleSequence: + """Commands received while a toggle sequence is in progress are ignored.""" + + def test_second_start_during_sequence_ignored(self): + proc = TeleopMessageProcessor(name="test") + _step(proc, _tracked(b"start")) # starts the 3-frame sequence + # Second start during the sequence should not restart it + r1 = _step(proc, _tracked(b"start")) + assert r1["run_toggle"] is False # draining the False from queue + + r2 = _step(proc, _empty_tracked()) + assert r2["run_toggle"] is True # second edge fires normally + + +# =========================================================================== +# inject_reset +# =========================================================================== + + +class TestInjectReset: + def test_inject_reset_produces_pulse(self): + proc = TeleopMessageProcessor(name="test") + proc.inject_reset() + result = _step(proc, _empty_tracked()) + assert result["reset"] is True + + def test_inject_reset_clears_after_one_step(self): + proc = TeleopMessageProcessor(name="test") + proc.inject_reset() + _step(proc, _empty_tracked()) + result = _step(proc, _empty_tracked()) + assert result["reset"] is False + + def test_inject_reset_combines_with_message_reset(self): + proc = TeleopMessageProcessor(name="test") + proc.inject_reset() + result = _step(proc, _tracked(b"reset")) + assert result["reset"] is True + + def test_inject_reset_independent_of_toggle(self): + proc = TeleopMessageProcessor(name="test") + proc.inject_reset() + result = _step(proc, _tracked(b"start")) + assert result["run_toggle"] is True + assert result["reset"] is True + + +# =========================================================================== +# Word boundary matching +# =========================================================================== + + +class TestWordBoundaryMatching: + @pytest.mark.parametrize("payload", [b"teleop start", b"xr start session", b"start now"]) + def test_start_word(self, payload: bytes): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(payload)) + assert result["run_toggle"] is True + + @pytest.mark.parametrize("payload", [b"teleop reset", b"env reset"]) + def test_reset_word(self, payload: bytes): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(payload)) + assert result["reset"] is True + + +class TestAmbiguousPayloads: + def test_reset_wins_over_start(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"reset and start")) + assert result["reset"] is True + assert result["run_toggle"] is False + + +# =========================================================================== +# Empty, null, and malformed batches +# =========================================================================== + + +class TestEmptyAndNullBatches: + def test_empty_data_list(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _empty_tracked()) + assert result["run_toggle"] is False + assert result["kill"] is False + assert result["reset"] is False + + def test_null_data(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _null_tracked()) + assert result["run_toggle"] is False + + def test_none_input(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, None) + assert result["run_toggle"] is False + + +class TestMultipleMessagesInBatch: + def test_start_then_reset_in_one_batch(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"start", b"reset")) + assert result["run_toggle"] is True + assert result["reset"] is True + + +class TestMalformedPayloads: + def test_invalid_utf8(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"\xff\xfe")) + assert result["run_toggle"] is False + assert result["kill"] is False + assert result["reset"] is False + + def test_none_payload(self): + proc = TeleopMessageProcessor(name="test") + tracked = _FakeTracked(data=[_FakePayload(payload=None)]) # type: ignore[arg-type] + result = _step(proc, tracked) + assert result["run_toggle"] is False + + +# =========================================================================== +# JSON format tests (Quest client sends JSON teleop_command messages) +# =========================================================================== + + +def _json_command(command: str) -> bytes: + """Build a Quest-style JSON teleop_command payload.""" + return json.dumps({"type": "teleop_command", "message": {"command": command}}).encode("utf-8") + + +class TestJsonFormat: + def test_json_start_teleop(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(_json_command("start teleop"))) + assert result["run_toggle"] is True + + def test_json_stop_teleop_from_stopped_noop(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(_json_command("stop teleop"))) + assert result["run_toggle"] is False + + def test_json_reset_teleop(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(_json_command("reset teleop"))) + assert result["reset"] is True + + def test_json_wrong_type_ignored(self): + payload = json.dumps({"type": "other_event", "message": {"command": "start"}}).encode("utf-8") + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(payload)) + assert result["run_toggle"] is False + + def test_json_message_as_string(self): + payload = json.dumps({"type": "teleop_command", "message": "start teleop"}).encode("utf-8") + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(payload)) + assert result["run_toggle"] is True + + +# =========================================================================== +# _extract_command unit tests +# =========================================================================== + + +class TestExtractCommand: + def test_plain_text(self): + assert _extract_command("start teleop") == "start teleop" + + def test_json_teleop_command(self): + text = json.dumps({"type": "teleop_command", "message": {"command": "stop"}}) + assert _extract_command(text) == "stop" + + def test_json_wrong_type(self): + text = json.dumps({"type": "other", "message": {"command": "start"}}) + assert _extract_command(text) is None + + def test_json_no_message_key(self): + text = json.dumps({"type": "teleop_command"}) + assert _extract_command(text) is None + + def test_json_non_dict_value_returns_none(self): + assert _extract_command("42") is None + assert _extract_command("[1, 2, 3]") is None + assert _extract_command("true") is None + + +# =========================================================================== +# _classify_command unit tests +# =========================================================================== + + +class TestClassifyCommand: + def test_exact_words(self): + assert _classify_command("start") == "start" + assert _classify_command("stop") == "stop" + assert _classify_command("reset") == "reset" + + def test_word_boundary_prevents_false_match(self): + assert _classify_command("upstart") is None + assert _classify_command("nonstop") is None + assert _classify_command("unreset") is None + + def test_reset_beats_start(self): + assert _classify_command("reset and start") == "reset" + + def test_stop_beats_start(self): + assert _classify_command("stop and start") == "stop" + + def test_unrecognized_text(self): + assert _classify_command("hello world") is None + + def test_case_insensitive(self): + assert _classify_command("START") == "start" + assert _classify_command("Stop Teleop") == "stop" + assert _classify_command("RESET NOW") == "reset" + + +# =========================================================================== +# poll_control_events tests +# =========================================================================== + + +class TestPollControlEvents: + def test_plain_object_returns_default(self): + result = poll_control_events(object()) + assert result.is_active is None + assert result.should_reset is False + + def test_device_with_control_events(self): + class FakeDevice: + @property + def last_control_events(self): + return ControlEvents(is_active=True, should_reset=True) + + result = poll_control_events(FakeDevice()) + assert result.is_active is True + assert result.should_reset is True + + def test_device_with_none_events(self): + class FakeDevice: + last_control_events = None + + result = poll_control_events(FakeDevice()) + assert result.is_active is None + assert result.should_reset is False + + def test_duck_typed_snapshot(self): + class FakeSnapshot: + is_active = False + should_reset = True + + class FakeDevice: + @property + def last_control_events(self): + return FakeSnapshot() + + result = poll_control_events(FakeDevice()) + assert result.is_active is False + assert result.should_reset is True