Skip to content

cp: fix: Fix policy worker placement when using unified placement group (1341) into r0.4.0#1416

Merged
terrykong merged 1 commit intor0.4.0from
cherry-pick-1341-r0.4.0
Oct 25, 2025
Merged

cp: fix: Fix policy worker placement when using unified placement group (1341) into r0.4.0#1416
terrykong merged 1 commit intor0.4.0from
cherry-pick-1341-r0.4.0

Conversation

@chtruong814
Copy link
Copy Markdown
Contributor

@chtruong814 chtruong814 commented Oct 23, 2025

beep boop [🤖]: Hi @guyueh1 👋,

we've cherry picked #1341 into  for you! 🚀

Please review and approve this cherry pick by your convenience!

Summary by CodeRabbit

  • New Features

    • Added debugging output to display node IP and GPU ID information for policy workers across DPO, GRPO, RM, and SFT algorithms.
    • Enhanced GPU-aware bundle sorting for unified placement groups to improve worker distribution.
  • Tests

    • Added tests for placement group handling with unified and per-node configurations.

…1341)

Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <140554423+guyueh1@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Signed-off-by: NeMo Bot <nemo-bot@nvidia.com>
@github-actions
Copy link
Copy Markdown

ℹ️ File Consistency Check

Check based on commit: 3b50693 (PR #1416 from cherry-pick-1341-r0.4.0)

✅ DTensor Policy Worker Synchronization Check

Both DTensor policy worker files were modified in this PR:

  • nemo_rl/models/policy/dtensor_policy_worker.py
  • nemo_rl/models/policy/dtensor_policy_worker_v2.py

Please ensure that the changes are consistent between both files where applicable.


This check ensures that related file implementations remain synchronized across the codebase. If you believe this warning is incorrect or the files should intentionally differ, please add a comment explaining the reasoning.

@terrykong terrykong added the CI:L1 Run doctests, unit tests, and functional tests label Oct 23, 2025
@terrykong terrykong enabled auto-merge (squash) October 23, 2025 05:45
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Oct 23, 2025

📝 Walkthrough

Walkthrough

This PR introduces debugging capabilities for distributed training by enabling node IP and GPU ID reporting across policy workers. Changes include debug logging calls in algorithm files, infrastructure for GPU-aware bundle sorting in RayVirtualCluster, worker methods to report placement information, and a Policy method to gather and display worker locations.

Changes

Cohort / File(s) Summary
Algorithm debug logging
nemo_rl/algorithms/{dpo,grpo,rm,sft}.py
Added calls to policy.print_node_ip_and_gpu_id() after Policy instantiation for debugging worker placement.
Policy worker node/GPU reporting
nemo_rl/models/policy/{dtensor_policy_worker,dtensor_policy_worker_v2,megatron_policy_worker}.py
Added report_node_ip_and_gpu_id() method to each worker class to fetch and return node IP and first GPU ID.
Virtual cluster infrastructure
nemo_rl/distributed/virtual_cluster.py
Added GetGPUIDActor remote actor, _sorted_bundle_indices state, and _get_sorted_bundle_indices() method to compute GPU-aware bundle ordering; updated get_master_address_and_port() and _init_placement_groups() to utilize sorted indices for unified placement groups.
Policy worker group construction
nemo_rl/models/policy/lm_policy.py
Added print_node_ip_and_gpu_id() method to gather and format worker placement info; updated __init__ to conditionally construct worker groups using _sorted_bundle_indices when available.
Placement group tests
tests/unit/distributed/test_virtual_cluster.py
Added tests for sorted bundle indices creation with unified and per-node placement groups.

Sequence Diagram

sequenceDiagram
    participant Algo as Algorithm Setup
    participant Policy as Policy Init
    participant VCluster as Virtual Cluster
    participant Workers as Policy Workers
    participant Table as Debug Output

    Algo->>Policy: Create Policy instance
    Policy->>VCluster: Check _sorted_bundle_indices
    alt Unified PG with GPUs
        VCluster->>VCluster: Compute GPU-sorted bundles
        VCluster-->>Policy: Return sorted indices
        Policy->>Policy: Create worker groups (sorted)
    else Per-node or no GPU
        Policy->>Policy: Create worker groups (default)
    end
    
    Algo->>Policy: Call print_node_ip_and_gpu_id()
    Policy->>Workers: Gather node IPs & GPU IDs
    Workers-->>Policy: Report placement info
    Policy->>Table: Build & format table
    Table-->>Algo: Print worker placement details
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

The PR contains substantial repetition across algorithm files and worker implementations (simple debug calls and similar method stubs), which reduces review complexity. However, the core infrastructure changes in virtual_cluster.py (GPU-aware bundle sorting with Ray actor integration) and conditional worker group construction in lm_policy.py require careful attention. Additionally, there is a type annotation mismatch in worker methods (declared return type list[tuple[str, int]] vs. actual return tuple[str, int]) that needs verification.

Possibly related PRs

Suggested labels

r0.4.0, CI:L1

Suggested reviewers

  • terrykong
  • ZhiyuLi-Nvidia

Pre-merge checks and finishing touches

✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "fix: Fix policy worker placement when using unified placement group (1341)" accurately reflects the primary changes in the changeset. The main functional modifications involve adding GPU-aware bundle sorting logic in virtual_cluster.py, implementing conditional worker group construction in lm_policy.py based on sorted bundle indices, and adding supporting infrastructure for detecting and utilizing placement group information. The title is specific and descriptive—it identifies the exact problem being addressed (policy worker placement with unified placement groups) rather than being vague or generic. The reference to issue #1341 in parentheses provides useful context about the cherry-pick source. The title clearly communicates the core objective to a teammate reviewing the history.
Docstring Coverage ✅ Passed Docstring coverage is 86.96% which is sufficient. The required threshold is 80.00%.
Test Results For Major Changes ✅ Passed This PR is a cherry-pick of #1341, a bug fix for policy worker placement when using unified placement groups. The changes include infrastructure improvements (placement group sorting in virtual_cluster.py), debugging features (print methods in policy classes), and worker reporting methods. Importantly, two unit tests have been added for the new functionality: test_create_sorted_bundle_indices_for_unified_pg() and test_not_create_sorted_bundle_indices_for_per_node_pg() in test_virtual_cluster.py. This is a minor infrastructure and bug-fix change, not a major algorithm change affecting numerics, convergence, or performance. The PR description is minimal (standard cherry-pick message), but that is typical for cherry-pick PRs and does not indicate missing test coverage.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch cherry-pick-1341-r0.4.0

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (10)
nemo_rl/distributed/virtual_cluster.py (5)

62-64: Place pragma on the def line for remote functions.

Guideline requires '# pragma: no cover' on the class/def line itself for any @ray.remote target. Move/duplicate the pragma to the def line. [As per coding guidelines]

-@ray.remote  # pragma: no cover
-def _get_node_ip_and_free_port() -> tuple[str, int]:
+@ray.remote
+def _get_node_ip_and_free_port() -> tuple[str, int]:  # pragma: no cover
     return _get_node_ip_local(), _get_free_port_local()

417-426: Guard against empty/invalid sorted bundle indices.

If _sorted_bundle_indices is set but empty (unexpected but safer), fall back to bundle 0 to avoid IndexError.

-        if self._sorted_bundle_indices is not None:
-            return self.get_available_address_and_port(
-                pg_idx=0, bundle_idx=self._sorted_bundle_indices[0]
-            )
+        if self._sorted_bundle_indices:
+            first_idx = self._sorted_bundle_indices[0]
+            return self.get_available_address_and_port(pg_idx=0, bundle_idx=first_idx)

430-476: Always cleanup info actors via try/finally; minor robustness.

If ray.get fails for any reason, the loop killing actors is skipped. Wrap creation/collection in try/finally and ensure kill ignores already-dead actors.

-        info_actors = []
-        for i in range(num_bundles):
-            info_actors.append(
+        info_actors = []
+        for i in range(num_bundles):
+            info_actors.append(
                 GetGPUIDActor.options(
                     num_cpus=0.01,  # set both num_cpus and num_gpus to be small values to enable assignment in colocated case
                     num_gpus=0.01,
                     resources=None,
                     scheduling_strategy=PlacementGroupSchedulingStrategy(
                         placement_group=pg,
                         placement_group_bundle_index=i,
                     ),
                 ).remote()
             )
-        gpu_ids = ray.get([actor.get_gpu_id.remote() for actor in info_actors])
-        for actor in info_actors:
-            ray.kill(actor)
+        try:
+            gpu_ids = ray.get([actor.get_gpu_id.remote() for actor in info_actors])
+        finally:
+            for actor in info_actors:
+                try:
+                    ray.kill(actor)
+                except Exception:
+                    pass

484-496: Reset derived bundle ordering on shutdown.

Also clear self._sorted_bundle_indices to avoid stale state on reuse after shutdown().

             # Reset internal state
             self._node_placement_groups = None
+            self._sorted_bundle_indices = None

174-181: Remove the conflicting num_gpus default in the decorator for clarity.

The class decorator sets num_gpus=1, but instantiation always overrides it to 0.01. While Ray 2.x correctly supports .options() overriding decorator parameters across all versions, the conflicting default in the decorator is misleading and serves no purpose. Remove it to make the actual resource requirements explicit at the call site:

-@ray.remote(num_gpus=1)
+@ray.remote
 class GetGPUIDActor:  # pragma: no cover

The num_gpus=0.01 override at instantiation (line 452) is the authoritative resource specification and correctly enables placement in colocated scenarios.

nemo_rl/algorithms/dpo.py (1)

250-252: Gate debug print and fail gracefully if prettytable or Ray state is unavailable.

Avoid a hard runtime dependency for a non-essential debug print. Gate by env flag and catch exceptions.

-    # print the node IP and GPU ID of the policy workers for debugging
-    policy.print_node_ip_and_gpu_id()
+    # Optional: print worker placement (enable with NRL_DEBUG_WORKER_PLACEMENT=1)
+    if os.getenv("NRL_DEBUG_WORKER_PLACEMENT", "0") == "1":
+        try:
+            policy.print_node_ip_and_gpu_id()
+        except Exception as e:
+            print(f"⚠️ Skipping worker placement print: {e}")
nemo_rl/algorithms/grpo.py (1)

438-440: Gate debug print and make it non-fatal.

Same rationale as DPO: make optional and resilient.

-    # print the node IP and GPU ID of the policy workers for debugging
-    policy.print_node_ip_and_gpu_id()
+    # Optional: print worker placement (enable with NRL_DEBUG_WORKER_PLACEMENT=1)
+    if os.getenv("NRL_DEBUG_WORKER_PLACEMENT", "0") == "1":
+        try:
+            policy.print_node_ip_and_gpu_id()
+        except Exception as e:
+            print(f"⚠️ Skipping worker placement print: {e}", flush=True)
nemo_rl/algorithms/sft.py (1)

205-207: Gate debug print and make it non-fatal.

Same pattern for SFT.

-    # print the node IP and GPU ID of the policy workers for debugging
-    policy.print_node_ip_and_gpu_id()
+    # Optional: print worker placement (enable with NRL_DEBUG_WORKER_PLACEMENT=1)
+    if os.getenv("NRL_DEBUG_WORKER_PLACEMENT", "0") == "1":
+        try:
+            policy.print_node_ip_and_gpu_id()
+        except Exception as e:
+            print(f"⚠️ Skipping worker placement print: {e}")
nemo_rl/algorithms/rm.py (1)

225-227: Gate debug print and make it non-fatal.

Same pattern for RM.

-    # print the node IP and GPU ID of the policy workers for debugging
-    policy.print_node_ip_and_gpu_id()
+    # Optional: print worker placement (enable with NRL_DEBUG_WORKER_PLACEMENT=1)
+    if os.getenv("NRL_DEBUG_WORKER_PLACEMENT", "0") == "1":
+        try:
+            policy.print_node_ip_and_gpu_id()
+        except Exception as e:
+            print(f"⚠️ Skipping worker placement print: {e}")
nemo_rl/models/policy/lm_policy.py (1)

170-186: Avoid relying on a private attribute (_sorted_bundle_indices).

Prefer a public accessor on RayVirtualCluster (e.g., get_sorted_bundle_indices()) to reduce coupling.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3ae2afa and 3b50693.

📒 Files selected for processing (10)
  • nemo_rl/algorithms/dpo.py (1 hunks)
  • nemo_rl/algorithms/grpo.py (1 hunks)
  • nemo_rl/algorithms/rm.py (1 hunks)
  • nemo_rl/algorithms/sft.py (1 hunks)
  • nemo_rl/distributed/virtual_cluster.py (5 hunks)
  • nemo_rl/models/policy/dtensor_policy_worker.py (1 hunks)
  • nemo_rl/models/policy/dtensor_policy_worker_v2.py (1 hunks)
  • nemo_rl/models/policy/lm_policy.py (2 hunks)
  • nemo_rl/models/policy/megatron_policy_worker.py (1 hunks)
  • tests/unit/distributed/test_virtual_cluster.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (CODING_GUIDELINES.md)

**/*.py: Follow the Google Python Style Guide for all Python code
Target Python 3.12+ for all Python code in NeMo-RL
Indent Python code with 4 spaces; do not use tabs
Python filenames should be snake_case (e.g., some_file.py)
Class names should be PascalCase
Function and method names should be snake_case
Local variable names should be snake_case; if starting with a number, prefix with k (e.g., k_99th_percentile)
Global variables should be UPPER_SNAKE_CASE and prefixed with G_ (e.g., G_MY_GLOBAL)
Constants should be UPPER_SNAKE_CASE
Avoid shadowing variables declared in an outer scope
Initialize all externally visible members of a class in the constructor
For public interfaces used outside a file, prefer docstrings over comments
Use comments mainly for code within a function or interfaces local to a file
Commented-out code must include a nearby comment explaining usage and why it is commented out; otherwise remove before merging
Use Google-style docstrings for classes and functions (Sphinx-parseable)
Avoid using reflection when functionality can be easily achieved without it
Limit except clauses to the smallest specific set of exceptions possible
For duck-typing via try/except, keep the try body minimal and use else for main logic
Add the NVIDIA copyright header (with current year) at the top of all Python files, excluding tests/ and test-only scripts

Files:

  • nemo_rl/algorithms/sft.py
  • nemo_rl/models/policy/dtensor_policy_worker_v2.py
  • tests/unit/distributed/test_virtual_cluster.py
  • nemo_rl/algorithms/rm.py
  • nemo_rl/algorithms/dpo.py
  • nemo_rl/models/policy/dtensor_policy_worker.py
  • nemo_rl/algorithms/grpo.py
  • nemo_rl/models/policy/megatron_policy_worker.py
  • nemo_rl/models/policy/lm_policy.py
  • nemo_rl/distributed/virtual_cluster.py
nemo_rl/**/*.py

📄 CodeRabbit inference engine (CODING_GUIDELINES.md)

nemo_rl/**/*.py: Do not set non-None configuration defaults in code; YAML is the single source of truth for defaults
Access required config attributes directly (e.g., policy_cfg["precision"]) and assume presence; do not introduce hidden defaults
Express configuration optionality via TypedDict using typing.NotRequired
When adding a new config key to a TypedDict subclass, document the key’s purpose, valid values/types, and recommended default in code
For any class or function decorated with @ray.remote, add '# pragma: no cover' on the class/def line (and on remote functions)

Files:

  • nemo_rl/algorithms/sft.py
  • nemo_rl/models/policy/dtensor_policy_worker_v2.py
  • nemo_rl/algorithms/rm.py
  • nemo_rl/algorithms/dpo.py
  • nemo_rl/models/policy/dtensor_policy_worker.py
  • nemo_rl/algorithms/grpo.py
  • nemo_rl/models/policy/megatron_policy_worker.py
  • nemo_rl/models/policy/lm_policy.py
  • nemo_rl/distributed/virtual_cluster.py
🧬 Code graph analysis (9)
nemo_rl/algorithms/sft.py (2)
tests/unit/utils/test_native_checkpoint.py (1)
  • policy (120-130)
nemo_rl/models/policy/lm_policy.py (1)
  • print_node_ip_and_gpu_id (778-809)
nemo_rl/models/policy/dtensor_policy_worker_v2.py (2)
nemo_rl/models/policy/dtensor_policy_worker.py (1)
  • report_node_ip_and_gpu_id (1930-1934)
nemo_rl/models/policy/megatron_policy_worker.py (1)
  • report_node_ip_and_gpu_id (1939-1943)
tests/unit/distributed/test_virtual_cluster.py (2)
tests/unit/models/generation/test_vllm_generation.py (1)
  • cluster (223-234)
nemo_rl/distributed/virtual_cluster.py (2)
  • RayVirtualCluster (186-505)
  • _init_placement_groups (234-276)
nemo_rl/algorithms/rm.py (1)
nemo_rl/models/policy/lm_policy.py (1)
  • print_node_ip_and_gpu_id (778-809)
nemo_rl/algorithms/dpo.py (3)
tests/unit/models/generation/test_vllm_generation.py (1)
  • policy (245-260)
tests/unit/utils/test_native_checkpoint.py (1)
  • policy (120-130)
nemo_rl/models/policy/lm_policy.py (1)
  • print_node_ip_and_gpu_id (778-809)
nemo_rl/models/policy/dtensor_policy_worker.py (2)
nemo_rl/models/policy/dtensor_policy_worker_v2.py (1)
  • report_node_ip_and_gpu_id (1914-1918)
nemo_rl/models/policy/megatron_policy_worker.py (1)
  • report_node_ip_and_gpu_id (1939-1943)
nemo_rl/algorithms/grpo.py (1)
nemo_rl/models/policy/lm_policy.py (1)
  • print_node_ip_and_gpu_id (778-809)
nemo_rl/models/policy/megatron_policy_worker.py (2)
nemo_rl/models/policy/dtensor_policy_worker.py (1)
  • report_node_ip_and_gpu_id (1930-1934)
nemo_rl/models/policy/dtensor_policy_worker_v2.py (1)
  • report_node_ip_and_gpu_id (1914-1918)
nemo_rl/models/policy/lm_policy.py (3)
tests/unit/models/generation/test_vllm_generation.py (1)
  • cluster (223-234)
tests/unit/utils/test_native_checkpoint.py (1)
  • cluster (96-109)
nemo_rl/distributed/worker_groups.py (2)
  • RayWorkerGroup (303-1004)
  • run_all_workers_single_data (728-772)
🪛 Ruff (0.14.1)
nemo_rl/distributed/virtual_cluster.py

433-435: Avoid specifying long messages outside the exception class

(TRY003)

Comment thread nemo_rl/algorithms/rm.py
Comment on lines +225 to +227
# print the node IP and GPU ID of the policy workers for debugging
policy.print_node_ip_and_gpu_id()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Type contract of Policy.print_node_ip_and_gpu_id may not match usage.

The method is annotated to return list[tuple[str, int]] in nemo_rl/models/policy/lm_policy.py but appears to only print. Since it’s called for side effects, ensure its annotation/return aligns to avoid confusion.


🏁 Script executed:

#!/bin/bash
# Show the definition and any return statements
rg -nP -C2 --type=py 'def\s+print_node_ip_and_gpu_id\s*\('
rg -nP -C1 --type=py 'return\s+.*' nemo_rl/models/policy/lm_policy.py

Length of output: 1669


I need to see the complete implementation of the print_node_ip_and_gpu_id method to verify the actual return value against its annotation.


---

🏁 Script executed:

```shell
#!/bin/bash
# Extract the full print_node_ip_and_gpu_id method implementation
sed -n '778,710p' nemo_rl/models/policy/lm_policy.py

Length of output: 64



---

🏁 Script executed:

```shell
#!/bin/bash
# Extract the full print_node_ip_and_gpu_id method implementation (lines 778-790)
sed -n '778,790p' nemo_rl/models/policy/lm_policy.py

Length of output: 550



---

🏁 Script executed:

```shell
#!/bin/bash
# Extract the full print_node_ip_and_gpu_id method implementation
sed -n '778,810p' nemo_rl/models/policy/lm_policy.py

Length of output: 1276



---

🏁 Script executed:

```shell
#!/bin/bash
# Get the return statement
sed -n '810,815p' nemo_rl/models/policy/lm_policy.py

Length of output: 0



---

🏁 Script executed:

```shell
#!/bin/bash
# Find all lines between the method definition and next method definition
awk '/def print_node_ip_and_gpu_id/,/^    def / {print NR": "$0}' nemo_rl/models/policy/lm_policy.py | head -50

Length of output: 69



---

🏁 Script executed:

```shell
#!/bin/bash
# Get broader range to find return statement
sed -n '800,825p' nemo_rl/models/policy/lm_policy.py | cat -n

Length of output: 421



---

🏁 Script executed:

```shell
#!/bin/bash
# Verify the usage context at line 225-227 in rm.py
sed -n '220,230p' nemo_rl/algorithms/rm.py

Length of output: 298


Return type annotation should be None.

The method print_node_ip_and_gpu_id() in nemo_rl/models/policy/lm_policy.py (line 778) is annotated as returning list[tuple[str, int]], but the implementation prints a table and has no explicit return statement, implicitly returning None. The usage at line 225-227 of nemo_rl/algorithms/rm.py confirms it's called purely for side effects without capturing any return value.

Change the annotation at line 778 from -> list[tuple[str, int]]: to -> None:.

🤖 Prompt for AI Agents
In nemo_rl/models/policy/lm_policy.py around line 778, the method
print_node_ip_and_gpu_id is annotated to return list[tuple[str, int]] but only
prints a table and has no return statement; update the return type annotation to
-> None and adjust any docstring or type hints to reflect that it returns
nothing (side-effect only).

Comment on lines +1913 to +1918

def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
"""Report the node IP and GPU ID of the current worker."""
ip = ray._private.services.get_node_ip_address()
gpu_id = ray.get_gpu_ids()[0]
return (ip, gpu_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix return type and guard for CPU-only scheduling.

The method returns a single tuple but is annotated as list[tuple[str, int]]. Also, ray.get_gpu_ids()[0] will raise if no GPU is assigned.

Apply this diff:

-def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
+def report_node_ip_and_gpu_id(self) -> tuple[str, int]:
     """Report the node IP and GPU ID of the current worker."""
-    ip = ray._private.services.get_node_ip_address()
-    gpu_id = ray.get_gpu_ids()[0]
-    return (ip, gpu_id)
+    ip = ray._private.services.get_node_ip_address()
+    gpu_ids = ray.get_gpu_ids()
+    gpu_id = int(gpu_ids[0]) if gpu_ids else -1
+    return (ip, gpu_id)
🤖 Prompt for AI Agents
In nemo_rl/models/policy/dtensor_policy_worker_v2.py around lines 1913 to 1918,
the method is annotated to return list[tuple[str, int]] but actually returns a
single tuple and it will crash if no GPU is assigned; change the return
annotation to tuple[str, int], call ray.get_gpu_ids() into a local variable,
check if the list is non-empty and set gpu_id = int(gpu_ids[0]) otherwise set
gpu_id = -1 (or another sentinel for CPU-only), then return (ip, gpu_id).

Comment on lines +1930 to +1934
def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
"""Report the node IP and GPU ID of the current worker."""
ip = ray._private.services.get_node_ip_address()
gpu_id = ray.get_gpu_ids()[0]
return (ip, gpu_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Align signature with return value and make GPU ID robust.

Returns a tuple but annotated as list[…]. Also unsafe indexing when no GPU is assigned.

Apply this diff:

-def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
+def report_node_ip_and_gpu_id(self) -> tuple[str, int]:
     """Report the node IP and GPU ID of the current worker."""
-    ip = ray._private.services.get_node_ip_address()
-    gpu_id = ray.get_gpu_ids()[0]
-    return (ip, gpu_id)
+    ip = ray._private.services.get_node_ip_address()
+    gpu_ids = ray.get_gpu_ids()
+    gpu_id = int(gpu_ids[0]) if gpu_ids else -1
+    return (ip, gpu_id)
🤖 Prompt for AI Agents
In nemo_rl/models/policy/dtensor_policy_worker.py around lines 1930 to 1934, the
function is annotated to return a list[tuple[str, int]] but actually returns a
single tuple and also unsafely indexes ray.get_gpu_ids(); change the return type
annotation to tuple[str, int], retrieve gpu_ids = ray.get_gpu_ids(), handle the
case where gpu_ids is empty by using a sentinel (e.g. -1) and otherwise parse
the first id to int (int(gpu_ids[0])), and return (ip, gpu_id) as a tuple with
the adjusted types.

Comment on lines +170 to +186
if cluster._sorted_bundle_indices is not None:
# The cluster has initialized a unified placemenet group across nodes
# In this case, we need to create workers based on sorted bundle indices
group_size = cluster.num_gpus_per_node
tied_groups = [
(i // group_size, [bundle_idx])
for i, bundle_idx in enumerate(cluster._sorted_bundle_indices)
]

self.worker_group = RayWorkerGroup(
cluster,
worker_builder,
name_prefix=name_prefix,
bundle_indices_list=tied_groups,
sharding_annotations=self.sharding_annotations,
env_vars=env_vars or {},
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Confirm index semantics for bundle_indices_list with unified placement groups.

For unified PG there is only one placement group (pg_idx=0). Here the first element in each (…, [bundle_idx]) tuple is computed as i // group_size, which becomes 0,1,2,… across nodes. If RayWorkerGroup interprets that element as a PG index (not a node index), any value >0 would be invalid and could misplace workers or crash. Verify that RayWorkerGroup treats the first tuple element as a node index for unified PG and internally uses pg_idx=0 when calling cluster.get_available_address_and_port.

Run this to confirm expected behavior in the repo:

If the first element is indeed used directly as pg_idx, change the construction to keep pg_idx=0 for unified PG, e.g.:
[(0, [bundle_idx]) for bundle_idx in cluster._sorted_bundle_indices]


🏁 Script executed:

#!/bin/bash
# Inspect how bundle_indices_list is consumed.
rg -n -C3 "def _create_workers_from_bundle_indices\(" nemo_rl/distributed/worker_groups.py
rg -n -C2 "bundle_indices_list" nemo_rl/distributed/worker_groups.py
rg -n -C3 "get_available_address_and_port\(" nemo_rl/distributed/worker_groups.py

Length of output: 3587


I need to find the implementation of get_available_address_and_port to confirm the semantics of the first parameter.

[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Find the implementation of get_available_address_and_port
rg -n "def get_available_address_and_port" nemo_rl/
# Also check if there's a cluster class that defines this method
rg -n "class.*Cluster" nemo_rl/distributed/

Length of output: 224


[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Extract the get_available_address_and_port implementation
rg -n -A20 "def get_available_address_and_port" nemo_rl/distributed/virtual_cluster.py

Length of output: 825


Confirm bundle_indices_list first element semantics for unified placement groups.

The implementation at nemo_rl/distributed/virtual_cluster.py:389-392 shows that get_available_address_and_port only uses the pg_idx parameter when multiple placement groups exist; for unified PG (single placement group), it always uses placement_groups[0] and ignores the pg_idx value.

However, the problematic code in nemo_rl/models/policy/lm_policy.py:170-186 constructs tied_groups with first elements as i // group_size, which represent node indices (0, 1, 2...). The documentation at nemo_rl/distributed/worker_groups.py:333 and line 423 explicitly calls this node_idx, yet it's unpacked as pg_idx at line 466. While the conditional logic prevents crashes (since unified PG ignores pg_idx values), this creates a semantic mismatch: a node index is being passed where a placement group index is expected and named.

To maintain semantic clarity, ensure the first tuple element for unified PG consistently represents a placement group index (stay 0), not a node index:

tied_groups = [(0, [bundle_idx]) for bundle_idx in cluster._sorted_bundle_indices]

Alternatively, rename pg_idx to node_idx throughout worker_groups.py to match the documented contract.

🤖 Prompt for AI Agents
In nemo_rl/models/policy/lm_policy.py around lines 170 to 186, the tied_groups
list currently uses i // group_size (node indices) as the first tuple element
while that position is semantically a placement-group index (pg_idx) for unified
placement groups; change tied_groups so the first element is 0 for all entries
(i.e., use the placement-group index) when creating RayWorkerGroup, or
alternatively update the worker_groups API to consistently use node_idx there;
ensure the chosen fix keeps the tuple shape (pg_idx, [bundle_idx]) and preserves
existing behavior for unified placement groups.

Comment on lines +778 to +809
def print_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
"""Print the node IP and GPU ID of the current worker."""
results = ray.get(
self.worker_group.run_all_workers_single_data(
"report_node_ip_and_gpu_id",
)
)
all_node_ips = sorted(set([result[0] for result in results]))
all_gpu_ids = sorted(set([result[1] for result in results]))

worker_id_list = [
[list() for _ in range(len(all_gpu_ids))] for _ in range(len(all_node_ips))
]
for worker_id, (ip, gpu_id) in enumerate(results):
node_idx = all_node_ips.index(ip)
gpu_idx = all_gpu_ids.index(gpu_id)
worker_id_list[node_idx][gpu_idx].append("worker-" + str(worker_id))

from prettytable import PrettyTable

table = PrettyTable()
table.title = "Policy worker mapping to Nodes and GPUs"
table.field_names = ["Node_IP"] + [
"GPU_ID=" + str(gpu_id) for gpu_id in all_gpu_ids
]
for i, node_idx in enumerate(all_node_ips):
row = [node_idx]
for j in range(len(all_gpu_ids)):
row.append(tuple(worker_id_list[i][j]))
table.add_row(row)

print(table)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return the gathered results to match the annotation (and keep printing).

Method is annotated as list[tuple[str, int]] but returns nothing. Return results so callers can consume the mapping.

Apply this diff:

 def print_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
     """Print the node IP and GPU ID of the current worker."""
-    results = ray.get(
+    results = ray.get(
         self.worker_group.run_all_workers_single_data(
             "report_node_ip_and_gpu_id",
         )
     )
@@
-    for i, node_idx in enumerate(all_node_ips):
-        row = [node_idx]
+    for i, node_ip in enumerate(all_node_ips):
+        row = [node_ip]
         for j in range(len(all_gpu_ids)):
             row.append(tuple(worker_id_list[i][j]))
         table.add_row(row)
 
     print(table)
+    return results
🤖 Prompt for AI Agents
In nemo_rl/models/policy/lm_policy.py around lines 778 to 809 the method
print_node_ip_and_gpu_id is annotated to return list[tuple[str, int]] but
currently only prints a PrettyTable and returns None; after printing the table,
return the gathered results variable (the list of (ip, gpu_id) tuples retrieved
from ray.get) so callers can consume the mapping while preserving the existing
print behavior.

Comment on lines +1939 to +1943
def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
"""Report the node IP and GPU ID of the current worker."""
ip = ray._private.services.get_node_ip_address()
gpu_id = ray.get_gpu_ids()[0]
return (ip, gpu_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Fix the return type mismatch.

The declared return type is list[tuple[str, int]], but the implementation returns a single tuple (ip, gpu_id) of type tuple[str, int]. This type mismatch will cause issues for callers expecting a list.

Fix option 1 (if a single tuple is intended): Change the return type annotation:

-    def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
+    def report_node_ip_and_gpu_id(self) -> tuple[str, int]:
         """Report the node IP and GPU ID of the current worker."""
         ip = ray._private.services.get_node_ip_address()
         gpu_id = ray.get_gpu_ids()[0]
         return (ip, gpu_id)

Fix option 2 (if a list is intended): Wrap the return value in a list:

     def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
         """Report the node IP and GPU ID of the current worker."""
         ip = ray._private.services.get_node_ip_address()
         gpu_id = ray.get_gpu_ids()[0]
-        return (ip, gpu_id)
+        return [(ip, gpu_id)]

Additional concerns:

  1. Line 1941 uses ray._private.services.get_node_ip_address(), which is a private API that may change without warning in future Ray versions.
  2. Line 1942 accesses ray.get_gpu_ids()[0] without bounds checking, which will raise an IndexError if no GPU IDs are available.

Note: The same type mismatch pattern exists in dtensor_policy_worker.py and dtensor_policy_worker_v2.py (see relevant code snippets), suggesting this is a systematic issue across the codebase.

Run the following script to check how this method is called and verify which fix option is appropriate:


🏁 Script executed:

#!/bin/bash
# Search for usages of report_node_ip_and_gpu_id to determine expected return type
rg -nP --type=py -C3 'report_node_ip_and_gpu_id\s*\(' -g '!**/test/**'

Length of output: 2019



---

🏁 Script executed:

```shell
#!/bin/bash
# Search for call sites (usages, not definitions) of report_node_ip_and_gpu_id
# Look for method invocation patterns like .report_node_ip_and_gpu_id() or variable assignment
rg -nP --type=py '\.report_node_ip_and_gpu_id\(\)' -g '!**/test/**' -C5

Length of output: 0



---

🏁 Script executed:

```shell
#!/bin/bash
# Find the class containing report_node_ip_and_gpu_id in megatron_policy_worker.py
# and check its base classes/interfaces
rg -nP --type=py 'class\s+\w+' nemo_rl/models/policy/megatron_policy_worker.py | head -5

Length of output: 31


🏁 Script executed:

#!/bin/bash
# Search for test files that might invoke report_node_ip_and_gpu_id
rg -nP --type=py 'report_node_ip_and_gpu_id' --include='*test*.py'

Length of output: 441



---

🏁 Script executed:

```shell
#!/bin/bash
# View the class definition of MegatronPolicyWorker to see base classes
sed -n '432,470p' nemo_rl/models/policy/megatron_policy_worker.py

Length of output: 1481


🏁 Script executed:

#!/bin/bash
# Search for test files correctly using -g glob pattern
rg -nP --type=py 'report_node_ip_and_gpu_id' -g '*test*.py'

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look for .remote() calls on objects that might have report_node_ip_and_gpu_id
# This method might be called via Ray actor remote execution
fd -e py -type f nemo_rl tests | xargs rg -l 'report_node_ip_and_gpu_id'

Length of output: 350



---

🏁 Script executed:

```shell
#!/bin/bash
# Check lm_policy.py for report_node_ip_and_gpu_id definition or interface
rg -nP --type=py 'report_node_ip_and_gpu_id' nemo_rl/models/policy/lm_policy.py -B5 -A10

Length of output: 720


🏁 Script executed:

#!/bin/bash
# Check if lm_policy.py defines a base class or protocol
sed -n '1,50p' nemo_rl/models/policy/lm_policy.py | grep -E 'class|Protocol|ABC'

Length of output: 0


Fix the return type annotation to tuple[str, int].

The method is called via Ray's run_all_workers_single_data() which collects results from all workers into a list. The caller in lm_policy.py (lines 785–791) expects each worker's return value to be a single tuple that can be unpacked as (ip, gpu_id):

all_node_ips = sorted(set([result[0] for result in results]))  # result is a tuple
for worker_id, (ip, gpu_id) in enumerate(results):              # unpacks tuple

The declared return type list[tuple[str, int]] is incorrect; it should be tuple[str, int]:

-    def report_node_ip_and_gpu_id(self) -> list[tuple[str, int]]:
+    def report_node_ip_and_gpu_id(self) -> tuple[str, int]:
         """Report the node IP and GPU ID of the current worker."""
         ip = ray._private.services.get_node_ip_address()
         gpu_id = ray.get_gpu_ids()[0]
         return (ip, gpu_id)

Additional concerns:

  1. Line 1941 uses ray._private.services.get_node_ip_address(), a private API that may change without warning.
  2. Line 1942 accesses ray.get_gpu_ids()[0] without bounds checking; will raise IndexError if unavailable.

Note: The same type mismatch exists in dtensor_policy_worker.py and dtensor_policy_worker_v2.py.

🤖 Prompt for AI Agents
In nemo_rl/models/policy/megatron_policy_worker.py around lines 1939 to 1943,
change the return type annotation from list[tuple[str, int]] to tuple[str, int]
and return a single tuple (ip, gpu_id) so callers that unpack the worker result
will work; also replace the private call
ray._private.services.get_node_ip_address() with the public API
ray.util.get_node_ip_address() (or ray._private.services.get_node_ip_address if
public API not available, but prefer the public one), and add bounds checking
for GPU IDs (e.g., call ray.get_gpu_ids() into a variable, raise/log a clear
error or return a sentinel value if empty before accessing [0]) to avoid
IndexError. Also apply the same type-annotation and safety fixes to
dtensor_policy_worker.py and dtensor_policy_worker_v2.py at the corresponding
lines.

Comment on lines +234 to +248
def test_create_sorted_bundle_indices_for_unified_pg():
"""Test that sorted bundle indices are created for a unified placement group."""
cluster = RayVirtualCluster(bundle_ct_per_node_list=[2], use_gpus=True)
cluster._init_placement_groups(strategy=None, use_unified_pg=True)
assert cluster._sorted_bundle_indices is not None
assert len(cluster._sorted_bundle_indices) == 2
assert 0 in cluster._sorted_bundle_indices
assert 1 in cluster._sorted_bundle_indices


def test_not_create_sorted_bundle_indices_for_per_node_pg():
"""Test that sorted bundle indices are not created for a per-node placement group."""
cluster = RayVirtualCluster(bundle_ct_per_node_list=[2], use_gpus=True)
cluster._init_placement_groups(strategy=None, use_unified_pg=False)
assert cluster._sorted_bundle_indices is None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add teardown and guard for GPU availability to avoid flaky/leaky tests.

Both tests create a RayVirtualCluster but never call shutdown(), and they assume >=2 GPUs. This can leak PGs/workers across tests and fail on CPU-only CI. Add a skip-if and ensure shutdown in finally.

Apply this diff:

@@
-def test_create_sorted_bundle_indices_for_unified_pg():
+def test_create_sorted_bundle_indices_for_unified_pg():
     """Test that sorted bundle indices are created for a unified placement group."""
-    cluster = RayVirtualCluster(bundle_ct_per_node_list=[2], use_gpus=True)
-    cluster._init_placement_groups(strategy=None, use_unified_pg=True)
-    assert cluster._sorted_bundle_indices is not None
-    assert len(cluster._sorted_bundle_indices) == 2
-    assert 0 in cluster._sorted_bundle_indices
-    assert 1 in cluster._sorted_bundle_indices
+    # Skip if the Ray cluster does not have >=2 GPUs
+    if ray.cluster_resources().get("GPU", 0) < 2:
+        pytest.skip("Requires >=2 GPUs to create a unified PG with 2 bundles")
+    cluster = RayVirtualCluster(bundle_ct_per_node_list=[2], use_gpus=True)
+    try:
+        cluster._init_placement_groups(strategy=None, use_unified_pg=True)
+        assert cluster._sorted_bundle_indices is not None
+        assert len(cluster._sorted_bundle_indices) == 2
+        assert 0 in cluster._sorted_bundle_indices
+        assert 1 in cluster._sorted_bundle_indices
+    finally:
+        cluster.shutdown()
@@
-def test_not_create_sorted_bundle_indices_for_per_node_pg():
+def test_not_create_sorted_bundle_indices_for_per_node_pg():
     """Test that sorted bundle indices are not created for a per-node placement group."""
-    cluster = RayVirtualCluster(bundle_ct_per_node_list=[2], use_gpus=True)
-    cluster._init_placement_groups(strategy=None, use_unified_pg=False)
-    assert cluster._sorted_bundle_indices is None
+    # Skip if the Ray cluster does not have >=2 GPUs
+    if ray.cluster_resources().get("GPU", 0) < 2:
+        pytest.skip("Requires >=2 GPUs to create per-node PGs with 2 bundles")
+    cluster = RayVirtualCluster(bundle_ct_per_node_list=[2], use_gpus=True)
+    try:
+        cluster._init_placement_groups(strategy=None, use_unified_pg=False)
+        assert cluster._sorted_bundle_indices is None
+    finally:
+        cluster.shutdown()
🤖 Prompt for AI Agents
In tests/unit/distributed/test_virtual_cluster.py around lines 234 to 248, the
two tests create RayVirtualCluster instances but never shut them down and assume
at least 2 GPUs which makes them flaky on CPU-only CI and can leak placement
groups/workers; update each test to skip when fewer than 2 GPUs are available
(use pytest.mark.skipif with a helper like torch.cuda.device_count() < 2 or Ray
GPU query) and wrap cluster creation/usage in try/finally so cluster.shutdown()
is always called in the finally block to ensure cleanup.

@guyueh1 guyueh1 added CI:L1 Run doctests, unit tests, and functional tests and removed CI:L1 Run doctests, unit tests, and functional tests labels Oct 24, 2025
@terrykong terrykong merged commit 1242003 into r0.4.0 Oct 25, 2025
103 of 120 checks passed
@terrykong terrykong deleted the cherry-pick-1341-r0.4.0 branch October 25, 2025 07:21
terrykong pushed a commit that referenced this pull request Nov 19, 2025
…oup (1341)` into `r0.4.0` (#1416)

Signed-off-by: Guyue Huang <guyueh@nvidia.com>
Signed-off-by: Guyue Huang <140554423+guyueh1@users.noreply.github.com>
Signed-off-by: NeMo Bot <nemo-bot@nvidia.com>
Co-authored-by: Guyue Huang <140554423+guyueh1@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cherry-pick CI:L1 Run doctests, unit tests, and functional tests Run CICD

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants