From 1a87916729a1342f410de88ef8c9f7d17c6b3f21 Mon Sep 17 00:00:00 2001 From: Rafael Wiltz Date: Wed, 15 Apr 2026 16:40:15 -0400 Subject: [PATCH 1/2] Wire up teleop control states via message channel Add message-channel-based control (start/stop/reset) from the headset, with TeleopMessageProcessor for parsing payloads, ControlEvents dataclass for polling, and pipeline reset integration via ExecutionEvents. --- docs/source/features/isaac_teleop.rst | 106 ++++- .../teleoperation/teleop_se3_agent.py | 9 +- scripts/tools/record_demos.py | 45 +- source/isaaclab_teleop/config/extension.toml | 2 +- source/isaaclab_teleop/docs/CHANGELOG.rst | 45 ++ .../isaaclab_teleop/__init__.pyi | 9 +- .../isaaclab_teleop/command_handler.py | 72 +-- .../isaaclab_teleop/control_events.py | 80 ++++ .../isaaclab_teleop/isaac_teleop_cfg.py | 19 + .../isaaclab_teleop/isaac_teleop_device.py | 51 +- .../message_channel_state_manager.py | 162 +++++++ .../isaaclab_teleop/session_lifecycle.py | 118 ++++- .../test/test_cloudxr_lifecycle.py | 8 + .../test/test_control_events.py | 443 ++++++++++++++++++ 14 files changed, 1074 insertions(+), 95 deletions(-) create mode 100644 source/isaaclab_teleop/isaaclab_teleop/control_events.py create mode 100644 source/isaaclab_teleop/isaaclab_teleop/message_channel_state_manager.py create mode 100644 source/isaaclab_teleop/test/test_control_events.py diff --git a/docs/source/features/isaac_teleop.rst b/docs/source/features/isaac_teleop.rst index ac910d6ed4ed..325fba4d51c5 100644 --- a/docs/source/features/isaac_teleop.rst +++ b/docs/source/features/isaac_teleop.rst @@ -115,8 +115,10 @@ and Isaac Lab. It composes three collaborators: Isaac Sim's XR bridge, creates the ``TeleopSession``, and steps it each frame to produce an action tensor. -* **CommandHandler** -- registers and dispatches START / STOP / RESET callbacks triggered by XR UI - buttons or the message bus. +* **CommandHandler** -- lightweight callback registry for START / STOP / RESET commands. Scripts + can register callbacks via :meth:`~isaaclab_teleop.IsaacTeleopDevice.add_callback`, but the + primary control path uses :func:`~isaaclab_teleop.poll_control_events` (see + :ref:`isaac-teleop-control-states`). .. dropdown:: Session lifecycle details @@ -127,6 +129,102 @@ and Isaac Lab. It composes three collaborators: the session is not yet ready or has been torn down. +.. _isaac-teleop-control-states: + +Teleop Control States (Start / Stop / Reset) +--------------------------------------------- + +Isaac Lab supports remote teleop control commands -- **start**, **stop**, and **reset** -- sent +from the XR headset to the simulation. These are used to begin and end demonstration recording, +pause the robot, or reset the environment without touching the simulation host. + +How it works +~~~~~~~~~~~~ + +By default, every :class:`~isaaclab_teleop.IsaacTeleopCfg` enables a control message channel +using the well-known UUID ``uuid5(NAMESPACE_DNS, "teleop_command")``. The channel is created as +a ``teleop_control_pipeline`` inside TeleopCore's :class:`TeleopSession`, which means: + +1. A :class:`~isaacteleop.retargeting_engine.deviceio_source_nodes.MessageChannelSource` opens an + OpenXR opaque data channel (``XR_NV_opaque_data_channel``) with the agreed-upon UUID. +2. The CloudXR JS client (or any other client) discovers the channel by UUID and sends UTF-8 + JSON commands:: + + {"type": "teleop_command", "message": {"command": "start teleop"}} + {"type": "teleop_command", "message": {"command": "stop teleop"}} + {"type": "teleop_command", "message": {"command": "reset teleop"}} + +3. A :class:`~isaaclab_teleop.message_channel_state_manager.MessageChannelTeleopStateManager` + parses these payloads and produces ``teleop_state`` (one-hot: stopped / paused / running) and + ``reset_event`` (bool pulse) outputs. +4. TeleopCore decodes these outputs into ``ExecutionEvents`` and injects them into every + retargeter's ``ComputeContext``, so stateful retargeters can react to state changes + (e.g. reinitializing cross-step state on reset). + +Polling control events in your script +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Use :func:`~isaaclab_teleop.poll_control_events` to read the latest control state each frame: + +.. code-block:: python + + from isaaclab_teleop import poll_control_events + + with IsaacTeleopDevice(cfg) as device: + running = False + while sim_app.is_running(): + action = device.advance() + + ctrl = poll_control_events(device) + if ctrl.is_active is not None: + running = ctrl.is_active # True after "start", False after "stop" + if ctrl.should_reset: + env.reset() # "reset" command received this frame + + if action is not None and running: + env.step(action.repeat(num_envs, 1)) + else: + env.sim.render() + +:class:`~isaaclab_teleop.ControlEvents` has two fields: + +* ``is_active`` -- ``True`` after a "start" command, ``False`` after "stop", ``None`` when no + command has been received yet (callers should leave their own flag unchanged). +* ``should_reset`` -- ``True`` for exactly one frame after a "reset" command. + +Disabling the control channel +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If you do not need headset-driven start/stop/reset (e.g. keyboard-only workflows), set +``control_channel_uuid=None`` in your config: + +.. code-block:: python + + IsaacTeleopCfg( + pipeline_builder=_build_my_pipeline, + control_channel_uuid=None, # no opaque data channel created + ) + +Using a custom channel UUID +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To use a different channel UUID (e.g. for a separate control protocol), pass any 16-byte +``bytes`` value: + +.. code-block:: python + + import uuid + + MY_UUID = uuid.uuid5(uuid.NAMESPACE_DNS, "my_custom_control").bytes + + IsaacTeleopCfg( + pipeline_builder=_build_my_pipeline, + control_channel_uuid=MY_UUID, + ) + +The CloudXR JS client must be updated to discover this UUID when sending commands. + + .. _isaac-teleop-retargeting: Retargeting Framework @@ -908,6 +1006,10 @@ See the :ref:`isaaclab_teleop-api` for full class and function documentation: * :class:`~isaaclab_teleop.IsaacTeleopCfg` * :class:`~isaaclab_teleop.IsaacTeleopDevice` * :func:`~isaaclab_teleop.create_isaac_teleop_device` +* :class:`~isaaclab_teleop.ControlEvents` +* :class:`~isaaclab_teleop.SupportsControlEvents` +* :func:`~isaaclab_teleop.poll_control_events` +* :data:`~isaaclab_teleop.TELEOP_CONTROL_CHANNEL_UUID` * :class:`~isaaclab_teleop.XrCfg` * :class:`~isaaclab_teleop.XrAnchorRotationMode` diff --git a/scripts/environments/teleoperation/teleop_se3_agent.py b/scripts/environments/teleoperation/teleop_se3_agent.py index 897eb159e86e..cdd5c104c44f 100644 --- a/scripts/environments/teleoperation/teleop_se3_agent.py +++ b/scripts/environments/teleoperation/teleop_se3_agent.py @@ -218,7 +218,7 @@ def stop_teleoperation() -> None: try: if use_isaac_teleop: - from isaaclab_teleop import create_isaac_teleop_device + from isaaclab_teleop import create_isaac_teleop_device, poll_control_events teleop_interface = create_isaac_teleop_device( env_cfg.isaac_teleop, @@ -297,6 +297,13 @@ def run_loop(): # get device command action = teleop_interface.advance() + if use_isaac_teleop: + ctrl = poll_control_events(teleop_interface) + if ctrl.is_active is not None: + teleoperation_active = ctrl.is_active + if ctrl.should_reset: + should_reset_recording_instance = True + # action is None when IsaacTeleop session hasn't started yet # (e.g. waiting for user to click "Start AR") if action is None: diff --git a/scripts/tools/record_demos.py b/scripts/tools/record_demos.py index bd318b7a2625..c8484dc3b750 100644 --- a/scripts/tools/record_demos.py +++ b/scripts/tools/record_demos.py @@ -406,26 +406,34 @@ def process_success_condition(env: gym.Env, success_term: object | None, success def handle_reset( - env: gym.Env, success_step_count: int, instruction_display: InstructionDisplay, label_text: str + env: gym.Env, + success_step_count: int, + instruction_display: InstructionDisplay, + label_text: str, + teleop_interface: object | None = None, ) -> int: """Handle resetting the environment. - Resets the environment, recorder manager, and related state variables. - Updates the instruction display with current status. + Resets the environment, recorder manager, teleop device, and related + state variables. Updates the instruction display with current status. Args: - env: The environment instance to reset - success_step_count: Current count of consecutive successful steps - instruction_display: The display object to update - label_text: Text to display showing current recording status + env: The environment instance to reset. + success_step_count: Current count of consecutive successful steps. + instruction_display: The display object to update. + label_text: Text to display showing current recording status. + teleop_interface: Optional teleop device to reset (resets XR anchor + and retargeter cross-step state). Returns: - int: Reset success step count (0) + Reset success step count (0). """ print("Resetting environment...") env.sim.reset() env.recorder_manager.reset() env.reset() + if teleop_interface is not None and hasattr(teleop_interface, "reset"): + teleop_interface.reset() success_step_count = 0 instruction_display.show_demo(label_text) return success_step_count @@ -476,16 +484,16 @@ def stop_recording_instance(): running_recording_instance = False print("Recording paused") - # Set up teleoperation callbacks + # Set up teleoperation callbacks. For IsaacTeleop the primary control + # path is poll_control_events(); these callbacks are bridged automatically + # and also serve native (keyboard / spacemouse) devices. teleoperation_callbacks = { - "R": reset_recording_instance, "START": start_recording_instance, "STOP": stop_recording_instance, "RESET": reset_recording_instance, } teleop_interface = setup_teleop_device(teleoperation_callbacks, use_isaac_teleop) - teleop_interface.add_callback("R", reset_recording_instance) label_text = f"Recorded {current_recorded_demo_count} successful demonstrations." instruction_display = setup_ui(label_text, env) @@ -504,10 +512,21 @@ def inner_loop(): stack_name = "IsaacTeleop" if use_isaac_teleop else "native" print(f"{stack_name} recording started.") + if use_isaac_teleop: + from isaaclab_teleop import poll_control_events + with contextlib.suppress(KeyboardInterrupt), torch.inference_mode(): while simulation_app.is_running(): # Get teleop command (may be None while waiting for session start) action = teleop_interface.advance() + + if use_isaac_teleop: + ctrl = poll_control_events(teleop_interface) + if ctrl.is_active is not None: + running_recording_instance = ctrl.is_active + if ctrl.should_reset: + should_reset_recording_instance = True + if action is None: env.sim.render() continue @@ -558,7 +577,9 @@ def inner_loop(): # Handle reset if requested if should_reset_recording_instance: - success_step_count = handle_reset(env, success_step_count, instruction_display, label_text) + success_step_count = handle_reset( + env, success_step_count, instruction_display, label_text, teleop_interface + ) should_reset_recording_instance = False # Check if simulation is stopped diff --git a/source/isaaclab_teleop/config/extension.toml b/source/isaaclab_teleop/config/extension.toml index 881c57a52727..13c63e04ab99 100644 --- a/source/isaaclab_teleop/config/extension.toml +++ b/source/isaaclab_teleop/config/extension.toml @@ -1,6 +1,6 @@ [package] # Semantic Versioning is used: https://semver.org/ -version = "0.3.5" +version = "0.3.6" # Description title = "Isaac Lab Teleop" diff --git a/source/isaaclab_teleop/docs/CHANGELOG.rst b/source/isaaclab_teleop/docs/CHANGELOG.rst index d526500022c1..1092aef5a933 100644 --- a/source/isaaclab_teleop/docs/CHANGELOG.rst +++ b/source/isaaclab_teleop/docs/CHANGELOG.rst @@ -1,6 +1,51 @@ Changelog --------- +0.3.6 (2026-04-21) +~~~~~~~~~~~~~~~~~~~ + +Added +^^^^^ + +* Added :attr:`~isaaclab_teleop.IsaacTeleopCfg.control_channel_uuid` for + receiving teleop control commands (start/stop/reset) from the headset via + an OpenXR message channel. The channel is managed by TeleopCore's native + ``teleop_control_pipeline`` mechanism. + +* Added ``MessageChannelTeleopStateManager`` retargeter that converts raw + message-channel payloads into ``teleop_state`` and ``reset_event`` outputs + for the control pipeline. + +* Added :func:`~isaaclab_teleop.poll_control_events` helper, + :class:`~isaaclab_teleop.ControlEvents` dataclass, and + :class:`~isaaclab_teleop.SupportsControlEvents` protocol for polling + start/stop/reset signals from any teleop device in a single call. + +* Added :attr:`~isaaclab_teleop.IsaacTeleopDevice.last_control_events` + property exposing the most recent control events from the message channel. + Control events are automatically bridged to legacy + :meth:`~isaaclab_teleop.IsaacTeleopDevice.add_callback` callbacks. + +Changed +^^^^^^^ + +* :meth:`~isaaclab_teleop.IsaacTeleopDevice.reset` now injects a + ``reset`` :class:`ExecutionEvents` into TeleopCore's ``ComputeContext`` + on the next pipeline step, resetting retargeter cross-step state. + Previously only the XR anchor was reset. + +Fixed +^^^^^ + +* Fixed ``record_demos.py`` not resetting the teleop device when a + success condition triggers an environment reset. Retargeters now + reinitialize their state on success-triggered resets. + +* Fixed shutdown hang caused by Kit's pre-shutdown callback calling + ``stop()`` while the simulation loop was still running. The callback + now uses the same graceful teardown path as the XR-disabled handler. + + 0.3.5 (2026-04-06) ~~~~~~~~~~~~~~~~~~~ diff --git a/source/isaaclab_teleop/isaaclab_teleop/__init__.pyi b/source/isaaclab_teleop/isaaclab_teleop/__init__.pyi index 655c7025cb0f..045f16f0c690 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/__init__.pyi +++ b/source/isaaclab_teleop/isaaclab_teleop/__init__.pyi @@ -6,15 +6,20 @@ __all__ = [ "CLOUDXR_AVP_ENV", "CLOUDXR_JS_ENV", + "ControlEvents", "IsaacTeleopCfg", "IsaacTeleopDevice", - "create_isaac_teleop_device", - "XrAnchorSynchronizer", + "SupportsControlEvents", + "TELEOP_CONTROL_CHANNEL_UUID", "XrAnchorRotationMode", + "XrAnchorSynchronizer", "XrCfg", + "create_isaac_teleop_device", + "poll_control_events", "remove_camera_configs", ] +from .control_events import TELEOP_CONTROL_CHANNEL_UUID, ControlEvents, SupportsControlEvents, poll_control_events from .isaac_teleop_cfg import CLOUDXR_AVP_ENV, CLOUDXR_JS_ENV, IsaacTeleopCfg from .isaac_teleop_device import IsaacTeleopDevice, create_isaac_teleop_device from .xr_anchor_utils import XrAnchorSynchronizer diff --git a/source/isaaclab_teleop/isaaclab_teleop/command_handler.py b/source/isaaclab_teleop/isaaclab_teleop/command_handler.py index eb5fb38aeb44..7e999e638f5e 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/command_handler.py +++ b/source/isaaclab_teleop/isaaclab_teleop/command_handler.py @@ -3,57 +3,30 @@ # # SPDX-License-Identifier: BSD-3-Clause -"""Teleop command handling for IsaacTeleop-based teleoperation.""" +"""Teleop command callback registry for IsaacTeleop-based teleoperation.""" from __future__ import annotations -import logging from collections.abc import Callable -from typing import Any - -import carb - -logger = logging.getLogger(__name__) class CommandHandler: - """Handles teleop command callbacks and XR message bus events. - - This class is responsible for: - - 1. Registering callbacks for teleop commands (START, STOP, RESET) - 2. Subscribing to the XR message bus for command events - 3. Dispatching callbacks when commands are received - - Teleop commands can be triggered via XR controller buttons or the - message bus. The handler normalizes command names (e.g. mapping - ``"R"`` to ``"RESET"``) and dispatches to registered callbacks. + """Lightweight callback registry for teleop commands. + + Scripts can register callbacks for ``START``, ``STOP``, and ``RESET`` + commands via :meth:`add_callback`. The callbacks are dispatched by + :meth:`fire` when the corresponding command is received. + + Note: + In the current architecture control signals arrive through + TeleopCore's ``teleop_control_pipeline`` and are consumed via + :func:`~isaaclab_teleop.poll_control_events`. This registry is + retained for backward compatibility with scripts that register + callbacks before the pipeline-based path was introduced. """ - TELEOP_COMMAND_EVENT_TYPE = "teleop_command" - - def __init__(self, xr_core: Any | None = None, on_reset: Callable[[], None] | None = None): - """Initialize the command handler. - - Args: - xr_core: The XRCore singleton, or ``None`` if XR is not available. - When provided, the handler subscribes to the message bus for - teleop command events. - on_reset: Optional hook called whenever a ``"reset"`` message-bus - event is received, *in addition to* the user's RESET callback. - This allows the device to perform internal reset actions (e.g. - resetting the XR anchor) without coupling the handler to the - anchor manager. - """ + def __init__(self) -> None: self._callbacks: dict[str, Callable] = {} - self._on_reset = on_reset - self._xr_core = xr_core - self._vc_subscription = None - - if self._xr_core is not None: - self._vc_subscription = self._xr_core.get_message_bus().create_subscription_to_pop_by_type( - carb.events.type_from_string(self.TELEOP_COMMAND_EVENT_TYPE), self._on_teleop_command - ) @property def callbacks(self) -> dict[str, Callable]: @@ -70,7 +43,6 @@ def add_callback(self, key: str, func: Callable) -> None: func: The function to call when the command is received. Should take no arguments. """ - # Map "R" to "RESET" for compatibility with existing scripts if key == "R": key = "RESET" self._callbacks[key] = func @@ -84,19 +56,5 @@ def fire(self, command: str) -> None: if command in self._callbacks: self._callbacks[command]() - def _on_teleop_command(self, event: carb.events.IEvent) -> None: - """Handle teleop command events from the message bus.""" - msg = event.payload.get("message", "") - - if "start" in msg: - self.fire("START") - elif "stop" in msg: - self.fire("STOP") - elif "reset" in msg: - self.fire("RESET") - if self._on_reset is not None: - self._on_reset() - def cleanup(self) -> None: - """Release event subscriptions.""" - self._vc_subscription = None + """Release resources (no-op; retained for API compatibility).""" diff --git a/source/isaaclab_teleop/isaaclab_teleop/control_events.py b/source/isaaclab_teleop/isaaclab_teleop/control_events.py new file mode 100644 index 000000000000..790ccf63ebe1 --- /dev/null +++ b/source/isaaclab_teleop/isaaclab_teleop/control_events.py @@ -0,0 +1,80 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Teleop control events dataclass, polling helper, and well-known channel UUID.""" + +from __future__ import annotations + +import dataclasses +import uuid +from typing import Protocol, runtime_checkable + +TELEOP_CONTROL_CHANNEL_UUID: bytes = uuid.uuid5(uuid.NAMESPACE_DNS, "teleop_command").bytes +"""Well-known 16-byte UUID for the teleop control message channel. + +Derived deterministically as ``uuid5(NAMESPACE_DNS, "teleop_command")`` +so that both the Isaac Lab server and the Quest client can independently +compute the same channel identifier from the string ``"teleop_command"`` +(matching the carb event type name used by the legacy XRCore message bus). + +Pass this value as :attr:`~isaaclab_teleop.IsaacTeleopCfg.control_channel_uuid` +when configuring a teleop session with message-channel-based control. +""" + + +@dataclasses.dataclass(frozen=True) +class ControlEvents: + """Result of :func:`poll_control_events`. + + Attributes: + is_active: ``True`` if the control channel received a ``"start"`` + command, ``False`` after ``"stop"``, or ``None`` when no control + channel is configured (callers should leave their own active + flag unchanged). + should_reset: ``True`` when the control channel received a + ``"reset"`` command this frame. + """ + + is_active: bool | None = None + should_reset: bool = False + + +_NO_OP_EVENTS = ControlEvents() +"""Shared immutable sentinel returned when no control channel is active.""" + + +@runtime_checkable +class SupportsControlEvents(Protocol): + """Duck type for teleop devices that expose control events.""" + + @property + def last_control_events(self) -> ControlEvents: ... + + +def poll_control_events(teleop_interface: SupportsControlEvents | object) -> ControlEvents: + """Poll control events from any teleop interface. + + Safe to call with any device type (keyboard, spacemouse, etc.). + Devices that do not expose the message-channel protocol return + a no-op :class:`ControlEvents`. + + Args: + teleop_interface: The teleop device to poll. Devices implementing + :class:`SupportsControlEvents` provide full type safety; other + devices are handled gracefully via duck typing. + + Returns: + A :class:`ControlEvents` with the latest start/stop and reset + signals. + """ + events = getattr(teleop_interface, "last_control_events", None) + if events is None: + return _NO_OP_EVENTS + if isinstance(events, ControlEvents): + return events + return ControlEvents( + is_active=getattr(events, "is_active", None), + should_reset=getattr(events, "should_reset", False), + ) diff --git a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py index 6539fa67f346..45911f7cd5bc 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py +++ b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py @@ -14,6 +14,7 @@ from isaaclab.utils import configclass +from .control_events import TELEOP_CONTROL_CHANNEL_UUID from .xr_cfg import XrCfg _CLOUDXR_ENV_DIR = Path(__file__).resolve().parent @@ -117,6 +118,24 @@ def build_pipeline(): If ``None``, the tuning UI will not be opened. """ + control_channel_uuid: bytes | None = TELEOP_CONTROL_CHANNEL_UUID + """16-byte UUID for the teleop control message channel. + + Defaults to :data:`~isaaclab_teleop.TELEOP_CONTROL_CHANNEL_UUID` + (``uuid5(NAMESPACE_DNS, "teleop_command")``), which is the well-known + channel both the Isaac Lab server and CloudXR JS client use to + exchange start/stop/reset commands. + + When set, a ``teleop_control_pipeline`` with a + :class:`~isaaclab_teleop.message_channel_state_manager.MessageChannelTeleopStateManager` + is created automatically. The remote client sends UTF-8 control + commands over the OpenXR opaque data channel identified by this UUID, + and the results are exposed via + :func:`~isaaclab_teleop.poll_control_events`. + + Set to ``None`` to disable the control channel entirely. + """ + target_frame_prim_path: str | None = None """Optional USD prim path whose world frame becomes the target coordinate frame for all output poses. diff --git a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py index 2e7c2c7a406b..cd1e320e2cc9 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py +++ b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py @@ -15,6 +15,7 @@ import torch from .command_handler import CommandHandler +from .control_events import ControlEvents from .isaac_teleop_cfg import IsaacTeleopCfg from .session_lifecycle import TeleopSessionLifecycle from .xr_anchor_manager import XrAnchorManager @@ -35,8 +36,8 @@ class IsaacTeleopDevice: and coordinate-frame transform computation. * :class:`TeleopSessionLifecycle` -- pipeline building, OpenXR handle acquisition, session creation/destruction, and action-tensor extraction. - * :class:`CommandHandler` -- callback registration and XR message-bus - command dispatch. + * :class:`CommandHandler` -- callback registration for START / STOP / RESET + commands, bridged from the pipeline-based control events. Together they manage: @@ -120,18 +121,17 @@ def __init__( # Compose the three collaborators self._anchor_manager = XrAnchorManager(cfg.xr_cfg) + self._command_handler = CommandHandler() self._session_lifecycle = TeleopSessionLifecycle( cfg, cloudxr_env_file=cloudxr_env_file, auto_launch_cloudxr=auto_launch_cloudxr, ) - self._command_handler = CommandHandler( - xr_core=self._anchor_manager.xr_core, - on_reset=self._anchor_manager.reset, - ) # Controller button polling state (edge detection for right 'A') self._prev_right_a_pressed = False + # Track the last is_active value so callbacks only fire on edges + self._prev_control_is_active: bool | None = None def __del__(self): """Clean up resources when the object is destroyed.""" @@ -188,9 +188,23 @@ def __exit__(self, exc_type, exc_val, exc_tb): def reset(self) -> None: """Reset the device state. - Resets the XR anchor synchronizer if present. + Resets the XR anchor synchronizer and schedules a + ``reset`` :class:`~isaacteleop.retargeting_engine.interface.execution_events.ExecutionEvents` + for the next pipeline step so that all retargeters reinitialize + their cross-step state. """ self._anchor_manager.reset() + self._session_lifecycle.request_reset() + + @property + def last_control_events(self) -> ControlEvents: + """Control events from the most recent :meth:`advance`. + + Returns a :class:`ControlEvents` derived from messages received over + the control message channel. When no control channel is configured, + returns a default (no-op) :class:`ControlEvents`. + """ + return self._session_lifecycle.last_control_events def add_callback(self, key: str, func: Callable) -> None: """Add a callback function for teleop commands. @@ -252,8 +266,31 @@ def advance(self, target_T_world: np.ndarray | torch.Tensor | SupportsDLPack | N # Poll controller buttons (e.g. toggle anchor rotation on right 'A' press) self._poll_buttons() + # Bridge control events to legacy callbacks so scripts that registered + # via add_callback() still receive START / STOP / RESET dispatches. + self._dispatch_control_callbacks() + return action + # ------------------------------------------------------------------ + # Control event -> callback bridge + # ------------------------------------------------------------------ + + def _dispatch_control_callbacks(self) -> None: + """Fire legacy callbacks when control events indicate a state change. + + This bridges the pipeline-based :class:`ControlEvents` with the + callback-based :class:`CommandHandler` so that scripts which registered + callbacks via :meth:`add_callback` still receive dispatches. + """ + events = self._session_lifecycle.last_control_events + if events.should_reset: + self._command_handler.fire("RESET") + self._anchor_manager.reset() + if events.is_active is not None and events.is_active != self._prev_control_is_active: + self._command_handler.fire("START" if events.is_active else "STOP") + self._prev_control_is_active = events.is_active + # ------------------------------------------------------------------ # Target frame transform (config-driven rebase) # ------------------------------------------------------------------ diff --git a/source/isaaclab_teleop/isaaclab_teleop/message_channel_state_manager.py b/source/isaaclab_teleop/isaaclab_teleop/message_channel_state_manager.py new file mode 100644 index 000000000000..7ccf988e6b56 --- /dev/null +++ b/source/isaaclab_teleop/isaaclab_teleop/message_channel_state_manager.py @@ -0,0 +1,162 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Message-channel-based teleop state manager for TeleopCore's teleop_control_pipeline.""" + +from __future__ import annotations + +import json +import re +from typing import TYPE_CHECKING + +from isaacteleop.retargeting_engine.interface.execution_events import ExecutionEvents, ExecutionState +from isaacteleop.teleop_session_manager.teleop_state_manager_retargeter import TeleopStateManager + +from .control_events import ControlEvents + +if TYPE_CHECKING: + from isaacteleop.retargeting_engine.interface import RetargeterIOType + from isaacteleop.retargeting_engine.interface.retargeter_core_types import ComputeContext, RetargeterIO + +_COMMAND_PATTERNS: list[tuple[re.Pattern[str], str]] = [ + (re.compile(r"\breset\b", re.IGNORECASE), "reset"), + (re.compile(r"\bstop\b", re.IGNORECASE), "stop"), + (re.compile(r"\bstart\b", re.IGNORECASE), "start"), +] +"""Ordered patterns for classifying a command string. + +``reset`` is checked first so that a hypothetical payload containing +both "reset" and "start" is treated as a reset (the more destructive +operation wins). ``stop`` precedes ``start`` for the same reason. +""" + + +class MessageChannelTeleopStateManager(TeleopStateManager): + """Teleop state manager driven by message channel payloads. + + Consumes the ``messages_tracked`` output of a + :class:`~isaacteleop.retargeting_engine.deviceio_source_nodes.MessageChannelSource`, + parses the JSON/text payloads (``"start"``, ``"stop"``, ``"reset"``), and + produces the ``teleop_state`` (one-hot) and ``reset_event`` (bool pulse) + outputs required by TeleopCore's ``teleop_control_pipeline`` contract. + + Payload formats supported (same as the legacy carb message bus): + + 1. **JSON (Quest client format)**:: + + {"type": "teleop_command", "message": {"command": "start teleop"}} + + 2. **Plain text (fallback)**: raw UTF-8 string matched by word boundary + (``"start"``, ``"stop"``, ``"reset"``). + + The state machine maps commands to :class:`ExecutionState` as follows: + + * ``"start"`` -> :attr:`ExecutionState.RUNNING` + * ``"stop"`` -> :attr:`ExecutionState.PAUSED` + * ``"reset"`` -> no state change, emits ``reset=True`` pulse + """ + + INPUT_MESSAGES = "messages_tracked" + + def __init__(self, name: str) -> None: + self._state = ExecutionState.STOPPED + self._control_events = ControlEvents() + super().__init__(name=name) + + @property + def last_control_events(self) -> ControlEvents: + """The most recent :class:`ControlEvents` derived from message payloads.""" + return self._control_events + + def input_spec(self) -> RetargeterIOType: + from isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types import ( + MessageChannelMessagesTrackedGroup, + ) + + return { + self.INPUT_MESSAGES: MessageChannelMessagesTrackedGroup(), + } + + def _compute_execution_events( + self, + inputs: RetargeterIO, + context: ComputeContext, + ) -> ExecutionEvents: + del context + + reset = False + messages_tracked = inputs[self.INPUT_MESSAGES][0] + + data = getattr(messages_tracked, "data", None) + if data: + for message in data: + payload = getattr(message, "payload", None) + if payload is None: + continue + try: + text = bytes(payload).decode("utf-8") + except (UnicodeDecodeError, TypeError): + continue + + command = _extract_command(text) + if command is None: + continue + + kind = _classify_command(command) + if kind == "start": + self._state = ExecutionState.RUNNING + elif kind == "stop": + self._state = ExecutionState.PAUSED + elif kind == "reset": + reset = True + + is_active: bool | None + if self._state == ExecutionState.RUNNING: + is_active = True + elif self._state == ExecutionState.PAUSED: + is_active = False + else: + is_active = None + + self._control_events = ControlEvents(is_active=is_active, should_reset=reset) + + return ExecutionEvents(reset=reset, execution_state=self._state) + + +def _classify_command(text: str) -> str | None: + """Return ``"start"``, ``"stop"``, ``"reset"``, or ``None``. + + Uses word-boundary matching so that e.g. ``"stop_and_restart"`` + matches ``"stop"`` (not ``"start"``). + """ + for pattern, label in _COMMAND_PATTERNS: + if pattern.search(text): + return label + return None + + +def _extract_command(text: str) -> str | None: + """Extract the command string from a JSON or plain-text payload. + + Tries JSON parsing first (Quest client format) and falls back to the + raw text for plain-string payloads. Non-string JSON scalars (numbers, + arrays, booleans) are discarded. + """ + try: + obj = json.loads(text) + except (json.JSONDecodeError, TypeError): + return text + + if not isinstance(obj, dict): + return None + if obj.get("type") != "teleop_command": + return None + + msg = obj.get("message") + if isinstance(msg, dict): + return msg.get("command", "") + if isinstance(msg, str): + return msg + return None diff --git a/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py b/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py index d395fab31a30..d4d3e65aa2b0 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py +++ b/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py @@ -21,7 +21,9 @@ from isaacteleop.retargeting_engine_ui import MultiRetargeterTuningUIImGui from isaacteleop.teleop_session_manager import TeleopSession +from .control_events import _NO_OP_EVENTS, ControlEvents from .isaac_teleop_cfg import IsaacTeleopCfg +from .message_channel_state_manager import MessageChannelTeleopStateManager class SupportsDLPack(Protocol): @@ -118,8 +120,11 @@ def __init__( # Session state (populated during start) self._session: TeleopSession | None = None self._pipeline = None + self._teleop_control_pipeline = None + self._control_state_manager: MessageChannelTeleopStateManager | None = None self._last_right_controller = None self._session_start_deferred_logged = False + self._pending_reset = False # CloudXR runtime launcher (created in start if configured, stopped in stop) self._cloudxr_launcher: CloudXRLauncher | None = None @@ -192,6 +197,40 @@ def last_right_controller(self): """ return self._last_right_controller + @property + def has_control_channel(self) -> bool: + """Whether a message-channel-based control channel is configured.""" + return self._control_state_manager is not None + + @property + def last_control_events(self) -> ControlEvents: + """Control events from the most recent :meth:`step`. + + Returns a :class:`ControlEvents` derived from messages received over + the control channel. When no control channel is configured, returns + a default (no-op) :class:`ControlEvents`. + """ + if self._control_state_manager is not None: + return self._control_state_manager.last_control_events + return _NO_OP_EVENTS + + def request_reset(self) -> None: + """Schedule a reset ``ExecutionEvents`` for the next :meth:`step`. + + The reset is consumed exactly once: on the next ``step()`` call, + ``ExecutionEvents(reset=True)`` is injected into the + ``TeleopSession``, which propagates the reset to all retargeters + via ``ComputeContext``. Subsequent steps resume normal control + pipeline operation. + + If the control channel already processed a reset this frame + (headset-initiated), this method is a no-op to avoid injecting a + redundant second reset pulse on the following frame. + """ + if self._control_state_manager is not None and self._control_state_manager.last_control_events.should_reset: + return + self._pending_reset = True + # ------------------------------------------------------------------ # Lifecycle: start / stop # ------------------------------------------------------------------ @@ -222,12 +261,32 @@ def start(self) -> None: self._last_right_controller = None button_controllers = ControllersSource("_button_controllers") - self._pipeline = OutputCombiner( - { - "action": user_pipeline.output("action"), - self._CONTROLLER_RIGHT_KEY: button_controllers.output(ControllersSource.RIGHT), - } - ) + pipeline_outputs: dict = { + "action": user_pipeline.output("action"), + self._CONTROLLER_RIGHT_KEY: button_controllers.output(ControllersSource.RIGHT), + } + self._pipeline = OutputCombiner(pipeline_outputs) + + # Build optional teleop_control_pipeline for message-channel control + self._teleop_control_pipeline = None + self._control_state_manager = None + if self._cfg.control_channel_uuid is not None: + from isaacteleop.retargeting_engine.deviceio_source_nodes import message_channel_config + + # The sink (write-side) is unused; only the source is needed. + # message_channel_config wires source and sink internally, so + # discarding the sink reference does not affect the channel lifetime. + source, _sink = message_channel_config( + name="_teleop_control", + channel_uuid=self._cfg.control_channel_uuid, + ) + state_manager = MessageChannelTeleopStateManager(name="_teleop_control_state") + self._teleop_control_pipeline = state_manager.connect( + { + state_manager.INPUT_MESSAGES: source.output("messages_tracked"), + } + ) + self._control_state_manager = state_manager # Try to start the session now; it may be deferred self._try_start_session() @@ -270,6 +329,8 @@ def stop(self, exc_type=None, exc_val=None, exc_tb=None) -> None: logger.debug(f"Suppressed error during IsaacTeleop session cleanup: {e}") self._session = None self._pipeline = None + self._teleop_control_pipeline = None + self._control_state_manager = None if self._cloudxr_launcher is not None: try: @@ -285,15 +346,22 @@ def stop(self, exc_type=None, exc_val=None, exc_tb=None) -> None: def _on_request_required_extensions(self) -> list[str]: """Callback for required extensions subscription. + Inspects both the main pipeline and the ``teleop_control_pipeline`` + (if configured) so that extensions required by the control channel + (e.g. ``XR_NV_opaque_data_channel``) are included. + Returns: A list of required extensions. """ from isaacteleop.teleop_session_manager.helpers import get_required_oxr_extensions_from_pipeline - required_extensions = ( - get_required_oxr_extensions_from_pipeline(self._pipeline) if self._pipeline is not None else [] - ) + required_extensions: list[str] = [] + if self._pipeline is not None: + required_extensions.extend(get_required_oxr_extensions_from_pipeline(self._pipeline)) + if self._teleop_control_pipeline is not None: + required_extensions.extend(get_required_oxr_extensions_from_pipeline(self._teleop_control_pipeline)) + required_extensions = sorted(set(required_extensions)) logger.info(f"Required extensions: {required_extensions}") return required_extensions @@ -307,10 +375,16 @@ def _on_xr_enabled_changed(self, item, event_type): self._teardown_dead_session() def _on_pre_shutdown(self, _event): - """Called when Kit is closing; run full cleanup since the app is exiting.""" + """Called when Kit is closing; tear down the session but leave the + pipeline intact so the main loop can exit via its own control flow + (``simulation_app.is_running()`` will go ``False``). + + Full resource cleanup happens later when the context manager's + ``__exit__`` calls :meth:`stop`. + """ logger.info("Shutting down IsaacTeleop session due to Kit close") self._pre_shutdown_subscription = None - self.stop() + self._teardown_dead_session() # ------------------------------------------------------------------ # Deferred session creation @@ -371,6 +445,7 @@ def _try_start_session(self) -> bool: app_name=self._cfg.app_name, trackers=[], pipeline=self._pipeline, + teleop_control_pipeline=self._teleop_control_pipeline, plugins=self._cfg.plugins, oxr_handles=oxr_handles, ) @@ -436,14 +511,31 @@ def step( # pipeline contains ValueInput leaf nodes. external_inputs = self._build_external_inputs(anchor_world_matrix_fn, target_T_world) - # Execute one step of the teleop session. + # Execute one step of the teleop session. TeleopCore natively runs + # the teleop_control_pipeline (if configured), decodes the control + # outputs into ExecutionEvents, and injects them into ComputeContext. # If the underlying OpenXR session was destroyed externally (e.g. # user clicked "Stop AR"), the step call will fail. We catch the # error, tear down the dead session, and return None so the caller # can continue rendering (or wait for the session to restart). + # Consume pending host-initiated reset (e.g. env reset on success). + # Preserve the control-channel execution state so the injected reset + # does not accidentally override a headset-initiated PAUSED state. + execution_events = None + if self._pending_reset: + from isaacteleop.retargeting_engine.interface.execution_events import ExecutionEvents, ExecutionState + + ctrl = self.last_control_events + prev_state = ExecutionState.PAUSED if ctrl.is_active is False else ExecutionState.RUNNING + execution_events = ExecutionEvents(reset=True, execution_state=prev_state) + self._pending_reset = False + assert self._session is not None # guaranteed by _try_start_session above try: - result = self._session.step(external_inputs=external_inputs) + result = self._session.step( + external_inputs=external_inputs, + execution_events=execution_events, + ) except Exception as e: logger.warning(f"IsaacTeleop session step failed (XR session likely torn down): {e}") self._teardown_dead_session() diff --git a/source/isaaclab_teleop/test/test_cloudxr_lifecycle.py b/source/isaaclab_teleop/test/test_cloudxr_lifecycle.py index e9565a7d3e41..43131f70cfc3 100644 --- a/source/isaaclab_teleop/test/test_cloudxr_lifecycle.py +++ b/source/isaaclab_teleop/test/test_cloudxr_lifecycle.py @@ -38,8 +38,15 @@ "isaacteleop.oxr", "isaacteleop.retargeting_engine", "isaacteleop.retargeting_engine.interface", + "isaacteleop.retargeting_engine.interface.execution_events", + "isaacteleop.retargeting_engine.interface.retargeter_core_types", + "isaacteleop.retargeting_engine.interface.tensor_group_type", + "isaacteleop.retargeting_engine.deviceio_source_nodes", + "isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types", "isaacteleop.retargeting_engine_ui", "isaacteleop.teleop_session_manager", + "isaacteleop.teleop_session_manager.teleop_state_manager_retargeter", + "isaacteleop.teleop_session_manager.teleop_state_manager_types", "isaacsim", "isaacsim.kit", "isaacsim.kit.xr", @@ -85,6 +92,7 @@ def _make_cfg() -> IsaacTeleopCfg: """Build a minimal IsaacTeleopCfg with a dummy pipeline_builder.""" return IsaacTeleopCfg( pipeline_builder=lambda: MagicMock(), + control_channel_uuid=None, ) diff --git a/source/isaaclab_teleop/test/test_control_events.py b/source/isaaclab_teleop/test/test_control_events.py new file mode 100644 index 000000000000..0dfb6bfef036 --- /dev/null +++ b/source/isaaclab_teleop/test/test_control_events.py @@ -0,0 +1,443 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +# pyright: reportPrivateUsage=none + +"""Tests for MessageChannelTeleopStateManager, _classify_command, _extract_command, +and poll_control_events. + +These tests exercise pure logic (no Omniverse/Isaac Sim stack required). +The state manager is tested by calling its ``_compute_execution_events`` +method directly with fake pipeline I/O, mirroring how TeleopCore's +``teleop_control_pipeline`` mechanism invokes it. +""" + +from __future__ import annotations + +import dataclasses +import sys +from types import ModuleType +from unittest.mock import MagicMock + +import pytest + +# --------------------------------------------------------------------------- +# Stub out isaacteleop modules before any isaaclab_teleop imports so the +# tests can run in a plain Python environment without Omniverse. +# --------------------------------------------------------------------------- + +_MODULES_TO_STUB = [ + "isaacteleop", + "isaacteleop.deviceio", + "isaacteleop.deviceio_trackers", + "isaacteleop.retargeting_engine", + "isaacteleop.retargeting_engine.deviceio_source_nodes", + "isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types", + "isaacteleop.retargeting_engine.interface", + "isaacteleop.retargeting_engine.interface.retargeter_core_types", + "isaacteleop.retargeting_engine.interface.tensor_group_type", + "isaacteleop.retargeting_engine_ui", + "isaacteleop.schema", + "isaacteleop.teleop_session_manager", + "isaacteleop.teleop_session_manager.teleop_state_manager_retargeter", + "isaacteleop.teleop_session_manager.teleop_state_manager_types", +] + +_stubs: dict[str, ModuleType | MagicMock] = {} + + +def _install_stubs(): + for name in _MODULES_TO_STUB: + if name not in sys.modules: + _stubs[name] = MagicMock() + sys.modules[name] = _stubs[name] + + # Provide real ExecutionState and ExecutionEvents so state logic works. + from enum import Enum + + class ExecutionState(str, Enum): + UNKNOWN = "unknown" + STOPPED = "stopped" + PAUSED = "paused" + RUNNING = "running" + + @dataclasses.dataclass + class ExecutionEvents: + reset: bool = False + execution_state: ExecutionState = ExecutionState.UNKNOWN + + ee_mod = sys.modules["isaacteleop.retargeting_engine.interface.execution_events"] = ModuleType( + "isaacteleop.retargeting_engine.interface.execution_events" + ) + ee_mod.ExecutionState = ExecutionState # type: ignore[attr-defined] + ee_mod.ExecutionEvents = ExecutionEvents # type: ignore[attr-defined] + + # Make them available from the interface module too + iface = sys.modules["isaacteleop.retargeting_engine.interface"] + iface.ExecutionState = ExecutionState # type: ignore[attr-defined] + iface.ExecutionEvents = ExecutionEvents # type: ignore[attr-defined] + iface.RetargeterIOType = dict # type: ignore[attr-defined] + + # Provide a minimal TeleopStateManager base so the subclass can instantiate + class FakeTeleopStateManager: + def __init__(self, name: str) -> None: + self.name = name + + tsm_mod = sys.modules["isaacteleop.teleop_session_manager.teleop_state_manager_retargeter"] + tsm_mod.TeleopStateManager = FakeTeleopStateManager # type: ignore[attr-defined] + + # MessageChannelMessagesTrackedGroup stub + dt_mod = sys.modules["isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types"] + dt_mod.MessageChannelMessagesTrackedGroup = MagicMock # type: ignore[attr-defined] + + +_install_stubs() + +from isaaclab_teleop.control_events import ControlEvents, poll_control_events # noqa: E402 +from isaaclab_teleop.message_channel_state_manager import ( # noqa: E402 + MessageChannelTeleopStateManager, + _classify_command, + _extract_command, +) + +# Re-import after stubs so we can reference them in assertions. +from isaacteleop.retargeting_engine.interface.execution_events import ( # noqa: E402 + ExecutionEvents, + ExecutionState, +) + +# --------------------------------------------------------------------------- +# Test doubles for MessageChannelMessagesTrackedT +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass +class _FakePayload: + payload: bytes + + +@dataclasses.dataclass +class _FakeTracked: + data: list[_FakePayload] | None = None + + +def _tracked(*payloads: bytes) -> _FakeTracked: + """Build a lightweight stand-in for ``MessageChannelMessagesTrackedT``.""" + return _FakeTracked(data=[_FakePayload(p) for p in payloads]) + + +def _empty_tracked() -> _FakeTracked: + return _FakeTracked(data=[]) + + +def _null_tracked() -> _FakeTracked: + return _FakeTracked(data=None) + + +def _make_inputs(messages_tracked): + """Build a fake RetargeterIO dict for the state manager.""" + tg = MagicMock() + tg.__getitem__ = MagicMock(return_value=messages_tracked) + return {MessageChannelTeleopStateManager.INPUT_MESSAGES: tg} + + +def _step(mgr, messages_tracked) -> ExecutionEvents: + """Invoke the state manager's compute with fake inputs and return the events.""" + inputs = _make_inputs(messages_tracked) + return mgr._compute_execution_events(inputs, context=None) + + +# =========================================================================== +# MessageChannelTeleopStateManager tests +# =========================================================================== + + +class TestInitialState: + def test_defaults(self): + mgr = MessageChannelTeleopStateManager(name="test") + events = mgr.last_control_events + assert events.is_active is None + assert events.should_reset is False + + +class TestStartMessage: + def test_start_sets_running(self): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(b"start")) + assert result.execution_state == ExecutionState.RUNNING + assert mgr.last_control_events.is_active is True + + def test_start_does_not_set_reset(self): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(b"start")) + assert result.reset is False + assert mgr.last_control_events.should_reset is False + + +class TestStopMessage: + def test_stop_sets_paused(self): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(b"stop")) + assert result.execution_state == ExecutionState.PAUSED + assert mgr.last_control_events.is_active is False + + +class TestResetMessage: + def test_reset_sets_flags(self): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(b"reset")) + assert result.reset is True + assert mgr.last_control_events.should_reset is True + + def test_reset_does_not_change_active_state(self): + mgr = MessageChannelTeleopStateManager(name="test") + _step(mgr, _tracked(b"start")) + _step(mgr, _tracked(b"reset")) + assert mgr.last_control_events.is_active is True + + +class TestResetPulseBehaviour: + def test_should_reset_clears_on_next_step(self): + mgr = MessageChannelTeleopStateManager(name="test") + _step(mgr, _tracked(b"reset")) + assert mgr.last_control_events.should_reset is True + + _step(mgr, _empty_tracked()) + assert mgr.last_control_events.should_reset is False + + +class TestWordBoundaryMatching: + @pytest.mark.parametrize("payload", [b"teleop start", b"xr start session", b"start now"]) + def test_start_word(self, payload: bytes): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(payload)) + assert result.execution_state == ExecutionState.RUNNING + + @pytest.mark.parametrize("payload", [b"teleop stop", b"stop teleop"]) + def test_stop_word(self, payload: bytes): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(payload)) + assert result.execution_state == ExecutionState.PAUSED + + @pytest.mark.parametrize("payload", [b"teleop reset", b"env reset"]) + def test_reset_word(self, payload: bytes): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(payload)) + assert result.reset is True + + +class TestAmbiguousPayloads: + def test_stop_wins_over_start_when_both_present(self): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(b"stop and start")) + assert result.execution_state == ExecutionState.PAUSED + + def test_reset_wins_over_start_when_both_present(self): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(b"reset and start")) + assert result.reset is True + + +class TestEmptyAndNullBatches: + def test_empty_data_list(self): + mgr = MessageChannelTeleopStateManager(name="test") + _step(mgr, _tracked(b"start")) + _step(mgr, _empty_tracked()) + assert mgr.last_control_events.is_active is True + + def test_null_data(self): + mgr = MessageChannelTeleopStateManager(name="test") + _step(mgr, _null_tracked()) + assert mgr.last_control_events.is_active is None + + def test_none_input(self): + mgr = MessageChannelTeleopStateManager(name="test") + _step(mgr, None) + assert mgr.last_control_events.is_active is None + + +class TestMultipleMessagesInBatch: + def test_start_then_reset_in_one_batch(self): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(b"start", b"reset")) + assert mgr.last_control_events.is_active is True + assert result.reset is True + + +class TestSequentialStartStop: + def test_start_then_stop(self): + mgr = MessageChannelTeleopStateManager(name="test") + _step(mgr, _tracked(b"start")) + assert mgr.last_control_events.is_active is True + result = _step(mgr, _tracked(b"stop")) + assert mgr.last_control_events.is_active is False + assert result.execution_state == ExecutionState.PAUSED + + +class TestMalformedPayloads: + def test_invalid_utf8(self): + mgr = MessageChannelTeleopStateManager(name="test") + _step(mgr, _tracked(b"\xff\xfe")) + assert mgr.last_control_events.is_active is None + assert mgr.last_control_events.should_reset is False + + def test_none_payload(self): + mgr = MessageChannelTeleopStateManager(name="test") + tracked = _FakeTracked(data=[_FakePayload(payload=None)]) # type: ignore[arg-type] + _step(mgr, tracked) + assert mgr.last_control_events.is_active is None + + +# =========================================================================== +# JSON format tests (Quest client sends JSON teleop_command messages) +# =========================================================================== + + +def _json_command(command: str) -> bytes: + """Build a Quest-style JSON teleop_command payload.""" + import json + + return json.dumps({"type": "teleop_command", "message": {"command": command}}).encode("utf-8") + + +class TestJsonFormat: + def test_json_start_teleop(self): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(_json_command("start teleop"))) + assert result.execution_state == ExecutionState.RUNNING + + def test_json_stop_teleop(self): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(_json_command("stop teleop"))) + assert result.execution_state == ExecutionState.PAUSED + + def test_json_reset_teleop(self): + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(_json_command("reset teleop"))) + assert result.reset is True + + def test_json_wrong_type_ignored(self): + import json + + payload = json.dumps({"type": "other_event", "message": {"command": "start"}}).encode("utf-8") + mgr = MessageChannelTeleopStateManager(name="test") + _step(mgr, _tracked(payload)) + assert mgr.last_control_events.is_active is None + + def test_json_message_as_string(self): + import json + + payload = json.dumps({"type": "teleop_command", "message": "start teleop"}).encode("utf-8") + mgr = MessageChannelTeleopStateManager(name="test") + result = _step(mgr, _tracked(payload)) + assert result.execution_state == ExecutionState.RUNNING + + +# =========================================================================== +# _extract_command unit tests +# =========================================================================== + + +class TestExtractCommand: + def test_plain_text(self): + assert _extract_command("start teleop") == "start teleop" + + def test_json_teleop_command(self): + import json + + text = json.dumps({"type": "teleop_command", "message": {"command": "stop"}}) + assert _extract_command(text) == "stop" + + def test_json_wrong_type(self): + import json + + text = json.dumps({"type": "other", "message": {"command": "start"}}) + assert _extract_command(text) is None + + def test_json_no_message_key(self): + import json + + text = json.dumps({"type": "teleop_command"}) + assert _extract_command(text) is None + + def test_json_non_dict_value_returns_none(self): + assert _extract_command("42") is None + assert _extract_command("[1, 2, 3]") is None + assert _extract_command("true") is None + + +# =========================================================================== +# _classify_command unit tests +# =========================================================================== + + +class TestClassifyCommand: + def test_exact_words(self): + assert _classify_command("start") == "start" + assert _classify_command("stop") == "stop" + assert _classify_command("reset") == "reset" + + def test_word_boundary_prevents_false_match(self): + assert _classify_command("upstart") is None + assert _classify_command("nonstop") is None + assert _classify_command("unreset") is None + + def test_reset_beats_start(self): + assert _classify_command("reset and start") == "reset" + + def test_stop_beats_start(self): + assert _classify_command("stop and start") == "stop" + + def test_unrecognized_text(self): + assert _classify_command("hello world") is None + + def test_case_insensitive(self): + assert _classify_command("START") == "start" + assert _classify_command("Stop Teleop") == "stop" + assert _classify_command("RESET NOW") == "reset" + + +# =========================================================================== +# poll_control_events tests +# =========================================================================== + + +class TestPollControlEvents: + def test_plain_object_returns_default(self): + result = poll_control_events(object()) + assert result.is_active is None + assert result.should_reset is False + + def test_device_with_control_events(self): + class FakeDevice: + @property + def last_control_events(self): + return ControlEvents(is_active=True, should_reset=True) + + result = poll_control_events(FakeDevice()) + assert result.is_active is True + assert result.should_reset is True + + def test_device_with_none_events(self): + class FakeDevice: + last_control_events = None + + result = poll_control_events(FakeDevice()) + assert result.is_active is None + assert result.should_reset is False + + def test_duck_typed_snapshot(self): + class FakeSnapshot: + is_active = False + should_reset = True + + class FakeDevice: + @property + def last_control_events(self): + return FakeSnapshot() + + result = poll_control_events(FakeDevice()) + assert result.is_active is False + assert result.should_reset is True From 2ba0cdc4a1d7114b29121d71b4cad6e9fe8c960d Mon Sep 17 00:00:00 2001 From: Rafael Wiltz Date: Tue, 21 Apr 2026 21:26:22 -0400 Subject: [PATCH 2/2] Use the isaac teleop default state manager --- docs/source/features/isaac_teleop.rst | 10 +- scripts/tools/record_demos.py | 1 + source/isaaclab_teleop/docs/CHANGELOG.rst | 6 +- .../isaaclab_teleop/control_events.py | 10 +- .../isaaclab_teleop/isaac_teleop_cfg.py | 10 +- .../isaaclab_teleop/isaac_teleop_device.py | 28 +- .../message_channel_state_manager.py | 162 ------- .../isaaclab_teleop/session_lifecycle.py | 183 +++++--- .../teleop_message_processor.py | 232 ++++++++++ .../test/test_control_events.py | 431 ++++++++++++------ 10 files changed, 671 insertions(+), 402 deletions(-) delete mode 100644 source/isaaclab_teleop/isaaclab_teleop/message_channel_state_manager.py create mode 100644 source/isaaclab_teleop/isaaclab_teleop/teleop_message_processor.py diff --git a/docs/source/features/isaac_teleop.rst b/docs/source/features/isaac_teleop.rst index 325fba4d51c5..733150259586 100644 --- a/docs/source/features/isaac_teleop.rst +++ b/docs/source/features/isaac_teleop.rst @@ -154,10 +154,12 @@ a ``teleop_control_pipeline`` inside TeleopCore's :class:`TeleopSession`, which {"type": "teleop_command", "message": {"command": "stop teleop"}} {"type": "teleop_command", "message": {"command": "reset teleop"}} -3. A :class:`~isaaclab_teleop.message_channel_state_manager.MessageChannelTeleopStateManager` - parses these payloads and produces ``teleop_state`` (one-hot: stopped / paused / running) and - ``reset_event`` (bool pulse) outputs. -4. TeleopCore decodes these outputs into ``ExecutionEvents`` and injects them into every +3. A :class:`~isaaclab_teleop.teleop_message_processor.TeleopMessageProcessor` parses these + payloads and produces boolean pulse signals (``run_toggle``, ``kill``, ``reset``). +4. :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager` consumes the + boolean signals, runs its state machine (edge detection, fail-safe), and produces + ``teleop_state`` (one-hot) and ``reset_event`` (bool pulse) outputs. +5. TeleopCore decodes these outputs into ``ExecutionEvents`` and injects them into every retargeter's ``ComputeContext``, so stateful retargeters can react to state changes (e.g. reinitializing cross-step state on reset). diff --git a/scripts/tools/record_demos.py b/scripts/tools/record_demos.py index c8484dc3b750..75df9e0ee92a 100644 --- a/scripts/tools/record_demos.py +++ b/scripts/tools/record_demos.py @@ -488,6 +488,7 @@ def stop_recording_instance(): # path is poll_control_events(); these callbacks are bridged automatically # and also serve native (keyboard / spacemouse) devices. teleoperation_callbacks = { + "R": reset_recording_instance, "START": start_recording_instance, "STOP": stop_recording_instance, "RESET": reset_recording_instance, diff --git a/source/isaaclab_teleop/docs/CHANGELOG.rst b/source/isaaclab_teleop/docs/CHANGELOG.rst index 1092aef5a933..9ad0bf77ecd2 100644 --- a/source/isaaclab_teleop/docs/CHANGELOG.rst +++ b/source/isaaclab_teleop/docs/CHANGELOG.rst @@ -12,9 +12,9 @@ Added an OpenXR message channel. The channel is managed by TeleopCore's native ``teleop_control_pipeline`` mechanism. -* Added ``MessageChannelTeleopStateManager`` retargeter that converts raw - message-channel payloads into ``teleop_state`` and ``reset_event`` outputs - for the control pipeline. +* Added :class:`~isaaclab_teleop.teleop_message_processor.TeleopMessageProcessor` + retargeter that converts raw message-channel payloads into boolean control + signals for :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager`. * Added :func:`~isaaclab_teleop.poll_control_events` helper, :class:`~isaaclab_teleop.ControlEvents` dataclass, and diff --git a/source/isaaclab_teleop/isaaclab_teleop/control_events.py b/source/isaaclab_teleop/isaaclab_teleop/control_events.py index 790ccf63ebe1..69ddd7c1ed21 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/control_events.py +++ b/source/isaaclab_teleop/isaaclab_teleop/control_events.py @@ -16,8 +16,7 @@ Derived deterministically as ``uuid5(NAMESPACE_DNS, "teleop_command")`` so that both the Isaac Lab server and the Quest client can independently -compute the same channel identifier from the string ``"teleop_command"`` -(matching the carb event type name used by the legacy XRCore message bus). +compute the same channel identifier from the string ``"teleop_command"``. Pass this value as :attr:`~isaaclab_teleop.IsaacTeleopCfg.control_channel_uuid` when configuring a teleop session with message-channel-based control. @@ -29,12 +28,11 @@ class ControlEvents: """Result of :func:`poll_control_events`. Attributes: - is_active: ``True`` if the control channel received a ``"start"`` - command, ``False`` after ``"stop"``, or ``None`` when no control + is_active: ``True`` when the teleop state machine is in RUNNING, + ``False`` when PAUSED or STOPPED, or ``None`` when no control channel is configured (callers should leave their own active flag unchanged). - should_reset: ``True`` when the control channel received a - ``"reset"`` command this frame. + should_reset: ``True`` when a reset was triggered this frame. """ is_active: bool | None = None diff --git a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py index 45911f7cd5bc..f94a63d57589 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py +++ b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_cfg.py @@ -126,11 +126,11 @@ def build_pipeline(): channel both the Isaac Lab server and CloudXR JS client use to exchange start/stop/reset commands. - When set, a ``teleop_control_pipeline`` with a - :class:`~isaaclab_teleop.message_channel_state_manager.MessageChannelTeleopStateManager` - is created automatically. The remote client sends UTF-8 control - commands over the OpenXR opaque data channel identified by this UUID, - and the results are exposed via + When set, a ``teleop_control_pipeline`` is created automatically + using :class:`~isaaclab_teleop.teleop_message_processor.TeleopMessageProcessor` + and :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager`. + The remote client sends UTF-8 control commands over the OpenXR opaque + data channel identified by this UUID, and the results are exposed via :func:`~isaaclab_teleop.poll_control_events`. Set to ``None`` to disable the control channel entirely. diff --git a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py index cd1e320e2cc9..3f8c565a7e21 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py +++ b/source/isaaclab_teleop/isaaclab_teleop/isaac_teleop_device.py @@ -68,7 +68,8 @@ class IsaacTeleopDevice: Teleop commands: The device supports callbacks for START, STOP, and RESET commands - that can be triggered via XR controller buttons or the message bus. + that can be triggered via the message-channel control pipeline or + registered directly via :meth:`add_callback`. Example: .. code-block:: python @@ -119,7 +120,6 @@ def __init__( """ self._cfg = cfg - # Compose the three collaborators self._anchor_manager = XrAnchorManager(cfg.xr_cfg) self._command_handler = CommandHandler() self._session_lifecycle = TeleopSessionLifecycle( @@ -128,9 +128,7 @@ def __init__( auto_launch_cloudxr=auto_launch_cloudxr, ) - # Controller button polling state (edge detection for right 'A') self._prev_right_a_pressed = False - # Track the last is_active value so callbacks only fire on edges self._prev_control_is_active: bool | None = None def __del__(self): @@ -200,9 +198,9 @@ def reset(self) -> None: def last_control_events(self) -> ControlEvents: """Control events from the most recent :meth:`advance`. - Returns a :class:`ControlEvents` derived from messages received over - the control message channel. When no control channel is configured, - returns a default (no-op) :class:`ControlEvents`. + Returns a :class:`ControlEvents` derived from the teleop control + pipeline. When no control channel is configured, returns a + default (no-op) :class:`ControlEvents`. """ return self._session_lifecycle.last_control_events @@ -266,8 +264,6 @@ def advance(self, target_T_world: np.ndarray | torch.Tensor | SupportsDLPack | N # Poll controller buttons (e.g. toggle anchor rotation on right 'A' press) self._poll_buttons() - # Bridge control events to legacy callbacks so scripts that registered - # via add_callback() still receive START / STOP / RESET dispatches. self._dispatch_control_callbacks() return action @@ -282,13 +278,23 @@ def _dispatch_control_callbacks(self) -> None: This bridges the pipeline-based :class:`ControlEvents` with the callback-based :class:`CommandHandler` so that scripts which registered callbacks via :meth:`add_callback` still receive dispatches. + + Only fires START/STOP when ``is_active`` transitions between ``True`` + and ``False``; initial transitions from ``None`` are ignored to avoid + spurious callbacks during ``DefaultTeleopStateManager``'s + STOPPED -> PAUSED progression. """ + from .control_events import _NO_OP_EVENTS + events = self._session_lifecycle.last_control_events + if events is _NO_OP_EVENTS: + return if events.should_reset: self._command_handler.fire("RESET") self._anchor_manager.reset() - if events.is_active is not None and events.is_active != self._prev_control_is_active: - self._command_handler.fire("START" if events.is_active else "STOP") + if events.is_active is not None: + if self._prev_control_is_active is not None and events.is_active != self._prev_control_is_active: + self._command_handler.fire("START" if events.is_active else "STOP") self._prev_control_is_active = events.is_active # ------------------------------------------------------------------ diff --git a/source/isaaclab_teleop/isaaclab_teleop/message_channel_state_manager.py b/source/isaaclab_teleop/isaaclab_teleop/message_channel_state_manager.py deleted file mode 100644 index 7ccf988e6b56..000000000000 --- a/source/isaaclab_teleop/isaaclab_teleop/message_channel_state_manager.py +++ /dev/null @@ -1,162 +0,0 @@ -# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). -# All rights reserved. -# -# SPDX-License-Identifier: BSD-3-Clause - -"""Message-channel-based teleop state manager for TeleopCore's teleop_control_pipeline.""" - -from __future__ import annotations - -import json -import re -from typing import TYPE_CHECKING - -from isaacteleop.retargeting_engine.interface.execution_events import ExecutionEvents, ExecutionState -from isaacteleop.teleop_session_manager.teleop_state_manager_retargeter import TeleopStateManager - -from .control_events import ControlEvents - -if TYPE_CHECKING: - from isaacteleop.retargeting_engine.interface import RetargeterIOType - from isaacteleop.retargeting_engine.interface.retargeter_core_types import ComputeContext, RetargeterIO - -_COMMAND_PATTERNS: list[tuple[re.Pattern[str], str]] = [ - (re.compile(r"\breset\b", re.IGNORECASE), "reset"), - (re.compile(r"\bstop\b", re.IGNORECASE), "stop"), - (re.compile(r"\bstart\b", re.IGNORECASE), "start"), -] -"""Ordered patterns for classifying a command string. - -``reset`` is checked first so that a hypothetical payload containing -both "reset" and "start" is treated as a reset (the more destructive -operation wins). ``stop`` precedes ``start`` for the same reason. -""" - - -class MessageChannelTeleopStateManager(TeleopStateManager): - """Teleop state manager driven by message channel payloads. - - Consumes the ``messages_tracked`` output of a - :class:`~isaacteleop.retargeting_engine.deviceio_source_nodes.MessageChannelSource`, - parses the JSON/text payloads (``"start"``, ``"stop"``, ``"reset"``), and - produces the ``teleop_state`` (one-hot) and ``reset_event`` (bool pulse) - outputs required by TeleopCore's ``teleop_control_pipeline`` contract. - - Payload formats supported (same as the legacy carb message bus): - - 1. **JSON (Quest client format)**:: - - {"type": "teleop_command", "message": {"command": "start teleop"}} - - 2. **Plain text (fallback)**: raw UTF-8 string matched by word boundary - (``"start"``, ``"stop"``, ``"reset"``). - - The state machine maps commands to :class:`ExecutionState` as follows: - - * ``"start"`` -> :attr:`ExecutionState.RUNNING` - * ``"stop"`` -> :attr:`ExecutionState.PAUSED` - * ``"reset"`` -> no state change, emits ``reset=True`` pulse - """ - - INPUT_MESSAGES = "messages_tracked" - - def __init__(self, name: str) -> None: - self._state = ExecutionState.STOPPED - self._control_events = ControlEvents() - super().__init__(name=name) - - @property - def last_control_events(self) -> ControlEvents: - """The most recent :class:`ControlEvents` derived from message payloads.""" - return self._control_events - - def input_spec(self) -> RetargeterIOType: - from isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types import ( - MessageChannelMessagesTrackedGroup, - ) - - return { - self.INPUT_MESSAGES: MessageChannelMessagesTrackedGroup(), - } - - def _compute_execution_events( - self, - inputs: RetargeterIO, - context: ComputeContext, - ) -> ExecutionEvents: - del context - - reset = False - messages_tracked = inputs[self.INPUT_MESSAGES][0] - - data = getattr(messages_tracked, "data", None) - if data: - for message in data: - payload = getattr(message, "payload", None) - if payload is None: - continue - try: - text = bytes(payload).decode("utf-8") - except (UnicodeDecodeError, TypeError): - continue - - command = _extract_command(text) - if command is None: - continue - - kind = _classify_command(command) - if kind == "start": - self._state = ExecutionState.RUNNING - elif kind == "stop": - self._state = ExecutionState.PAUSED - elif kind == "reset": - reset = True - - is_active: bool | None - if self._state == ExecutionState.RUNNING: - is_active = True - elif self._state == ExecutionState.PAUSED: - is_active = False - else: - is_active = None - - self._control_events = ControlEvents(is_active=is_active, should_reset=reset) - - return ExecutionEvents(reset=reset, execution_state=self._state) - - -def _classify_command(text: str) -> str | None: - """Return ``"start"``, ``"stop"``, ``"reset"``, or ``None``. - - Uses word-boundary matching so that e.g. ``"stop_and_restart"`` - matches ``"stop"`` (not ``"start"``). - """ - for pattern, label in _COMMAND_PATTERNS: - if pattern.search(text): - return label - return None - - -def _extract_command(text: str) -> str | None: - """Extract the command string from a JSON or plain-text payload. - - Tries JSON parsing first (Quest client format) and falls back to the - raw text for plain-string payloads. Non-string JSON scalars (numbers, - arrays, booleans) are discarded. - """ - try: - obj = json.loads(text) - except (json.JSONDecodeError, TypeError): - return text - - if not isinstance(obj, dict): - return None - if obj.get("type") != "teleop_command": - return None - - msg = obj.get("message") - if isinstance(msg, dict): - return msg.get("command", "") - if isinstance(msg, str): - return msg - return None diff --git a/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py b/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py index d4d3e65aa2b0..fa5f36f658e0 100644 --- a/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py +++ b/source/isaaclab_teleop/isaaclab_teleop/session_lifecycle.py @@ -18,12 +18,13 @@ if TYPE_CHECKING: from isaacteleop.cloudxr import CloudXRLauncher from isaacteleop.oxr import OpenXRSessionHandles + from isaacteleop.retargeting_engine.interface.execution_events import ExecutionEvents from isaacteleop.retargeting_engine_ui import MultiRetargeterTuningUIImGui from isaacteleop.teleop_session_manager import TeleopSession from .control_events import _NO_OP_EVENTS, ControlEvents from .isaac_teleop_cfg import IsaacTeleopCfg -from .message_channel_state_manager import MessageChannelTeleopStateManager +from .teleop_message_processor import TeleopMessageProcessor class SupportsDLPack(Protocol): @@ -73,6 +74,19 @@ def _to_numpy_4x4(mat: np.ndarray | torch.Tensor | SupportsDLPack) -> np.ndarray return np.asarray(mat, dtype=np.float32) +def _execution_events_to_control(ee: ExecutionEvents) -> ControlEvents: + """Map TeleopCore :class:`ExecutionEvents` to the script-facing :class:`ControlEvents`.""" + from isaacteleop.retargeting_engine.interface.execution_events import ExecutionState + + if ee.execution_state == ExecutionState.RUNNING: + is_active: bool | None = True + elif ee.execution_state in (ExecutionState.PAUSED, ExecutionState.STOPPED): + is_active = False + else: + is_active = None + return ControlEvents(is_active=is_active, should_reset=ee.reset) + + class TeleopSessionLifecycle: """Manages the IsaacTeleop session lifecycle. @@ -80,11 +94,13 @@ class TeleopSessionLifecycle: 1. Building the retargeting pipeline from configuration 2. Adding a parallel ``ControllersSource`` for button-state access - 3. Acquiring OpenXR handles from Kit's XR bridge extension - 4. Creating, entering, and exiting the ``TeleopSession`` - 5. Building external inputs for pipeline leaf nodes (e.g. world-to-anchor transform) - 6. Stepping the session and extracting the flattened action tensor - 7. Managing the optional retargeting tuning UI + 3. Building the optional ``teleop_control_pipeline`` for headset-driven + start/stop/reset via a message channel + 4. Acquiring OpenXR handles from Kit's XR bridge extension + 5. Creating, entering, and exiting the ``TeleopSession`` + 6. Building external inputs for pipeline leaf nodes (e.g. world-to-anchor transform) + 7. Stepping the session and extracting the flattened action tensor + 8. Managing the optional retargeting tuning UI """ WORLD_T_ANCHOR_INPUT_NAME = "world_T_anchor" @@ -121,9 +137,10 @@ def __init__( self._session: TeleopSession | None = None self._pipeline = None self._teleop_control_pipeline = None - self._control_state_manager: MessageChannelTeleopStateManager | None = None + self._message_processor: TeleopMessageProcessor | None = None self._last_right_controller = None self._session_start_deferred_logged = False + # Fallback for host-initiated resets when no control pipeline is configured self._pending_reset = False # CloudXR runtime launcher (created in start if configured, stopped in stop) @@ -199,37 +216,45 @@ def last_right_controller(self): @property def has_control_channel(self) -> bool: - """Whether a message-channel-based control channel is configured.""" - return self._control_state_manager is not None + """Whether a message-channel-based control pipeline is configured.""" + return self._message_processor is not None @property def last_control_events(self) -> ControlEvents: """Control events from the most recent :meth:`step`. - Returns a :class:`ControlEvents` derived from messages received over - the control channel. When no control channel is configured, returns - a default (no-op) :class:`ControlEvents`. + When a ``teleop_control_pipeline`` is configured, derives + :class:`ControlEvents` from + ``session.last_context.execution_events``. Otherwise returns a + default (no-op) :class:`ControlEvents`. """ - if self._control_state_manager is not None: - return self._control_state_manager.last_control_events - return _NO_OP_EVENTS + if self._message_processor is None: + return _NO_OP_EVENTS + if self._session is None: + return _NO_OP_EVENTS + ctx = self._session.last_context + if ctx is None: + return _NO_OP_EVENTS + return _execution_events_to_control(ctx.execution_events) def request_reset(self) -> None: - """Schedule a reset ``ExecutionEvents`` for the next :meth:`step`. + """Schedule a reset for the next pipeline step. - The reset is consumed exactly once: on the next ``step()`` call, - ``ExecutionEvents(reset=True)`` is injected into the - ``TeleopSession``, which propagates the reset to all retargeters - via ``ComputeContext``. Subsequent steps resume normal control - pipeline operation. + When a control pipeline is configured, the reset flows through + :meth:`TeleopMessageProcessor.inject_reset` so + :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager` + processes it normally. Otherwise falls back to an + ``execution_events`` override on the next :meth:`step` call. - If the control channel already processed a reset this frame - (headset-initiated), this method is a no-op to avoid injecting a - redundant second reset pulse on the following frame. + If the control channel already processed a reset this frame, + this method is a no-op to avoid a redundant second reset pulse. """ - if self._control_state_manager is not None and self._control_state_manager.last_control_events.should_reset: + if self.last_control_events.should_reset: return - self._pending_reset = True + if self._message_processor is not None: + self._message_processor.inject_reset() + else: + self._pending_reset = True # ------------------------------------------------------------------ # Lifecycle: start / stop @@ -242,9 +267,10 @@ def start(self) -> None: the CloudXR runtime and WSS proxy are launched first. Builds the retargeting pipeline, wraps it with a parallel - ``ControllersSource`` for button-state access, attempts to acquire - OpenXR handles, and opens the retargeting tuning UI if retargeters - are configured. + ``ControllersSource`` for button-state access, builds the optional + ``teleop_control_pipeline`` for message-channel control, attempts + to acquire OpenXR handles, and opens the retargeting tuning UI if + retargeters are configured. If the OpenXR handles are not yet available (e.g. user hasn't clicked "Start AR"), session creation is deferred and will be retried on each @@ -261,7 +287,7 @@ def start(self) -> None: self._last_right_controller = None button_controllers = ControllersSource("_button_controllers") - pipeline_outputs: dict = { + pipeline_outputs: dict[str, Any] = { "action": user_pipeline.output("action"), self._CONTROLLER_RIGHT_KEY: button_controllers.output(ControllersSource.RIGHT), } @@ -269,24 +295,11 @@ def start(self) -> None: # Build optional teleop_control_pipeline for message-channel control self._teleop_control_pipeline = None - self._control_state_manager = None + self._message_processor = None if self._cfg.control_channel_uuid is not None: - from isaacteleop.retargeting_engine.deviceio_source_nodes import message_channel_config - - # The sink (write-side) is unused; only the source is needed. - # message_channel_config wires source and sink internally, so - # discarding the sink reference does not affect the channel lifetime. - source, _sink = message_channel_config( - name="_teleop_control", - channel_uuid=self._cfg.control_channel_uuid, - ) - state_manager = MessageChannelTeleopStateManager(name="_teleop_control_state") - self._teleop_control_pipeline = state_manager.connect( - { - state_manager.INPUT_MESSAGES: source.output("messages_tracked"), - } + self._teleop_control_pipeline, self._message_processor = self._build_control_pipeline( + self._cfg.control_channel_uuid ) - self._control_state_manager = state_manager # Try to start the session now; it may be deferred self._try_start_session() @@ -328,9 +341,12 @@ def stop(self, exc_type=None, exc_val=None, exc_tb=None) -> None: # expected and safe to suppress. logger.debug(f"Suppressed error during IsaacTeleop session cleanup: {e}") self._session = None - self._pipeline = None - self._teleop_control_pipeline = None - self._control_state_manager = None + + # Always clear pipeline state (session may never have been created if + # OpenXR handles were never available). + self._pipeline = None + self._teleop_control_pipeline = None + self._message_processor = None if self._cloudxr_launcher is not None: try: @@ -343,6 +359,49 @@ def stop(self, exc_type=None, exc_val=None, exc_tb=None) -> None: logger.info("IsaacTeleop session ended") + # ------------------------------------------------------------------ + # Control pipeline construction + # ------------------------------------------------------------------ + + @staticmethod + def _build_control_pipeline(channel_uuid: bytes) -> tuple[Any, TeleopMessageProcessor]: + """Build a ``teleop_control_pipeline`` from a message channel UUID. + + Wires ``MessageChannelSource`` -> :class:`TeleopMessageProcessor` + -> :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager`. + + Args: + channel_uuid: 16-byte UUID for the OpenXR opaque data channel. + + Returns: + A ``(teleop_control_pipeline, message_processor)`` tuple. + """ + from isaacteleop.retargeting_engine.deviceio_source_nodes import message_channel_config + from isaacteleop.teleop_session_manager import DefaultTeleopStateManager + + source, _sink = message_channel_config( + name="_teleop_control", + channel_uuid=channel_uuid, + ) + + processor = TeleopMessageProcessor(name="_teleop_msg_processor") + processor_graph = processor.connect({processor.INPUT_MESSAGES: source.output("messages_tracked")}) + + state_manager = DefaultTeleopStateManager(name="_teleop_state") + teleop_control_pipeline = state_manager.connect( + { + state_manager.INPUT_KILL: processor_graph.output("kill"), + state_manager.INPUT_RUN_TOGGLE: processor_graph.output("run_toggle"), + state_manager.INPUT_RESET: processor_graph.output("reset"), + } + ) + + return teleop_control_pipeline, processor + + # ------------------------------------------------------------------ + # Extension / XR lifecycle callbacks + # ------------------------------------------------------------------ + def _on_request_required_extensions(self) -> list[str]: """Callback for required extensions subscription. @@ -415,11 +474,6 @@ def _try_start_session(self) -> bool: if self._session is not None: return True - # In headless mode the AR profile setting is deliberately omitted - # from the .kit file so that all extensions (including the teleop - # bridge and its BridgeComponent) can load and register before Kit - # creates the OpenXR instance. We enable it here, after extensions - # are loaded; Kit will process the change on the next event-loop tick. self._ensure_xr_ar_profile_enabled() from isaacteleop.oxr import OpenXRSessionHandles @@ -511,25 +565,20 @@ def step( # pipeline contains ValueInput leaf nodes. external_inputs = self._build_external_inputs(anchor_world_matrix_fn, target_T_world) - # Execute one step of the teleop session. TeleopCore natively runs - # the teleop_control_pipeline (if configured), decodes the control - # outputs into ExecutionEvents, and injects them into ComputeContext. - # If the underlying OpenXR session was destroyed externally (e.g. - # user clicked "Stop AR"), the step call will fail. We catch the - # error, tear down the dead session, and return None so the caller - # can continue rendering (or wait for the session to restart). - # Consume pending host-initiated reset (e.g. env reset on success). - # Preserve the control-channel execution state so the injected reset - # does not accidentally override a headset-initiated PAUSED state. + # When no control pipeline is configured, host-initiated resets use + # the execution_events override as a fallback path. execution_events = None if self._pending_reset: from isaacteleop.retargeting_engine.interface.execution_events import ExecutionEvents, ExecutionState - ctrl = self.last_control_events - prev_state = ExecutionState.PAUSED if ctrl.is_active is False else ExecutionState.RUNNING - execution_events = ExecutionEvents(reset=True, execution_state=prev_state) + execution_events = ExecutionEvents(reset=True, execution_state=ExecutionState.RUNNING) self._pending_reset = False + # Execute one step of the teleop session. + # If the underlying OpenXR session was destroyed externally (e.g. + # user clicked "Stop AR"), the step call will fail. We catch the + # error, tear down the dead session, and return None so the caller + # can continue rendering (or wait for the session to restart). assert self._session is not None # guaranteed by _try_start_session above try: result = self._session.step( diff --git a/source/isaaclab_teleop/isaaclab_teleop/teleop_message_processor.py b/source/isaaclab_teleop/isaaclab_teleop/teleop_message_processor.py new file mode 100644 index 000000000000..1844925c4d0c --- /dev/null +++ b/source/isaaclab_teleop/isaaclab_teleop/teleop_message_processor.py @@ -0,0 +1,232 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Message-channel payload parser for TeleopCore's teleop_control_pipeline. + +Provides :class:`TeleopMessageProcessor`, a lightweight +:class:`~isaacteleop.retargeting_engine.interface.BaseRetargeter` that +converts message-channel payloads into boolean pulse signals suitable for +:class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager`. +""" + +from __future__ import annotations + +import json +import re +from typing import TYPE_CHECKING + +from isaacteleop.retargeting_engine.interface import BaseRetargeter, RetargeterIOType + +if TYPE_CHECKING: + from isaacteleop.retargeting_engine.interface.retargeter_core_types import ComputeContext, RetargeterIO + +_COMMAND_PATTERNS: list[tuple[re.Pattern[str], str]] = [ + (re.compile(r"\breset\b", re.IGNORECASE), "reset"), + (re.compile(r"\bstop\b", re.IGNORECASE), "stop"), + (re.compile(r"\bstart\b", re.IGNORECASE), "start"), +] +"""Ordered patterns for classifying a command string. + +``reset`` is checked first so that a hypothetical payload containing +both "reset" and "start" is treated as a reset (the more destructive +operation wins). ``stop`` precedes ``start`` for the same reason. +""" + +# Shadow states mirroring DefaultTeleopStateManager's ExecutionState. +_STOPPED = "stopped" +_PAUSED = "paused" +_RUNNING = "running" + +# DefaultTeleopStateManager cycles states on run_toggle rising edges: +# STOPPED -> PAUSED -> RUNNING -> PAUSED -> RUNNING -> ... +# To map imperative "start" (= go to RUNNING) and "stop" (= go to PAUSED) +# we emit the right number of toggle edges based on predicted state. +_START_TOGGLE_SEQUENCES: dict[str, list[bool]] = { + _STOPPED: [True, False, True], # 2 edges: STOPPED -> PAUSED -> RUNNING + _PAUSED: [True], # 1 edge: PAUSED -> RUNNING + _RUNNING: [], # already running +} +_STOP_TOGGLE_SEQUENCES: dict[str, list[bool]] = { + _RUNNING: [True], # 1 edge: RUNNING -> PAUSED + _PAUSED: [], # already paused + _STOPPED: [], # already stopped +} +# Shadow state advances on each rising edge (True after False). +_TOGGLE_TRANSITIONS: dict[str, str] = { + _STOPPED: _PAUSED, + _PAUSED: _RUNNING, + _RUNNING: _PAUSED, +} + + +class TeleopMessageProcessor(BaseRetargeter): + """Parse message-channel payloads into boolean control signals. + + Consumes the ``messages_tracked`` output of a + :class:`~isaacteleop.retargeting_engine.deviceio_source_nodes.MessageChannelSource` + and produces three boolean pulse outputs that drive + :class:`~isaacteleop.teleop_session_manager.DefaultTeleopStateManager`: + + * ``run_toggle`` -- pulsed ``True`` on rising edges; the number of + edges depends on the target state (e.g. ``"start"`` from STOPPED + emits two edges over three frames: STOPPED -> PAUSED -> RUNNING). + * ``kill`` -- always ``False`` (reserved for fail-safe; ``"stop"`` + uses ``run_toggle`` to reach PAUSED instead of STOPPED). + * ``reset`` -- pulsed ``True`` for one frame on ``"reset"``. + + The processor maintains a *shadow state* that mirrors + ``DefaultTeleopStateManager``'s internal state so it can emit the + correct toggle sequence for imperative commands. + + Payload formats supported: + + 1. **JSON (Quest client format)**:: + + {"type": "teleop_command", "message": {"command": "start teleop"}} + + 2. **Plain text (fallback)**: raw UTF-8 string matched by word boundary + (``"start"``, ``"stop"``, ``"reset"``). + + Host-initiated resets (e.g. environment success) are injected via + :meth:`inject_reset`, which sets the ``reset`` output ``True`` on the + next compute call without requiring a message-channel payload. + """ + + INPUT_MESSAGES = "messages_tracked" + + def __init__(self, name: str) -> None: + self._inject_reset_pending = False + self._shadow_state = _STOPPED + self._run_toggle_queue: list[bool] = [] + self._prev_toggle_output = False + super().__init__(name=name) + + def inject_reset(self) -> None: + """Schedule a reset pulse on the next pipeline step. + + The ``reset`` output will be ``True`` for exactly one frame, then + automatically cleared. + """ + self._inject_reset_pending = True + + def _make_toggle_sequence(self, base_sequence: list[bool]) -> list[bool]: + """Prepend a ``False`` frame if needed to guarantee a clean rising edge. + + ``DefaultTeleopStateManager`` uses edge detection + (``pressed and not prev_pressed``), so emitting ``True`` when the + previous output was already ``True`` would not trigger a state + transition. This method prepends ``False`` when necessary. + """ + if not base_sequence: + return [] + seq = list(base_sequence) + if self._prev_toggle_output: + seq.insert(0, False) + return seq + + def input_spec(self) -> RetargeterIOType: + from isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types import ( + MessageChannelMessagesTrackedGroup, + ) + + return {self.INPUT_MESSAGES: MessageChannelMessagesTrackedGroup()} + + def output_spec(self) -> RetargeterIOType: + from isaacteleop.teleop_session_manager.teleop_state_manager_types import bool_signal + + return { + "run_toggle": bool_signal("run_toggle"), + "kill": bool_signal("kill"), + "reset": bool_signal("reset"), + } + + def _compute_fn( + self, + inputs: RetargeterIO, + outputs: RetargeterIO, + context: ComputeContext, + ) -> None: + del context + + reset = self._inject_reset_pending + self._inject_reset_pending = False + + # Parse incoming messages and enqueue toggle sequences. + messages_tracked = inputs[self.INPUT_MESSAGES][0] + data = getattr(messages_tracked, "data", None) + if data: + for message in data: + payload = getattr(message, "payload", None) + if payload is None: + continue + try: + text = bytes(payload).decode("utf-8") + except (UnicodeDecodeError, TypeError): + continue + + command = _extract_command(text) + if command is None: + continue + + kind = _classify_command(command) + if kind == "start" and not self._run_toggle_queue: + self._run_toggle_queue = self._make_toggle_sequence(_START_TOGGLE_SEQUENCES[self._shadow_state]) + elif kind == "stop" and not self._run_toggle_queue: + self._run_toggle_queue = self._make_toggle_sequence(_STOP_TOGGLE_SEQUENCES[self._shadow_state]) + elif kind == "reset": + reset = True + + # Drain the toggle queue (one value per frame). + if self._run_toggle_queue: + run_toggle = self._run_toggle_queue.pop(0) + else: + run_toggle = False + + # Advance shadow state on rising edges (matches DefaultTeleopStateManager's + # edge detection: ``pressed and not prev_pressed``). + if run_toggle and not self._prev_toggle_output: + self._shadow_state = _TOGGLE_TRANSITIONS[self._shadow_state] + self._prev_toggle_output = run_toggle + + outputs["run_toggle"][0] = run_toggle + outputs["kill"][0] = False + outputs["reset"][0] = reset + + +def _classify_command(text: str) -> str | None: + """Return ``"start"``, ``"stop"``, ``"reset"``, or ``None``. + + Uses word-boundary matching so that e.g. ``"stop_and_restart"`` + matches ``"stop"`` (not ``"start"``). + """ + for pattern, label in _COMMAND_PATTERNS: + if pattern.search(text): + return label + return None + + +def _extract_command(text: str) -> str | None: + """Extract the command string from a JSON or plain-text payload. + + Tries JSON parsing first (Quest client format) and falls back to the + raw text for plain-string payloads. Non-string JSON scalars (numbers, + arrays, booleans) are discarded. + """ + try: + obj = json.loads(text) + except (json.JSONDecodeError, TypeError): + return text + + if not isinstance(obj, dict): + return None + if obj.get("type") != "teleop_command": + return None + + msg = obj.get("message") + if isinstance(msg, dict): + return msg.get("command", "") + if isinstance(msg, str): + return msg + return None diff --git a/source/isaaclab_teleop/test/test_control_events.py b/source/isaaclab_teleop/test/test_control_events.py index 0dfb6bfef036..8bc05d3f957c 100644 --- a/source/isaaclab_teleop/test/test_control_events.py +++ b/source/isaaclab_teleop/test/test_control_events.py @@ -5,18 +5,19 @@ # pyright: reportPrivateUsage=none -"""Tests for MessageChannelTeleopStateManager, _classify_command, _extract_command, +"""Tests for TeleopMessageProcessor, _classify_command, _extract_command, and poll_control_events. These tests exercise pure logic (no Omniverse/Isaac Sim stack required). -The state manager is tested by calling its ``_compute_execution_events`` -method directly with fake pipeline I/O, mirroring how TeleopCore's +The message processor is tested by calling its ``_compute_fn`` method +directly with fake pipeline I/O, mirroring how TeleopCore's ``teleop_control_pipeline`` mechanism invokes it. """ from __future__ import annotations import dataclasses +import json import sys from types import ModuleType from unittest.mock import MagicMock @@ -54,7 +55,6 @@ def _install_stubs(): _stubs[name] = MagicMock() sys.modules[name] = _stubs[name] - # Provide real ExecutionState and ExecutionEvents so state logic works. from enum import Enum class ExecutionState(str, Enum): @@ -74,21 +74,20 @@ class ExecutionEvents: ee_mod.ExecutionState = ExecutionState # type: ignore[attr-defined] ee_mod.ExecutionEvents = ExecutionEvents # type: ignore[attr-defined] - # Make them available from the interface module too iface = sys.modules["isaacteleop.retargeting_engine.interface"] iface.ExecutionState = ExecutionState # type: ignore[attr-defined] iface.ExecutionEvents = ExecutionEvents # type: ignore[attr-defined] iface.RetargeterIOType = dict # type: ignore[attr-defined] - # Provide a minimal TeleopStateManager base so the subclass can instantiate - class FakeTeleopStateManager: + class FakeBaseRetargeter: def __init__(self, name: str) -> None: self.name = name - tsm_mod = sys.modules["isaacteleop.teleop_session_manager.teleop_state_manager_retargeter"] - tsm_mod.TeleopStateManager = FakeTeleopStateManager # type: ignore[attr-defined] + iface.BaseRetargeter = FakeBaseRetargeter # type: ignore[attr-defined] + + tsm_types = sys.modules["isaacteleop.teleop_session_manager.teleop_state_manager_types"] + tsm_types.bool_signal = MagicMock # type: ignore[attr-defined] - # MessageChannelMessagesTrackedGroup stub dt_mod = sys.modules["isaacteleop.retargeting_engine.deviceio_source_nodes.deviceio_tensor_types"] dt_mod.MessageChannelMessagesTrackedGroup = MagicMock # type: ignore[attr-defined] @@ -96,18 +95,12 @@ def __init__(self, name: str) -> None: _install_stubs() from isaaclab_teleop.control_events import ControlEvents, poll_control_events # noqa: E402 -from isaaclab_teleop.message_channel_state_manager import ( # noqa: E402 - MessageChannelTeleopStateManager, +from isaaclab_teleop.teleop_message_processor import ( # noqa: E402 + TeleopMessageProcessor, _classify_command, _extract_command, ) -# Re-import after stubs so we can reference them in assertions. -from isaacteleop.retargeting_engine.interface.execution_events import ( # noqa: E402 - ExecutionEvents, - ExecutionState, -) - # --------------------------------------------------------------------------- # Test doubles for MessageChannelMessagesTrackedT # --------------------------------------------------------------------------- @@ -137,157 +130,319 @@ def _null_tracked() -> _FakeTracked: def _make_inputs(messages_tracked): - """Build a fake RetargeterIO dict for the state manager.""" + """Build a fake RetargeterIO dict for the processor.""" tg = MagicMock() tg.__getitem__ = MagicMock(return_value=messages_tracked) - return {MessageChannelTeleopStateManager.INPUT_MESSAGES: tg} + return {TeleopMessageProcessor.INPUT_MESSAGES: tg} + + +class _FakeOutputSlot: + """Captures ``outputs["key"][0] = value`` assignments.""" + + def __init__(self): + self.value = None + + def __setitem__(self, idx, val): + self.value = val + + def __getitem__(self, idx): + return self.value + +def _make_outputs(): + """Build a fake outputs dict with capturable slots.""" + return {"run_toggle": _FakeOutputSlot(), "kill": _FakeOutputSlot(), "reset": _FakeOutputSlot()} -def _step(mgr, messages_tracked) -> ExecutionEvents: - """Invoke the state manager's compute with fake inputs and return the events.""" + +def _step(proc, messages_tracked) -> dict: + """Run the processor's _compute_fn and return captured outputs.""" inputs = _make_inputs(messages_tracked) - return mgr._compute_execution_events(inputs, context=None) + outputs = _make_outputs() + proc._compute_fn(inputs, outputs, context=None) + return {k: v.value for k, v in outputs.items()} # =========================================================================== -# MessageChannelTeleopStateManager tests +# TeleopMessageProcessor: basic command parsing # =========================================================================== -class TestInitialState: - def test_defaults(self): - mgr = MessageChannelTeleopStateManager(name="test") - events = mgr.last_control_events - assert events.is_active is None - assert events.should_reset is False +class TestStartCommand: + def test_start_sets_run_toggle(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"start")) + assert result["run_toggle"] is True + assert result["kill"] is False + assert result["reset"] is False + + def test_start_does_not_set_reset(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"start")) + assert result["reset"] is False -class TestStartMessage: - def test_start_sets_running(self): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(b"start")) - assert result.execution_state == ExecutionState.RUNNING - assert mgr.last_control_events.is_active is True +class TestStopCommand: + def test_stop_from_stopped_is_noop(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"stop")) + assert result["run_toggle"] is False + assert result["kill"] is False - def test_start_does_not_set_reset(self): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(b"start")) - assert result.reset is False - assert mgr.last_control_events.should_reset is False +class TestResetCommand: + def test_reset_sets_reset_flag(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"reset")) + assert result["reset"] is True + assert result["run_toggle"] is False + assert result["kill"] is False -class TestStopMessage: - def test_stop_sets_paused(self): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(b"stop")) - assert result.execution_state == ExecutionState.PAUSED - assert mgr.last_control_events.is_active is False +class TestResetPulseBehaviour: + def test_reset_clears_on_next_step(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"reset")) + assert result["reset"] is True -class TestResetMessage: - def test_reset_sets_flags(self): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(b"reset")) - assert result.reset is True - assert mgr.last_control_events.should_reset is True + result = _step(proc, _empty_tracked()) + assert result["reset"] is False - def test_reset_does_not_change_active_state(self): - mgr = MessageChannelTeleopStateManager(name="test") - _step(mgr, _tracked(b"start")) - _step(mgr, _tracked(b"reset")) - assert mgr.last_control_events.is_active is True +class TestKillAlwaysFalse: + def test_kill_is_always_false(self): + proc = TeleopMessageProcessor(name="test") + for payload in [b"start", b"stop", b"reset", b"hello"]: + result = _step(proc, _tracked(payload)) + assert result["kill"] is False -class TestResetPulseBehaviour: - def test_should_reset_clears_on_next_step(self): - mgr = MessageChannelTeleopStateManager(name="test") - _step(mgr, _tracked(b"reset")) - assert mgr.last_control_events.should_reset is True - _step(mgr, _empty_tracked()) - assert mgr.last_control_events.should_reset is False +# =========================================================================== +# Shadow state and toggle sequences +# =========================================================================== + + +class TestStartFromStopped: + """``start`` from STOPPED needs 2 toggle edges over 3 frames.""" + + def test_full_sequence_reaches_running(self): + proc = TeleopMessageProcessor(name="test") + # Frame 0: "start" received, first toggle edge queued + r0 = _step(proc, _tracked(b"start")) + assert r0["run_toggle"] is True # edge 1: STOPPED -> PAUSED + + # Frame 1: queue drains False (prev resets) + r1 = _step(proc, _empty_tracked()) + assert r1["run_toggle"] is False + + # Frame 2: queue drains True (second edge) + r2 = _step(proc, _empty_tracked()) + assert r2["run_toggle"] is True # edge 2: PAUSED -> RUNNING + + # Frame 3: queue empty, back to idle + r3 = _step(proc, _empty_tracked()) + assert r3["run_toggle"] is False + + def test_shadow_state_is_running_after_sequence(self): + proc = TeleopMessageProcessor(name="test") + _step(proc, _tracked(b"start")) + _step(proc, _empty_tracked()) + _step(proc, _empty_tracked()) + assert proc._shadow_state == "running" + + +class TestStartFromPaused: + """``start`` from PAUSED needs 1 toggle edge.""" + + def test_single_edge_reaches_running(self): + proc = TeleopMessageProcessor(name="test") + # Drive to RUNNING: start sequence plays 3 frames + _step(proc, _tracked(b"start")) + _step(proc, _empty_tracked()) + _step(proc, _empty_tracked()) + assert proc._shadow_state == "running" + + # Stop to reach PAUSED (prev_toggle is True from start sequence, + # so a False is prepended before the toggle edge) + _step(proc, _tracked(b"stop")) # drains False (prepended) + r_stop_edge = _step(proc, _empty_tracked()) # drains True (edge) + assert r_stop_edge["run_toggle"] is True + assert proc._shadow_state == "paused" + + # Start from PAUSED: prev_toggle is True, so False prepended + _step(proc, _tracked(b"start")) # drains False (prepended) + r_start_edge = _step(proc, _empty_tracked()) # drains True (edge) + assert r_start_edge["run_toggle"] is True + assert proc._shadow_state == "running" + + +class TestStartFromRunning: + """``start`` when already RUNNING is a no-op.""" + + def test_start_from_running_noop(self): + proc = TeleopMessageProcessor(name="test") + _step(proc, _tracked(b"start")) + _step(proc, _empty_tracked()) + _step(proc, _empty_tracked()) + assert proc._shadow_state == "running" + + result = _step(proc, _tracked(b"start")) + assert result["run_toggle"] is False + + +class TestStopFromRunning: + """``stop`` from RUNNING uses one toggle edge to reach PAUSED.""" + + def test_stop_pauses(self): + proc = TeleopMessageProcessor(name="test") + _step(proc, _tracked(b"start")) + _step(proc, _empty_tracked()) + _step(proc, _empty_tracked()) + assert proc._shadow_state == "running" + + # prev_toggle is True, so stop prepends False before the edge + r0 = _step(proc, _tracked(b"stop")) + assert r0["run_toggle"] is False # prepended False + r1 = _step(proc, _empty_tracked()) + assert r1["run_toggle"] is True # edge: RUNNING -> PAUSED + assert proc._shadow_state == "paused" + + +class TestStopFromPaused: + """``stop`` when already PAUSED is a no-op.""" + + def test_stop_from_paused_noop(self): + proc = TeleopMessageProcessor(name="test") + _step(proc, _tracked(b"start")) + _step(proc, _empty_tracked()) + _step(proc, _empty_tracked()) + # Stop to PAUSED + _step(proc, _tracked(b"stop")) + _step(proc, _empty_tracked()) + assert proc._shadow_state == "paused" + + result = _step(proc, _tracked(b"stop")) + assert result["run_toggle"] is False + + +class TestCommandDuringToggleSequence: + """Commands received while a toggle sequence is in progress are ignored.""" + + def test_second_start_during_sequence_ignored(self): + proc = TeleopMessageProcessor(name="test") + _step(proc, _tracked(b"start")) # starts the 3-frame sequence + # Second start during the sequence should not restart it + r1 = _step(proc, _tracked(b"start")) + assert r1["run_toggle"] is False # draining the False from queue + + r2 = _step(proc, _empty_tracked()) + assert r2["run_toggle"] is True # second edge fires normally + + +# =========================================================================== +# inject_reset +# =========================================================================== + + +class TestInjectReset: + def test_inject_reset_produces_pulse(self): + proc = TeleopMessageProcessor(name="test") + proc.inject_reset() + result = _step(proc, _empty_tracked()) + assert result["reset"] is True + + def test_inject_reset_clears_after_one_step(self): + proc = TeleopMessageProcessor(name="test") + proc.inject_reset() + _step(proc, _empty_tracked()) + result = _step(proc, _empty_tracked()) + assert result["reset"] is False + + def test_inject_reset_combines_with_message_reset(self): + proc = TeleopMessageProcessor(name="test") + proc.inject_reset() + result = _step(proc, _tracked(b"reset")) + assert result["reset"] is True + + def test_inject_reset_independent_of_toggle(self): + proc = TeleopMessageProcessor(name="test") + proc.inject_reset() + result = _step(proc, _tracked(b"start")) + assert result["run_toggle"] is True + assert result["reset"] is True + + +# =========================================================================== +# Word boundary matching +# =========================================================================== class TestWordBoundaryMatching: @pytest.mark.parametrize("payload", [b"teleop start", b"xr start session", b"start now"]) def test_start_word(self, payload: bytes): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(payload)) - assert result.execution_state == ExecutionState.RUNNING - - @pytest.mark.parametrize("payload", [b"teleop stop", b"stop teleop"]) - def test_stop_word(self, payload: bytes): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(payload)) - assert result.execution_state == ExecutionState.PAUSED + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(payload)) + assert result["run_toggle"] is True @pytest.mark.parametrize("payload", [b"teleop reset", b"env reset"]) def test_reset_word(self, payload: bytes): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(payload)) - assert result.reset is True + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(payload)) + assert result["reset"] is True class TestAmbiguousPayloads: - def test_stop_wins_over_start_when_both_present(self): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(b"stop and start")) - assert result.execution_state == ExecutionState.PAUSED + def test_reset_wins_over_start(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"reset and start")) + assert result["reset"] is True + assert result["run_toggle"] is False + - def test_reset_wins_over_start_when_both_present(self): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(b"reset and start")) - assert result.reset is True +# =========================================================================== +# Empty, null, and malformed batches +# =========================================================================== class TestEmptyAndNullBatches: def test_empty_data_list(self): - mgr = MessageChannelTeleopStateManager(name="test") - _step(mgr, _tracked(b"start")) - _step(mgr, _empty_tracked()) - assert mgr.last_control_events.is_active is True + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _empty_tracked()) + assert result["run_toggle"] is False + assert result["kill"] is False + assert result["reset"] is False def test_null_data(self): - mgr = MessageChannelTeleopStateManager(name="test") - _step(mgr, _null_tracked()) - assert mgr.last_control_events.is_active is None + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _null_tracked()) + assert result["run_toggle"] is False def test_none_input(self): - mgr = MessageChannelTeleopStateManager(name="test") - _step(mgr, None) - assert mgr.last_control_events.is_active is None + proc = TeleopMessageProcessor(name="test") + result = _step(proc, None) + assert result["run_toggle"] is False class TestMultipleMessagesInBatch: def test_start_then_reset_in_one_batch(self): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(b"start", b"reset")) - assert mgr.last_control_events.is_active is True - assert result.reset is True - - -class TestSequentialStartStop: - def test_start_then_stop(self): - mgr = MessageChannelTeleopStateManager(name="test") - _step(mgr, _tracked(b"start")) - assert mgr.last_control_events.is_active is True - result = _step(mgr, _tracked(b"stop")) - assert mgr.last_control_events.is_active is False - assert result.execution_state == ExecutionState.PAUSED + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"start", b"reset")) + assert result["run_toggle"] is True + assert result["reset"] is True class TestMalformedPayloads: def test_invalid_utf8(self): - mgr = MessageChannelTeleopStateManager(name="test") - _step(mgr, _tracked(b"\xff\xfe")) - assert mgr.last_control_events.is_active is None - assert mgr.last_control_events.should_reset is False + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(b"\xff\xfe")) + assert result["run_toggle"] is False + assert result["kill"] is False + assert result["reset"] is False def test_none_payload(self): - mgr = MessageChannelTeleopStateManager(name="test") + proc = TeleopMessageProcessor(name="test") tracked = _FakeTracked(data=[_FakePayload(payload=None)]) # type: ignore[arg-type] - _step(mgr, tracked) - assert mgr.last_control_events.is_active is None + result = _step(proc, tracked) + assert result["run_toggle"] is False # =========================================================================== @@ -297,42 +452,36 @@ def test_none_payload(self): def _json_command(command: str) -> bytes: """Build a Quest-style JSON teleop_command payload.""" - import json - return json.dumps({"type": "teleop_command", "message": {"command": command}}).encode("utf-8") class TestJsonFormat: def test_json_start_teleop(self): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(_json_command("start teleop"))) - assert result.execution_state == ExecutionState.RUNNING + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(_json_command("start teleop"))) + assert result["run_toggle"] is True - def test_json_stop_teleop(self): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(_json_command("stop teleop"))) - assert result.execution_state == ExecutionState.PAUSED + def test_json_stop_teleop_from_stopped_noop(self): + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(_json_command("stop teleop"))) + assert result["run_toggle"] is False def test_json_reset_teleop(self): - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(_json_command("reset teleop"))) - assert result.reset is True + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(_json_command("reset teleop"))) + assert result["reset"] is True def test_json_wrong_type_ignored(self): - import json - payload = json.dumps({"type": "other_event", "message": {"command": "start"}}).encode("utf-8") - mgr = MessageChannelTeleopStateManager(name="test") - _step(mgr, _tracked(payload)) - assert mgr.last_control_events.is_active is None + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(payload)) + assert result["run_toggle"] is False def test_json_message_as_string(self): - import json - payload = json.dumps({"type": "teleop_command", "message": "start teleop"}).encode("utf-8") - mgr = MessageChannelTeleopStateManager(name="test") - result = _step(mgr, _tracked(payload)) - assert result.execution_state == ExecutionState.RUNNING + proc = TeleopMessageProcessor(name="test") + result = _step(proc, _tracked(payload)) + assert result["run_toggle"] is True # =========================================================================== @@ -345,20 +494,14 @@ def test_plain_text(self): assert _extract_command("start teleop") == "start teleop" def test_json_teleop_command(self): - import json - text = json.dumps({"type": "teleop_command", "message": {"command": "stop"}}) assert _extract_command(text) == "stop" def test_json_wrong_type(self): - import json - text = json.dumps({"type": "other", "message": {"command": "start"}}) assert _extract_command(text) is None def test_json_no_message_key(self): - import json - text = json.dumps({"type": "teleop_command"}) assert _extract_command(text) is None