From dff8f494d39b8cc06a9852c5895476bf1caed72c Mon Sep 17 00:00:00 2001 From: Yanzi Zhu Date: Tue, 24 Mar 2026 13:48:13 -0700 Subject: [PATCH] Adds a hub for operator to run on desktop --- .../webxr_client/helpers/controlChannel.ts | 230 ++++++ deps/cloudxr/webxr_client/src/App.tsx | 197 ++++- deps/cloudxr/webxr_client/src/CloudXR2DUI.tsx | 188 ++++- docs/source/index.rst | 1 + docs/source/references/oob_teleop_control.rst | 744 ++++++++++++++++++ src/core/cloudxr/python/CMakeLists.txt | 16 + src/core/cloudxr/python/__main__.py | 172 +++- src/core/cloudxr/python/copy_if_exists.cmake | 12 + src/core/cloudxr/python/oob_teleop_adb.py | 367 +++++++++ src/core/cloudxr/python/oob_teleop_env.py | 273 +++++++ src/core/cloudxr/python/oob_teleop_hub.py | 372 +++++++++ src/core/cloudxr/python/udp_tcp_relay.py | 183 +++++ src/core/cloudxr/python/wss.py | 406 +++++++++- src/core/cloudxr/udprelay/go.mod | 3 + src/core/cloudxr/udprelay/main.go | 122 +++ src/core/cloudxr_tests/python/conftest.py | 48 ++ src/core/cloudxr_tests/python/pyproject.toml | 3 + .../python/test_oob_teleop_adb.py | 385 +++++++++ .../python/test_oob_teleop_env.py | 264 +++++++ .../python/test_oob_teleop_hub.py | 373 +++++++++ 20 files changed, 4259 insertions(+), 100 deletions(-) create mode 100644 deps/cloudxr/webxr_client/helpers/controlChannel.ts create mode 100644 docs/source/references/oob_teleop_control.rst create mode 100644 src/core/cloudxr/python/copy_if_exists.cmake create mode 100644 src/core/cloudxr/python/oob_teleop_adb.py create mode 100644 src/core/cloudxr/python/oob_teleop_env.py create mode 100644 src/core/cloudxr/python/oob_teleop_hub.py create mode 100644 src/core/cloudxr/python/udp_tcp_relay.py create mode 100644 src/core/cloudxr/udprelay/go.mod create mode 100644 src/core/cloudxr/udprelay/main.go create mode 100644 src/core/cloudxr_tests/python/conftest.py create mode 100644 src/core/cloudxr_tests/python/test_oob_teleop_adb.py create mode 100644 src/core/cloudxr_tests/python/test_oob_teleop_env.py create mode 100644 src/core/cloudxr_tests/python/test_oob_teleop_hub.py diff --git a/deps/cloudxr/webxr_client/helpers/controlChannel.ts b/deps/cloudxr/webxr_client/helpers/controlChannel.ts new file mode 100644 index 000000000..311a632a8 --- /dev/null +++ b/deps/cloudxr/webxr_client/helpers/controlChannel.ts @@ -0,0 +1,230 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * HeadsetControlChannel — WebSocket client that connects the XR headset to the + * teleop control hub running in the WSS proxy. + * + * Protocol: docs/source/references/oob_teleop_control.rst (Sphinx build) + * Hub WS URL: ``wss://:/oob/v1/ws`` when the page URL includes ``oobEnable=1`` and + * valid ``serverIP`` / ``port`` query parameters (see App.tsx). No connection is made without them. + * + * Usage (in App.tsx): + * + * const channel = new HeadsetControlChannel({ + * url: 'wss://host:48322/oob/v1/ws', + * onConfig: (config, version) => { ... }, + * getMetricsSnapshot: () => [ { cadence: 'frame', metrics: { ... } } ], + * }); + * channel.connect(); + * // on cleanup: + * channel.dispose(); + */ + +/** + * Fields the hub merges into ``config`` on ``hello`` / ``config`` pushes. + * + * **Streaming target:** ``serverIP``, ``port``, ``proxyUrl``, ``mediaAddress``, ``mediaPort``. + * + * **Client UI (allowlist):** keys match HTML form element **ids** on the Teleop page—only these may be + * set remotely today (see ``CloudXR2DUI``). The hub stores arbitrary top-level keys, but the web + * client only applies this known set so coercion and validation stay explicit. + */ +export interface StreamConfig { + serverIP?: string; + port?: number; + proxyUrl?: string | null; + mediaAddress?: string; + mediaPort?: number; + /** Form id ``panelHiddenAtStart``: hide the in-XR control panel when the session starts. */ + panelHiddenAtStart?: boolean; + /** Form id ``codec``: ``h264`` | ``h265`` | ``av1`` when supported. */ + codec?: string; + /** Form id ``perEyeWidth``. */ + perEyeWidth?: number; + /** Form id ``perEyeHeight``. */ + perEyeHeight?: number; +} + +export interface MetricsSnapshot { + cadence: string; + metrics: Record; +} + +export interface ControlChannelOptions { + /** Full WSS URL of the hub, e.g. wss://host:48322/oob/v1/ws */ + url: string; + /** Sent in the register message. Must match CONTROL_TOKEN env var if set. */ + token?: string; + /** Human-readable label in hub snapshots (optional). */ + deviceLabel?: string; + /** + * Called on hello (initial config) and on config push. + * Apply the config to the CloudXR connection settings before connect. + */ + onConfig: (config: StreamConfig, configVersion: number) => void; + /** Called when the WebSocket connection state changes. */ + onConnectionChange?: (connected: boolean) => void; + /** + * Optional: called periodically to get the latest metrics to report. + * Return an empty array or null/undefined to skip a tick. + */ + getMetricsSnapshot?: () => MetricsSnapshot[] | null | undefined; + /** How often to report metrics (ms). Default: 500. */ + metricsIntervalMs?: number; +} + +const RECONNECT_DELAY_MS = 3000; +const DEFAULT_METRICS_INTERVAL_MS = 500; + +export class HeadsetControlChannel { + private ws: WebSocket | null = null; + private disposed = false; + private metricsTimer: ReturnType | null = null; + private reconnectTimer: ReturnType | null = null; + + constructor(private readonly opts: ControlChannelOptions) {} + + /** Open the WebSocket and start the reconnection loop. */ + connect(): void { + if (this.disposed) return; + this._openWebSocket(); + } + + /** Close the channel permanently. Safe to call multiple times. */ + dispose(): void { + this.disposed = true; + this._clearTimers(); + if (this.ws) { + this.ws.onclose = null; // prevent reconnect on this close + this.ws.close(); + this.ws = null; + } + } + + // --------------------------------------------------------------------------- + // Private + // --------------------------------------------------------------------------- + + private _openWebSocket(): void { + if (this.disposed) return; + + let ws: WebSocket; + try { + ws = new WebSocket(this.opts.url); + } catch (err) { + if (this.disposed) return; + console.warn( + '[ControlChannel] WebSocket constructor failed for', + this.opts.url, + err + ); + this.ws = null; + this._afterSocketClosed(); + return; + } + + this.ws = ws; + + ws.onopen = () => { + ws.send( + JSON.stringify({ + type: 'register', + payload: { + role: 'headset', + ...(this.opts.token ? { token: this.opts.token } : {}), + ...(this.opts.deviceLabel ? { deviceLabel: this.opts.deviceLabel } : {}), + }, + }) + ); + this.opts.onConnectionChange?.(true); + this._startMetricsTimer(); + }; + + ws.onmessage = (ev) => { + if (typeof ev.data !== 'string') return; + let msg: { type?: string; payload?: unknown }; + try { + msg = JSON.parse(ev.data); + } catch { + return; + } + this._handleMessage(msg); + }; + + ws.onclose = () => { + this.ws = null; + this._afterSocketClosed(); + }; + + ws.onerror = () => { + // onclose fires next; reconnect logic lives there + }; + } + + /** Clear timers, notify disconnected, schedule reconnect (same path as WebSocket onclose). */ + private _afterSocketClosed(): void { + this._clearTimers(); + this.opts.onConnectionChange?.(false); + if (!this.disposed) { + this.reconnectTimer = setTimeout(() => this._openWebSocket(), RECONNECT_DELAY_MS); + } + } + + private _handleMessage(msg: { type?: string; payload?: unknown }): void { + const type = msg.type; + const payload = (msg.payload ?? {}) as Record; + + if (type === 'hello') { + // hello to headset includes initial config + if ( + payload.config != null && + typeof payload.configVersion === 'number' + ) { + this.opts.onConfig(payload.config as StreamConfig, payload.configVersion as number); + } + } else if (type === 'config') { + if ( + payload.config != null && + typeof payload.configVersion === 'number' + ) { + this.opts.onConfig(payload.config as StreamConfig, payload.configVersion as number); + } + } else if (type === 'error') { + console.warn('[ControlChannel] Hub error:', payload); + } + } + + private _startMetricsTimer(): void { + if (!this.opts.getMetricsSnapshot) return; + const interval = this.opts.metricsIntervalMs ?? DEFAULT_METRICS_INTERVAL_MS; + this.metricsTimer = setInterval(() => { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + const snapshots = this.opts.getMetricsSnapshot?.(); + if (!snapshots || snapshots.length === 0) return; + const t = Date.now(); + for (const { cadence, metrics } of snapshots) { + if (Object.keys(metrics).length === 0) continue; + this.ws.send( + JSON.stringify({ + type: 'clientMetrics', + payload: { t, cadence, metrics }, + }) + ); + } + }, interval); + } + + private _clearTimers(): void { + if (this.metricsTimer !== null) { + clearInterval(this.metricsTimer); + this.metricsTimer = null; + } + if (this.reconnectTimer !== null) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + } +} diff --git a/deps/cloudxr/webxr_client/src/App.tsx b/deps/cloudxr/webxr_client/src/App.tsx index 98d121b42..2d620bd59 100644 --- a/deps/cloudxr/webxr_client/src/App.tsx +++ b/deps/cloudxr/webxr_client/src/App.tsx @@ -48,6 +48,7 @@ import { useState, useMemo, useEffect, useRef } from 'react'; import { CloudXR2DUI } from './CloudXR2DUI'; import CloudXR3DUI from './CloudXRUI'; +import { HeadsetControlChannel, type StreamConfig } from '@helpers/controlChannel'; // Performance metrics signals - raw numeric data, one per callback cadence. // Signals update their value without triggering React re-renders. @@ -87,6 +88,65 @@ const START_TELEOP_COMMAND = { }, } as const; +/** Teleop ``StreamConfig`` fields from the page URL; hub ``hello`` / ``config`` overrides when pushed. */ +function streamConfigFromUrlSearchParams(searchParams: URLSearchParams): StreamConfig { + const out: StreamConfig = {}; + const sip = searchParams.get('serverIP'); + if (sip) out.serverIP = sip; + const portStr = searchParams.get('port'); + if (portStr !== null && portStr !== '') { + const n = Number(portStr); + if (!Number.isNaN(n)) out.port = n; + } + if (searchParams.has('proxyUrl')) { + const u = searchParams.get('proxyUrl'); + out.proxyUrl = u === '' || u === null ? '' : u; + } + const ma = searchParams.get('mediaAddress'); + if (ma) out.mediaAddress = ma; + const mp = searchParams.get('mediaPort'); + if (mp !== null && mp !== '') { + const n = Number(mp); + if (!Number.isNaN(n)) out.mediaPort = n; + } + const ph = searchParams.get('panelHiddenAtStart'); + if (ph !== null && ph !== '') { + const s = ph.trim().toLowerCase(); + if (s === '1' || s === 'true') out.panelHiddenAtStart = true; + else if (s === '0' || s === 'false') out.panelHiddenAtStart = false; + } + const codec = searchParams.get('codec')?.trim(); + if (codec) out.codec = codec; + const pew = searchParams.get('perEyeWidth'); + if (pew !== null && pew !== '') { + const n = Number(pew); + if (!Number.isNaN(n)) out.perEyeWidth = n; + } + const peh = searchParams.get('perEyeHeight'); + if (peh !== null && peh !== '') { + const n = Number(peh); + if (!Number.isNaN(n)) out.perEyeHeight = n; + } + return out; +} + +/** When set with ``serverIP`` + ``port``, WebXR builds ``wss://{serverIP}:{port}/oob/v1/ws``. */ +function isOobEnabled(searchParams: URLSearchParams): boolean { + const v = searchParams.get('oobEnable'); + return v === '1' || v?.toLowerCase() === 'true'; +} + +function buildOobHubWsUrlFromQuery(searchParams: URLSearchParams): string | null { + if (!isOobEnabled(searchParams)) return null; + const serverIP = searchParams.get('serverIP')?.trim(); + const portStr = searchParams.get('port')?.trim(); + if (!serverIP || portStr === undefined || portStr === '') return null; + if (!/^\d{1,5}$/.test(portStr)) return null; + const host = + serverIP.includes(':') && !serverIP.startsWith('[') ? `[${serverIP}]` : serverIP; + return `wss://${host}:${portStr}/oob/v1/ws`; +} + function App() { const COUNTDOWN_MAX_SECONDS = 9; const COUNTDOWN_STORAGE_KEY = 'cxr.react.countdownSeconds'; @@ -287,53 +347,54 @@ function App() { // Initialize CloudXR2DUI useEffect(() => { - // Create and initialize the 2D UI manager + // Create and initialize the 2D UI manager. + // URL query seeds are applied synchronously during initialize() so they + // take priority over any previously-stored localStorage values. const ui = new CloudXR2DUI(() => { - // Callback when configuration changes setConfigVersion(v => v + 1); }); - ui.initialize(); - ui.setupConnectButtonHandler( - async () => { - const config = ui.getConfiguration(); - const resolutionError = getResolutionValidationError( - config.perEyeWidth, - config.perEyeHeight - ); - if (resolutionError) { - ui.updateConnectButtonState(); - return; + const urlSeeds = streamConfigFromUrlSearchParams(new URLSearchParams(window.location.search)); + ui.initialize(urlSeeds); + const doConnect = async () => { + const config = ui.getConfiguration(); + const resolutionError = getResolutionValidationError( + config.perEyeWidth, + config.perEyeHeight + ); + if (resolutionError) { + ui.updateConnectButtonState(); + return; + } + // Start XR session + if (config.immersiveMode === 'ar') { + await store.enterAR(); + } else if (config.immersiveMode === 'vr') { + await store.enterVR(); + } else { + setErrorMessage('Unrecognized immersive mode'); + } + store.setFrameRate((supportedFrameRates: ArrayLike): number | false => { + let frameRate = ui.getConfiguration().deviceFrameRate; + let found = false; + for (let i = 0; i < supportedFrameRates.length; ++i) { + if (supportedFrameRates[i] === frameRate) { + found = true; + break; + } } - // Start XR session - if (config.immersiveMode === 'ar') { - await store.enterAR(); - } else if (config.immersiveMode === 'vr') { - await store.enterVR(); + if (found) { + console.info('Changed frame rate to', frameRate); + return frameRate; } else { - setErrorMessage('Unrecognized immersive mode'); + console.error('Failed to change frame rate to', frameRate); + return false; } - store.setFrameRate((supportedFrameRates: ArrayLike): number | false => { - let frameRate = ui.getConfiguration().deviceFrameRate; - let found = false; - for (let i = 0; i < supportedFrameRates.length; ++i) { - if (supportedFrameRates[i] === frameRate) { - found = true; - break; - } - } - if (found) { - console.info('Changed frame rate to', frameRate); - return frameRate; - } else { - console.error('Failed to change frame rate to', frameRate); - return false; - } - }); - }, - (error: Error) => { - setErrorMessage(`Failed to start XR session: ${error}`); - } - ); + }); + }; + + ui.setupConnectButtonHandler(doConnect, (error: Error) => { + setErrorMessage(`Failed to start XR session: ${error}`); + }); setCloudXR2DUI(ui); @@ -616,6 +677,60 @@ function App() { } }; + // URL query seeds are applied synchronously in initialize() above. + // This effect is a safety fallback in case the UI is re-created (e.g. store change). + useEffect(() => { + if (!cloudXR2DUI) return; + const seeds = streamConfigFromUrlSearchParams(new URLSearchParams(window.location.search)); + if (Object.keys(seeds).length > 0) { + cloudXR2DUI.setStreamConfig(seeds); + } + }, [cloudXR2DUI]); + + // OOB WebSocket: only when oobEnable=1 and query has valid serverIP + port → wss://{serverIP}:{port}/oob/v1/ws. + useEffect(() => { + if (!cloudXR2DUI) return; + const p = new URLSearchParams(window.location.search); + const hubWsUrl = buildOobHubWsUrlFromQuery(p); + if (!hubWsUrl) { + return; + } + + console.info('[Teleop] OOB control WebSocket:', hubWsUrl); + + const channel = new HeadsetControlChannel({ + url: hubWsUrl, + token: p.get('controlToken') ?? undefined, + onConfig: (config) => { + cloudXR2DUI.setStreamConfig(config); + }, + getMetricsSnapshot: () => { + const snapshots: Array<{ cadence: string; metrics: Record }> = []; + const rm = renderMetrics.value; + if (rm) { + snapshots.push({ + cadence: 'render', + metrics: { [CloudXR.MetricsName.RenderFramerate]: rm.fps }, + }); + } + const sm = streamingMetrics.value; + if (sm) { + snapshots.push({ + cadence: 'frame', + metrics: { + [CloudXR.MetricsName.StreamingFramerate]: sm.fps, + [CloudXR.MetricsName.PoseToRenderTime]: sm.latencyMs, + }, + }); + } + return snapshots; + }, + }); + channel.connect(); + + return () => { channel.dispose(); }; + }, [cloudXR2DUI]); + // Countdown configuration handlers (0-5 seconds) const handleIncreaseCountdown = () => { if (isCountingDown) return; diff --git a/deps/cloudxr/webxr_client/src/CloudXR2DUI.tsx b/deps/cloudxr/webxr_client/src/CloudXR2DUI.tsx index 283f3a049..b233dddac 100644 --- a/deps/cloudxr/webxr_client/src/CloudXR2DUI.tsx +++ b/deps/cloudxr/webxr_client/src/CloudXR2DUI.tsx @@ -41,6 +41,7 @@ import { parseControlPanelPosition, ReactUIConfig, } from '@helpers/react/utils'; +import type { StreamConfig } from '@helpers/controlChannel'; import { CloudXRConfig, enableLocalStorage, @@ -61,6 +62,21 @@ import { /** Full config: CloudXR connection settings + React UI options. */ type AppConfig = CloudXRConfig & ReactUIConfig; +/** + * Form element ids the teleop hub / bookmark URL may set. Keys match ``StreamConfig`` and JSON + * ``config`` from the hub. To add a field: extend the allowlist, add coercion in + * ``applyRemoteUiFieldsFromHub``, and wire Python ``_stream_config_from_query`` / bookmark encoding. + * Arbitrary id→value maps from the network are intentionally not supported. + */ +const HUB_CLIENT_UI_FIELDS = [ + 'panelHiddenAtStart', + 'codec', + 'perEyeWidth', + 'perEyeHeight', +] as const; + +type HubClientUiKey = (typeof HUB_CLIENT_UI_FIELDS)[number]; + /** * 2D UI Management for CloudXR React Example * Handles the main user interface for CloudXR streaming, including form management, @@ -173,9 +189,11 @@ export class CloudXR2DUI { } /** - * Initializes the CloudXR2DUI with all necessary components and event handlers + * Initializes the CloudXR2DUI with all necessary components and event handlers. + * @param urlSeeds - Optional stream config parsed from URL query parameters. + * Applied after localStorage restore so URL params always take priority. */ - public initialize(): void { + public initialize(urlSeeds?: StreamConfig): void { if (this.initialized) { return; } @@ -183,6 +201,9 @@ export class CloudXR2DUI { try { this.initializeElements(); this.setupLocalStorage(); + if (urlSeeds && Object.keys(urlSeeds).length > 0) { + this.applyUrlSeeds(urlSeeds); + } this.setupProxyConfiguration(); this.setupEventListeners(); // Set initial display value @@ -337,6 +358,59 @@ export class CloudXR2DUI { enableLocalStorage(this.controllerModelVisibilitySelect, 'controllerModelVisibility'); } + /** + * Override form inputs with URL query seeds so they take priority over localStorage. + * Only touches keys that are actually present in {@link seeds}. + */ + private applyUrlSeeds(seeds: StreamConfig): void { + if (seeds.serverIP !== undefined) { + this.serverIpInput.value = seeds.serverIP; + try { localStorage.setItem('serverIp', seeds.serverIP); } catch (_) {} + } + if (seeds.port !== undefined) { + this.portInput.value = String(seeds.port); + try { localStorage.setItem('port', String(seeds.port)); } catch (_) {} + } + if ('proxyUrl' in seeds) { + const url = seeds.proxyUrl ?? ''; + this.proxyUrlInput.value = url; + try { localStorage.setItem('proxyUrl', url); } catch (_) {} + } + if (seeds.mediaAddress !== undefined) { + this.mediaAddressInput.value = seeds.mediaAddress; + try { localStorage.setItem('mediaAddress', seeds.mediaAddress); } catch (_) {} + } + if (seeds.mediaPort !== undefined) { + this.mediaPortInput.value = String(seeds.mediaPort); + try { localStorage.setItem('mediaPort', String(seeds.mediaPort)); } catch (_) {} + } + if (seeds.panelHiddenAtStart !== undefined) { + this.panelHiddenAtStartSelect.value = this._remoteTruthy(seeds.panelHiddenAtStart) + ? 'true' + : 'false'; + try { localStorage.setItem('panelHiddenAtStart', this.panelHiddenAtStartSelect.value); } catch (_) {} + } + if (seeds.codec !== undefined && typeof seeds.codec === 'string' && seeds.codec !== '') { + setSelectValueIfAvailable(this.codecSelect, seeds.codec); + try { localStorage.setItem('codec', this.codecSelect.value); } catch (_) {} + } + if (seeds.perEyeWidth !== undefined) { + const n = Number(seeds.perEyeWidth); + if (Number.isFinite(n)) { + this.perEyeWidthInput.value = String(Math.round(n)); + try { localStorage.setItem('perEyeWidth', this.perEyeWidthInput.value); } catch (_) {} + } + } + if (seeds.perEyeHeight !== undefined) { + const n = Number(seeds.perEyeHeight); + if (Number.isFinite(n)) { + this.perEyeHeightInput.value = String(Math.round(n)); + try { localStorage.setItem('perEyeHeight', this.perEyeHeightInput.value); } catch (_) {} + } + } + console.info('[CloudXR2DUI] URL query params applied (override localStorage):', seeds); + } + /** * Configures proxy settings based on the current protocol (HTTP/HTTPS) * Sets appropriate placeholders and help text for port and proxy URL inputs @@ -720,6 +794,116 @@ export class CloudXR2DUI { return { ...this.currentConfiguration }; } + private _remoteTruthy(v: unknown): boolean { + if (v === true || v === 1) return true; + if (typeof v === 'string') { + const s = v.trim().toLowerCase(); + return s === '1' || s === 'true' || s === 'yes' || s === 'on'; + } + return false; + } + + /** + * Applies allowlisted client UI fields from hub ``config`` / bookmark query (see + * ``HUB_CLIENT_UI_FIELDS``). + */ + private applyRemoteUiFieldsFromHub( + config: Partial> + ): void { + let touchedCodecOrResolution = false; + + if (config.panelHiddenAtStart !== undefined) { + this.panelHiddenAtStartSelect.value = this._remoteTruthy(config.panelHiddenAtStart) + ? 'true' + : 'false'; + try { + localStorage.setItem('panelHiddenAtStart', this.panelHiddenAtStartSelect.value); + } catch (_) {} + } + if (config.codec !== undefined && typeof config.codec === 'string' && config.codec !== '') { + setSelectValueIfAvailable(this.codecSelect, config.codec); + try { + localStorage.setItem('codec', this.codecSelect.value); + } catch (_) {} + touchedCodecOrResolution = true; + } + if (config.perEyeWidth !== undefined) { + const n = Number(config.perEyeWidth); + if (Number.isFinite(n)) { + this.perEyeWidthInput.value = String(Math.round(n)); + try { + localStorage.setItem('perEyeWidth', this.perEyeWidthInput.value); + } catch (_) {} + touchedCodecOrResolution = true; + } + } + if (config.perEyeHeight !== undefined) { + const n = Number(config.perEyeHeight); + if (Number.isFinite(n)) { + this.perEyeHeightInput.value = String(Math.round(n)); + try { + localStorage.setItem('perEyeHeight', this.perEyeHeightInput.value); + } catch (_) {} + touchedCodecOrResolution = true; + } + } + if (touchedCodecOrResolution) { + this.setProfileToCustomIfNeeded(); + this.persistProfileFieldsToLocalStorage(); + } + this.updateResolutionValidationMessage(); + this.updateGridValidationMessage(); + this.updateConnectButtonState(); + this.updateConfiguration(); + } + + /** + * Applies a partial stream / teleop config pushed from the hub or URL seeds. + * Updates matching form inputs, persists where appropriate, and notifies the callback. + * + * Streaming target fields and allowlisted client UI fields (``HUB_CLIENT_UI_FIELDS``) are applied; + * other hub keys are ignored by this client. + */ + public setStreamConfig(config: StreamConfig): void { + if (config.serverIP !== undefined) { + this.serverIpInput.value = config.serverIP; + try { localStorage.setItem('serverIp', config.serverIP); } catch (_) {} + } + if (config.port !== undefined) { + this.portInput.value = String(config.port); + try { localStorage.setItem('port', String(config.port)); } catch (_) {} + } + if ('proxyUrl' in config) { + const url = config.proxyUrl ?? ''; + this.proxyUrlInput.value = url; + try { localStorage.setItem('proxyUrl', url); } catch (_) {} + } + if (config.mediaAddress !== undefined) { + this.mediaAddressInput.value = config.mediaAddress; + try { localStorage.setItem('mediaAddress', config.mediaAddress); } catch (_) {} + } + if (config.mediaPort !== undefined) { + this.mediaPortInput.value = String(config.mediaPort); + try { localStorage.setItem('mediaPort', String(config.mediaPort)); } catch (_) {} + } + + const hasClientUi = + config.panelHiddenAtStart !== undefined || + config.codec !== undefined || + config.perEyeWidth !== undefined || + config.perEyeHeight !== undefined; + if (hasClientUi) { + this.applyRemoteUiFieldsFromHub({ + panelHiddenAtStart: config.panelHiddenAtStart, + codec: config.codec, + perEyeWidth: config.perEyeWidth, + perEyeHeight: config.perEyeHeight, + }); + } else { + this.updateConfiguration(); + } + } + /** * Sets the start button state * @param disabled - Whether the button should be disabled diff --git a/docs/source/index.rst b/docs/source/index.rst index 390fcb9e7..49bffce56 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -60,6 +60,7 @@ Table of Contents references/build device/index references/retargeting + references/oob_teleop_control references/license Indices and tables diff --git a/docs/source/references/oob_teleop_control.rst b/docs/source/references/oob_teleop_control.rst new file mode 100644 index 000000000..c899c101c --- /dev/null +++ b/docs/source/references/oob_teleop_control.rst @@ -0,0 +1,744 @@ +.. SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +.. SPDX-License-Identifier: Apache-2.0 + +Out-of-band teleop control +========================== + +This guide explains how to **coordinate Isaac Teleop from outside the headset**—change where the headset +streams from, read status, and (for advanced setups) push settings over HTTP—using the same TLS port as +the CloudXR proxy when you enable the feature. + +**Primary audience:** **operators** setting up a lab or demo. **Integrators** (automation, custom +dashboards, metrics) should read :ref:`teleop-oob-integrators` after the operator workflow. + +**Supported browsers and devices:** Use an **Android-based XR headset** with a **Chromium-based** browser +for WebXR. On the **PC**, use a **Chromium-based** browser for **remote inspect**—for example +**``chrome://inspect``** in Google Chrome or **``edge://inspect``** in Microsoft Edge. Other browsers or +non-Android XR stacks are outside the scope of this guide. + +**Supported operator flow:** start with **``--setup-oob``** so the bookmark opens on the headset → on +the **PC**, open **remote inspect** on that tab → click **CONNECT** in DevTools (WebXR user gesture). +There is **no** ``autoConnect`` query flag and no hub-driven session start. + +**What this is not:** it does not replace CloudXR streaming. Video, audio, and poses still use the +normal streaming path. The **control plane** (this document) coordinates **which server** the headset should use; the usual +**when to connect** path is the in-page **CONNECT** control (or refresh / disconnect in the UI), not +an HTTP trigger. **Signaling is TLS:** use **``https://``** for HTTP on the proxy port and **``wss://``** +for OOB WebSocket and CloudXR signaling through this stack—there is no supported operator path to use +plain **``http``** / **``ws``** here. + +**What you get when it is enabled** + +* A **WebXR page** on the headset talks to a small **control service** (the “hub”) over WSS. +* From a **PC** (optional for operators), you can call **HTTPS** on the proxy port to **read state** and + **update streaming settings** (``/api/oob/v1/state`` and ``/api/oob/v1/config``), or use WebSocket + **``setConfig``** from a **``dashboard``** client. **Connect** / **disconnect** are driven from the + **WebXR page** while you control it through DevTools (remote inspect), not from the hub—see + :ref:`teleop-user-activation`. +* Optional **``--setup-oob``** automates **adb**: open the right URL on the device and wire USB or + wireless debugging so you rarely type long URLs on the headset. + +The proxy’s HTTP API uses **GET** with query parameters because the WebSocket stack on this port only +accepts GET for ordinary HTTP. **``curl``** and scripts work fine for **state** and **config**. + +.. _teleop-roles-three-machines: + +Who is involved +--------------- + +Most setups look like **three roles** (they can be the same physical machine): + +.. list-table:: + :header-rows: 1 + :widths: 22 38 40 + + * - Role + - Typical software + - What it does + * - **XR headset** + - Isaac Teleop **WebXR client** in the device browser + - Runs the immersive session and streaming client. Registers with the hub, applies the + streaming **host/port** from **``hello``** / **``config``**, and starts/stops the XR session + from the **in-page** controls only. + * - **Operator machine** + - Your PC: Chromium **remote inspect**; optionally **``curl``** or scripts against **HTTPS** + - Drives **CONNECT** through the inspected Teleop tab; can change streaming target via HTTP + **config** or WebSocket **``setConfig``** (integrator-style). There is **no** built-in browser + dashboard and **no** hub API to start/stop the XR session remotely. + * - **Streaming host** + - Machine where **``python -m isaacteleop.cloudxr``** runs (CloudXR runtime + TLS proxy) + - Listens on the **proxy TLS port** (default **48322**). Serves CloudXR signaling and, if you + enable it, the teleop hub. The **``serverIP``** / **``port``** you configure name **this** + streaming endpoint—the headset must be able to reach that address on the network. + +The headset must reach **both** the **WebXR page** (often ``https://…:8080/`` during development) +**and** the **OOB WebSocket** on the streaming host. The bookmark adds **``oobEnable=1``** plus the same +**``serverIP``** and **``port``** as CloudXR signaling; the client opens **only** that +``wss://{serverIP}:{port}/oob/v1/ws`` (see :ref:`teleop-headset-control-url`). There is no implicit +default to the page origin. + +**Same PC as the headset cable?** Very often the streaming host and the PC you type on are one +computer. The split above still helps: the **WebXR tab** and **your terminal** are different +programs. + +End-to-end workflow (the usual path) +------------------------------------ + +With **``--setup-oob``**, the terminal prints an **OOB TELEOP** banner that mirrors the three steps +below (same order and links). + +**Prerequisites:** **``adb``** on PATH; USB or wireless debugging; **one** device in **``adb devices``**; +WebXR dev server reachable at **``https://127.0.0.1:8080/``** (USB) or **``https://:8080/``** +(wireless), with a certificate the headset trusts. + +**1 — Start with hub + adb automation** + +.. code-block:: bash + + python -m isaacteleop.cloudxr --setup-oob tethered + +Wireless: use **``--setup-oob :5555``** instead. Add **``--wss-only``** if you skip the CloudXR +runtime (see :ref:`teleop-wss-only-testing`). + +Wait for **``WSS proxy listening on port …``**; the bookmark is printed in the **OOB TELEOP** banner. +**``--setup-oob``** runs **``am start``** so the page opens on the headset (or open the URL yourself if +you omitted the flag). + +**2 — Inspect from the PC** + +On the **PC**, open your Chromium browser to **``chrome://inspect/#devices``** (Chrome) or +**``edge://inspect``** (Edge). Under **Remote Target**, select the headset browser tab that shows +**Isaac Teleop Web Client** (or the matching URL) and click **inspect**. + +**3 — CONNECT (required gesture)** + +In the DevTools window for that page, click **CONNECT** on the Teleop UI (same as tapping CONNECT on +the headset). WebXR needs this user gesture; see :ref:`teleop-user-activation`. + +**4 — Disconnect (from the PC)** + +To end the session without reaching for the headset, keep the **same** DevTools window focused on the +Teleop page and **reload** it—for example click the **URL** in the inspector’s address bar and press +**Enter**, or use the refresh action. That tears down the page session the same way as navigating away +on the device. + +**After that (optional):** change streaming target with **``GET /api/oob/v1/config``** or WebSocket +**``setConfig``** without rebuilding the bookmark URL (see :ref:`teleop-stream-defaults`). + +.. _teleop-setup-oob-cli: + +Choosing ``--setup-oob`` and when the hub is on +------------------------------------------------ + +**Omit** **``--setup-oob``** if you will open the teleop URL yourself and do not need adb help. In +that case the **OOB HTTP** paths return **404**, and **``/oob/v1/ws``** is forwarded to the CloudXR +runtime instead of the hub—so teleop control expects the **full** stack, not **``--wss-only``** +alone. + +**``--setup-oob tethered``** — USB. After the proxy is listening, the tool: + +1. Starts a **PC-side UDP-over-TCP relay** on TCP port **47999** (override with **``TELEOP_UDP_RELAY_PORT``**). +2. Sets up **``adb reverse``** for the proxy port (**48322**), the WebXR dev-server port (**8080**), and the + relay TCP port (**47999**). +3. Pushes and starts the **headset-side relay binary** (``udprelay``) via ``adb push`` + ``adb shell``. + The headset relay listens on UDP **47998** and tunnels datagrams over TCP to the PC relay. +4. Opens the bookmark with **``serverIP=127.0.0.1``**, **``mediaAddress=127.0.0.1``**, + **``mediaPort=47998``** — all traffic uses loopback through the ``adb reverse`` + relay tunnel. + +The relay binary must be cross-compiled for ``linux/arm64`` (Android). Build it from +``src/core/cloudxr/udprelay/``: + +.. code-block:: bash + + cd src/core/cloudxr/udprelay + GOOS=linux GOARCH=arm64 go build -o udprelay . + +Place the binary where the tool can find it (set **``TELEOP_RELAY_BINARY``** or copy to +``cloudxr/native/udprelay`` in the package). Reverse mappings and relay processes are cleaned up on +shutdown. + +**``--setup-oob HOST:PORT``** — Wireless. The tool resolves your PC’s LAN address (**``TELEOP_PROXY_HOST``** +or an automatic guess), runs **``adb connect``**, checks for a single device, then listens. After +that, it opens a bookmark using **``https://:8080/``** and **``wss://:/…``**. If +the LAN address cannot be determined, set **``TELEOP_PROXY_HOST``** before starting. + +**Requirements:** **``adb``** on PATH; exactly one ready device; the pre-built ``udprelay`` binary for +tethered mode (or set **``TELEOP_RELAY_BINARY``**). Failed **``adb reverse``**, relay setup, or +**``am start``** exits the process with code **1**. USB reverse mappings and relay processes are cleaned +up on shutdown. + +OOB HTTP and WebSocket share the **same TLS port** as the proxy (**``PROXY_PORT``**, default **48322**). + +.. _teleop-stream-defaults: + +Stream settings: defaults and how to change them +------------------------------------------------ + +The hub keeps one merged **``config``** object (often called **``StreamConfig``** in TypeScript). It +includes **streaming target** fields and optional **WebXR client UI** fields. The stock web client +applies only **allowlisted** UI keys so values are validated and mapped to real form controls; the +hub may store other top-level keys from **``setConfig``**, but the browser **ignores** unknown keys. +How to extend that allowlist is covered in :ref:`teleop-oob-integrators`. + +**Streaming target fields** + +.. list-table:: + :header-rows: 1 + :widths: 22 78 + + * - Field + - Meaning + * - **``serverIP``** + - Hostname or IP for CloudXR **signaling** over **TLS** (**``https://``** / **``wss://``** through + the proxy) the headset uses. + * - **``port``** + - TCP port for TLS signaling (normally the proxy port, default **48322**). + * - **``proxyUrl``** + - Optional full proxy URL override; empty string clears it. + * - **``mediaAddress``** + - Media (UDP) address. + * - **``mediaPort``** + - Media (UDP) port (common default **47998** with USB **``adb reverse``**). + +**Client UI fields (allowlist)** — same names as Teleop form element **ids** and bookmark query keys: + +.. list-table:: + :header-rows: 1 + :widths: 22 78 + + * - Field + - Meaning + * - **``panelHiddenAtStart``** + - **``true``** / **``false``** — hide the in-XR control panel when the session starts (query: + **``true``** / **``false``** or **``1``** / **``0``**). + * - **``codec``** + - Preferred video codec: **``h264``**, **``h265``**, or **``av1``** (must match a **````** +option sets, security if the page ever mixed admin and user content). The supported pattern is: add the +**HTML id** and hub key name (usually the same string), extend **``_stream_config_from_query``** / +bookmark encoding in **``wss.py``**, extend **``streamConfigFromUrlSearchParams``** in **``App.tsx``**, +and add coercion in **``CloudXR2DUI``** (see **``HUB_CLIENT_UI_FIELDS``** in **``CloudXR2DUI.tsx``**). + +**Source files** + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - Area + - Location + * - Hub logic + - :code-file:`oob_teleop_hub.py ` + * - TLS proxy, HTTP, adb, bookmark URL + - :code-file:`wss.py ` + * - PC-side UDP-over-TCP relay + - :code-file:`udp_tcp_relay.py ` + * - Headset-side UDP relay (Go) + - ``src/core/cloudxr/udprelay/main.go`` + * - CLI (**``--wss-only``**, **``--setup-oob``**, **``OobAdbError``** → exit 1) + - :code-file:`__main__.py ` + * - Headset control channel (TypeScript) + - ``deps/cloudxr/webxr_client/helpers/controlChannel.ts`` + * - Headset UI wiring (React) + - ``deps/cloudxr/webxr_client/src/App.tsx`` diff --git a/src/core/cloudxr/python/CMakeLists.txt b/src/core/cloudxr/python/CMakeLists.txt index 5e88cf747..9c695f745 100644 --- a/src/core/cloudxr/python/CMakeLists.txt +++ b/src/core/cloudxr/python/CMakeLists.txt @@ -111,6 +111,10 @@ add_custom_target(cloudxr_python ALL "${CMAKE_CURRENT_SOURCE_DIR}/launcher.py" "${CMAKE_CURRENT_SOURCE_DIR}/runtime.py" "${CMAKE_CURRENT_SOURCE_DIR}/wss.py" + "${CMAKE_CURRENT_SOURCE_DIR}/udp_tcp_relay.py" + "${CMAKE_CURRENT_SOURCE_DIR}/oob_teleop_env.py" + "${CMAKE_CURRENT_SOURCE_DIR}/oob_teleop_adb.py" + "${CMAKE_CURRENT_SOURCE_DIR}/oob_teleop_hub.py" "${CLOUDXR_PYTHON_DIR}/" COMMAND ${CMAKE_COMMAND} -E rm -rf "${CLOUDXR_PYTHON_DIR}/__pycache__" COMMENT "Copying cloudxr Python files to package structure" @@ -124,3 +128,15 @@ if(CLOUDXR_NATIVE_AVAILABLE) else() add_dependencies(cloudxr_python cloudxr_native_dir) endif() + +# Copy pre-built headset UDP relay binary into native/ at build time (if it exists). +# Build it with: cd src/core/cloudxr/udprelay && GOOS=linux GOARCH=arm64 go build -o udprelay . +set(UDPRELAY_SRC "${CMAKE_CURRENT_SOURCE_DIR}/../udprelay/udprelay") +add_custom_target(cloudxr_udprelay ALL + COMMAND ${CMAKE_COMMAND} + -DSRC=${UDPRELAY_SRC} + -DDST=${CLOUDXR_NATIVE_DIR}/udprelay + -P ${CMAKE_CURRENT_SOURCE_DIR}/copy_if_exists.cmake + COMMENT "Copying udprelay binary to native/ (skipped if not yet built)" +) +add_dependencies(cloudxr_udprelay cloudxr_native_dir) diff --git a/src/core/cloudxr/python/__main__.py b/src/core/cloudxr/python/__main__.py index b8637b90a..c9d99fbc4 100644 --- a/src/core/cloudxr/python/__main__.py +++ b/src/core/cloudxr/python/__main__.py @@ -4,14 +4,30 @@ """Entry point for python -m isaacteleop.cloudxr. Runs CloudXR runtime and WSS proxy; main process winds both down on exit.""" import argparse +import asyncio +import multiprocessing import os import signal -import time +import sys +from datetime import datetime, timezone from isaacteleop import __version__ as isaacteleop_version -from isaacteleop.cloudxr.env_config import get_env_config -from isaacteleop.cloudxr.launcher import CloudXRLauncher -from isaacteleop.cloudxr.runtime import latest_runtime_log, runtime_version +from isaacteleop.cloudxr.env_config import EnvConfig +from isaacteleop.cloudxr.runtime import ( + check_eula, + latest_runtime_log, + run as runtime_run, + runtime_version, + terminate_or_kill_runtime, + wait_for_runtime_ready, +) +from isaacteleop.cloudxr.wss import ( + OobAdbError, + parse_setup_oob_cli, + print_oob_hub_startup_banner, + resolve_lan_host_for_oob, + run as wss_run, +) def _parse_args() -> argparse.Namespace: @@ -36,53 +52,147 @@ def _parse_args() -> argparse.Namespace: action="store_true", help="Accept the NVIDIA CloudXR EULA non-interactively (e.g. for CI or containers).", ) + parser.add_argument( + "--wss-only", + action="store_true", + help=( + "Do not start the CloudXR runtime; run only the WSS TLS proxy on PROXY_PORT (default 48322). " + "Logs go to stderr." + ), + ) + parser.add_argument( + "--setup-oob", + type=parse_setup_oob_cli, + default=None, + metavar="tethered|HOST:PORT", + help=( + "Enable OOB teleop control over adb (omit to disable): from this PC, open the teleop page " + "on the headset while it sits around your neck. " + "Use `tethered` for USB adb reverse + UDP relay or `HOST:PORT` for wireless adb. " + 'See docs: "Out-of-band teleop control".' + ), + ) return parser.parse_args() -def main() -> None: +async def _main_async() -> None: """Launch the CloudXR runtime and WSS proxy, then block until interrupted.""" args = _parse_args() + env_cfg = EnvConfig.from_args(args.cloudxr_install_dir, args.cloudxr_env_config) + check_eula(accept_eula=args.accept_eula or None) - with CloudXRLauncher( - install_dir=args.cloudxr_install_dir, - env_config=args.cloudxr_env_config, - accept_eula=args.accept_eula, - ) as launcher: - cxr_ver = runtime_version() + stop = asyncio.get_running_loop().create_future() + + def on_signal() -> None: + if not stop.done(): + stop.set_result(None) + + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + try: + loop.add_signal_handler(sig, on_signal) + except NotImplementedError: + pass + + if args.wss_only: print( - f"Running Isaac Teleop \033[36m{isaacteleop_version}\033[0m, CloudXR Runtime \033[36m{cxr_ver}\033[0m" + f"Isaac Teleop \033[36m{isaacteleop_version}\033[0m — WSS proxy only " + f"(CloudXR runtime \033[33mnot started\033[0m)" + ) + print("CloudXR WSS proxy: \033[36mrunning\033[0m (logs on stderr)") + if args.setup_oob is not None: + print( + " oob: on (teleop hub runs in this WSS proxy — see OOB TELEOP block)" + ) + if args.setup_oob == "tethered": + print_oob_hub_startup_banner(oob_mode="tethered") + else: + print_oob_hub_startup_banner( + oob_mode="wireless", lan_host=resolve_lan_host_for_oob() + ) + else: + print( + " oob: off (--setup-oob tethered|HOST:PORT for OOB + adb)" + ) + print( + "\033[33mNon-teleop WebSocket paths proxy to the runtime backend, which is not running.\033[0m" + ) + print("\033[33mCtrl+C to stop.\033[0m") + await wss_run( + log_file_path=None, + stop_future=stop, + setup_oob=args.setup_oob, ) + print("Stopped.") + return + + logs_dir_path = env_cfg.ensure_logs_dir() + wss_ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H%M%SZ") + wss_log_path = logs_dir_path / f"wss.{wss_ts}.log" + + runtime_proc = multiprocessing.Process(target=runtime_run) + runtime_proc.start() + + cxr_ver = runtime_version() + print( + f"Running Isaac Teleop \033[36m{isaacteleop_version}\033[0m, CloudXR Runtime \033[36m{cxr_ver}\033[0m" + ) + + try: + ready = await wait_for_runtime_ready(runtime_proc.is_alive) + if not ready: + if not runtime_proc.is_alive() and runtime_proc.exitcode != 0: + sys.exit( + runtime_proc.exitcode if runtime_proc.exitcode is not None else 1 + ) + print("CloudXR runtime failed to start, terminating...") + sys.exit(1) + + print("CloudXR runtime started, make sure load environment variables:") + print("") + print("```bash") + print(f"source {env_cfg.env_filepath()}") + print("```") + print("") - env_cfg = get_env_config() - logs_dir_path = env_cfg.ensure_logs_dir() cxr_log = latest_runtime_log() or logs_dir_path print( f"CloudXR runtime: \033[36mrunning\033[0m, log file: \033[90m{cxr_log}\033[0m" ) - wss_log = launcher.wss_log_path - print( - f"CloudXR WSS proxy: \033[36mrunning\033[0m, log file: \033[90m{wss_log}\033[0m" - ) + print("CloudXR WSS proxy: \033[36mrunning\033[0m") + print(f" logFile: \033[90m{wss_log_path}\033[0m") + if args.setup_oob is not None: + print( + " oob: on (teleop hub runs in this WSS proxy — see OOB TELEOP block)" + ) + if args.setup_oob == "tethered": + print_oob_hub_startup_banner(oob_mode="tethered") + else: + print_oob_hub_startup_banner( + oob_mode="wireless", lan_host=resolve_lan_host_for_oob() + ) + print( f"Activate CloudXR environment in another terminal: \033[1;32msource {env_cfg.env_filepath()}\033[0m" ) print("\033[33mKeep this terminal open, Ctrl+C to terminate.\033[0m") - stop = False - - def on_signal(sig, frame): - nonlocal stop - stop = True - - signal.signal(signal.SIGINT, on_signal) - signal.signal(signal.SIGTERM, on_signal) - - while not stop: - launcher.health_check() - time.sleep(0.1) + await wss_run( + log_file_path=wss_log_path, + stop_future=stop, + setup_oob=args.setup_oob, + ) + finally: + terminate_or_kill_runtime(runtime_proc) print("Stopped.") if __name__ == "__main__": - main() + try: + asyncio.run(_main_async()) + except OobAdbError as e: + print("", file=sys.stderr) + print(str(e), file=sys.stderr) + print("", file=sys.stderr) + raise SystemExit(1) from None diff --git a/src/core/cloudxr/python/copy_if_exists.cmake b/src/core/cloudxr/python/copy_if_exists.cmake new file mode 100644 index 000000000..cfad073c2 --- /dev/null +++ b/src/core/cloudxr/python/copy_if_exists.cmake @@ -0,0 +1,12 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# Build-time helper: copy SRC to DST only when SRC exists. +# Usage: cmake -DSRC= -DDST= -P copy_if_exists.cmake +if(EXISTS "${SRC}") + file(COPY_FILE "${SRC}" "${DST}") + message(STATUS "Copied ${SRC} -> ${DST}") +else() + message(STATUS "udprelay not found at ${SRC} (skipping). " + "Build with: cd src/core/cloudxr/udprelay && GOOS=linux GOARCH=arm64 go build -o udprelay .") +endif() diff --git a/src/core/cloudxr/python/oob_teleop_adb.py b/src/core/cloudxr/python/oob_teleop_adb.py new file mode 100644 index 000000000..2f017f603 --- /dev/null +++ b/src/core/cloudxr/python/oob_teleop_adb.py @@ -0,0 +1,367 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""ADB automation for OOB teleop (``--setup-oob``): adb reverse, UDP relay lifecycle, connect, and open the headset bookmark URL.""" + +from __future__ import annotations + +import argparse +import logging +import os +import shlex +import shutil +import subprocess +from dataclasses import dataclass +from pathlib import Path + +from .oob_teleop_env import ( + DEFAULT_STREAM_SIGNALING_PORT, + DEFAULT_WEB_CLIENT_ORIGIN, + TETHERED_LOCAL_WEB_CLIENT, + build_headset_bookmark_url, + client_ui_fields_from_env, + resolve_lan_host_for_oob, + web_client_base_override_from_env, +) + +log = logging.getLogger("oob-teleop-adb") + + +class OobAdbError(Exception): + """``--setup-oob`` adb step failed; ``str(exception)`` is formatted for users (print without traceback).""" + + +@dataclass(frozen=True) +class AdbAutomation: + """After WSS listens: optional ``adb connect``, ``adb reverse``, relay setup, then ``am start`` bookmark. + + * **Tethered (``tethered=True``, ``connect`` unset):** sets up ``adb reverse`` for TCP ports + and a UDP-over-TCP relay so the headset reaches all services via ``127.0.0.1``. + * **Wireless (``tethered=False``, ``connect`` = HOST:PORT):** ``adb connect`` + LAN URLs; set + ``TELEOP_PROXY_HOST`` or rely on ``guess_lan_ipv4()``. + """ + + tethered: bool = False + connect: str | None = None + + +def adb_automation_from_setup_oob(setup_oob: str) -> AdbAutomation: + """``tethered`` → adb reverse + UDP relay; ``HOST:PORT`` → wireless ``adb connect`` + LAN URLs.""" + s = setup_oob.strip() + if s.lower() == "tethered": + return AdbAutomation(tethered=True, connect=None) + if ":" not in s: + raise ValueError( + "setup_oob must be 'tethered' or HOST:PORT for wireless (e.g. 192.168.1.5:5555)" + ) + return AdbAutomation(tethered=False, connect=s) + + +def parse_setup_oob_cli(value: str) -> str: + """``argparse`` ``type=`` for ``--setup-oob`` (``tethered`` or ``HOST:PORT``).""" + v = value.strip() + if not v: + raise argparse.ArgumentTypeError("--setup-oob requires 'tethered' or HOST:PORT") + try: + adb_automation_from_setup_oob(v) + except ValueError as e: + raise argparse.ArgumentTypeError(str(e)) from e + return "tethered" if v.lower() == "tethered" else v + + +def ensure_adb_connect(connect_spec: str) -> None: + """Run plain ``adb connect`` (not ``adb -s …``).""" + c = connect_spec.strip() + if not c: + return + proc = subprocess.run( + ["adb", "connect", c], + capture_output=True, + text=True, + timeout=60, + check=False, + ) + out = _adb_output_text(proc) + if proc.returncode != 0: + log.warning("ADB connect %s failed (exit %s): %s", c, proc.returncode, out) + else: + log.info("ADB connect %s: %s", c, out or "ok") + + +def _web_client_base_for_adb_bookmark(tethered: bool = False) -> str: + ovr = web_client_base_override_from_env() + if ovr: + return ovr + if tethered: + return TETHERED_LOCAL_WEB_CLIENT + return DEFAULT_WEB_CLIENT_ORIGIN + + +def _adb_output_text(proc: subprocess.CompletedProcess[str]) -> str: + return (proc.stderr or proc.stdout or "").strip() + + +def adb_automation_failure_hint(diagnostic: str) -> str: + """Human-readable next steps for common ``adb`` failures.""" + d = diagnostic.lower() + if "unauthorized" in d: + return ( + "Device is unauthorized: unlock the headset, confirm the USB debugging (RSA) prompt, " + "and run `adb devices` until the device shows `device` not `unauthorized`. " + "If this persists, try `adb kill-server` and reconnect the cable." + ) + if ( + "no devices/emulators" in d + or "no devices found" in d + or "device not found" in d + ): + return ( + "No adb device: USB — plug in and enable debugging; wireless — run `adb connect HOST:PORT` " + "and check `adb devices`." + ) + if "more than one device" in d: + return "Multiple adb devices: unplug or disconnect extras so only one headset shows in `adb devices`." + if "offline" in d: + return "Device offline: reconnect USB and confirm USB debugging on the headset." + return "" + + +def oob_adb_automation_message(rc: int, detail: str, hint: str) -> str: + d = detail.strip() if detail else "(no output from adb)" + lines = [ + f"OOB adb automation failed (adb exit code {rc}).", + "", + d, + ] + if hint.strip(): + lines.extend(["", hint]) + lines.extend( + [ + "", + "To run the WSS proxy and OOB hub without adb, omit --setup-oob and open the teleop URL on the headset yourself.", + ] + ) + return "\n".join(lines) + + +def require_adb_on_path() -> None: + """Raise :exc:`OobAdbError` if ``adb`` is missing.""" + if shutil.which("adb"): + return + raise OobAdbError( + "Cannot use --setup-oob: `adb` was not found on PATH.\n\n" + "Install Android Platform Tools and ensure `adb` is available, or omit --setup-oob and open " + "the teleop bookmark URL on the headset yourself." + ) + + +def assert_at_most_one_adb_device() -> None: + """Fail if more than one device is in ``device`` state (OOB adb automation targets a single device).""" + try: + proc = subprocess.run( + ["adb", "devices"], + capture_output=True, + text=True, + timeout=30, + check=False, + ) + except FileNotFoundError as e: + raise OobAdbError( + "Cannot use --setup-oob: `adb` was not found on PATH.\n\n" + "Install Android Platform Tools and ensure `adb` is available, or omit --setup-oob." + ) from e + if proc.returncode != 0: + return + text = (proc.stdout or "") + "\n" + (proc.stderr or "") + ready: list[str] = [] + for line in text.strip().splitlines()[1:]: + line = line.strip() + if not line: + continue + parts = line.split() + if len(parts) >= 2 and parts[-1] == "device": + ready.append(parts[0]) + if len(ready) > 1: + listed = ", ".join(ready) + raise OobAdbError( + "Too many adb devices for --setup-oob.\n\n" + f"Currently connected: {listed}\n\n" + "Use exactly one headset (unplug or disconnect the others), then retry. " + "Or omit --setup-oob and open the teleop URL manually." + ) + + +HEADSET_RELAY_REMOTE_PATH = "/data/local/tmp/udprelay" +UDP_RELAY_TCP_PORT = 47999 + + +def _adb_cmd_prefix() -> list[str]: + return ["adb"] + + +def adb_reverse(remote_port: int, local_port: int) -> None: + """Run ``adb reverse tcp:REMOTE tcp:LOCAL``.""" + cmd = ["adb", "reverse", f"tcp:{remote_port}", f"tcp:{local_port}"] + log.info("adb reverse: %s", " ".join(cmd)) + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30, check=False) + if proc.returncode != 0: + raise OobAdbError( + f"adb reverse tcp:{remote_port} tcp:{local_port} failed " + f"(exit {proc.returncode}): {_adb_output_text(proc)}" + ) + + +def adb_reverse_remove_all() -> None: + """Run ``adb reverse --remove-all`` (best-effort cleanup).""" + proc = subprocess.run( + ["adb", "reverse", "--remove-all"], + capture_output=True, + text=True, + timeout=30, + check=False, + ) + if proc.returncode != 0: + log.warning( + "adb reverse --remove-all failed (exit %s): %s", + proc.returncode, + _adb_output_text(proc), + ) + else: + log.info("adb reverse --remove-all: ok") + + +def adb_push_file(local_path: str | Path, remote_path: str) -> None: + """Push a local file to the device via ``adb push``.""" + cmd = ["adb", "push", str(local_path), remote_path] + log.info("adb push: %s -> %s", local_path, remote_path) + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60, check=False) + if proc.returncode != 0: + raise OobAdbError( + f"adb push {local_path} {remote_path} failed " + f"(exit {proc.returncode}): {_adb_output_text(proc)}" + ) + + +def adb_start_relay(remote_binary: str, udp_port: int, tcp_port: int) -> None: + """Start the headset-side UDP relay binary in the background via ``adb shell``. + + The relay listens on UDP ``udp_port`` and tunnels to TCP ``tcp_port`` (adb-reversed to the PC). + """ + subprocess.run( + ["adb", "shell", "chmod", "755", remote_binary], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + cmd = [ + "adb", + "shell", + f"nohup {remote_binary} -udp-port {udp_port} -tcp-port {tcp_port} " + f"/dev/null 2>&1 &", + ] + log.info("Starting headset relay: %s", " ".join(cmd)) + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=10, check=False) + if proc.returncode != 0: + raise OobAdbError( + f"Failed to start headset relay (exit {proc.returncode}): {_adb_output_text(proc)}" + ) + log.info("Headset relay started (UDP :%d <-> TCP :%d)", udp_port, tcp_port) + + +def adb_stop_relay(remote_binary: str) -> None: + """Kill the headset-side relay process (best-effort).""" + binary_name = Path(remote_binary).name + proc = subprocess.run( + ["adb", "shell", "pkill", "-f", binary_name], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + if proc.returncode == 0: + log.info("Headset relay stopped") + else: + log.debug("pkill relay (may already be gone): exit %s", proc.returncode) + + +def setup_adb_reverse_ports( + proxy_port: int, relay_tcp_port: int, web_port: int | None = None +) -> list[int]: + """Set up ``adb reverse`` for the proxy, relay tunnel, and optionally the web dev server. + + Returns the list of remote ports that were reversed (for cleanup tracking). + """ + reversed_ports: list[int] = [] + adb_reverse(proxy_port, proxy_port) + reversed_ports.append(proxy_port) + adb_reverse(relay_tcp_port, relay_tcp_port) + reversed_ports.append(relay_tcp_port) + if web_port is not None: + adb_reverse(web_port, web_port) + reversed_ports.append(web_port) + return reversed_ports + + +def run_adb_headset_bookmark( + *, + resolved_port: int, + adb: AdbAutomation, +) -> tuple[int, str]: + """Open the teleop bookmark URL on the headset via ``am start``. + + For **tethered** mode, uses ``127.0.0.1`` for all addresses (everything tunneled via ``adb reverse``). + For **wireless** mode, uses the PC's LAN address. + + Returns ``(exit_code, diagnostic)``. + """ + cmd_adb = _adb_cmd_prefix() + + if adb.tethered: + # Signaling reaches the PC via adb reverse (127.0.0.1:port). + # Do NOT set mediaAddress/mediaPort — WebRTC's ICE rejects loopback + # as a remote candidate ("no local candidate"). Leave them unset so + # the SDK negotiates media via normal ICE (typically over WiFi). + stream_cfg: dict = { + "serverIP": "127.0.0.1", + "port": resolved_port, + **client_ui_fields_from_env(), + } + else: + signaling_port = int( + os.environ.get( + "TELEOP_STREAM_PORT", str(DEFAULT_STREAM_SIGNALING_PORT) + ).strip() + or DEFAULT_STREAM_SIGNALING_PORT + ) + proxy_host = resolve_lan_host_for_oob() + stream_cfg = { + "serverIP": proxy_host, + "port": signaling_port, + **client_ui_fields_from_env(), + } + + web_base = _web_client_base_for_adb_bookmark(tethered=adb.tethered) + token = os.environ.get("CONTROL_TOKEN") or None + url = build_headset_bookmark_url( + web_client_base=web_base, + stream_config=stream_cfg, + control_token=token, + ) + + full = cmd_adb + [ + "shell", + "am", + "start", + "-a", + "android.intent.action.VIEW", + "-d", + shlex.quote(url), + ] + log.info("ADB automation: %s", " ".join(shlex.quote(c) for c in full)) + proc = subprocess.run(full, capture_output=True, text=True) + if proc.returncode != 0: + diag = _adb_output_text(proc) + return proc.returncode, diag + log.info("ADB automation: am start completed") + return 0, "" diff --git a/src/core/cloudxr/python/oob_teleop_env.py b/src/core/cloudxr/python/oob_teleop_env.py new file mode 100644 index 000000000..92326c1c8 --- /dev/null +++ b/src/core/cloudxr/python/oob_teleop_env.py @@ -0,0 +1,273 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""OOB teleop environment: proxy port, LAN detection, stream defaults, headset bookmark URLs, startup banner.""" + +from __future__ import annotations + +import logging +import os +import socket +from urllib.parse import urlencode + +from .oob_teleop_hub import OOB_WS_PATH + +log = logging.getLogger("oob-teleop-env") + +WSS_PROXY_DEFAULT_PORT = 48322 + +# Hosted WebXR client used by default for OOB bookmark URLs (wireless mode). +DEFAULT_WEB_CLIENT_ORIGIN = "https://nvidia.github.io/IsaacTeleop/client/" + +# Local web client served via webpack dev-server, reached through adb reverse in tethered mode. +TETHERED_LOCAL_WEB_CLIENT = "https://127.0.0.1:8080/" + +# Optional override for the WebXR page origin in OOB bookmark URLs. +TELEOP_WEB_CLIENT_BASE_ENV = "TELEOP_WEB_CLIENT_BASE" + +# CloudXR runtime signaling (WebSocket) TCP port. +DEFAULT_STREAM_SIGNALING_PORT = 49100 + +# CloudXR media (RTP) default UDP port on host. +DEFAULT_STREAM_MEDIA_UDP_PORT = 47998 + +CHROME_INSPECT_DEVICES_URL = "chrome://inspect/#devices" + + +def web_client_base_override_from_env() -> str | None: + v = os.environ.get(TELEOP_WEB_CLIENT_BASE_ENV, "").strip() + return v or None + + +def wss_proxy_port() -> int: + """TCP port for the WSS proxy (``PROXY_PORT`` environment variable if set, else ``48322``).""" + raw = os.environ.get("PROXY_PORT", "").strip() + if raw: + return int(raw) + return WSS_PROXY_DEFAULT_PORT + + +def guess_lan_ipv4() -> str | None: + """Best-effort LAN IPv4 for operator URLs when headsets reach the PC by IP.""" + # Any non-loopback destination works: UDP "connect" only consults the routing table to pick a + # local address; no packet need reach the peer. Use RFC 5737 TEST-NET so we do not + # imply reliance on a public resolver or send traffic off-host. + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.settimeout(0.25) + s.connect(("192.0.2.1", 1)) + addr, _ = s.getsockname() + except OSError: + return None + if not addr or addr == "127.0.0.1": + return None + return addr + + +def default_initial_stream_config(resolved_proxy_port: int) -> dict: + """Default hub stream config from env and LAN guess (same host as proxy port by default).""" + env_ip = os.environ.get("TELEOP_STREAM_SERVER_IP", "").strip() + env_port = os.environ.get("TELEOP_STREAM_PORT", "").strip() + server_ip = env_ip or guess_lan_ipv4() or "127.0.0.1" + port = int(env_port) if env_port else resolved_proxy_port + return {"serverIP": server_ip, "port": port} + + +def client_ui_fields_from_env() -> dict: + """Optional WebXR client UI defaults merged into hub ``config`` and bookmarks (allowlist). + + Keys match Teleop form element ids / ``StreamConfig`` in the web client. + """ + out: dict = {} + codec = os.environ.get("TELEOP_CLIENT_CODEC", "").strip() + if codec: + out["codec"] = codec + ph = os.environ.get("TELEOP_CLIENT_PANEL_HIDDEN_AT_START", "").strip().lower() + if ph in ("1", "true", "yes", "on"): + out["panelHiddenAtStart"] = True + elif ph in ("0", "false", "no", "off"): + out["panelHiddenAtStart"] = False + for key, env_name in ( + ("perEyeWidth", "TELEOP_CLIENT_PER_EYE_WIDTH"), + ("perEyeHeight", "TELEOP_CLIENT_PER_EYE_HEIGHT"), + ): + raw = os.environ.get(env_name, "").strip() + if not raw: + continue + try: + out[key] = int(raw, 10) + except ValueError: + log.warning("%s invalid integer %r", env_name, raw) + return out + + +def build_headset_bookmark_url( + *, + web_client_base: str, + stream_config: dict | None = None, + control_token: str | None = None, +) -> str: + """Full WebXR page URL with OOB query params (``oobEnable=1``, stream fields, optional token). + + The client derives ``wss://{serverIP}:{port}/oob/v1/ws`` from ``serverIP`` + ``port`` in the query + (same values as CloudXR signaling) when ``oobEnable=1``; it does not open OOB without that pair. + Optional client UI keys (``codec``, ``panelHiddenAtStart``, ``perEyeWidth``, ``perEyeHeight``) are + encoded when present on ``stream_config`` (see ``client_ui_fields_from_env``). + """ + cfg = stream_config or {} + if not cfg.get("serverIP") or cfg.get("port") is None: + raise ValueError( + "build_headset_bookmark_url requires stream_config with serverIP and port (OOB WS is derived from them)" + ) + params: dict[str, str] = {"oobEnable": "1"} + if control_token: + params["controlToken"] = control_token + for key in ("serverIP", "proxyUrl", "mediaAddress"): + v = cfg.get(key) + if v is not None and v != "": + params[key] = str(v) + for key in ("port", "mediaPort"): + v = cfg.get(key) + if v is not None: + params[key] = str(int(v)) + v = cfg.get("codec") + if v is not None and str(v).strip() != "": + params["codec"] = str(v).strip() + v = cfg.get("panelHiddenAtStart") + if isinstance(v, bool): + params["panelHiddenAtStart"] = "true" if v else "false" + elif v is not None and str(v).strip() != "": + s = str(v).strip().lower() + if s in ("1", "true", "yes", "on"): + params["panelHiddenAtStart"] = "true" + elif s in ("0", "false", "no", "off"): + params["panelHiddenAtStart"] = "false" + for key in ("perEyeWidth", "perEyeHeight"): + v = cfg.get(key) + if v is not None: + try: + params[key] = str(int(v)) + except (TypeError, ValueError): + pass + q = urlencode(params) + base = web_client_base.rstrip("/") + sep = "&" if "?" in base else "?" + return f"{base}{sep}{q}" + + +def resolve_lan_host_for_oob() -> str: + """PC LAN address the headset uses for ``https://…:8080/`` and ``wss://…:PROXY_PORT`` (wireless OOB).""" + h = os.environ.get("TELEOP_PROXY_HOST", "").strip() or guess_lan_ipv4() + if not h: + raise RuntimeError( + "Wireless --setup-oob HOST:PORT needs this PC's LAN IP for WebXR/WSS URLs. " + "Set TELEOP_PROXY_HOST to an address the headset can reach (or fix routing so guess_lan_ipv4() works)." + ) + return h + + +def print_oob_hub_startup_banner(*, oob_mode: str, lan_host: str | None = None) -> None: + """Printed instructions for OOB + adb (``oob_mode``: ``tethered`` | ``wireless``). + + Wording follows ``docs/source/references/oob_teleop_control.rst`` (section *End-to-end workflow*). + """ + port = wss_proxy_port() + token = os.environ.get("CONTROL_TOKEN") or None + is_tethered = oob_mode == "tethered" + + if is_tethered: + primary_host = "127.0.0.1" + else: + if not lan_host: + lan_host = resolve_lan_host_for_oob() + primary_host = lan_host + + web_base = TETHERED_LOCAL_WEB_CLIENT if is_tethered else DEFAULT_WEB_CLIENT_ORIGIN + stream_cfg = default_initial_stream_config(port) + stream_cfg = {**stream_cfg, "serverIP": primary_host} + + web_client_base_override = web_client_base_override_from_env() + if web_client_base_override: + web_base = web_client_base_override + + stream_cfg = {**stream_cfg, **client_ui_fields_from_env()} + + primary_base = f"https://{primary_host}:{port}" + bookmark_primary = build_headset_bookmark_url( + web_client_base=web_base, + stream_config=stream_cfg, + control_token=token, + ) + wss_primary = f"wss://{primary_host}:{port}{OOB_WS_PATH}" + + bar = "=" * 72 + print(bar) + print("OOB TELEOP — enabled (out-of-band control hub is running in this WSS proxy)") + print(bar) + print() + print( + f" The hub shares the CloudXR proxy TLS port {port} on this machine " + f"(control WebSocket: {wss_primary})." + ) + print( + " Same steps as docs: references/oob_teleop_control.rst — " + '"End-to-end workflow (the usual path)".' + ) + print() + if oob_mode == "tethered": + print( + " adb: USB tethered — one device in `adb devices`; tool sets up adb reverse + UDP relay, then opens the page." + ) + else: + print( + " adb: wireless — `adb connect` to your device; this PC must be reachable at the " + "LAN addresses below." + ) + print() + print(" Step 1 — Headset: Teleop page URL (doc: start with hub + adb automation)") + print( + ' After this process logs "WSS proxy listening on port …", `--setup-oob` runs ' + "`adb` to open the page on the headset. If that fails, open this URL on the headset yourself:" + ) + print(f" {bookmark_primary}") + if web_client_base_override: + print( + f" ({TELEOP_WEB_CLIENT_BASE_ENV} overrides the WebXR origin; " + "query still targets this streaming host.)" + ) + print( + " The page loads with oobEnable=1 and the same serverIP/port as CloudXR; " + "the client connects to the hub at the wss URL above." + ) + print() + print(" Step 2 — This PC: Inspect from the PC (Chrome remote debugging)") + print(f" Open {CHROME_INSPECT_DEVICES_URL}") + print( + " Under Remote Target, select the headset tab for Isaac Teleop Web Client " + "(or the matching URL) and click inspect." + ) + print() + print(" Step 3 — This PC: CONNECT (required WebXR user gesture)") + print( + " In that DevTools window, click CONNECT on the Teleop UI (same as tapping " + "CONNECT on the headset)." + ) + print() + print("-" * 72) + print("OOB HTTP (optional — operators / curl / scripts on this PC)") + print("-" * 72) + cfg_q = urlencode( + { + "serverIP": str(stream_cfg["serverIP"]), + "port": str(int(stream_cfg["port"])), + } + ) + print(f" State: {primary_base}/api/oob/v1/state") + print(f" Config: {primary_base}/api/oob/v1/config?{cfg_q}") + if token: + print() + print( + " CONTROL_TOKEN is set: add ?token=... or header X-Control-Token on OOB HTTP requests." + ) + print(bar) + print() diff --git a/src/core/cloudxr/python/oob_teleop_hub.py b/src/core/cloudxr/python/oob_teleop_hub.py new file mode 100644 index 000000000..22eac4368 --- /dev/null +++ b/src/core/cloudxr/python/oob_teleop_hub.py @@ -0,0 +1,372 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Out-of-band (OOB) teleop hub — WebSocket application for headset ↔ operator coordination. + +Peers are the **headset** (WebXR client, role ``headset``) and optional **dashboard** +WebSocket clients (role ``dashboard``). Operators use the **OOB HTTP API** on the same TLS port. + +WebSocket: ``wss://:/oob/v1/ws`` +OOB HTTP: ``GET /api/oob/v1/state``, ``GET /api/oob/v1/config`` + +See ``docs/source/references/oob_teleop_control.rst`` (*Out-of-band teleop control*). +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time +import uuid +from dataclasses import dataclass, field +from typing import Any, Literal + +log = logging.getLogger("oob-teleop-hub") + +OOB_WS_PATH = "/oob/v1/ws" + +# --------------------------------------------------------------------------- +# Internal state containers +# --------------------------------------------------------------------------- + + +@dataclass +class _HeadsetState: + client_id: str + ws: Any + registered_at: float + device_label: str | None = None + metrics_by_cadence: dict = field(default_factory=dict) + + +@dataclass +class _DashboardState: + client_id: str + ws: Any + registered_at: float + + +# --------------------------------------------------------------------------- +# Hub +# --------------------------------------------------------------------------- + + +class OOBControlHub: + """Routes control messages between OOB operators, optional dashboard WebSockets, and headsets. + + One instance per proxy process; WebSocket connections on ``OOB_WS_PATH`` are dispatched + via :meth:`handle_connection`. + """ + + def __init__( + self, + control_token: str | None = None, + initial_config: dict | None = None, + ) -> None: + self._token = control_token + self._headsets: dict[Any, _HeadsetState] = {} + self._dashboards: dict[Any, _DashboardState] = {} + self._stream_config: dict = dict(initial_config or {}) + self._config_version: int = 0 + self._lock = asyncio.Lock() + + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + + async def handle_connection(self, ws: Any) -> None: + """Entry point for each new WebSocket client on ``OOB_WS_PATH``.""" + client_id = str(uuid.uuid4()) + role: str | None = None + + try: + async for raw in ws: + if not isinstance(raw, str): + await self._send_error(ws, "BAD_REQUEST", "Expected text frame") + continue + + try: + msg = json.loads(raw) + except json.JSONDecodeError: + await self._send_error(ws, "BAD_REQUEST", "Invalid JSON") + continue + + msg_type: str = msg.get("type", "") + payload: dict = msg.get("payload") or {} + + if role is None: + if msg_type != "register": + await self._send_error( + ws, "BAD_REQUEST", "First message must be register" + ) + return + role = await self._handle_register(ws, client_id, payload) + if role is None: + return # rejected; connection already closed + continue + + if role == "headset": + await self._dispatch_headset(ws, msg_type, payload) + else: + await self._dispatch_dashboard(ws, msg_type, payload) + + except Exception: + log.debug("Teleop WS closed", exc_info=True) + finally: + async with self._lock: + self._headsets.pop(ws, None) + self._dashboards.pop(ws, None) + log.info( + "Teleop client disconnected (clientId=%s role=%s)", client_id, role + ) + + async def get_snapshot(self) -> dict: + """Build the JSON snapshot for ``GET /api/oob/v1/state``.""" + async with self._lock: + headsets = [ + { + "clientId": s.client_id, + "connected": True, + "deviceLabel": s.device_label, + "registeredAt": int(s.registered_at * 1000), + "metricsByCadence": s.metrics_by_cadence, + } + for s in self._headsets.values() + ] + dashboards = [ + { + "clientId": s.client_id, + "connected": True, + "registeredAt": int(s.registered_at * 1000), + } + for s in self._dashboards.values() + ] + return { + "updatedAt": int(time.time() * 1000), + "configVersion": self._config_version, + "config": dict(self._stream_config), + "headsets": headsets, + "dashboards": dashboards, + } + + def check_token(self, token: str | None) -> bool: + """Return True if token satisfies the hub's auth requirement.""" + if not self._token: + return True + return token == self._token + + async def http_oob_set_config(self, payload: dict) -> tuple[int, dict]: + """Merge stream config like WebSocket ``setConfig``; for OOB HTTP (GET query).""" + if not self.check_token(payload.get("token")): + return 401, {"error": "Unauthorized"} + + new_config = payload.get("config") + if not isinstance(new_config, dict): + return 400, {"error": "config must be an object"} + + target_raw = payload.get("targetClientId") + target_id: str | None = ( + None if target_raw is None or target_raw == "" else str(target_raw) + ) + + outcome = await self._merge_stream_config(new_config, target_id) + if outcome[0] == "noop": + return 200, { + "ok": True, + "changed": False, + "configVersion": outcome[1], + } + if outcome[0] == "missing": + return 404, {"error": f"Headset '{outcome[1]}' not connected"} + + _tag, version, config_snapshot, targets = outcome + log.info( + "OOB setConfig configVersion=%d → %d headset(s)", version, len(targets) + ) + await self._push_config_to_headsets(version, config_snapshot, targets) + return 200, { + "ok": True, + "changed": True, + "configVersion": version, + "targetCount": len(targets), + } + + # ------------------------------------------------------------------ + # Private: registration + # ------------------------------------------------------------------ + + async def _handle_register( + self, ws: Any, client_id: str, payload: dict + ) -> str | None: + """Validate, register, and send hello. Returns role or None on rejection.""" + if self._token and payload.get("token") != self._token: + await self._send_error(ws, "UNAUTHORIZED", "Invalid or missing token") + try: + await ws.close(1008, "Unauthorized") + except Exception: + pass + return None + + role = payload.get("role") + if role not in ("headset", "dashboard"): + await self._send_error( + ws, "BAD_REQUEST", "role must be 'headset' or 'dashboard'" + ) + return None + + async with self._lock: + if role == "headset": + state = _HeadsetState( + client_id=client_id, + ws=ws, + registered_at=time.time(), + device_label=payload.get("deviceLabel"), + ) + self._headsets[ws] = state + log.info( + "Headset registered: clientId=%s label=%s", + client_id, + state.device_label, + ) + hello_payload = { + "clientId": client_id, + "configVersion": self._config_version, + "config": dict(self._stream_config), + } + else: + self._dashboards[ws] = _DashboardState( + client_id=client_id, + ws=ws, + registered_at=time.time(), + ) + log.info("Dashboard registered: clientId=%s", client_id) + hello_payload = {"clientId": client_id} + + await self._send(ws, "hello", hello_payload) + return role + + # ------------------------------------------------------------------ + # Private: message dispatch + # ------------------------------------------------------------------ + + async def _dispatch_headset(self, ws: Any, msg_type: str, payload: dict) -> None: + if msg_type == "clientMetrics": + await self._handle_client_metrics(ws, payload) + else: + await self._send_error( + ws, "BAD_REQUEST", f"Unknown message type for headset: {msg_type}" + ) + + async def _dispatch_dashboard(self, ws: Any, msg_type: str, payload: dict) -> None: + # Defense-in-depth: re-validate token on dashboard messages + if self._token and payload.get("token") != self._token: + await self._send_error(ws, "UNAUTHORIZED", "Invalid or missing token") + return + + if msg_type == "setConfig": + await self._handle_set_config(ws, payload) + else: + await self._send_error( + ws, "BAD_REQUEST", f"Unknown message type for dashboard: {msg_type}" + ) + + # ------------------------------------------------------------------ + # Private: message handlers + # ------------------------------------------------------------------ + + async def _merge_stream_config( + self, new_config: dict, target_id: str | None + ) -> ( + tuple[Literal["noop"], int] + | tuple[Literal["missing"], str] + | tuple[Literal["push"], int, dict, list[_HeadsetState]] + ): + """Under lock: shallow-merge ``new_config``; return noop, missing target id, or push tuple.""" + async with self._lock: + merged = {**self._stream_config, **new_config} + if merged == self._stream_config: + return ("noop", self._config_version) + + all_headsets = list(self._headsets.values()) + if target_id is not None: + targets = [s for s in all_headsets if s.client_id == target_id] + if not targets: + return ("missing", target_id) + else: + targets = all_headsets + + self._stream_config = merged + self._config_version += 1 + version = self._config_version + snapshot = dict(self._stream_config) + return ("push", version, snapshot, targets) + + async def _push_config_to_headsets( + self, version: int, config_snapshot: dict, targets: list[_HeadsetState] + ) -> None: + push_payload = {"configVersion": version, "config": config_snapshot} + for headset in targets: + await self._send(headset.ws, "config", push_payload) + + async def _handle_client_metrics(self, ws: Any, payload: dict) -> None: + async with self._lock: + state = self._headsets.get(ws) + if state is None: + return + cadence = str(payload.get("cadence", "unknown")) + raw_metrics = payload.get("metrics", {}) + if not isinstance(raw_metrics, dict): + raw_metrics = {} + state.metrics_by_cadence[cadence] = { + "at": int(payload.get("t", time.time() * 1000)), + "metrics": { + str(k): float(v) + for k, v in raw_metrics.items() + if isinstance(v, (int, float)) + }, + } + + async def _handle_set_config(self, ws: Any, payload: dict) -> None: + new_config = payload.get("config") + if not isinstance(new_config, dict): + await self._send_error( + ws, "BAD_REQUEST", "payload.config must be an object" + ) + return + + target_id: str | None = payload.get("targetClientId") + outcome = await self._merge_stream_config(new_config, target_id) + if outcome[0] == "noop": + return + if outcome[0] == "missing": + await self._send_error( + ws, "NOT_FOUND", f"Headset '{outcome[1]}' not connected" + ) + return + + _tag, version, config_snapshot, targets = outcome + log.info("setConfig configVersion=%d → %d headset(s)", version, len(targets)) + await self._push_config_to_headsets(version, config_snapshot, targets) + + # ------------------------------------------------------------------ + # Private: send helpers + # ------------------------------------------------------------------ + + async def _send(self, ws: Any, msg_type: str, payload: dict) -> None: + try: + await ws.send(json.dumps({"type": msg_type, "payload": payload})) + except Exception: + log.debug("Failed to send '%s' message", msg_type, exc_info=True) + + async def _send_error( + self, + ws: Any, + code: str, + message: str, + request_id: str | None = None, + ) -> None: + p: dict = {"code": code, "message": message} + if request_id is not None: + p["requestId"] = request_id + await self._send(ws, "error", p) diff --git a/src/core/cloudxr/python/udp_tcp_relay.py b/src/core/cloudxr/python/udp_tcp_relay.py new file mode 100644 index 000000000..425de976b --- /dev/null +++ b/src/core/cloudxr/python/udp_tcp_relay.py @@ -0,0 +1,183 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""PC-side UDP-over-TCP relay for tunneling CloudXR media (UDP) through ``adb reverse`` (TCP-only). + +Framing protocol (shared with the headset-side Go relay): +each UDP datagram is sent on the TCP stream as ``[2-byte big-endian length][payload]``. +Both directions use the same framing on a single bidirectional TCP connection. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import struct + +log = logging.getLogger("udp-tcp-relay") + +DEFAULT_RELAY_TCP_PORT = 47999 +DEFAULT_UDP_TARGET_PORT = 47998 +DEFAULT_UDP_TARGET_HOST = "127.0.0.1" + +_HEADER = struct.Struct("!H") # 2-byte big-endian uint16 +_MAX_UDP_PAYLOAD = 65535 + + +def relay_tcp_port() -> int: + raw = os.environ.get("TELEOP_UDP_RELAY_PORT", "").strip() + if raw: + return int(raw) + return DEFAULT_RELAY_TCP_PORT + + +class _RelaySession: + """Bridges one TCP connection to a UDP socket targeting the CloudXR runtime.""" + + def __init__( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + udp_host: str, + udp_port: int, + loop: asyncio.AbstractEventLoop, + ) -> None: + self._reader = reader + self._writer = writer + self._udp_host = udp_host + self._udp_port = udp_port + self._loop = loop + self._udp_transport: asyncio.DatagramTransport | None = None + self._tasks: list[asyncio.Task] = [] + self._closed = False + + async def run(self) -> None: + transport, _protocol = await self._loop.create_datagram_endpoint( + lambda: _UdpReceiver(self), + remote_addr=(self._udp_host, self._udp_port), + ) + self._udp_transport = transport + try: + tcp_task = asyncio.create_task(self._tcp_to_udp()) + self._tasks.append(tcp_task) + await tcp_task + finally: + self.close() + + async def _tcp_to_udp(self) -> None: + """Read length-prefixed datagrams from TCP, send as UDP.""" + try: + while not self._closed: + hdr = await self._reader.readexactly(_HEADER.size) + (length,) = _HEADER.unpack(hdr) + if length == 0: + continue + payload = await self._reader.readexactly(length) + if self._udp_transport and not self._closed: + self._udp_transport.sendto(payload) + except asyncio.IncompleteReadError: + log.debug("TCP peer disconnected (incomplete read)") + except ConnectionResetError: + log.debug("TCP connection reset") + except Exception: + if not self._closed: + log.exception("tcp_to_udp error") + + def udp_datagram_received(self, data: bytes) -> None: + """Called by _UdpReceiver when a UDP packet arrives from the runtime.""" + if self._closed or self._writer.is_closing(): + return + if len(data) > _MAX_UDP_PAYLOAD: + log.warning("Dropping oversized UDP datagram (%d bytes)", len(data)) + return + frame = _HEADER.pack(len(data)) + data + try: + self._writer.write(frame) + except Exception: + if not self._closed: + log.debug("Failed to write UDP->TCP frame") + + def close(self) -> None: + if self._closed: + return + self._closed = True + for t in self._tasks: + t.cancel() + if self._udp_transport: + self._udp_transport.close() + if not self._writer.is_closing(): + self._writer.close() + + +class _UdpReceiver(asyncio.DatagramProtocol): + def __init__(self, session: _RelaySession) -> None: + self._session = session + + def datagram_received(self, data: bytes, addr: tuple) -> None: + self._session.udp_datagram_received(data) + + def error_received(self, exc: Exception) -> None: + log.debug("UDP error: %s", exc) + + +class UdpTcpRelay: + """Manages the PC-side TCP server that bridges to the CloudXR runtime's UDP media port.""" + + def __init__( + self, + tcp_port: int = DEFAULT_RELAY_TCP_PORT, + udp_host: str = DEFAULT_UDP_TARGET_HOST, + udp_port: int = DEFAULT_UDP_TARGET_PORT, + ) -> None: + self._tcp_port = tcp_port + self._udp_host = udp_host + self._udp_port = udp_port + self._server: asyncio.Server | None = None + self._current_session: _RelaySession | None = None + + async def start(self) -> None: + self._server = await asyncio.start_server( + self._on_connect, + host="127.0.0.1", + port=self._tcp_port, + ) + log.info( + "UDP-TCP relay listening on TCP :%d -> UDP %s:%d", + self._tcp_port, + self._udp_host, + self._udp_port, + ) + + async def _on_connect( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + peer = writer.get_extra_info("peername") + log.info("Relay: new TCP connection from %s", peer) + if self._current_session is not None: + log.info("Relay: replacing previous session") + self._current_session.close() + session = _RelaySession( + reader, + writer, + self._udp_host, + self._udp_port, + asyncio.get_running_loop(), + ) + self._current_session = session + try: + await session.run() + finally: + if self._current_session is session: + self._current_session = None + log.info("Relay: TCP connection from %s closed", peer) + + async def stop(self) -> None: + if self._current_session is not None: + self._current_session.close() + self._current_session = None + if self._server is not None: + self._server.close() + await self._server.wait_closed() + self._server = None + log.info("UDP-TCP relay stopped") diff --git a/src/core/cloudxr/python/wss.py b/src/core/cloudxr/python/wss.py index a4bcfc511..9915114e6 100755 --- a/src/core/cloudxr/python/wss.py +++ b/src/core/cloudxr/python/wss.py @@ -5,7 +5,10 @@ import asyncio import errno +import json import logging +import os +from urllib.parse import unquote, urlparse import shutil import ssl import subprocess @@ -14,6 +17,32 @@ from pathlib import Path from .env_config import get_env_config +from .oob_teleop_adb import ( + HEADSET_RELAY_REMOTE_PATH, + OobAdbError, + UDP_RELAY_TCP_PORT, + adb_automation_failure_hint, + adb_automation_from_setup_oob, + adb_push_file, + adb_reverse_remove_all, + adb_start_relay, + adb_stop_relay, + assert_at_most_one_adb_device, + ensure_adb_connect, + oob_adb_automation_message, + parse_setup_oob_cli, # noqa: F401 — re-exported for __main__ + require_adb_on_path, + run_adb_headset_bookmark, + setup_adb_reverse_ports, +) +from .oob_teleop_env import ( + client_ui_fields_from_env, + default_initial_stream_config, + print_oob_hub_startup_banner, # noqa: F401 — re-exported for __main__ + resolve_lan_host_for_oob, + wss_proxy_port, +) +from .oob_teleop_hub import OOB_WS_PATH try: import websockets @@ -111,10 +140,134 @@ def build_ssl_context(cert_paths: CertPaths) -> ssl.SSLContext: } -def _make_http_handler(backend_host, backend_port): +def _cert_html() -> bytes: + return ( + b"" + b"" + b"

Certificate Accepted

" + b"

You can close this tab and return to the web client.

" + b"
" + ) + + +def _normalize_request_path(raw_path: str) -> str: + """Normalize HTTP request-target: query stripped, absolute-URL form, ``%``-decoding, ``//``, ``.`` / ``..``.""" + path = (raw_path or "/").split("?")[0] or "/" + if path.startswith(("http://", "https://")): + path = urlparse(path).path or "/" + path = unquote(path, errors="replace") + segments = [p for p in path.split("/") if p and p != "."] + stack: list[str] = [] + for seg in segments: + if seg == "..": + if stack: + stack.pop() + else: + stack.append(seg) + if not stack: + return "/" + return "/" + "/".join(stack) + + +def _is_oob_hub_http_path(path: str) -> bool: + """True for OOB HTTP API paths on the WSS proxy.""" + return path in ( + "/api/oob/v1/state", + "/api/oob/v1/config", + ) + + +def _parse_query_params(raw_path: str) -> dict[str, str]: + """First occurrence wins; keys and values are URL-decoded.""" + if "?" not in raw_path: + return {} + qs = raw_path.split("?", 1)[1] + out: dict[str, str] = {} + for part in qs.split("&"): + if not part: + continue + if "=" in part: + k, v = part.split("=", 1) + k = unquote(k) + if k not in out: + out[k] = unquote(v, errors="replace") + else: + k = unquote(part) + if k not in out: + out[k] = "" + return out + + +def _stream_config_from_query(q: dict[str, str]) -> tuple[dict | None, str | None]: + """Build ``StreamConfig`` patch from query string (flat ``serverIP=`` / ``port=`` / …).""" + cfg: dict[str, object] = {} + if "serverIP" in q: + cfg["serverIP"] = q["serverIP"] + if "proxyUrl" in q: + cfg["proxyUrl"] = q["proxyUrl"] + if "mediaAddress" in q: + cfg["mediaAddress"] = q["mediaAddress"] + if "port" in q and q["port"] != "": + try: + cfg["port"] = int(q["port"], 10) + except ValueError: + return None, "port must be an integer" + if "mediaPort" in q and q["mediaPort"] != "": + try: + cfg["mediaPort"] = int(q["mediaPort"], 10) + except ValueError: + return None, "mediaPort must be an integer" + + if "panelHiddenAtStart" in q and q["panelHiddenAtStart"] != "": + s = q["panelHiddenAtStart"].strip().lower() + if s in ("1", "true", "yes", "on"): + cfg["panelHiddenAtStart"] = True + elif s in ("0", "false", "no", "off"): + cfg["panelHiddenAtStart"] = False + else: + return None, "panelHiddenAtStart must be true or false" + if "codec" in q and q["codec"] != "": + cfg["codec"] = q["codec"] + if "perEyeWidth" in q and q["perEyeWidth"] != "": + try: + cfg["perEyeWidth"] = int(q["perEyeWidth"], 10) + except ValueError: + return None, "perEyeWidth must be an integer" + if "perEyeHeight" in q and q["perEyeHeight"] != "": + try: + cfg["perEyeHeight"] = int(q["perEyeHeight"], 10) + except ValueError: + return None, "perEyeHeight must be an integer" + + return cfg, None + + +def _oob_token(request, q: dict[str, str]) -> str | None: + h = request.headers.get("X-Control-Token") + if h: + return h + t = q.get("token") + return t if t else None + + +def _json_response(status: int, phrase: str, body: dict) -> Response: + return Response( + status, + phrase, + Headers({"Content-Type": "application/json", **CORS_HEADERS}), + json.dumps(body).encode(), + ) + + +def _make_http_handler(backend_host, backend_port, hub=None): async def handle_http_request(connection, request): if request.headers.get("Upgrade", "").lower() == "websocket": return None + if request.headers.get("Access-Control-Request-Method"): return Response( 200, @@ -122,19 +275,61 @@ async def handle_http_request(connection, request): Headers({"Content-Type": "text/plain", **CORS_HEADERS}), b"OK", ) + + path = _normalize_request_path(request.path or "/") + raw_path = request.path or "/" + q = _parse_query_params(raw_path) + + if hub is not None and _is_oob_hub_http_path(path): + token = _oob_token(request, q) + if path == "/api/oob/v1/state": + if not hub.check_token(token): + return _json_response( + 401, "Unauthorized", {"error": "Unauthorized"} + ) + snapshot = await hub.get_snapshot() + return Response( + 200, + "OK", + Headers({"Content-Type": "application/json", **CORS_HEADERS}), + json.dumps(snapshot).encode(), + ) + + if path == "/api/oob/v1/config": + if not hub.check_token(token): + return _json_response( + 401, "Unauthorized", {"error": "Unauthorized"} + ) + cfg, err = _stream_config_from_query(q) + if err: + return _json_response(400, "Bad Request", {"error": err}) + payload = { + "config": cfg, + "targetClientId": q.get("targetClientId"), + "token": token, + } + status, body = await hub.http_oob_set_config(payload) + phrase = { + 200: "OK", + 400: "Bad Request", + 401: "Unauthorized", + 404: "Not Found", + }.get(status, "Error") + return _json_response(status, phrase, body) + + if hub is None and _is_oob_hub_http_path(path): + return Response( + 404, + "Not Found", + Headers({"Content-Type": "text/plain", **CORS_HEADERS}), + b"Not found", + ) + return Response( 200, "OK", Headers({"Content-Type": "text/html; charset=utf-8", **CORS_HEADERS}), - b"" - b"" - b"

Certificate Accepted

" - b"

You can close this tab and return to the web client.

" - b"
", + _cert_html(), ) return handle_http_request @@ -156,6 +351,24 @@ def add_cors_headers(connection, request, response): } +def _is_backend_connection_refused(exc: BaseException) -> bool: + """True when ``ws_connect`` failed because nothing is listening (runtime not running).""" + if isinstance(exc, ConnectionRefusedError): + return True + if isinstance(exc, OSError) and exc.errno in ( + errno.ECONNREFUSED, + getattr(errno, "WSAECONNREFUSED", -1), + ): + return True + if isinstance(exc, OSError): + msg = str(exc).lower() + if "errno 61" in msg or "errno 111" in msg: + return True + if "connection refused" in msg: + return True + return False + + async def _pipe(src, dst, label: str): try: async for msg in src: @@ -204,6 +417,19 @@ async def proxy_handler(client, backend_host: str, backend_port: int): ping_timeout=None, close_timeout=10, ) + except OSError as exc: + if _is_backend_connection_refused(exc): + log.warning( + "No CloudXR runtime at ws://%s:%s (connection refused) for path %s — " + "expected when running WSS+hub without the runtime; teleop signaling uses %s.", + backend_host, + backend_port, + path, + OOB_WS_PATH, + ) + return + log.exception("Failed to connect to backend %s", backend_uri) + return except Exception: log.exception("Failed to connect to backend %s", backend_uri) return @@ -232,43 +458,118 @@ async def proxy_handler(client, backend_host: str, backend_port: int): log.info("Connection closed: %s", path) +def _find_relay_binary() -> Path | None: + """Locate the pre-built headset relay binary (``udprelay``) for ``adb push``. + + Search order (first existing file wins): + 1. ``TELEOP_RELAY_BINARY`` env var (explicit override) + 2. ``/native/udprelay`` — CMake-copied into the build output + 3. ``/../udprelay/udprelay`` — source tree when running from source + 4. ``/src/core/cloudxr/udprelay/udprelay`` — source tree via CMakeLists parent + """ + pkg = Path(__file__).resolve().parent + candidates = [ + pkg / "native" / "udprelay", + pkg.parent / "udprelay" / "udprelay", + ] + # When running from the CMake build output the source tree is far away; + # walk up to find the repo root (contains src/core/cloudxr/udprelay/). + source_root = pkg + for _ in range(10): + candidate = source_root / "src" / "core" / "cloudxr" / "udprelay" / "udprelay" + if candidate.is_file(): + candidates.append(candidate) + break + parent = source_root.parent + if parent == source_root: + break + source_root = parent + + env = os.environ.get("TELEOP_RELAY_BINARY", "").strip() + if env: + candidates.insert(0, Path(env)) + for p in candidates: + if p.is_file(): + log.info("Found relay binary: %s", p) + return p + return None + + def default_cert_paths() -> CertPaths: """Return cert paths under the default location (~/.cloudxr/certs).""" return cert_paths_from_dir(Path(get_env_config().openxr_run_dir()).parent / "certs") async def run( - log_file_path: str | Path, + log_file_path: str | Path | None, stop_future: asyncio.Future, backend_host: str = "localhost", backend_port: int = 49100, - proxy_port: int = 48322, + proxy_port: int | None = None, + setup_oob: str | None = None, ) -> None: logger = log logger.setLevel(logging.INFO) logger.propagate = False - file_handler = logging.FileHandler(log_file_path, mode="a", encoding="utf-8") - file_handler.setFormatter( - logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") - ) - logger.addHandler(file_handler) + _log_fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") + if log_file_path is not None: + _handler: logging.Handler = logging.FileHandler( + log_file_path, mode="a", encoding="utf-8" + ) + else: + _handler = logging.StreamHandler(sys.stderr) + _handler.setFormatter(_log_fmt) + logger.addHandler(_handler) try: + resolved_port = wss_proxy_port() if proxy_port is None else proxy_port + adb_effective = None + if setup_oob is not None: + require_adb_on_path() + adb_effective = adb_automation_from_setup_oob(setup_oob) + if adb_effective.connect is not None: + resolve_lan_host_for_oob() + await asyncio.to_thread(ensure_adb_connect, adb_effective.connect) + await asyncio.to_thread(assert_at_most_one_adb_device) logging.getLogger("websockets").setLevel(logging.WARNING) cert_paths = default_cert_paths() ensure_certificate(cert_paths) ssl_ctx = build_ssl_context(cert_paths) + hub = None + if setup_oob is not None: + from .oob_teleop_hub import OOBControlHub # noqa: PLC0415 + + control_token = os.environ.get("CONTROL_TOKEN") or None + initial = { + **default_initial_stream_config(resolved_port), + **client_ui_fields_from_env(), + } + hub = OOBControlHub(control_token=control_token, initial_config=initial) + log.info( + "Teleop control hub enabled (token=%s) OOB_WS=%s initial_stream=%s", + "set" if control_token else "none", + OOB_WS_PATH, + initial, + ) + def handler(ws): + if hub is not None: + path = _normalize_request_path(ws.request.path or "/") + if path == OOB_WS_PATH: + return hub.handle_connection(ws) return proxy_handler(ws, backend_host, backend_port) - http_handler = _make_http_handler(backend_host, backend_port) + http_handler = _make_http_handler(backend_host, backend_port, hub=hub) + + udp_relay = None + tethered_setup_done = False async with ws_serve( handler, host="", - port=proxy_port, + port=resolved_port, ssl=ssl_ctx, process_request=http_handler, process_response=add_cors_headers, @@ -278,16 +579,69 @@ def handler(ws): ping_timeout=None, close_timeout=10, ): - log.info("WSS proxy listening on port %d", proxy_port) - await stop_future - log.info("Shutting down ...") + log.info("WSS proxy listening on port %d", resolved_port) + try: + if setup_oob is not None and adb_effective is not None: + if adb_effective.tethered: + from .udp_tcp_relay import UdpTcpRelay # noqa: PLC0415 + + relay_tcp = UDP_RELAY_TCP_PORT + udp_relay = UdpTcpRelay(tcp_port=relay_tcp) + await udp_relay.start() + + await asyncio.to_thread( + setup_adb_reverse_ports, + resolved_port, + relay_tcp, + 8080, + ) + tethered_setup_done = True + + relay_bin = _find_relay_binary() + if relay_bin: + await asyncio.to_thread( + adb_push_file, + relay_bin, + HEADSET_RELAY_REMOTE_PATH, + ) + await asyncio.to_thread( + adb_start_relay, + HEADSET_RELAY_REMOTE_PATH, + 47998, + relay_tcp, + ) + else: + log.warning( + "Headset relay binary not found; push it manually to %s " + "and start with: udprelay -udp-port 47998 -tcp-port %d", + HEADSET_RELAY_REMOTE_PATH, + relay_tcp, + ) + + rc, adb_diag = await asyncio.to_thread( + run_adb_headset_bookmark, + resolved_port=resolved_port, + adb=adb_effective, + ) + if rc != 0: + hint = adb_automation_failure_hint(adb_diag) + detail = adb_diag if adb_diag else "" + raise OobAdbError(oob_adb_automation_message(rc, detail, hint)) + await stop_future + log.info("Shutting down ...") + finally: + if tethered_setup_done: + await asyncio.to_thread(adb_stop_relay, HEADSET_RELAY_REMOTE_PATH) + await asyncio.to_thread(adb_reverse_remove_all) + if udp_relay is not None: + await udp_relay.stop() except OSError as e: if e.errno == errno.EADDRINUSE: raise RuntimeError( - f"WSS proxy port {proxy_port} is already in use. " - f"Set PROXY_PORT to a different port or stop the process using {proxy_port}." + f"WSS proxy port {resolved_port} is already in use. " + f"Set PROXY_PORT to a different port or stop the process using {resolved_port}." ) from e raise finally: - logger.removeHandler(file_handler) - file_handler.close() + logger.removeHandler(_handler) + _handler.close() diff --git a/src/core/cloudxr/udprelay/go.mod b/src/core/cloudxr/udprelay/go.mod new file mode 100644 index 000000000..8cfc95bc1 --- /dev/null +++ b/src/core/cloudxr/udprelay/go.mod @@ -0,0 +1,3 @@ +module github.com/NVIDIA/IsaacTeleop/udprelay + +go 1.21 diff --git a/src/core/cloudxr/udprelay/main.go b/src/core/cloudxr/udprelay/main.go new file mode 100644 index 000000000..d629ed50a --- /dev/null +++ b/src/core/cloudxr/udprelay/main.go @@ -0,0 +1,122 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +// udprelay is the headset-side component of the UDP-over-TCP tunnel for adb reverse. +// +// It listens for UDP datagrams on a local port (default 47998, the CloudXR media port) +// and tunnels them over a TCP connection to the PC-side relay (default TCP 47999, +// reached via adb reverse). The PC relay then forwards each datagram as real UDP to +// the CloudXR runtime. +// +// Framing: each datagram is prefixed with a 2-byte big-endian uint16 length header. +// Both directions use the same framing on the single TCP connection. +// +// Build for Android headset: +// +// GOOS=linux GOARCH=arm64 go build -o udprelay . +package main + +import ( + "encoding/binary" + "flag" + "fmt" + "io" + "log" + "net" + "os" + "os/signal" + "sync" + "syscall" +) + +func main() { + udpPort := flag.Int("udp-port", 47998, "Local UDP listen port (CloudXR media)") + tcpPort := flag.Int("tcp-port", 47999, "TCP port to connect to (adb-reversed to PC relay)") + tcpHost := flag.String("tcp-host", "127.0.0.1", "TCP host to connect to") + flag.Parse() + + udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("0.0.0.0:%d", *udpPort)) + if err != nil { + log.Fatalf("resolve UDP addr: %v", err) + } + + udpConn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + log.Fatalf("listen UDP :%d: %v", *udpPort, err) + } + defer udpConn.Close() + log.Printf("Listening UDP on :%d", *udpPort) + + tcpTarget := fmt.Sprintf("%s:%d", *tcpHost, *tcpPort) + tcpConn, err := net.Dial("tcp", tcpTarget) + if err != nil { + log.Fatalf("connect TCP %s: %v", tcpTarget, err) + } + defer tcpConn.Close() + log.Printf("Connected TCP to %s", tcpTarget) + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + var ( + mu sync.Mutex + clientAddr *net.UDPAddr // last-seen UDP peer (the WebRTC client) + ) + + // UDP → TCP: read UDP datagrams, length-prefix, send on TCP. + go func() { + buf := make([]byte, 65535+2) + for { + n, addr, err := udpConn.ReadFromUDP(buf[2:]) + if err != nil { + log.Printf("UDP read error: %v", err) + return + } + mu.Lock() + clientAddr = addr + mu.Unlock() + + binary.BigEndian.PutUint16(buf[:2], uint16(n)) + if _, err := tcpConn.Write(buf[:2+n]); err != nil { + log.Printf("TCP write error: %v", err) + return + } + } + }() + + // TCP → UDP: read length-prefixed frames, send as UDP back to last-seen client. + go func() { + hdr := make([]byte, 2) + payload := make([]byte, 65535) + for { + if _, err := io.ReadFull(tcpConn, hdr); err != nil { + if err != io.EOF { + log.Printf("TCP read header error: %v", err) + } + return + } + length := binary.BigEndian.Uint16(hdr) + if length == 0 { + continue + } + if _, err := io.ReadFull(tcpConn, payload[:length]); err != nil { + log.Printf("TCP read payload error: %v", err) + return + } + + mu.Lock() + dst := clientAddr + mu.Unlock() + if dst == nil { + continue + } + if _, err := udpConn.WriteToUDP(payload[:length], dst); err != nil { + log.Printf("UDP write error: %v", err) + } + } + }() + + sig := <-sigCh + log.Printf("Received %v, shutting down", sig) + os.Exit(0) +} diff --git a/src/core/cloudxr_tests/python/conftest.py b/src/core/cloudxr_tests/python/conftest.py new file mode 100644 index 000000000..1aaaa2725 --- /dev/null +++ b/src/core/cloudxr_tests/python/conftest.py @@ -0,0 +1,48 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Make CloudXR python sources importable without installing ``isaacteleop``. + +* Flat ``sys.path`` entry: ``from oob_teleop_hub import …`` (no relative imports). +* Synthetic package ``cloudxr_py_test_ns``: ``from cloudxr_py_test_ns.oob_teleop_env import …`` + so modules that use sibling relative imports load correctly. +""" + +from __future__ import annotations + +import importlib.util +import sys +import types +from pathlib import Path + +_CLOUDXR_PY = Path(__file__).resolve().parents[2] / "cloudxr" / "python" +if _CLOUDXR_PY.is_dir() and str(_CLOUDXR_PY) not in sys.path: + sys.path.insert(0, str(_CLOUDXR_PY)) + +# Must match string used in test modules that import oob_teleop_env / oob_teleop_adb. +CLOUDXR_TEST_PKG = "cloudxr_py_test_ns" + + +def _ensure_cloudxr_package() -> None: + if CLOUDXR_TEST_PKG in sys.modules: + return + pkg = types.ModuleType(CLOUDXR_TEST_PKG) + pkg.__path__ = [str(_CLOUDXR_PY)] + sys.modules[CLOUDXR_TEST_PKG] = pkg + + def load(mod: str) -> None: + full = f"{CLOUDXR_TEST_PKG}.{mod}" + path = _CLOUDXR_PY / f"{mod}.py" + spec = importlib.util.spec_from_file_location(full, path) + assert spec and spec.loader + module = importlib.util.module_from_spec(spec) + sys.modules[full] = module + spec.loader.exec_module(module) + setattr(sys.modules[CLOUDXR_TEST_PKG], mod, module) + + load("oob_teleop_hub") + load("oob_teleop_env") + load("oob_teleop_adb") + + +_ensure_cloudxr_package() diff --git a/src/core/cloudxr_tests/python/pyproject.toml b/src/core/cloudxr_tests/python/pyproject.toml index 3850a5360..006c5e8c2 100644 --- a/src/core/cloudxr_tests/python/pyproject.toml +++ b/src/core/cloudxr_tests/python/pyproject.toml @@ -10,10 +10,12 @@ requires-python = ">=3.10,<3.14" [project.optional-dependencies] dev = [ "pytest", + "pytest-asyncio>=1.3.0", "numpy", ] test = [ "pytest", + "pytest-asyncio>=1.3.0", "numpy", ] @@ -22,5 +24,6 @@ pythonpath = ["."] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] +asyncio_mode = "auto" # Prevent pytest from recursing into parent directories norecursedirs = [".git", ".venv", "build", "dist", "*.egg", "__pycache__"] diff --git a/src/core/cloudxr_tests/python/test_oob_teleop_adb.py b/src/core/cloudxr_tests/python/test_oob_teleop_adb.py new file mode 100644 index 000000000..9cae4788e --- /dev/null +++ b/src/core/cloudxr_tests/python/test_oob_teleop_adb.py @@ -0,0 +1,385 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for :mod:`oob_teleop_adb` (CLI parsing, hints, adb reverse, relay, bookmark automation with mocked subprocess).""" + +from __future__ import annotations + +import argparse +from unittest.mock import MagicMock, patch + +import pytest + +from cloudxr_py_test_ns.oob_teleop_adb import ( + AdbAutomation, + OobAdbError, + adb_automation_failure_hint, + adb_automation_from_setup_oob, + adb_push_file, + adb_reverse, + adb_reverse_remove_all, + adb_start_relay, + adb_stop_relay, + assert_at_most_one_adb_device, + ensure_adb_connect, + oob_adb_automation_message, + parse_setup_oob_cli, + require_adb_on_path, + run_adb_headset_bookmark, + setup_adb_reverse_ports, +) + + +def test_adb_automation_from_setup_oob_tethered() -> None: + a = adb_automation_from_setup_oob(" TETHERED ") + assert a == AdbAutomation(tethered=True, connect=None) + + +def test_adb_automation_from_setup_oob_wireless() -> None: + a = adb_automation_from_setup_oob("192.168.1.5:5555") + assert a == AdbAutomation(tethered=False, connect="192.168.1.5:5555") + + +def test_adb_automation_from_setup_oob_invalid() -> None: + with pytest.raises(ValueError, match="tethered"): + adb_automation_from_setup_oob("nocolon") + + +def test_parse_setup_oob_cli() -> None: + assert parse_setup_oob_cli("tethered") == "tethered" + assert parse_setup_oob_cli("10.0.0.1:5555") == "10.0.0.1:5555" + with pytest.raises(argparse.ArgumentTypeError): + parse_setup_oob_cli("") + with pytest.raises(argparse.ArgumentTypeError): + parse_setup_oob_cli("bad") + + +@pytest.mark.parametrize( + "diag,needle", + [ + ("device unauthorized", "unauthorized"), + ("error: no devices/emulators found", "No adb device"), + ("error: no devices found", "No adb device"), + ("device not found", "No adb device"), + ("more than one device", "Multiple adb devices"), + ("device offline", "offline"), + ("something else", ""), + ], +) +def test_adb_automation_failure_hint(diag: str, needle: str) -> None: + h = adb_automation_failure_hint(diag) + if needle: + assert needle.lower() in h.lower() + else: + assert h == "" + + +def test_oob_adb_automation_message_format() -> None: + msg = oob_adb_automation_message(1, "adb said no", "try again") + assert "exit code 1" in msg + assert "adb said no" in msg + assert "try again" in msg + assert "omit --setup-oob" in msg + + +def test_oob_adb_automation_message_empty_detail() -> None: + msg = oob_adb_automation_message(2, "", "") + assert "(no output from adb)" in msg + + +def test_require_adb_on_path_raises_when_missing( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "cloudxr_py_test_ns.oob_teleop_adb.shutil.which", lambda _x: None + ) + with pytest.raises(OobAdbError, match="adb` was not found"): + require_adb_on_path() + + +def test_require_adb_on_path_ok_when_adb_present( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "cloudxr_py_test_ns.oob_teleop_adb.shutil.which", lambda _x: "/fake/adb" + ) + require_adb_on_path() + + +def test_ensure_adb_connect_skips_empty() -> None: + with patch("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run") as run: + ensure_adb_connect(" \t ") + run.assert_not_called() + + +def test_ensure_adb_connect_invokes_adb(monkeypatch: pytest.MonkeyPatch) -> None: + called: list[list[str]] = [] + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + called.append(cmd) + m = MagicMock() + m.returncode = 0 + m.stdout = m.stderr = "" + return m + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + ensure_adb_connect("192.168.0.5:5555") + assert called == [["adb", "connect", "192.168.0.5:5555"]] + + +def test_assert_at_most_one_adb_device_two_devices_raises( + monkeypatch: pytest.MonkeyPatch, +) -> None: + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + m = MagicMock() + m.returncode = 0 + m.stderr = "" + m.stdout = "List of devices attached\nserial1\tdevice\nserial2\tdevice\n" + return m + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + with pytest.raises(OobAdbError, match="Too many adb devices"): + assert_at_most_one_adb_device() + + +def test_assert_at_most_one_adb_device_one_ok(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + m = MagicMock() + m.returncode = 0 + m.stderr = "" + m.stdout = "List of devices attached\nabc\tdevice\n" + return m + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + assert_at_most_one_adb_device() + + +def test_assert_at_most_one_adb_file_not_found_wraps( + monkeypatch: pytest.MonkeyPatch, +) -> None: + def fake_run(*_a: object, **_kw: object) -> None: + raise FileNotFoundError("adb") + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + with pytest.raises(OobAdbError, match="adb` was not found"): + assert_at_most_one_adb_device() + + +def _proc(rc: int, out: str = "", err: str = "") -> MagicMock: + m = MagicMock() + m.returncode = rc + m.stdout = out + m.stderr = err + return m + + +# ── adb reverse tests ── + + +def test_adb_reverse_success(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[list[str]] = [] + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + calls.append(cmd) + return _proc(0) + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + adb_reverse(48322, 48322) + assert calls == [["adb", "reverse", "tcp:48322", "tcp:48322"]] + + +def test_adb_reverse_failure_raises(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + "cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", + lambda *a, **kw: _proc(1, err="error: device offline"), + ) + with pytest.raises(OobAdbError, match="adb reverse"): + adb_reverse(48322, 48322) + + +def test_adb_reverse_remove_all_success(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[list[str]] = [] + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + calls.append(cmd) + return _proc(0) + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + adb_reverse_remove_all() + assert calls == [["adb", "reverse", "--remove-all"]] + + +def test_setup_adb_reverse_ports_with_web(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[list[str]] = [] + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + calls.append(cmd) + return _proc(0) + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + ports = setup_adb_reverse_ports(48322, 47999, web_port=8080) + assert ports == [48322, 47999, 8080] + assert len(calls) == 3 + + +def test_setup_adb_reverse_ports_no_web(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[list[str]] = [] + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + calls.append(cmd) + return _proc(0) + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + ports = setup_adb_reverse_ports(48322, 47999) + assert ports == [48322, 47999] + assert len(calls) == 2 + + +# ── adb push / relay tests ── + + +def test_adb_push_file_success(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[list[str]] = [] + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + calls.append(cmd) + return _proc(0) + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + adb_push_file("/tmp/udprelay", "/data/local/tmp/udprelay") + assert calls == [["adb", "push", "/tmp/udprelay", "/data/local/tmp/udprelay"]] + + +def test_adb_push_file_failure_raises(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + "cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", + lambda *a, **kw: _proc(1, err="push failed"), + ) + with pytest.raises(OobAdbError, match="adb push"): + adb_push_file("/tmp/udprelay", "/data/local/tmp/udprelay") + + +def test_adb_start_relay_success(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[list[str]] = [] + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + calls.append(cmd) + return _proc(0) + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + adb_start_relay("/data/local/tmp/udprelay", 47998, 47999) + assert len(calls) == 2 # chmod + nohup + + +def test_adb_stop_relay_best_effort(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[list[str]] = [] + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + calls.append(cmd) + return _proc(1) # process not found is ok + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + adb_stop_relay("/data/local/tmp/udprelay") + assert any("pkill" in c for c in calls) + + +# ── bookmark tests ── + + +def test_run_adb_headset_bookmark_wireless_am_start_ok( + clear_teleop_env: None, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("TELEOP_PROXY_HOST", "192.168.0.7") + monkeypatch.delenv("TELEOP_WEB_CLIENT_BASE", raising=False) + + calls: list[list[str]] = [] + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + calls.append(cmd) + return _proc(0) + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + rc, diag = run_adb_headset_bookmark( + resolved_port=48322, + adb=AdbAutomation(tethered=False, connect=None), + ) + assert rc == 0 + assert diag == "" + assert len(calls) == 1 + assert calls[0][:2] == ["adb", "shell"] + assert "-d" in calls[0] + intent_url = calls[0][calls[0].index("-d") + 1] + assert "oobEnable=1" in intent_url + assert "192.168.0.7" in intent_url + + +def test_run_adb_headset_bookmark_tethered_uses_loopback( + clear_teleop_env: None, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("TELEOP_WEB_CLIENT_BASE", raising=False) + + calls: list[list[str]] = [] + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + calls.append(cmd) + return _proc(0) + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + rc, diag = run_adb_headset_bookmark( + resolved_port=48322, + adb=AdbAutomation(tethered=True, connect=None), + ) + assert rc == 0 + assert diag == "" + assert len(calls) == 1 + intent_url = calls[0][calls[0].index("-d") + 1] + assert intent_url.startswith("https://127.0.0.1:8080/"), ( + f"tethered mode should use local web client, got: {intent_url}" + ) + assert "oobEnable=1" in intent_url + assert "serverIP=127.0.0.1" in intent_url + # mediaAddress/mediaPort must NOT be set — WebRTC ICE rejects loopback + # as a remote candidate. Media negotiates via normal ICE (WiFi). + assert "mediaAddress" not in intent_url + assert "mediaPort" not in intent_url + + +def test_run_adb_headset_bookmark_am_start_failure( + clear_teleop_env: None, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("TELEOP_PROXY_HOST", "10.0.0.1") + monkeypatch.delenv("TELEOP_WEB_CLIENT_BASE", raising=False) + + def fake_run(cmd: list[str], **_kw: object) -> MagicMock: + if "am" in cmd and "VIEW" in " ".join(cmd): + return _proc(1, err="am start failed") + return _proc(0) + + monkeypatch.setattr("cloudxr_py_test_ns.oob_teleop_adb.subprocess.run", fake_run) + rc, diag = run_adb_headset_bookmark( + resolved_port=48322, + adb=AdbAutomation(tethered=False, connect=None), + ) + assert rc == 1 + assert "am start failed" in diag + + +# Re-use fixture name from test_oob_teleop_env pattern +@pytest.fixture +def clear_teleop_env(monkeypatch: pytest.MonkeyPatch) -> None: + for k in ( + "TELEOP_PROXY_HOST", + "TELEOP_WEB_CLIENT_BASE", + "TELEOP_STREAM_SERVER_IP", + "TELEOP_STREAM_PORT", + "TELEOP_STREAM_MEDIA_PORT", + "CONTROL_TOKEN", + "TELEOP_CLIENT_CODEC", + "TELEOP_CLIENT_PANEL_HIDDEN_AT_START", + "TELEOP_CLIENT_PER_EYE_WIDTH", + "TELEOP_CLIENT_PER_EYE_HEIGHT", + ): + monkeypatch.delenv(k, raising=False) diff --git a/src/core/cloudxr_tests/python/test_oob_teleop_env.py b/src/core/cloudxr_tests/python/test_oob_teleop_env.py new file mode 100644 index 000000000..bfe985c75 --- /dev/null +++ b/src/core/cloudxr_tests/python/test_oob_teleop_env.py @@ -0,0 +1,264 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for :mod:`oob_teleop_env` (bookmark URLs, env-driven defaults, LAN helpers). + +Uses ``cloudxr_py_test_ns`` from ``conftest`` so relative imports inside the package resolve. +""" + +from __future__ import annotations + +import logging +from urllib.parse import parse_qs, urlparse + +import pytest + +from cloudxr_py_test_ns.oob_teleop_env import ( + CHROME_INSPECT_DEVICES_URL, + TELEOP_WEB_CLIENT_BASE_ENV, + WSS_PROXY_DEFAULT_PORT, + build_headset_bookmark_url, + client_ui_fields_from_env, + default_initial_stream_config, + guess_lan_ipv4, + print_oob_hub_startup_banner, + resolve_lan_host_for_oob, + web_client_base_override_from_env, + wss_proxy_port, +) +from cloudxr_py_test_ns.oob_teleop_hub import OOB_WS_PATH + + +@pytest.fixture +def clear_teleop_env(monkeypatch: pytest.MonkeyPatch) -> None: + keys = ( + "PROXY_PORT", + "TELEOP_STREAM_SERVER_IP", + "TELEOP_STREAM_PORT", + "TELEOP_CLIENT_CODEC", + "TELEOP_CLIENT_PANEL_HIDDEN_AT_START", + "TELEOP_CLIENT_PER_EYE_WIDTH", + "TELEOP_CLIENT_PER_EYE_HEIGHT", + "TELEOP_WEB_CLIENT_BASE", + "TELEOP_PROXY_HOST", + "CONTROL_TOKEN", + ) + for k in keys: + monkeypatch.delenv(k, raising=False) + + +def test_wss_proxy_port_default(clear_teleop_env: None) -> None: + assert wss_proxy_port() == WSS_PROXY_DEFAULT_PORT + + +def test_wss_proxy_port_from_env( + clear_teleop_env: None, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setenv("PROXY_PORT", "50000") + assert wss_proxy_port() == 50000 + + +def test_web_client_base_override_from_env( + clear_teleop_env: None, monkeypatch: pytest.MonkeyPatch +) -> None: + assert web_client_base_override_from_env() is None + monkeypatch.setenv(TELEOP_WEB_CLIENT_BASE_ENV, " https://example.test/app ") + assert web_client_base_override_from_env() == "https://example.test/app" + + +def test_build_headset_bookmark_url_minimal() -> None: + u = build_headset_bookmark_url( + web_client_base="https://h.test/", + stream_config={"serverIP": "10.0.0.1", "port": 48322}, + ) + assert urlparse(u).hostname == "h.test" + q = parse_qs(urlparse(u).query) + assert q["oobEnable"] == ["1"] + assert q["serverIP"] == ["10.0.0.1"] + assert q["port"] == ["48322"] + + +def test_build_headset_bookmark_url_appends_when_base_has_query() -> None: + u = build_headset_bookmark_url( + web_client_base="https://h.test/page?x=1", + stream_config={"serverIP": "1.1.1.1", "port": 1}, + ) + q = parse_qs(urlparse(u).query) + assert q["x"] == ["1"] + assert q["oobEnable"] == ["1"] + assert q["serverIP"] == ["1.1.1.1"] + + +def test_build_headset_bookmark_url_token_and_media_and_codec() -> None: + u = build_headset_bookmark_url( + web_client_base="https://x/", + stream_config={ + "serverIP": "192.168.0.2", + "port": 99, + "mediaAddress": "192.168.0.2", + "mediaPort": 47998, + "codec": "h265", + "panelHiddenAtStart": True, + "perEyeWidth": 1920, + "perEyeHeight": 1680, + }, + control_token="tok", + ) + q = parse_qs(urlparse(u).query) + assert q["controlToken"] == ["tok"] + assert q["codec"] == ["h265"] + assert q["panelHiddenAtStart"] == ["true"] + assert q["mediaPort"] == ["47998"] + assert q["perEyeWidth"] == ["1920"] + + +def test_build_headset_bookmark_url_requires_server_ip_and_port() -> None: + with pytest.raises(ValueError, match="serverIP and port"): + build_headset_bookmark_url( + web_client_base="https://x/", + stream_config={"serverIP": "", "port": 1}, + ) + with pytest.raises(ValueError, match="serverIP and port"): + build_headset_bookmark_url( + web_client_base="https://x/", + stream_config={"serverIP": "1.2.3.4"}, + ) + + +def test_default_initial_stream_config_uses_env_and_fallback( + clear_teleop_env: None, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "cloudxr_py_test_ns.oob_teleop_env.guess_lan_ipv4", lambda: None + ) + cfg = default_initial_stream_config(5555) + assert cfg == {"serverIP": "127.0.0.1", "port": 5555} + + monkeypatch.setenv("TELEOP_STREAM_SERVER_IP", "10.0.0.5") + monkeypatch.setenv("TELEOP_STREAM_PORT", "6000") + cfg2 = default_initial_stream_config(5555) + assert cfg2 == {"serverIP": "10.0.0.5", "port": 6000} + + +def test_client_ui_fields_from_env( + clear_teleop_env: None, monkeypatch: pytest.MonkeyPatch +) -> None: + assert client_ui_fields_from_env() == {} + + monkeypatch.setenv("TELEOP_CLIENT_CODEC", "av1") + monkeypatch.setenv("TELEOP_CLIENT_PANEL_HIDDEN_AT_START", "yes") + monkeypatch.setenv("TELEOP_CLIENT_PER_EYE_WIDTH", "100") + monkeypatch.setenv("TELEOP_CLIENT_PER_EYE_HEIGHT", "200") + d = client_ui_fields_from_env() + assert d["codec"] == "av1" + assert d["panelHiddenAtStart"] is True + assert d["perEyeWidth"] == 100 + assert d["perEyeHeight"] == 200 + + +def test_client_ui_fields_from_env_invalid_width_warns( + clear_teleop_env: None, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + monkeypatch.setenv("TELEOP_CLIENT_PER_EYE_WIDTH", "nope") + caplog.set_level(logging.WARNING) + d = client_ui_fields_from_env() + assert "perEyeWidth" not in d + assert any("TELEOP_CLIENT_PER_EYE_WIDTH" in r.message for r in caplog.records) + + +def test_resolve_lan_host_prefers_teleop_proxy_host( + clear_teleop_env: None, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setenv("TELEOP_PROXY_HOST", " 10.10.10.10 ") + assert resolve_lan_host_for_oob() == "10.10.10.10" + + +def test_resolve_lan_host_runtime_error_when_unresolvable( + clear_teleop_env: None, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr( + "cloudxr_py_test_ns.oob_teleop_env.guess_lan_ipv4", lambda: None + ) + with pytest.raises(RuntimeError, match="TELEOP_PROXY_HOST"): + resolve_lan_host_for_oob() + + +def test_guess_lan_ipv4_returns_none_on_loopback_socket( + monkeypatch: pytest.MonkeyPatch, +) -> None: + class FakeSock: + def __enter__(self) -> FakeSock: + return self + + def __exit__(self, *args: object) -> None: + return None + + def settimeout(self, _t: float) -> None: + return None + + def connect(self, _a: object) -> None: + return None + + def getsockname(self) -> tuple[str, int]: + return ("127.0.0.1", 12345) + + monkeypatch.setattr( + "cloudxr_py_test_ns.oob_teleop_env.socket.socket", lambda *a, **k: FakeSock() + ) + assert guess_lan_ipv4() is None + + +def test_guess_lan_ipv4_returns_address_when_non_loopback( + monkeypatch: pytest.MonkeyPatch, +) -> None: + class FakeSock: + def __enter__(self) -> FakeSock: + return self + + def __exit__(self, *args: object) -> None: + return None + + def settimeout(self, _t: float) -> None: + return None + + def connect(self, _a: object) -> None: + return None + + def getsockname(self) -> tuple[str, int]: + return ("192.168.50.2", 44444) + + monkeypatch.setattr( + "cloudxr_py_test_ns.oob_teleop_env.socket.socket", lambda *a, **k: FakeSock() + ) + assert guess_lan_ipv4() == "192.168.50.2" + + +def test_print_oob_hub_startup_banner_tethered( + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], +) -> None: + monkeypatch.setenv("TELEOP_PROXY_HOST", "192.168.42.1") + print_oob_hub_startup_banner(oob_mode="tethered") + out = capsys.readouterr().out + assert "OOB TELEOP" in out + assert "tethered" in out.lower() + assert "192.168.42.1" in out + assert CHROME_INSPECT_DEVICES_URL in out + assert f"wss://192.168.42.1:{WSS_PROXY_DEFAULT_PORT}{OOB_WS_PATH}" in out + + +def test_print_oob_hub_startup_banner_wireless_with_lan_host( + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], +) -> None: + monkeypatch.setattr( + "cloudxr_py_test_ns.oob_teleop_env.guess_lan_ipv4", lambda: None + ) + print_oob_hub_startup_banner(oob_mode="wireless", lan_host="172.16.0.5") + out = capsys.readouterr().out + assert "wireless" in out.lower() or "Wireless" in out + assert "172.16.0.5" in out + assert f"wss://172.16.0.5:{WSS_PROXY_DEFAULT_PORT}{OOB_WS_PATH}" in out diff --git a/src/core/cloudxr_tests/python/test_oob_teleop_hub.py b/src/core/cloudxr_tests/python/test_oob_teleop_hub.py new file mode 100644 index 000000000..45dc2c44f --- /dev/null +++ b/src/core/cloudxr_tests/python/test_oob_teleop_hub.py @@ -0,0 +1,373 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Async unit tests for :mod:`oob_teleop_hub.OOBControlHub`. + +Run from this directory (after ``pip install pytest``):: + + pytest -q + +No CloudXR runtime, TLS, or ``isaacteleop`` install required — ``conftest.py`` adds +``src/core/cloudxr/python`` to ``sys.path``. +""" + +from __future__ import annotations + +import asyncio +import json +from typing import Any + +import pytest + +from oob_teleop_hub import OOBControlHub + + +class QueueWS: + """Minimal async-iterable WebSocket stand-in for :meth:`OOBControlHub.handle_connection`.""" + + def __init__(self) -> None: + self._q: asyncio.Queue[str | None] = asyncio.Queue() + self.sent: list[str] = [] + self.close_calls: list[tuple[Any, ...]] = [] + + async def inject(self, message: str) -> None: + await self._q.put(message) + + async def end_stream(self) -> None: + await self._q.put(None) + + def __aiter__(self) -> QueueWS: + return self + + async def __anext__(self) -> str: + item = await self._q.get() + if item is None: + raise StopAsyncIteration + return item + + async def send(self, data: str) -> None: + self.sent.append(data) + + async def close(self, *_args: Any, **_kwargs: Any) -> None: + self.close_calls.append(_args) + + +def _loads_sent(ws: QueueWS) -> list[dict]: + return [json.loads(s) for s in ws.sent] + + +def test_check_token_no_requirement() -> None: + hub = OOBControlHub(control_token=None) + assert hub.check_token(None) is True + assert hub.check_token("anything") is True + + +def test_check_token_required() -> None: + hub = OOBControlHub(control_token="secret") + assert hub.check_token(None) is False + assert hub.check_token("wrong") is False + assert hub.check_token("secret") is True + + +@pytest.mark.asyncio +async def test_get_snapshot_empty() -> None: + hub = OOBControlHub() + snap = await hub.get_snapshot() + assert snap["configVersion"] == 0 + assert snap["config"] == {} + assert snap["headsets"] == [] + assert snap["dashboards"] == [] + assert "updatedAt" in snap + + +@pytest.mark.asyncio +async def test_headset_register_hello_and_snapshot() -> None: + hub = OOBControlHub(initial_config={"serverIP": "1.2.3.4", "port": 1111}) + ws = QueueWS() + task = asyncio.create_task(hub.handle_connection(ws)) + + await ws.inject( + json.dumps( + {"type": "register", "payload": {"role": "headset", "deviceLabel": "Q3"}} + ) + ) + await asyncio.sleep(0) + hello = json.loads(ws.sent[0]) + assert hello["type"] == "hello" + assert hello["payload"]["config"]["serverIP"] == "1.2.3.4" + assert hello["payload"]["config"]["port"] == 1111 + hid = hello["payload"]["clientId"] + + snap = await hub.get_snapshot() + assert len(snap["headsets"]) == 1 + assert snap["headsets"][0]["clientId"] == hid + assert snap["headsets"][0]["deviceLabel"] == "Q3" + + await ws.end_stream() + await task + + +@pytest.mark.asyncio +async def test_register_rejects_bad_token() -> None: + hub = OOBControlHub(control_token="ok") + ws = QueueWS() + task = asyncio.create_task(hub.handle_connection(ws)) + + await ws.inject( + json.dumps({"type": "register", "payload": {"role": "headset", "token": "bad"}}) + ) + await asyncio.sleep(0) + err = json.loads(ws.sent[0]) + assert err["type"] == "error" + assert err["payload"]["code"] == "UNAUTHORIZED" + assert ws.close_calls + + await ws.end_stream() + await task + + +@pytest.mark.asyncio +async def test_first_message_must_be_register() -> None: + hub = OOBControlHub() + ws = QueueWS() + task = asyncio.create_task(hub.handle_connection(ws)) + + await ws.inject(json.dumps({"type": "clientMetrics", "payload": {}})) + await asyncio.sleep(0) + err = json.loads(ws.sent[0]) + assert err["payload"]["code"] == "BAD_REQUEST" + + await ws.end_stream() + await task + + +@pytest.mark.asyncio +async def test_set_config_pushes_to_headset() -> None: + hub = OOBControlHub(initial_config={"serverIP": "127.0.0.1", "port": 49100}) + hw = QueueWS() + dw = QueueWS() + th = asyncio.create_task(hub.handle_connection(hw)) + td = asyncio.create_task(hub.handle_connection(dw)) + + await hw.inject(json.dumps({"type": "register", "payload": {"role": "headset"}})) + await dw.inject(json.dumps({"type": "register", "payload": {"role": "dashboard"}})) + await asyncio.sleep(0) + + hw.sent.clear() + await dw.inject( + json.dumps( + { + "type": "setConfig", + "payload": {"config": {"serverIP": "10.0.0.2", "port": 48322}}, + } + ) + ) + await asyncio.sleep(0) + + cfg_msgs = [m for m in _loads_sent(hw) if m["type"] == "config"] + assert len(cfg_msgs) == 1 + assert cfg_msgs[0]["payload"]["config"]["serverIP"] == "10.0.0.2" + assert cfg_msgs[0]["payload"]["config"]["port"] == 48322 + assert cfg_msgs[0]["payload"]["configVersion"] == 1 + + await hw.end_stream() + await dw.end_stream() + await th + await td + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "initial_cfg,config_payload", + [ + ({"serverIP": "127.0.0.1", "port": 49100}, {}), + ( + {"serverIP": "10.0.0.1", "port": 9000}, + {"serverIP": "10.0.0.1", "port": 9000}, + ), + ], +) +async def test_set_config_noop_when_unchanged( + initial_cfg: dict, config_payload: dict +) -> None: + hub = OOBControlHub(initial_config=initial_cfg) + hw = QueueWS() + dw = QueueWS() + th = asyncio.create_task(hub.handle_connection(hw)) + td = asyncio.create_task(hub.handle_connection(dw)) + + await hw.inject(json.dumps({"type": "register", "payload": {"role": "headset"}})) + await dw.inject(json.dumps({"type": "register", "payload": {"role": "dashboard"}})) + await asyncio.sleep(0) + hw.sent.clear() + + await dw.inject( + json.dumps({"type": "setConfig", "payload": {"config": config_payload}}) + ) + await asyncio.sleep(0) + + assert not any(m["type"] == "config" for m in _loads_sent(hw)) + assert (await hub.get_snapshot())["configVersion"] == 0 + + await hw.end_stream() + await dw.end_stream() + await th + await td + + +@pytest.mark.asyncio +async def test_dashboard_command_message_unknown() -> None: + hub = OOBControlHub() + hw = QueueWS() + dw = QueueWS() + th = asyncio.create_task(hub.handle_connection(hw)) + td = asyncio.create_task(hub.handle_connection(dw)) + + await hw.inject(json.dumps({"type": "register", "payload": {"role": "headset"}})) + await dw.inject(json.dumps({"type": "register", "payload": {"role": "dashboard"}})) + await asyncio.sleep(0) + dw.sent.clear() + + await dw.inject(json.dumps({"type": "command", "payload": {"action": "connect"}})) + await asyncio.sleep(0) + + errs = [m for m in _loads_sent(dw) if m["type"] == "error"] + assert errs and errs[0]["payload"]["code"] == "BAD_REQUEST" + assert "command" in errs[0]["payload"]["message"] + + await hw.end_stream() + await dw.end_stream() + await th + await td + + +@pytest.mark.asyncio +async def test_client_metrics_stored_in_snapshot() -> None: + hub = OOBControlHub() + ws = QueueWS() + task = asyncio.create_task(hub.handle_connection(ws)) + + await ws.inject(json.dumps({"type": "register", "payload": {"role": "headset"}})) + await asyncio.sleep(0) + await ws.inject( + json.dumps( + { + "type": "clientMetrics", + "payload": { + "t": 12345000, + "cadence": "frame", + "metrics": {"StreamingFramerate": 72.5}, + }, + } + ) + ) + await asyncio.sleep(0) + + snap = await hub.get_snapshot() + m = snap["headsets"][0]["metricsByCadence"]["frame"] + assert m["at"] == 12345000 + assert m["metrics"]["StreamingFramerate"] == 72.5 + + await ws.end_stream() + await task + + +@pytest.mark.asyncio +async def test_dashboard_set_config_requires_token_when_configured() -> None: + hub = OOBControlHub(control_token="t") + hw = QueueWS() + dw = QueueWS() + th = asyncio.create_task(hub.handle_connection(hw)) + td = asyncio.create_task(hub.handle_connection(dw)) + + await hw.inject( + json.dumps({"type": "register", "payload": {"role": "headset", "token": "t"}}) + ) + await dw.inject( + json.dumps({"type": "register", "payload": {"role": "dashboard", "token": "t"}}) + ) + await asyncio.sleep(0) + dw.sent.clear() + + await dw.inject( + json.dumps( + { + "type": "setConfig", + "payload": {"config": {"port": 1}, "token": "wrong"}, + } + ) + ) + await asyncio.sleep(0) + err = [m for m in _loads_sent(dw) if m["type"] == "error"] + assert err and err[0]["payload"]["code"] == "UNAUTHORIZED" + + await hw.end_stream() + await dw.end_stream() + await th + await td + + +@pytest.mark.asyncio +async def test_http_oob_set_config_noop_returns_changed_false() -> None: + hub = OOBControlHub(initial_config={"serverIP": "10.0.0.1", "port": 9000}) + status, body = await hub.http_oob_set_config( + {"config": {"serverIP": "10.0.0.1", "port": 9000}, "token": None} + ) + assert status == 200 + assert body.get("changed") is False + + +@pytest.mark.asyncio +async def test_http_oob_set_config_updates_and_pushes() -> None: + hub = OOBControlHub(initial_config={"serverIP": "127.0.0.1", "port": 49100}) + hw = QueueWS() + th = asyncio.create_task(hub.handle_connection(hw)) + await hw.inject(json.dumps({"type": "register", "payload": {"role": "headset"}})) + await asyncio.sleep(0) + hw.sent.clear() + + status, body = await hub.http_oob_set_config( + {"config": {"serverIP": "10.0.0.2"}, "token": None} + ) + assert status == 200 + assert body.get("changed") is True + cfg_msgs = [m for m in _loads_sent(hw) if m["type"] == "config"] + assert len(cfg_msgs) == 1 + assert cfg_msgs[0]["payload"]["config"]["serverIP"] == "10.0.0.2" + + await hw.end_stream() + await th + + +@pytest.mark.asyncio +async def test_http_oob_set_config_merges_client_ui_fields() -> None: + """Hub shallow-merge keeps arbitrary client UI keys for the WebXR allowlist.""" + hub = OOBControlHub(initial_config={"serverIP": "127.0.0.1", "port": 49100}) + hw = QueueWS() + th = asyncio.create_task(hub.handle_connection(hw)) + await hw.inject(json.dumps({"type": "register", "payload": {"role": "headset"}})) + await asyncio.sleep(0) + hw.sent.clear() + + status, body = await hub.http_oob_set_config( + { + "config": { + "codec": "h265", + "panelHiddenAtStart": True, + "perEyeWidth": 1920, + "perEyeHeight": 1680, + }, + "token": None, + } + ) + assert status == 200 + assert body.get("changed") is True + snap = await hub.get_snapshot() + assert snap["config"]["codec"] == "h265" + assert snap["config"]["panelHiddenAtStart"] is True + assert snap["config"]["perEyeWidth"] == 1920 + cfg_msgs = [m for m in _loads_sent(hw) if m["type"] == "config"] + assert cfg_msgs[0]["payload"]["config"]["codec"] == "h265" + + await hw.end_stream() + await th