From 4bc1b969077b8b47f44972b152887311511e00bb Mon Sep 17 00:00:00 2001 From: Yanzi Zhu Date: Sat, 11 Apr 2026 12:09:53 -0700 Subject: [PATCH] feat(oob): Adds initial oob implementation with metrics --- .../webxr_client/helpers/controlChannel.ts | 220 +++++++++++++ deps/cloudxr/webxr_client/src/App.tsx | 149 ++++++--- deps/cloudxr/webxr_client/src/CloudXR2DUI.tsx | 24 +- docs/source/index.rst | 1 + docs/source/references/oob_teleop_control.rst | 295 ++++++++++++++++++ src/core/cloudxr/python/CMakeLists.txt | 1 + src/core/cloudxr/python/__main__.py | 14 + src/core/cloudxr/python/launcher.py | 12 +- src/core/cloudxr/python/oob_teleop_hub.py | 295 ++++++++++++++++++ src/core/cloudxr/python/wss.py | 282 +++++++++++++++-- src/core/cloudxr_tests/python/conftest.py | 13 + src/core/cloudxr_tests/python/pyproject.toml | 3 + .../python/test_oob_teleop_hub.py | 238 ++++++++++++++ 13 files changed, 1480 insertions(+), 67 deletions(-) create mode 100644 deps/cloudxr/webxr_client/helpers/controlChannel.ts create mode 100644 docs/source/references/oob_teleop_control.rst create mode 100644 src/core/cloudxr/python/oob_teleop_hub.py create mode 100644 src/core/cloudxr_tests/python/conftest.py create mode 100644 src/core/cloudxr_tests/python/test_oob_teleop_hub.py diff --git a/deps/cloudxr/webxr_client/helpers/controlChannel.ts b/deps/cloudxr/webxr_client/helpers/controlChannel.ts new file mode 100644 index 000000000..7062ebb52 --- /dev/null +++ b/deps/cloudxr/webxr_client/helpers/controlChannel.ts @@ -0,0 +1,220 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * HeadsetControlChannel — WebSocket client that connects the XR headset to the + * teleop control hub running in the WSS proxy. + * + * Protocol: docs/source/references/oob_teleop_control.rst (Sphinx build) + * Hub WS URL: ``wss://:/oob/v1/ws`` when the page URL includes ``oobEnable=1`` and + * valid ``serverIP`` / ``port`` query parameters (see App.tsx). No connection is made without them. + * + * Usage (in App.tsx): + * + * const channel = new HeadsetControlChannel({ + * url: 'wss://host:48322/oob/v1/ws', + * onConfig: (config, version) => { ... }, + * getMetricsSnapshot: () => [ { cadence: 'frame', metrics: { ... } } ], + * }); + * channel.connect(); + * // on cleanup: + * channel.dispose(); + */ + +/** + * Fields the hub merges into ``config`` on ``hello`` / ``config`` pushes. + * Keys match the supported URL query parameter overrides (see ``CloudXR2DUI.applyUrlSeeds``). + */ +export interface StreamConfig { + serverIP?: string; + port?: number; + panelHiddenAtStart?: boolean; + codec?: string; +} + +export interface MetricsSnapshot { + cadence: string; + metrics: Record; +} + +export interface ControlChannelOptions { + /** Full WSS URL of the hub, e.g. wss://host:48322/oob/v1/ws */ + url: string; + /** Sent in the register message. Must match CONTROL_TOKEN env var if set. */ + token?: string; + /** Human-readable label in hub snapshots (optional). */ + deviceLabel?: string; + /** + * Called on hello (initial config) and on config push. + * Apply the config to the CloudXR connection settings before connect. + */ + onConfig: (config: StreamConfig, configVersion: number) => void; + /** Called when the WebSocket connection state changes. */ + onConnectionChange?: (connected: boolean) => void; + /** + * Optional: called periodically to get the latest metrics to report. + * Return an empty array or null/undefined to skip a tick. + */ + getMetricsSnapshot?: () => MetricsSnapshot[] | null | undefined; + /** How often to report metrics (ms). Default: 500. */ + metricsIntervalMs?: number; +} + +const RECONNECT_DELAY_MS = 3000; +const DEFAULT_METRICS_INTERVAL_MS = 500; + +export class HeadsetControlChannel { + private ws: WebSocket | null = null; + private disposed = false; + private metricsTimer: ReturnType | null = null; + private reconnectTimer: ReturnType | null = null; + + constructor(private readonly opts: ControlChannelOptions) {} + + /** Open the WebSocket and start the reconnection loop. */ + connect(): void { + if (this.disposed) return; + this._openWebSocket(); + } + + /** Close the channel permanently. Safe to call multiple times. */ + dispose(): void { + this.disposed = true; + this._clearTimers(); + if (this.ws) { + this.ws.onclose = null; // prevent reconnect on this close + this.ws.close(); + this.ws = null; + } + } + + // --------------------------------------------------------------------------- + // Private + // --------------------------------------------------------------------------- + + private _openWebSocket(): void { + if (this.disposed) return; + + let ws: WebSocket; + try { + ws = new WebSocket(this.opts.url); + } catch (err) { + if (this.disposed) return; + console.warn( + '[ControlChannel] WebSocket constructor failed for', + this.opts.url, + err + ); + this.ws = null; + this._afterSocketClosed(); + return; + } + + this.ws = ws; + + ws.onopen = () => { + ws.send( + JSON.stringify({ + type: 'register', + payload: { + role: 'headset', + ...(this.opts.token ? { token: this.opts.token } : {}), + ...(this.opts.deviceLabel ? { deviceLabel: this.opts.deviceLabel } : {}), + }, + }) + ); + this.opts.onConnectionChange?.(true); + this._startMetricsTimer(); + }; + + ws.onmessage = (ev) => { + if (typeof ev.data !== 'string') return; + let msg: { type?: string; payload?: unknown }; + try { + msg = JSON.parse(ev.data); + } catch { + return; + } + this._handleMessage(msg); + }; + + ws.onclose = (ev: CloseEvent) => { + this.ws = null; + this._afterSocketClosed(ev.code); + }; + + ws.onerror = () => { + // onclose fires next; reconnect logic lives there + }; + } + + /** Clear timers, notify disconnected, schedule reconnect unless terminal. */ + private _afterSocketClosed(closeCode?: number): void { + this._clearTimers(); + this.opts.onConnectionChange?.(false); + if (closeCode === 1008) { + console.warn('[ControlChannel] Terminal close (1008 policy/auth); will not reconnect.'); + return; + } + if (!this.disposed) { + this.reconnectTimer = setTimeout(() => this._openWebSocket(), RECONNECT_DELAY_MS); + } + } + + private _handleMessage(msg: { type?: string; payload?: unknown }): void { + const type = msg.type; + const payload = (msg.payload ?? {}) as Record; + + if (type === 'hello') { + // hello to headset includes initial config + if ( + payload.config != null && + typeof payload.configVersion === 'number' + ) { + this.opts.onConfig(payload.config as StreamConfig, payload.configVersion as number); + } + } else if (type === 'config') { + if ( + payload.config != null && + typeof payload.configVersion === 'number' + ) { + this.opts.onConfig(payload.config as StreamConfig, payload.configVersion as number); + } + } else if (type === 'error') { + console.warn('[ControlChannel] Hub error:', payload); + } + } + + private _startMetricsTimer(): void { + if (!this.opts.getMetricsSnapshot) return; + const interval = this.opts.metricsIntervalMs ?? DEFAULT_METRICS_INTERVAL_MS; + this.metricsTimer = setInterval(() => { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + const snapshots = this.opts.getMetricsSnapshot?.(); + if (!snapshots || snapshots.length === 0) return; + const t = Date.now(); + for (const { cadence, metrics } of snapshots) { + if (Object.keys(metrics).length === 0) continue; + this.ws.send( + JSON.stringify({ + type: 'clientMetrics', + payload: { t, cadence, metrics }, + }) + ); + } + }, interval); + } + + private _clearTimers(): void { + if (this.metricsTimer !== null) { + clearInterval(this.metricsTimer); + this.metricsTimer = null; + } + if (this.reconnectTimer !== null) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + } +} diff --git a/deps/cloudxr/webxr_client/src/App.tsx b/deps/cloudxr/webxr_client/src/App.tsx index 98d121b42..6118e3e7e 100644 --- a/deps/cloudxr/webxr_client/src/App.tsx +++ b/deps/cloudxr/webxr_client/src/App.tsx @@ -48,6 +48,7 @@ import { useState, useMemo, useEffect, useRef } from 'react'; import { CloudXR2DUI } from './CloudXR2DUI'; import CloudXR3DUI from './CloudXRUI'; +import { HeadsetControlChannel } from '@helpers/controlChannel'; // Performance metrics signals - raw numeric data, one per callback cadence. // Signals update their value without triggering React re-renders. @@ -87,6 +88,23 @@ const START_TELEOP_COMMAND = { }, } as const; +/** When set with ``serverIP`` + ``port``, WebXR builds ``wss://{serverIP}:{port}/oob/v1/ws``. */ +function isOobEnabled(searchParams: URLSearchParams): boolean { + const v = searchParams.get('oobEnable'); + return v === '1' || v?.toLowerCase() === 'true'; +} + +function buildOobHubWsUrlFromQuery(searchParams: URLSearchParams): string | null { + if (!isOobEnabled(searchParams)) return null; + const serverIP = searchParams.get('serverIP')?.trim(); + const portStr = searchParams.get('port')?.trim(); + if (!serverIP || portStr === undefined || portStr === '') return null; + if (!/^\d{1,5}$/.test(portStr)) return null; + const host = + serverIP.includes(':') && !serverIP.startsWith('[') ? `[${serverIP}]` : serverIP; + return `wss://${host}:${portStr}/oob/v1/ws`; +} + function App() { const COUNTDOWN_MAX_SECONDS = 9; const COUNTDOWN_STORAGE_KEY = 'cxr.react.countdownSeconds'; @@ -287,53 +305,58 @@ function App() { // Initialize CloudXR2DUI useEffect(() => { - // Create and initialize the 2D UI manager + // Create and initialize the 2D UI manager. const ui = new CloudXR2DUI(() => { - // Callback when configuration changes setConfigVersion(v => v + 1); }); - ui.initialize(); - ui.setupConnectButtonHandler( - async () => { - const config = ui.getConfiguration(); - const resolutionError = getResolutionValidationError( - config.perEyeWidth, - config.perEyeHeight - ); - if (resolutionError) { - ui.updateConnectButtonState(); - return; + // URL query params override localStorage so bookmarked links always win. + const urlSeeds: Record = {}; + const p = new URLSearchParams(window.location.search); + for (const key of ['serverIP', 'port', 'codec', 'panelHiddenAtStart']) { + const v = p.get(key); + if (v !== null) urlSeeds[key] = v; + } + ui.initialize(Object.keys(urlSeeds).length > 0 ? urlSeeds : undefined); + const doConnect = async () => { + const config = ui.getConfiguration(); + const resolutionError = getResolutionValidationError( + config.perEyeWidth, + config.perEyeHeight + ); + if (resolutionError) { + ui.updateConnectButtonState(); + return; + } + // Start XR session + if (config.immersiveMode === 'ar') { + await store.enterAR(); + } else if (config.immersiveMode === 'vr') { + await store.enterVR(); + } else { + setErrorMessage('Unrecognized immersive mode'); + } + store.setFrameRate((supportedFrameRates: ArrayLike): number | false => { + let frameRate = ui.getConfiguration().deviceFrameRate; + let found = false; + for (let i = 0; i < supportedFrameRates.length; ++i) { + if (supportedFrameRates[i] === frameRate) { + found = true; + break; + } } - // Start XR session - if (config.immersiveMode === 'ar') { - await store.enterAR(); - } else if (config.immersiveMode === 'vr') { - await store.enterVR(); + if (found) { + console.info('Changed frame rate to', frameRate); + return frameRate; } else { - setErrorMessage('Unrecognized immersive mode'); + console.error('Failed to change frame rate to', frameRate); + return false; } - store.setFrameRate((supportedFrameRates: ArrayLike): number | false => { - let frameRate = ui.getConfiguration().deviceFrameRate; - let found = false; - for (let i = 0; i < supportedFrameRates.length; ++i) { - if (supportedFrameRates[i] === frameRate) { - found = true; - break; - } - } - if (found) { - console.info('Changed frame rate to', frameRate); - return frameRate; - } else { - console.error('Failed to change frame rate to', frameRate); - return false; - } - }); - }, - (error: Error) => { - setErrorMessage(`Failed to start XR session: ${error}`); - } - ); + }); + }; + + ui.setupConnectButtonHandler(doConnect, (error: Error) => { + setErrorMessage(`Failed to start XR session: ${error}`); + }); setCloudXR2DUI(ui); @@ -616,6 +639,50 @@ function App() { } }; + // OOB WebSocket: only when oobEnable=1 and query has valid serverIP + port → wss://{serverIP}:{port}/oob/v1/ws. + useEffect(() => { + if (!cloudXR2DUI) return; + const p = new URLSearchParams(window.location.search); + const hubWsUrl = buildOobHubWsUrlFromQuery(p); + if (!hubWsUrl) { + return; + } + + console.info('[Teleop] OOB control WebSocket:', hubWsUrl); + + const channel = new HeadsetControlChannel({ + url: hubWsUrl, + token: p.get('controlToken') ?? undefined, + onConfig: () => { + // Config push handling deferred to phase 2. + }, + getMetricsSnapshot: () => { + const snapshots: Array<{ cadence: string; metrics: Record }> = []; + const rm = renderMetrics.value; + if (rm) { + snapshots.push({ + cadence: 'render', + metrics: { [CloudXR.MetricsName.RenderFramerate]: rm.fps }, + }); + } + const sm = streamingMetrics.value; + if (sm) { + snapshots.push({ + cadence: 'frame', + metrics: { + [CloudXR.MetricsName.StreamingFramerate]: sm.fps, + [CloudXR.MetricsName.PoseToRenderTime]: sm.latencyMs, + }, + }); + } + return snapshots; + }, + }); + channel.connect(); + + return () => { channel.dispose(); }; + }, [cloudXR2DUI]); + // Countdown configuration handlers (0-5 seconds) const handleIncreaseCountdown = () => { if (isCountingDown) return; diff --git a/deps/cloudxr/webxr_client/src/CloudXR2DUI.tsx b/deps/cloudxr/webxr_client/src/CloudXR2DUI.tsx index 283f3a049..367fe6942 100644 --- a/deps/cloudxr/webxr_client/src/CloudXR2DUI.tsx +++ b/deps/cloudxr/webxr_client/src/CloudXR2DUI.tsx @@ -175,7 +175,7 @@ export class CloudXR2DUI { /** * Initializes the CloudXR2DUI with all necessary components and event handlers */ - public initialize(): void { + public initialize(urlSeeds?: Record): void { if (this.initialized) { return; } @@ -183,6 +183,9 @@ export class CloudXR2DUI { try { this.initializeElements(); this.setupLocalStorage(); + if (urlSeeds) { + this.applyUrlSeeds(urlSeeds); + } this.setupProxyConfiguration(); this.setupEventListeners(); // Set initial display value @@ -197,6 +200,25 @@ export class CloudXR2DUI { } } + /** + * Override form fields and localStorage from URL query params. + * Called after setupLocalStorage() so URL params win over stored values. + */ + private applyUrlSeeds(seeds: Record): void { + const mapping: Array<[string, HTMLInputElement | HTMLSelectElement, string]> = [ + ['serverIP', this.serverIpInput, 'serverIp'], + ['port', this.portInput, 'port'], + ['codec', this.codecSelect, 'codec'], + ['panelHiddenAtStart', this.panelHiddenAtStartSelect, 'panelHiddenAtStart'], + ]; + for (const [paramKey, element, storageKey] of mapping) { + const v = seeds[paramKey]; + if (v === undefined) continue; + element.value = v; + try { localStorage.setItem(storageKey, v); } catch (_) { /* quota */ } + } + } + /** * Initializes all DOM element references by their IDs * Throws an error if any required element is not found diff --git a/docs/source/index.rst b/docs/source/index.rst index 390fcb9e7..49bffce56 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -60,6 +60,7 @@ Table of Contents references/build device/index references/retargeting + references/oob_teleop_control references/license Indices and tables diff --git a/docs/source/references/oob_teleop_control.rst b/docs/source/references/oob_teleop_control.rst new file mode 100644 index 000000000..44db6110f --- /dev/null +++ b/docs/source/references/oob_teleop_control.rst @@ -0,0 +1,295 @@ +.. SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +.. SPDX-License-Identifier: Apache-2.0 + +Out-of-band teleop control +========================== + +The **OOB (out-of-band) teleop control hub** lets you coordinate Isaac Teleop +from outside the headset — read streaming metrics, inspect connected clients, +and push configuration changes — over the **same TLS port** as the CloudXR +proxy. + +The hub shares the proxy TLS port (default **48322**, override with +``PROXY_PORT``). + +Quick start +----------- + +**Step 1 — Start the streaming host with OOB enabled** + +Launch the CloudXR runtime with the ``--setup-oob`` flag (add ``--accept-eula`` +on first run): + +.. code-block:: bash + + python -m isaacteleop.cloudxr --accept-eula --setup-oob + +You should see output confirming the hub is running: + +.. code-block:: text + + CloudXR WSS proxy: running, log file: /home//.cloudxr/logs/wss.2026-04-13T202133Z.log + oob: enabled (hub running in WSS proxy) + +**Step 2 — Open the web client on the headset** + +On the XR headset browser, navigate to the client URL with **all three** +required query parameters — ``oobEnable``, ``serverIP``, and ``port``: + +.. code-block:: text + + https://nvidia.github.io/IsaacTeleop/client/?oobEnable=1&serverIP=&port=48322 + +Replace ```` with the streaming host's LAN IP. The ``port`` must +match the proxy port (default 48322). + +.. note:: + + All three parameters are required. If ``serverIP`` or ``port`` is missing, + the OOB control channel is silently skipped — the client will still work for + streaming but will not register with the hub or report metrics. + +**Step 3 — Verify the headset registered with the hub** + +From a PC on the same network, query the hub state API (``-k`` skips the +self-signed certificate check): + +.. code-block:: bash + + curl -k https://:48322/api/oob/v1/state + +You should see the headset listed under ``"headsets"`` with +``"connected": true``: + +.. code-block:: json + + { + "updatedAt": 1776112022900, + "configVersion": 0, + "config": {"serverIP": "", "port": 48322}, + "headsets": [ + { + "clientId": "193f3758-281e-4292-8c36-6541b58963ef", + "connected": true, + "deviceLabel": null, + "registeredAt": 1776112022805, + "metricsByCadence": {} + } + ] + } + +If ``"headsets"`` is empty, double-check that the URL on the headset includes +both ``serverIP`` and ``port`` and that the headset can reach the host over the +network. + +**Step 4 — (Optional) Push config to the headset** + +Before or after the headset connects to the CloudXR stream, you can push +configuration overrides via the HTTP config API: + +.. code-block:: bash + + curl -k "https://:48322/api/oob/v1/config?serverIP=&port=48322&codec=av1" + +See ``GET /api/oob/v1/config`` below for all supported keys. + +**Step 5 — Connect and stream; poll for metrics** + +Press **CONNECT** on the headset to start the CloudXR streaming session. Once +streaming begins, the headset reports metrics to the hub every 500 ms. Poll the +state endpoint from a PC to collect them: + +.. code-block:: bash + + # Poll every 2 seconds (adjust to taste) + watch -n 2 'curl -sk https://:48322/api/oob/v1/state | python3 -m json.tool' + +The ``metricsByCadence`` field on each headset entry will now contain live streaming metrics. + +Architecture +------------ + +.. list-table:: + :header-rows: 1 + :widths: 22 38 40 + + * - Role + - Software + - What it does + * - **XR headset** + - Isaac Teleop WebXR client in the device browser + - Registers with the hub via WebSocket, reports streaming metrics + periodically (default every 500 ms), receives config pushes. + * - **Streaming host** + - ``python -m isaacteleop.cloudxr --setup-oob`` + - Runs CloudXR runtime + WSS proxy + OOB hub on a single TLS port. + * - **Operator / scripts** + - ``curl``, browser, or custom tooling + - Reads state via HTTP, optionally pushes config via HTTP. + +WebSocket protocol +------------------ + +Endpoint: ``wss://:/oob/v1/ws`` + +All messages are JSON text frames with ``{"type": ..., "payload": ...}``. + +Registration (first message) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: json + + { + "type": "register", + "payload": { + "role": "headset", + "deviceLabel": "Quest 3", + "token": "" + } + } + +``role`` must be ``"headset"``. The hub replies with: + +.. code-block:: json + + { + "type": "hello", + "payload": { + "clientId": "", + "configVersion": 0, + "config": {"serverIP": "...", "port": 48322} + } + } + +Headset → hub: ``clientMetrics`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: json + + { + "type": "clientMetrics", + "payload": { + "t": 1712800000000, + "cadence": "frame", + "metrics": { + "streaming.framerate": 72.0, + "render.pose_to_render_time": 18.5 + } + } + } + +HTTP API +-------- + +All endpoints use **GET** with query parameters on the proxy TLS port. + +``GET /api/oob/v1/state`` +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Returns the current hub state: connected headsets, latest metrics, and config +version. + +.. code-block:: bash + + curl -k https://localhost:48322/api/oob/v1/state + +Example response: + +.. code-block:: json + + { + "updatedAt": 1712800000000, + "configVersion": 0, + "config": {"serverIP": "10.0.0.1", "port": 48322}, + "headsets": [ + { + "clientId": "abc-123", + "connected": true, + "deviceLabel": "Quest 3", + "registeredAt": 1712799990000, + "metricsByCadence": { + "frame": { + "at": 1712800000000, + "metrics": {"streaming.framerate": 72.0} + } + } + } + ] + } + +``GET /api/oob/v1/config`` +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Push config to connected headsets via query parameters: + +.. code-block:: bash + + curl -k "https://localhost:48322/api/oob/v1/config?serverIP=10.0.0.5&port=48322" + +Example response: + +.. code-block:: json + + { + "ok": true, + "changed": true, + "configVersion": 1, + "targetCount": 1 + } + +Supported query keys: ``serverIP``, ``port``, ``panelHiddenAtStart``, ``codec``. +Optional ``targetClientId`` restricts the push to a single headset (returns 404 +if not connected). + +Authentication +-------------- + +Set ``CONTROL_TOKEN=`` to require a token on all hub operations. +Pass it as: + +- WebSocket: ``"token"`` field in the ``register`` payload +- HTTP: ``?token=`` query parameter or ``X-Control-Token`` header + +Web client integration +---------------------- + +The WebXR client connects to the hub when the page URL contains +``oobEnable=1`` plus ``serverIP`` and ``port``: + +.. code-block:: text + + https://nvidia.github.io/IsaacTeleop/client/?oobEnable=1&serverIP=10.0.0.1&port=48322 + +The client builds ``wss://{serverIP}:{port}/oob/v1/ws`` and: + +1. Registers as role ``"headset"`` +2. Reports ``clientMetrics`` periodically (default every 500 ms) +3. Receives ``config`` pushes (phase 2) + +URL query parameter overrides +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The following URL parameters override their corresponding form fields (and +``localStorage`` values) so that bookmarked links always take priority over +previously saved settings: + +- ``serverIP`` — CloudXR server IP address +- ``port`` — CloudXR server port +- ``codec`` — video codec +- ``panelHiddenAtStart`` — hide the control panel on load + +Environment variables +--------------------- + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - Variable + - Description + * - ``PROXY_PORT`` + - WSS proxy port (default ``48322``) + * - ``CONTROL_TOKEN`` + - Optional auth token for hub access + * - ``TELEOP_STREAM_SERVER_IP`` + - Override the auto-detected LAN IP in hub initial config diff --git a/src/core/cloudxr/python/CMakeLists.txt b/src/core/cloudxr/python/CMakeLists.txt index 5e88cf747..cd65a7f12 100644 --- a/src/core/cloudxr/python/CMakeLists.txt +++ b/src/core/cloudxr/python/CMakeLists.txt @@ -111,6 +111,7 @@ add_custom_target(cloudxr_python ALL "${CMAKE_CURRENT_SOURCE_DIR}/launcher.py" "${CMAKE_CURRENT_SOURCE_DIR}/runtime.py" "${CMAKE_CURRENT_SOURCE_DIR}/wss.py" + "${CMAKE_CURRENT_SOURCE_DIR}/oob_teleop_hub.py" "${CLOUDXR_PYTHON_DIR}/" COMMAND ${CMAKE_COMMAND} -E rm -rf "${CLOUDXR_PYTHON_DIR}/__pycache__" COMMENT "Copying cloudxr Python files to package structure" diff --git a/src/core/cloudxr/python/__main__.py b/src/core/cloudxr/python/__main__.py index b8637b90a..304a18f6f 100644 --- a/src/core/cloudxr/python/__main__.py +++ b/src/core/cloudxr/python/__main__.py @@ -36,6 +36,15 @@ def _parse_args() -> argparse.Namespace: action="store_true", help="Accept the NVIDIA CloudXR EULA non-interactively (e.g. for CI or containers).", ) + parser.add_argument( + "--setup-oob", + action="store_true", + default=False, + help=( + "Enable OOB teleop control hub in the WSS proxy. " + "Exposes WebSocket and HTTP API for headset metrics and remote config." + ), + ) return parser.parse_args() @@ -47,6 +56,7 @@ def main() -> None: install_dir=args.cloudxr_install_dir, env_config=args.cloudxr_env_config, accept_eula=args.accept_eula, + setup_oob=args.setup_oob, ) as launcher: cxr_ver = runtime_version() print( @@ -63,6 +73,10 @@ def main() -> None: print( f"CloudXR WSS proxy: \033[36mrunning\033[0m, log file: \033[90m{wss_log}\033[0m" ) + if args.setup_oob: + print( + " oob: \033[32menabled\033[0m (hub running in WSS proxy)" + ) print( f"Activate CloudXR environment in another terminal: \033[1;32msource {env_cfg.env_filepath()}\033[0m" ) diff --git a/src/core/cloudxr/python/launcher.py b/src/core/cloudxr/python/launcher.py index c3cfdbed7..33df37e43 100644 --- a/src/core/cloudxr/python/launcher.py +++ b/src/core/cloudxr/python/launcher.py @@ -73,6 +73,7 @@ def __init__( install_dir: str = "~/.cloudxr", env_config: str | Path | None = None, accept_eula: bool = False, + setup_oob: bool = False, ) -> None: """Launch the CloudXR runtime and WSS proxy. @@ -89,6 +90,8 @@ def __init__( accept_eula: Accept the NVIDIA CloudXR EULA non-interactively. When ``False`` and the EULA marker does not exist, the user is prompted on stdin. + setup_oob: Enable the OOB teleop control hub in the WSS + proxy. Raises: RuntimeError: If the EULA is not accepted or the runtime @@ -97,6 +100,7 @@ def __init__( self._install_dir = install_dir self._env_config = str(env_config) if env_config is not None else None self._accept_eula = accept_eula + self._setup_oob = setup_oob self._runtime_proc: subprocess.Popen | None = None self._wss_thread: threading.Thread | None = None @@ -365,11 +369,17 @@ def _start_wss_proxy(self, log_path: Path) -> None: stop_future = loop.create_future() self._wss_stop_future = stop_future + setup_oob = self._setup_oob + def _run_wss() -> None: asyncio.set_event_loop(loop) try: loop.run_until_complete( - wss_run(log_file_path=log_path, stop_future=stop_future) + wss_run( + log_file_path=log_path, + stop_future=stop_future, + setup_oob=setup_oob, + ) ) except Exception: logger.exception("WSS proxy thread exited with error") diff --git a/src/core/cloudxr/python/oob_teleop_hub.py b/src/core/cloudxr/python/oob_teleop_hub.py new file mode 100644 index 000000000..f5a595900 --- /dev/null +++ b/src/core/cloudxr/python/oob_teleop_hub.py @@ -0,0 +1,295 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Out-of-band (OOB) teleop hub — WebSocket server for headset metrics and config. + +Headsets register via WebSocket and report streaming metrics. Operators read +state and push config via the HTTP API on the same TLS port. + +WebSocket: ``wss://:/oob/v1/ws`` +HTTP API: ``GET /api/oob/v1/state``, ``GET /api/oob/v1/config`` +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time +import uuid +from dataclasses import dataclass, field +from typing import Any, Literal + +log = logging.getLogger("oob-teleop-hub") + +OOB_WS_PATH = "/oob/v1/ws" + + +@dataclass +class _HeadsetState: + client_id: str + ws: Any + registered_at: float + device_label: str | None = None + metrics_by_cadence: dict = field(default_factory=dict) + + +class OOBControlHub: + """Collects headset metrics and exposes state via HTTP. + + One instance per proxy process; WebSocket connections on ``OOB_WS_PATH`` + are dispatched via :meth:`handle_connection`. + """ + + def __init__( + self, + control_token: str | None = None, + initial_config: dict | None = None, + ) -> None: + self._token = control_token + self._headsets: dict[Any, _HeadsetState] = {} + self._stream_config: dict = dict(initial_config or {}) + self._config_version: int = 0 + self._lock = asyncio.Lock() + + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + + async def handle_connection(self, ws: Any) -> None: + """Entry point for each new WebSocket client on ``OOB_WS_PATH``.""" + client_id = str(uuid.uuid4()) + registered = False + + try: + async for raw in ws: + if not isinstance(raw, str): + await self._send_error(ws, "BAD_REQUEST", "Expected text frame") + continue + + try: + msg = json.loads(raw) + except json.JSONDecodeError: + await self._send_error(ws, "BAD_REQUEST", "Invalid JSON") + continue + + msg_type: str = msg.get("type", "") + payload: dict = msg.get("payload") or {} + + if not registered: + if msg_type != "register": + await self._send_error( + ws, "BAD_REQUEST", "First message must be register" + ) + return + ok = await self._handle_register(ws, client_id, payload) + if not ok: + return + registered = True + continue + + await self._dispatch_headset(ws, msg_type, payload) + + except Exception: + log.debug("Teleop WS closed", exc_info=True) + finally: + async with self._lock: + self._headsets.pop(ws, None) + log.info("Teleop client disconnected (clientId=%s)", client_id) + + async def get_snapshot(self) -> dict: + """Build the JSON snapshot for ``GET /api/oob/v1/state``.""" + async with self._lock: + headsets = [ + { + "clientId": s.client_id, + "connected": True, + "deviceLabel": s.device_label, + "registeredAt": int(s.registered_at * 1000), + "metricsByCadence": s.metrics_by_cadence, + } + for s in self._headsets.values() + ] + return { + "updatedAt": int(time.time() * 1000), + "configVersion": self._config_version, + "config": dict(self._stream_config), + "headsets": headsets, + } + + def check_token(self, token: str | None) -> bool: + """Return True if token satisfies the hub's auth requirement.""" + if not self._token: + return True + return token == self._token + + async def http_oob_set_config(self, payload: dict) -> tuple[int, dict]: + """Merge stream config; for OOB HTTP ``GET /api/oob/v1/config``.""" + if not self.check_token(payload.get("token")): + return 401, {"error": "Unauthorized"} + + new_config = payload.get("config") + if not isinstance(new_config, dict): + return 400, {"error": "config must be an object"} + + target_raw = payload.get("targetClientId") + target_id: str | None = ( + None if target_raw is None or target_raw == "" else str(target_raw) + ) + + outcome = await self._merge_stream_config(new_config, target_id) + if outcome[0] == "noop": + return 200, { + "ok": True, + "changed": False, + "configVersion": outcome[1], + } + if outcome[0] == "missing": + return 404, {"error": f"Headset '{outcome[1]}' not connected"} + + _tag, version, config_snapshot, targets = outcome + log.info( + "OOB setConfig configVersion=%d → %d headset(s)", version, len(targets) + ) + await self._push_config_to_headsets(version, config_snapshot, targets) + return 200, { + "ok": True, + "changed": True, + "configVersion": version, + "targetCount": len(targets), + } + + # ------------------------------------------------------------------ + # Private: registration + # ------------------------------------------------------------------ + + async def _handle_register(self, ws: Any, client_id: str, payload: dict) -> bool: + """Validate and register a headset. Returns True on success.""" + if self._token and payload.get("token") != self._token: + await self._send_error(ws, "UNAUTHORIZED", "Invalid or missing token") + try: + await ws.close(1008, "Unauthorized") + except Exception: + pass + return False + + role = payload.get("role") + if role != "headset": + await self._send_error(ws, "BAD_REQUEST", "role must be 'headset'") + return False + + async with self._lock: + state = _HeadsetState( + client_id=client_id, + ws=ws, + registered_at=time.time(), + device_label=payload.get("deviceLabel"), + ) + self._headsets[ws] = state + log.info( + "Headset registered: clientId=%s label=%s", + client_id, + state.device_label, + ) + hello_payload = { + "clientId": client_id, + "configVersion": self._config_version, + "config": dict(self._stream_config), + } + + await self._send(ws, "hello", hello_payload) + return True + + # ------------------------------------------------------------------ + # Private: message dispatch + # ------------------------------------------------------------------ + + async def _dispatch_headset(self, ws: Any, msg_type: str, payload: dict) -> None: + if msg_type == "clientMetrics": + await self._handle_client_metrics(ws, payload) + else: + await self._send_error( + ws, "BAD_REQUEST", f"Unknown message type: {msg_type}" + ) + + # ------------------------------------------------------------------ + # Private: message handlers + # ------------------------------------------------------------------ + + async def _merge_stream_config( + self, new_config: dict, target_id: str | None + ) -> ( + tuple[Literal["noop"], int] + | tuple[Literal["missing"], str] + | tuple[Literal["push"], int, dict, list[_HeadsetState]] + ): + async with self._lock: + merged = {**self._stream_config, **new_config} + if merged == self._stream_config: + return ("noop", self._config_version) + + if target_id is not None: + targets = [ + s for s in self._headsets.values() if s.client_id == target_id + ] + if not targets: + return ("missing", target_id) + # Targeted push: send merged snapshot without mutating global state. + return ("push", self._config_version, merged, targets) + + # Global push: update shared config and version. + self._stream_config = merged + self._config_version += 1 + return ( + "push", + self._config_version, + dict(self._stream_config), + list(self._headsets.values()), + ) + + async def _push_config_to_headsets( + self, version: int, config_snapshot: dict, targets: list[_HeadsetState] + ) -> None: + push_payload = {"configVersion": version, "config": config_snapshot} + for headset in targets: + await self._send(headset.ws, "config", push_payload) + + async def _handle_client_metrics(self, ws: Any, payload: dict) -> None: + async with self._lock: + state = self._headsets.get(ws) + if state is None: + return + cadence = str(payload.get("cadence", "unknown")) + raw_metrics = payload.get("metrics", {}) + if not isinstance(raw_metrics, dict): + raw_metrics = {} + state.metrics_by_cadence[cadence] = { + "at": int(payload.get("t", time.time() * 1000)), + "metrics": { + str(k): float(v) + for k, v in raw_metrics.items() + if isinstance(v, (int, float)) + }, + } + + # ------------------------------------------------------------------ + # Private: send helpers + # ------------------------------------------------------------------ + + async def _send(self, ws: Any, msg_type: str, payload: dict) -> None: + try: + await ws.send(json.dumps({"type": msg_type, "payload": payload})) + except Exception: + log.debug("Failed to send '%s' message", msg_type, exc_info=True) + + async def _send_error( + self, + ws: Any, + code: str, + message: str, + request_id: str | None = None, + ) -> None: + p: dict = {"code": code, "message": message} + if request_id is not None: + p["requestId"] = request_id + await self._send(ws, "error", p) diff --git a/src/core/cloudxr/python/wss.py b/src/core/cloudxr/python/wss.py index a4bcfc511..683423b7e 100755 --- a/src/core/cloudxr/python/wss.py +++ b/src/core/cloudxr/python/wss.py @@ -5,7 +5,10 @@ import asyncio import errno +import json import logging +import os +from urllib.parse import unquote, urlparse import shutil import ssl import subprocess @@ -13,7 +16,38 @@ from dataclasses import dataclass from pathlib import Path +import socket + from .env_config import get_env_config +from .oob_teleop_hub import OOB_WS_PATH + +WSS_PROXY_DEFAULT_PORT = 48322 + + +def _wss_proxy_port() -> int: + raw = os.environ.get("PROXY_PORT", "").strip() + return int(raw) if raw else WSS_PROXY_DEFAULT_PORT + + +def _guess_lan_ipv4() -> str | None: + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.settimeout(0.25) + s.connect(("192.0.2.1", 1)) + addr, _ = s.getsockname() + except OSError: + return None + return None if not addr or addr == "127.0.0.1" else addr + + +def _default_initial_config(proxy_port: int) -> dict: + server_ip = ( + os.environ.get("TELEOP_STREAM_SERVER_IP", "").strip() + or _guess_lan_ipv4() + or "127.0.0.1" + ) + return {"serverIP": server_ip, "port": proxy_port} + try: import websockets @@ -111,10 +145,113 @@ def build_ssl_context(cert_paths: CertPaths) -> ssl.SSLContext: } -def _make_http_handler(backend_host, backend_port): +def _cert_html() -> bytes: + return ( + b"" + b"" + b"

Certificate Accepted

" + b"

You can close this tab and return to the web client.

" + b"
" + ) + + +def _normalize_request_path(raw_path: str) -> str: + """Normalize HTTP request-target: query stripped, absolute-URL form, ``%``-decoding, ``//``, ``.`` / ``..``.""" + path = (raw_path or "/").split("?")[0] or "/" + if path.startswith(("http://", "https://")): + path = urlparse(path).path or "/" + path = unquote(path, errors="replace") + segments = [p for p in path.split("/") if p and p != "."] + stack: list[str] = [] + for seg in segments: + if seg == "..": + if stack: + stack.pop() + else: + stack.append(seg) + if not stack: + return "/" + return "/" + "/".join(stack) + + +def _is_oob_hub_http_path(path: str) -> bool: + """True for OOB HTTP API paths on the WSS proxy.""" + return path in ( + "/api/oob/v1/state", + "/api/oob/v1/config", + ) + + +def _parse_query_params(raw_path: str) -> dict[str, str]: + """First occurrence wins; keys and values are URL-decoded.""" + if "?" not in raw_path: + return {} + qs = raw_path.split("?", 1)[1] + out: dict[str, str] = {} + for part in qs.split("&"): + if not part: + continue + if "=" in part: + k, v = part.split("=", 1) + k = unquote(k) + if k not in out: + out[k] = unquote(v, errors="replace") + else: + k = unquote(part) + if k not in out: + out[k] = "" + return out + + +def _stream_config_from_query(q: dict[str, str]) -> tuple[dict | None, str | None]: + """Build ``StreamConfig`` patch from query string (``serverIP=`` / ``port=`` / …).""" + cfg: dict[str, object] = {} + if "serverIP" in q: + cfg["serverIP"] = q["serverIP"] + if "port" in q and q["port"] != "": + try: + cfg["port"] = int(q["port"], 10) + except ValueError: + return None, "port must be an integer" + if "panelHiddenAtStart" in q and q["panelHiddenAtStart"] != "": + s = q["panelHiddenAtStart"].strip().lower() + if s in ("1", "true", "yes", "on"): + cfg["panelHiddenAtStart"] = True + elif s in ("0", "false", "no", "off"): + cfg["panelHiddenAtStart"] = False + else: + return None, "panelHiddenAtStart must be true or false" + if "codec" in q and q["codec"] != "": + cfg["codec"] = q["codec"] + return cfg, None + + +def _oob_token(request, q: dict[str, str]) -> str | None: + h = request.headers.get("X-Control-Token") + if h: + return h + t = q.get("token") + return t if t else None + + +def _json_response(status: int, phrase: str, body: dict) -> Response: + return Response( + status, + phrase, + Headers({"Content-Type": "application/json", **CORS_HEADERS}), + json.dumps(body).encode(), + ) + + +def _make_http_handler(backend_host, backend_port, hub=None): async def handle_http_request(connection, request): if request.headers.get("Upgrade", "").lower() == "websocket": return None + if request.headers.get("Access-Control-Request-Method"): return Response( 200, @@ -122,19 +259,61 @@ async def handle_http_request(connection, request): Headers({"Content-Type": "text/plain", **CORS_HEADERS}), b"OK", ) + + path = _normalize_request_path(request.path or "/") + raw_path = request.path or "/" + q = _parse_query_params(raw_path) + + if hub is not None and _is_oob_hub_http_path(path): + token = _oob_token(request, q) + if path == "/api/oob/v1/state": + if not hub.check_token(token): + return _json_response( + 401, "Unauthorized", {"error": "Unauthorized"} + ) + snapshot = await hub.get_snapshot() + return Response( + 200, + "OK", + Headers({"Content-Type": "application/json", **CORS_HEADERS}), + json.dumps(snapshot).encode(), + ) + + if path == "/api/oob/v1/config": + if not hub.check_token(token): + return _json_response( + 401, "Unauthorized", {"error": "Unauthorized"} + ) + cfg, err = _stream_config_from_query(q) + if err: + return _json_response(400, "Bad Request", {"error": err}) + payload = { + "config": cfg, + "targetClientId": q.get("targetClientId"), + "token": token, + } + status, body = await hub.http_oob_set_config(payload) + phrase = { + 200: "OK", + 400: "Bad Request", + 401: "Unauthorized", + 404: "Not Found", + }.get(status, "Error") + return _json_response(status, phrase, body) + + if hub is None and _is_oob_hub_http_path(path): + return Response( + 404, + "Not Found", + Headers({"Content-Type": "text/plain", **CORS_HEADERS}), + b"Not found", + ) + return Response( 200, "OK", Headers({"Content-Type": "text/html; charset=utf-8", **CORS_HEADERS}), - b"" - b"" - b"

Certificate Accepted

" - b"

You can close this tab and return to the web client.

" - b"
", + _cert_html(), ) return handle_http_request @@ -156,6 +335,24 @@ def add_cors_headers(connection, request, response): } +def _is_backend_connection_refused(exc: BaseException) -> bool: + """True when ``ws_connect`` failed because nothing is listening (runtime not running).""" + if isinstance(exc, ConnectionRefusedError): + return True + if isinstance(exc, OSError) and exc.errno in ( + errno.ECONNREFUSED, + getattr(errno, "WSAECONNREFUSED", -1), + ): + return True + if isinstance(exc, OSError): + msg = str(exc).lower() + if "errno 61" in msg or "errno 111" in msg: + return True + if "connection refused" in msg: + return True + return False + + async def _pipe(src, dst, label: str): try: async for msg in src: @@ -204,6 +401,19 @@ async def proxy_handler(client, backend_host: str, backend_port: int): ping_timeout=None, close_timeout=10, ) + except OSError as exc: + if _is_backend_connection_refused(exc): + log.warning( + "No CloudXR runtime at ws://%s:%s (connection refused) for path %s — " + "expected when running WSS+hub without the runtime; teleop signaling uses %s.", + backend_host, + backend_port, + path, + OOB_WS_PATH, + ) + return + log.exception("Failed to connect to backend %s", backend_uri) + return except Exception: log.exception("Failed to connect to backend %s", backend_uri) return @@ -238,37 +448,61 @@ def default_cert_paths() -> CertPaths: async def run( - log_file_path: str | Path, + log_file_path: str | Path | None, stop_future: asyncio.Future, backend_host: str = "localhost", backend_port: int = 49100, - proxy_port: int = 48322, + proxy_port: int | None = None, + setup_oob: bool = False, ) -> None: logger = log logger.setLevel(logging.INFO) logger.propagate = False - file_handler = logging.FileHandler(log_file_path, mode="a", encoding="utf-8") - file_handler.setFormatter( - logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") - ) - logger.addHandler(file_handler) + _log_fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") + if log_file_path is not None: + _handler: logging.Handler = logging.FileHandler( + log_file_path, mode="a", encoding="utf-8" + ) + else: + _handler = logging.StreamHandler(sys.stderr) + _handler.setFormatter(_log_fmt) + logger.addHandler(_handler) try: + resolved_port = _wss_proxy_port() if proxy_port is None else proxy_port logging.getLogger("websockets").setLevel(logging.WARNING) cert_paths = default_cert_paths() ensure_certificate(cert_paths) ssl_ctx = build_ssl_context(cert_paths) + hub = None + if setup_oob: + from .oob_teleop_hub import OOBControlHub # noqa: PLC0415 + + control_token = os.environ.get("CONTROL_TOKEN") or None + initial = _default_initial_config(resolved_port) + hub = OOBControlHub(control_token=control_token, initial_config=initial) + log.info( + "Teleop control hub enabled (token=%s) OOB_WS=%s initial_stream=%s", + "set" if control_token else "none", + OOB_WS_PATH, + initial, + ) + def handler(ws): + if hub is not None: + path = _normalize_request_path(ws.request.path or "/") + if path == OOB_WS_PATH: + return hub.handle_connection(ws) return proxy_handler(ws, backend_host, backend_port) - http_handler = _make_http_handler(backend_host, backend_port) + http_handler = _make_http_handler(backend_host, backend_port, hub=hub) async with ws_serve( handler, host="", - port=proxy_port, + port=resolved_port, ssl=ssl_ctx, process_request=http_handler, process_response=add_cors_headers, @@ -278,16 +512,16 @@ def handler(ws): ping_timeout=None, close_timeout=10, ): - log.info("WSS proxy listening on port %d", proxy_port) + log.info("WSS proxy listening on port %d", resolved_port) await stop_future log.info("Shutting down ...") except OSError as e: if e.errno == errno.EADDRINUSE: raise RuntimeError( - f"WSS proxy port {proxy_port} is already in use. " - f"Set PROXY_PORT to a different port or stop the process using {proxy_port}." + f"WSS proxy port {resolved_port} is already in use. " + f"Set PROXY_PORT to a different port or stop the process using {resolved_port}." ) from e raise finally: - logger.removeHandler(file_handler) - file_handler.close() + logger.removeHandler(_handler) + _handler.close() diff --git a/src/core/cloudxr_tests/python/conftest.py b/src/core/cloudxr_tests/python/conftest.py new file mode 100644 index 000000000..43f6fe574 --- /dev/null +++ b/src/core/cloudxr_tests/python/conftest.py @@ -0,0 +1,13 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Make CloudXR python sources importable without installing ``isaacteleop``.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +_CLOUDXR_PY = Path(__file__).resolve().parents[2] / "cloudxr" / "python" +if _CLOUDXR_PY.is_dir() and str(_CLOUDXR_PY) not in sys.path: + sys.path.insert(0, str(_CLOUDXR_PY)) diff --git a/src/core/cloudxr_tests/python/pyproject.toml b/src/core/cloudxr_tests/python/pyproject.toml index 3850a5360..006c5e8c2 100644 --- a/src/core/cloudxr_tests/python/pyproject.toml +++ b/src/core/cloudxr_tests/python/pyproject.toml @@ -10,10 +10,12 @@ requires-python = ">=3.10,<3.14" [project.optional-dependencies] dev = [ "pytest", + "pytest-asyncio>=1.3.0", "numpy", ] test = [ "pytest", + "pytest-asyncio>=1.3.0", "numpy", ] @@ -22,5 +24,6 @@ pythonpath = ["."] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] +asyncio_mode = "auto" # Prevent pytest from recursing into parent directories norecursedirs = [".git", ".venv", "build", "dist", "*.egg", "__pycache__"] diff --git a/src/core/cloudxr_tests/python/test_oob_teleop_hub.py b/src/core/cloudxr_tests/python/test_oob_teleop_hub.py new file mode 100644 index 000000000..fa989b559 --- /dev/null +++ b/src/core/cloudxr_tests/python/test_oob_teleop_hub.py @@ -0,0 +1,238 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Async unit tests for :mod:`oob_teleop_hub.OOBControlHub`. + +Run from this directory (after ``pip install pytest``):: + + pytest -q + +No CloudXR runtime, TLS, or ``isaacteleop`` install required — ``conftest.py`` adds +``src/core/cloudxr/python`` to ``sys.path``. +""" + +from __future__ import annotations + +import asyncio +import json +from typing import Any + +import pytest + +from oob_teleop_hub import OOBControlHub + + +class QueueWS: + """Minimal async-iterable WebSocket stand-in for :meth:`OOBControlHub.handle_connection`.""" + + def __init__(self) -> None: + self._q: asyncio.Queue[str | None] = asyncio.Queue() + self.sent: list[str] = [] + self.close_calls: list[tuple[Any, ...]] = [] + + async def inject(self, message: str) -> None: + await self._q.put(message) + + async def end_stream(self) -> None: + await self._q.put(None) + + def __aiter__(self) -> QueueWS: + return self + + async def __anext__(self) -> str: + item = await self._q.get() + if item is None: + raise StopAsyncIteration + return item + + async def send(self, data: str) -> None: + self.sent.append(data) + + async def close(self, *_args: Any, **_kwargs: Any) -> None: + self.close_calls.append(_args) + + +def _loads_sent(ws: QueueWS) -> list[dict]: + return [json.loads(s) for s in ws.sent] + + +def test_check_token_no_requirement() -> None: + hub = OOBControlHub(control_token=None) + assert hub.check_token(None) is True + assert hub.check_token("anything") is True + + +def test_check_token_required() -> None: + hub = OOBControlHub(control_token="secret") + assert hub.check_token(None) is False + assert hub.check_token("wrong") is False + assert hub.check_token("secret") is True + + +@pytest.mark.asyncio +async def test_get_snapshot_empty() -> None: + hub = OOBControlHub() + snap = await hub.get_snapshot() + assert snap["configVersion"] == 0 + assert snap["config"] == {} + assert snap["headsets"] == [] + assert "updatedAt" in snap + + +@pytest.mark.asyncio +async def test_headset_register_hello_and_snapshot() -> None: + hub = OOBControlHub(initial_config={"serverIP": "1.2.3.4", "port": 1111}) + ws = QueueWS() + task = asyncio.create_task(hub.handle_connection(ws)) + + await ws.inject( + json.dumps( + {"type": "register", "payload": {"role": "headset", "deviceLabel": "Q3"}} + ) + ) + await asyncio.sleep(0) + hello = json.loads(ws.sent[0]) + assert hello["type"] == "hello" + assert hello["payload"]["config"]["serverIP"] == "1.2.3.4" + assert hello["payload"]["config"]["port"] == 1111 + hid = hello["payload"]["clientId"] + + snap = await hub.get_snapshot() + assert len(snap["headsets"]) == 1 + assert snap["headsets"][0]["clientId"] == hid + assert snap["headsets"][0]["deviceLabel"] == "Q3" + + await ws.end_stream() + await task + + +@pytest.mark.asyncio +async def test_register_rejects_bad_token() -> None: + hub = OOBControlHub(control_token="ok") + ws = QueueWS() + task = asyncio.create_task(hub.handle_connection(ws)) + + await ws.inject( + json.dumps({"type": "register", "payload": {"role": "headset", "token": "bad"}}) + ) + await asyncio.sleep(0) + err = json.loads(ws.sent[0]) + assert err["type"] == "error" + assert err["payload"]["code"] == "UNAUTHORIZED" + assert ws.close_calls + + await ws.end_stream() + await task + + +@pytest.mark.asyncio +async def test_register_rejects_non_headset_role() -> None: + hub = OOBControlHub() + ws = QueueWS() + task = asyncio.create_task(hub.handle_connection(ws)) + + await ws.inject(json.dumps({"type": "register", "payload": {"role": "dashboard"}})) + await asyncio.sleep(0) + err = json.loads(ws.sent[0]) + assert err["type"] == "error" + assert err["payload"]["code"] == "BAD_REQUEST" + + await ws.end_stream() + await task + + +@pytest.mark.asyncio +async def test_first_message_must_be_register() -> None: + hub = OOBControlHub() + ws = QueueWS() + task = asyncio.create_task(hub.handle_connection(ws)) + + await ws.inject(json.dumps({"type": "clientMetrics", "payload": {}})) + await asyncio.sleep(0) + err = json.loads(ws.sent[0]) + assert err["payload"]["code"] == "BAD_REQUEST" + + await ws.end_stream() + await task + + +@pytest.mark.asyncio +async def test_client_metrics_stored_in_snapshot() -> None: + hub = OOBControlHub() + ws = QueueWS() + task = asyncio.create_task(hub.handle_connection(ws)) + + await ws.inject(json.dumps({"type": "register", "payload": {"role": "headset"}})) + await asyncio.sleep(0) + await ws.inject( + json.dumps( + { + "type": "clientMetrics", + "payload": { + "t": 12345000, + "cadence": "frame", + "metrics": {"StreamingFramerate": 72.5}, + }, + } + ) + ) + await asyncio.sleep(0) + + snap = await hub.get_snapshot() + m = snap["headsets"][0]["metricsByCadence"]["frame"] + assert m["at"] == 12345000 + assert m["metrics"]["StreamingFramerate"] == 72.5 + + await ws.end_stream() + await task + + +@pytest.mark.asyncio +async def test_unknown_message_type_returns_error() -> None: + hub = OOBControlHub() + ws = QueueWS() + task = asyncio.create_task(hub.handle_connection(ws)) + + await ws.inject(json.dumps({"type": "register", "payload": {"role": "headset"}})) + await asyncio.sleep(0) + ws.sent.clear() + + await ws.inject(json.dumps({"type": "bogus", "payload": {}})) + await asyncio.sleep(0) + errs = [m for m in _loads_sent(ws) if m["type"] == "error"] + assert errs and errs[0]["payload"]["code"] == "BAD_REQUEST" + + await ws.end_stream() + await task + + +@pytest.mark.asyncio +async def test_http_oob_set_config_noop_returns_changed_false() -> None: + hub = OOBControlHub(initial_config={"serverIP": "10.0.0.1", "port": 9000}) + status, body = await hub.http_oob_set_config( + {"config": {"serverIP": "10.0.0.1", "port": 9000}, "token": None} + ) + assert status == 200 + assert body.get("changed") is False + + +@pytest.mark.asyncio +async def test_http_oob_set_config_updates_and_pushes() -> None: + hub = OOBControlHub(initial_config={"serverIP": "127.0.0.1", "port": 49100}) + hw = QueueWS() + th = asyncio.create_task(hub.handle_connection(hw)) + await hw.inject(json.dumps({"type": "register", "payload": {"role": "headset"}})) + await asyncio.sleep(0) + hw.sent.clear() + + status, body = await hub.http_oob_set_config( + {"config": {"serverIP": "10.0.0.2"}, "token": None} + ) + assert status == 200 + assert body.get("changed") is True + cfg_msgs = [m for m in _loads_sent(hw) if m["type"] == "config"] + assert len(cfg_msgs) == 1 + assert cfg_msgs[0]["payload"]["config"]["serverIP"] == "10.0.0.2" + + await hw.end_stream() + await th