-
Notifications
You must be signed in to change notification settings - Fork 14
Add message channel tracker #330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| // SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 OR MIT | ||
| /*! | ||
| * @file | ||
| * @brief Header for XR_NV_opaque_data_channel extension. | ||
| */ | ||
| #ifndef XR_NV_OPAQUE_DATA_CHANNEL_H | ||
| #define XR_NV_OPAQUE_DATA_CHANNEL_H 1 | ||
|
|
||
| #include "openxr_extension_helpers.h" | ||
|
|
||
| #ifdef __cplusplus | ||
| extern "C" { | ||
| #endif | ||
|
|
||
| #define XR_NV_opaque_data_channel 1 | ||
| #define XR_NV_opaque_data_channel_SPEC_VERSION 1 | ||
| #define XR_NV_OPAQUE_DATA_CHANNEL_EXTENSION_NAME "XR_NV_opaque_data_channel" | ||
|
|
||
| XR_DEFINE_HANDLE(XrOpaqueDataChannelNV) | ||
|
|
||
| XR_STRUCT_ENUM(XR_TYPE_OPAQUE_DATA_CHANNEL_CREATE_INFO_NV, 1000526001); | ||
| XR_STRUCT_ENUM(XR_TYPE_OPAQUE_DATA_CHANNEL_STATE_NV, 1000526002); | ||
|
|
||
| XR_RESULT_ENUM(XR_ERROR_CHANNEL_ALREADY_CREATED_NV, -1000526000); | ||
| XR_RESULT_ENUM(XR_ERROR_CHANNEL_NOT_CONNECTED_NV, -1000526001); | ||
|
|
||
| typedef enum XrOpaqueDataChannelStatusNV { | ||
| XR_OPAQUE_DATA_CHANNEL_STATUS_CONNECTING_NV = 0, | ||
| XR_OPAQUE_DATA_CHANNEL_STATUS_CONNECTED_NV = 1, | ||
| XR_OPAQUE_DATA_CHANNEL_STATUS_SHUTTING_NV = 2, | ||
| XR_OPAQUE_DATA_CHANNEL_STATUS_DISCONNECTED_NV = 3, | ||
| XR_OPAQUE_DATA_CHANNEL_STATUS_MAX_ENUM = 0x7FFFFFFF, | ||
| } XrOpaqueDataChannelStatusNV; | ||
|
|
||
| typedef struct XrOpaqueDataChannelCreateInfoNV { | ||
| XrStructureType type; | ||
| const void* next; | ||
| XrSystemId systemId; | ||
| XrUuidEXT uuid; | ||
| } XrOpaqueDataChannelCreateInfoNV; | ||
|
|
||
| typedef struct XrOpaqueDataChannelStateNV { | ||
| XrStructureType type; | ||
| void* next; | ||
| XrOpaqueDataChannelStatusNV state; | ||
| } XrOpaqueDataChannelStateNV; | ||
|
|
||
| typedef XrResult(XRAPI_PTR* PFN_xrCreateOpaqueDataChannelNV)(XrInstance instance, | ||
| const XrOpaqueDataChannelCreateInfoNV* createInfo, | ||
| XrOpaqueDataChannelNV* opaqueDataChannel); | ||
| typedef XrResult(XRAPI_PTR* PFN_xrDestroyOpaqueDataChannelNV)(XrOpaqueDataChannelNV opaqueDataChannel); | ||
| typedef XrResult(XRAPI_PTR* PFN_xrGetOpaqueDataChannelStateNV)(XrOpaqueDataChannelNV opaqueDataChannel, | ||
| XrOpaqueDataChannelStateNV* state); | ||
| typedef XrResult(XRAPI_PTR* PFN_xrSendOpaqueDataChannelNV)(XrOpaqueDataChannelNV opaqueDataChannel, | ||
| uint32_t opaqueDataInputCount, | ||
| const uint8_t* opaqueDatas); | ||
| typedef XrResult(XRAPI_PTR* PFN_xrReceiveOpaqueDataChannelNV)(XrOpaqueDataChannelNV opaqueDataChannel, | ||
| uint32_t opaqueDataCapacityInput, | ||
| uint32_t* opaqueDataCountOutput, | ||
| uint8_t* opaqueDatas); | ||
| typedef XrResult(XRAPI_PTR* PFN_xrShutdownOpaqueDataChannelNV)(XrOpaqueDataChannelNV opaqueDataChannel); | ||
|
|
||
| #ifdef __cplusplus | ||
| } | ||
| #endif | ||
|
|
||
| #endif |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| #!/usr/bin/env python3 | ||
| # SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| """ | ||
| Message channel example using TeleopSession + retargeting source/sink nodes. | ||
|
|
||
| Behavior: | ||
| - Prints any incoming messages each frame. | ||
| - Once channel status is CONNECTED, sends one message every second. | ||
| """ | ||
|
|
||
| import argparse | ||
| import sys | ||
| import time | ||
| import uuid | ||
|
|
||
| from isaacteleop.retargeting_engine.deviceio_source_nodes import ( | ||
| MessageChannelConnectionStatus, | ||
| message_channel_config, | ||
| ) | ||
| from isaacteleop.retargeting_engine.interface import TensorGroup | ||
| from isaacteleop.schema import MessageChannelMessages, MessageChannelMessagesTrackedT | ||
| from isaacteleop.teleop_session_manager import TeleopSession, TeleopSessionConfig | ||
|
|
||
|
|
||
| def _parse_uuid_bytes(uuid_text: str) -> bytes: | ||
| """Parse canonical UUID text to 16-byte payload.""" | ||
| return uuid.UUID(uuid_text).bytes | ||
|
Comment on lines
+27
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: Yes—for a “this argument must be a UUID” check, using
Example: import argparse, uuid
def uuid_type(s: str) -> uuid.UUID:
try:
return uuid.UUID(s)
except ValueError as e:
raise argparse.ArgumentTypeError(f"invalid UUID: {s!r}") from e
parser = argparse.ArgumentParser()
parser.add_argument("--id", type=uuid_type, required=True)
args = parser.parse_args()What’s not recommended (for user-friendly CLIs) is waiting until after References: 🏁 Script executed: fd message_channel_example.pyRepository: NVIDIA/IsaacTeleop Length of output: 127 🏁 Script executed: cat -n examples/teleop_session_manager/python/message_channel_example.pyRepository: NVIDIA/IsaacTeleop Length of output: 4750 Validate Invalid UUID input currently fails after Suggested update def _parse_uuid_bytes(uuid_text: str) -> bytes:
"""Parse canonical UUID text to 16-byte payload."""
- return uuid.UUID(uuid_text).bytes
+ try:
+ return uuid.UUID(uuid_text).bytes
+ except ValueError as exc:
+ raise argparse.ArgumentTypeError(
+ "--channel-uuid must be a valid UUID"
+ ) from exc
@@
parser.add_argument(
"--channel-uuid",
- type=str,
+ type=_parse_uuid_bytes,
required=True,
help="Message channel UUID (canonical form, e.g. 550e8400-e29b-41d4-a716-446655440000)",
)
@@
- channel_uuid = _parse_uuid_bytes(args.channel_uuid)
+ channel_uuid = args.channel_uuid🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| def _enqueue_outbound_message(sink, payload: bytes) -> None: | ||
| """Push one outbound message through MessageChannelSink.""" | ||
| tg = TensorGroup(sink.input_spec()["messages_tracked"]) | ||
| tg[0] = MessageChannelMessagesTrackedT([MessageChannelMessages(payload)]) | ||
| sink.compute({"messages_tracked": tg}, {}) | ||
|
|
||
|
|
||
| def main() -> int: | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| parser = argparse.ArgumentParser( | ||
| description="Message channel TeleopSession example" | ||
| ) | ||
| parser.add_argument( | ||
| "--channel-uuid", | ||
| type=str, | ||
| required=True, | ||
| help="Message channel UUID (canonical form, e.g. 550e8400-e29b-41d4-a716-446655440000)", | ||
| ) | ||
| parser.add_argument( | ||
| "--channel-name", | ||
| type=str, | ||
| default="example_message_channel", | ||
| help="Optional channel display name", | ||
| ) | ||
| parser.add_argument( | ||
| "--outbound-queue-capacity", | ||
| type=int, | ||
| default=256, | ||
| help="Bounded outbound queue length", | ||
| ) | ||
|
Comment on lines
+55
to
+60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validate outbound queue capacity as positive. Non-positive values can silently produce unusable queue behavior for outbound messages. 💡 Suggested fix+def _positive_int(value: str) -> int:
+ ivalue = int(value)
+ if ivalue <= 0:
+ raise argparse.ArgumentTypeError("must be > 0")
+ return ivalue
+
def main() -> int:
@@
parser.add_argument(
"--outbound-queue-capacity",
- type=int,
+ type=_positive_int,
default=256,
help="Bounded outbound queue length",
)🤖 Prompt for AI Agents |
||
| args = parser.parse_args() | ||
|
|
||
| channel_uuid = _parse_uuid_bytes(args.channel_uuid) | ||
|
|
||
| source, sink = message_channel_config( | ||
| name="message_channel", | ||
| channel_uuid=channel_uuid, | ||
| channel_name=args.channel_name, | ||
| outbound_queue_capacity=args.outbound_queue_capacity, | ||
| ) | ||
|
|
||
| config = TeleopSessionConfig( | ||
| app_name="MessageChannelExample", | ||
| pipeline=source, | ||
| ) | ||
|
|
||
| print("=" * 80) | ||
| print("Message Channel TeleopSession Example") | ||
| print("=" * 80) | ||
| print(f"Channel UUID: {args.channel_uuid}") | ||
| print(f"Channel Name: {args.channel_name}") | ||
| print("Press Ctrl+C to exit.") | ||
| print() | ||
|
|
||
| send_counter = 0 | ||
| last_send_time = 0.0 | ||
|
|
||
| with TeleopSession(config) as session: | ||
| while True: | ||
| result = session.step() | ||
| status = result["status"][0] | ||
| messages_tracked = result["messages_tracked"][0] | ||
| messages = ( | ||
| messages_tracked.data if messages_tracked.data is not None else [] | ||
| ) | ||
|
|
||
| for msg in messages: | ||
| payload = bytes(msg.payload) | ||
| try: | ||
| decoded = payload.decode("utf-8") | ||
| print(f"[rx] {decoded}") | ||
| except UnicodeDecodeError: | ||
| print(f"[rx] 0x{payload.hex()}") | ||
|
|
||
| now = time.monotonic() | ||
| if ( | ||
| status == MessageChannelConnectionStatus.CONNECTED | ||
| and now - last_send_time >= 1.0 | ||
| ): | ||
| payload_text = f"hello #{send_counter} @ {time.time():.3f}" | ||
| _enqueue_outbound_message(sink, payload_text.encode("utf-8")) | ||
| print(f"[tx] {payload_text}") | ||
| last_send_time = now | ||
| send_counter += 1 | ||
|
|
||
| time.sleep(0.01) | ||
|
|
||
| return 0 | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| try: | ||
| sys.exit(main()) | ||
| except KeyboardInterrupt: | ||
| print("\nExiting.") | ||
| sys.exit(0) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| // SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| #pragma once | ||
|
|
||
| #include "tracker.hpp" | ||
|
|
||
| #include <cstdint> | ||
| #include <vector> | ||
|
|
||
| namespace core | ||
| { | ||
|
|
||
| struct MessageChannelMessagesT; | ||
| struct MessageChannelMessagesTrackedT; | ||
|
|
||
| enum class MessageChannelStatus : int32_t | ||
| { | ||
| CONNECTING = 0, | ||
| CONNECTED = 1, | ||
| SHUTTING = 2, | ||
| DISCONNECTED = 3, | ||
| UNKNOWN = -1, | ||
| }; | ||
|
|
||
| class IMessageChannelTrackerImpl : public ITrackerImpl | ||
| { | ||
| public: | ||
| virtual MessageChannelStatus get_status() const = 0; | ||
| virtual const MessageChannelMessagesTrackedT& get_messages() const = 0; | ||
| virtual void send_message(const std::vector<uint8_t>& payload) const = 0; | ||
| }; | ||
|
|
||
| } // namespace core |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| // SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <deviceio_base/message_channel_tracker_base.hpp> | ||
| #include <schema/message_channel_generated.h> | ||
|
|
||
| #include <array> | ||
| #include <cstddef> | ||
| #include <cstdint> | ||
| #include <memory> | ||
| #include <string> | ||
| #include <vector> | ||
|
|
||
| namespace core | ||
| { | ||
|
|
||
| class MessageChannelTracker : public ITracker | ||
| { | ||
| public: | ||
| static constexpr size_t DEFAULT_MAX_MESSAGE_SIZE = 64 * 1024; | ||
| static constexpr size_t CHANNEL_UUID_SIZE = 16; | ||
|
|
||
| explicit MessageChannelTracker(const std::array<uint8_t, CHANNEL_UUID_SIZE>& channel_uuid, | ||
| const std::string& channel_name = "", | ||
| size_t max_message_size = DEFAULT_MAX_MESSAGE_SIZE); | ||
|
|
||
| std::string_view get_name() const override | ||
| { | ||
| return TRACKER_NAME; | ||
| } | ||
|
|
||
| MessageChannelStatus get_status(const ITrackerSession& session) const; | ||
| const MessageChannelMessagesTrackedT& get_messages(const ITrackerSession& session) const; | ||
| void send_message(const ITrackerSession& session, const std::vector<uint8_t>& payload) const; | ||
|
|
||
| const std::array<uint8_t, CHANNEL_UUID_SIZE>& channel_uuid() const | ||
| { | ||
| return channel_uuid_; | ||
| } | ||
|
|
||
| const std::string& channel_name() const | ||
| { | ||
| return channel_name_; | ||
| } | ||
|
|
||
| size_t max_message_size() const | ||
| { | ||
| return max_message_size_; | ||
| } | ||
|
|
||
| private: | ||
| static constexpr const char* TRACKER_NAME = "MessageChannelTracker"; | ||
|
|
||
| std::array<uint8_t, CHANNEL_UUID_SIZE> channel_uuid_{}; | ||
| std::string channel_name_; | ||
| size_t max_message_size_{ DEFAULT_MAX_MESSAGE_SIZE }; | ||
| }; | ||
|
|
||
| } // namespace core |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| // SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| #include "inc/deviceio_trackers/message_channel_tracker.hpp" | ||
|
|
||
| #include <stdexcept> | ||
|
|
||
| namespace core | ||
| { | ||
|
|
||
| MessageChannelTracker::MessageChannelTracker(const std::array<uint8_t, CHANNEL_UUID_SIZE>& channel_uuid, | ||
| const std::string& channel_name, | ||
| size_t max_message_size) | ||
| : channel_uuid_(channel_uuid), channel_name_(channel_name), max_message_size_(max_message_size) | ||
| { | ||
| if (max_message_size_ == 0) | ||
| { | ||
| throw std::invalid_argument("MessageChannelTracker: max_message_size must be > 0"); | ||
| } | ||
| } | ||
|
|
||
| MessageChannelStatus MessageChannelTracker::get_status(const ITrackerSession& session) const | ||
| { | ||
| return static_cast<const IMessageChannelTrackerImpl&>(session.get_tracker_impl(*this)).get_status(); | ||
| } | ||
|
|
||
| const MessageChannelMessagesTrackedT& MessageChannelTracker::get_messages(const ITrackerSession& session) const | ||
| { | ||
| return static_cast<const IMessageChannelTrackerImpl&>(session.get_tracker_impl(*this)).get_messages(); | ||
| } | ||
|
|
||
| void MessageChannelTracker::send_message(const ITrackerSession& session, const std::vector<uint8_t>& payload) const | ||
| { | ||
| static_cast<const IMessageChannelTrackerImpl&>(session.get_tracker_impl(*this)).send_message(payload); | ||
| } | ||
|
|
||
| } // namespace core |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set executable bit for shebang script.
Line [1] uses a shebang, but CI already reports this file is not executable. Please commit mode change to unblock pre-commit.
✅ Fix command
🧰 Tools
🪛 GitHub Actions: Run linters using pre-commit
[error] 1-1: pre-commit hook 'check-shebang-scripts-are-executable' failed (exit code 1): file has a shebang but is not marked executable. Suggested fix: 'chmod +x examples/teleop_session_manager/python/message_channel_example.py' (or adjust executable bit in git).
🤖 Prompt for AI Agents