From cd12be078cfce46ef9888026349a370d11a8042f Mon Sep 17 00:00:00 2001 From: Parth Chadha Date: Fri, 18 Apr 2025 12:20:41 -0700 Subject: [PATCH 01/12] fix: raise error if tied weights model is being trained with fsdp1 or dtensor+tp>1 Signed-off-by: Parth Chadha --- examples/configs/grpo_math_1B.yaml | 4 +-- examples/configs/grpo_math_8B.yaml | 3 +++ .../models/policy/dtensor_policy_worker.py | 8 ++++++ .../models/policy/fsdp1_policy_worker.py | 8 ++++++ .../models/generation/test_vllm_generation.py | 25 +++++++++++++++++++ 5 files changed, 46 insertions(+), 2 deletions(-) diff --git a/examples/configs/grpo_math_1B.yaml b/examples/configs/grpo_math_1B.yaml index 4cf474df01..744a522c4c 100644 --- a/examples/configs/grpo_math_1B.yaml +++ b/examples/configs/grpo_math_1B.yaml @@ -24,7 +24,7 @@ checkpointing: save_period: 10 policy: - model_name: "meta-llama/Llama-3.2-1B-Instruct" + model_name: "Qwen/Qwen2.5-1.5B" tokenizer: name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default train_global_batch_size: 512 @@ -37,7 +37,7 @@ policy: activation_checkpointing_enabled: false dtensor_cfg: - enabled: false + enabled: true cpu_offload: False sequence_parallel: false activation_checkpointing: false diff --git a/examples/configs/grpo_math_8B.yaml b/examples/configs/grpo_math_8B.yaml index e791c66a34..fd19794598 100644 --- a/examples/configs/grpo_math_8B.yaml +++ b/examples/configs/grpo_math_8B.yaml @@ -18,6 +18,9 @@ policy: fsdp_offload_enabled: false activation_checkpointing_enabled: false + dtensor_cfg: + enabled: False + optimizer: name: "torch.optim.AdamW" kwargs: diff --git a/nemo_reinforcer/models/policy/dtensor_policy_worker.py b/nemo_reinforcer/models/policy/dtensor_policy_worker.py index c967a53c97..c4874816ce 100644 --- a/nemo_reinforcer/models/policy/dtensor_policy_worker.py +++ b/nemo_reinforcer/models/policy/dtensor_policy_worker.py @@ -24,6 +24,7 @@ FSDPModule, ) from transformers import AutoModelForCausalLM, AutoTokenizer +from transformers.modeling_utils import _get_tied_weight_keys from nemo_reinforcer.models.dtensor.parallelize import _parallelize_model from nemo_reinforcer.algorithms.interfaces import LossFunction @@ -140,6 +141,7 @@ def __init__( device_map="cpu", # load weights onto CPU initially torch_dtype=torch.float32, # use full precision in sft until https://github.com/NVIDIA/reinforcer/issues/13 is fixed ) + self.tokenizer = tokenizer # ------------------------------------------------ # 3) Move to GPU + Composable FSDP @@ -152,6 +154,12 @@ def __init__( f"World size({world_size}) must be divisible by TP size({tp_size}) to use DTensor" ) + num_tied_weights = len(_get_tied_weight_keys(self.model)) + if num_tied_weights != 0 and tp_size > 1: + raise ValueError( + f"Using dtensor policy with tp size {tp_size} for model ({model_name}) that has tied weights (num_tied_weights={num_tied_weights}) is not supported (https://github.com/NVIDIA/reinforcer/issues/227). Please use dtensor policy with tensor parallel == 1 instead." + ) + mesh_2d = torch.distributed.device_mesh.init_device_mesh( "cuda", (dp_size, tp_size), mesh_dim_names=("dp", "tp") ) diff --git a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py index 7358e519e4..c98993a1f3 100644 --- a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py +++ b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py @@ -37,6 +37,7 @@ ) from transformers import AutoModelForCausalLM, AutoTokenizer +from transformers.modeling_utils import _get_tied_weight_keys from nemo_reinforcer.models.policy import PolicyConfig from nemo_reinforcer.models.policy.utils import import_class_from_path from nemo_reinforcer.distributed.virtual_cluster import ( @@ -91,6 +92,13 @@ def __init__( device_map="cpu", # load weights onto CPU initially torch_dtype=torch.float32, # use full precision in sft until https://github.com/NVIDIA/reinforcer/issues/13 is fixed ) + + num_tied_weights = len(_get_tied_weight_keys(self.model)) + if num_tied_weights != 0: + raise ValueError( + f"Using FSP1 with a model ({model_name}) that has tied weights (num_tied_weights={num_tied_weights}) is not supported (https://github.com/NVIDIA/reinforcer/issues/227). Please use dtensor policy with tensor parallel == 1 instead." + ) + if init_reference_model: self.reference_model = AutoModelForCausalLM.from_pretrained( model_name, diff --git a/tests/unit/models/generation/test_vllm_generation.py b/tests/unit/models/generation/test_vllm_generation.py index 04e0cd5969..a29f64ea4e 100644 --- a/tests/unit/models/generation/test_vllm_generation.py +++ b/tests/unit/models/generation/test_vllm_generation.py @@ -17,6 +17,8 @@ import pytest import torch import ray +from _pytest.monkeypatch import MonkeyPatch + from nemo_reinforcer.algorithms.utils import get_tokenizer from nemo_reinforcer.distributed.virtual_cluster import RayVirtualCluster @@ -26,6 +28,29 @@ from nemo_reinforcer.models.policy import PolicyConfig +# Remove this once https://github.com/NVIDIA/reinforcer/issues/227 is fixed +@pytest.fixture(scope="module", autouse=True) +def patch_tied_weights_for_module(): + """ + Module-scoped fixture to automatically monkeypatch + _get_tied_weight_keys from transformers for all tests in this file. + """ + # Create a module-scoped MonkeyPatch object + mpatch = MonkeyPatch() + print( + f"\nApplying module-level patch to transformers.modeling_utils._get_tied_weight_keys for {__name__}" + ) + mpatch.setattr( + "transformers.modeling_utils._get_tied_weight_keys", + lambda model, **kwargs: [], # Always return empty list + raising=True, + ) + # Yield control to the tests in the module + yield + # MonkeyPatch object handles cleanup automatically when the fixture scope ends + mpatch.undo() + + # Define basic vLLM test config basic_vllm_test_config: VllmConfig = { "backend": "vllm", From e96d0afc32f999e30132af3483f08f890417701b Mon Sep 17 00:00:00 2001 From: Sahil Jain <48468750+SahilJain314@users.noreply.github.com> Date: Fri, 18 Apr 2025 17:06:54 -0700 Subject: [PATCH 02/12] feat: Add total logging of generations in training (#172) Signed-off-by: Sahil Jain Signed-off-by: Parth Chadha --- nemo_reinforcer/algorithms/grpo.py | 11 ++++++++++- nemo_reinforcer/utils/logger.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/nemo_reinforcer/algorithms/grpo.py b/nemo_reinforcer/algorithms/grpo.py index 0b5b81b289..84e02b39a9 100644 --- a/nemo_reinforcer/algorithms/grpo.py +++ b/nemo_reinforcer/algorithms/grpo.py @@ -290,7 +290,7 @@ def generate_responses( tokenizer, input_lengths: torch.Tensor, include_logprobs: bool = True, -) -> Tuple[List[torch.Tensor], List[str], torch.Tensor]: +) -> Tuple[BatchedDataDict[DatumSpec], List[List[int]], Dict[str, float | int]]: """Generate responses from policy.""" # Generate responses generation_outputs = policy_generation.generate(generation_input_data) @@ -452,6 +452,7 @@ def grpo_train( logger.log_metrics(validation_timings, step, prefix="timing/validation") # Run grpo training (single-turn) + batch: BatchedDataDict[DatumSpec] for batch in dataloader: print( f"\n{'=' * 25} Step {step + 1}/{min(len(dataloader), master_config['grpo']['max_num_steps'])} {'=' * 25}" @@ -645,6 +646,14 @@ def grpo_train( policy.offload_after_refit() # Logging + # Log training data + log_data = {"content": flat_messages["content"]} + log_data["rewards"] = rewards.tolist() + log_data["generation_logprobs"] = train_data["generation_logprobs"].tolist() + log_data["prev_logprobs"] = train_data["prev_logprobs"].tolist() + log_data["input_lengths"] = input_lengths.tolist() + logger.log_batched_dict_as_jsonl(log_data, f"train_data_step{step}.jsonl") + print("\nšŸ“Š Training Results:") metrics = { "loss": train_results["loss"].numpy(), diff --git a/nemo_reinforcer/utils/logger.py b/nemo_reinforcer/utils/logger.py index 3564ae6f0f..4df96172f9 100644 --- a/nemo_reinforcer/utils/logger.py +++ b/nemo_reinforcer/utils/logger.py @@ -19,6 +19,7 @@ import time import threading import requests +import json from abc import ABC, abstractmethod import logging from typing import List, Any, Dict, Optional, TypedDict, Union @@ -27,8 +28,10 @@ from rich.panel import Panel from rich.box import ROUNDED from rich.logging import RichHandler +import torch from nemo_reinforcer.data.interfaces import LLMMessageLogType +from nemo_reinforcer.distributed.batched_data_dict import BatchedDataDict from torch.utils.tensorboard import SummaryWriter import ray @@ -568,6 +571,32 @@ def log_hyperparams(self, params: Dict[str, Any]) -> None: for logger in self.loggers: logger.log_hyperparams(params) + def log_batched_dict_as_jsonl( + self, to_log: BatchedDataDict | Dict[str, Any], filename: str + ) -> None: + """Log a list of dictionaries to a JSONL file. + + Args: + to_log: BatchedDataDict to log + filename: Filename to log to (within the log directory) + """ + if not isinstance(to_log, BatchedDataDict): + to_log = BatchedDataDict(to_log) + + # Create full path within log directory + filepath = os.path.join(self.base_log_dir, filename) + os.makedirs(os.path.dirname(filepath), exist_ok=True) + + # Write to JSONL file + with open(filepath, "w") as f: + for i, sample in enumerate(to_log.make_microbatch_iterator(1)): + for key, value in sample.items(): + if isinstance(value, torch.Tensor): + sample[key] = value.tolist() + f.write(json.dumps({**sample, "idx": i}) + "\n") + + print(f"Logged data to {filepath}") + def __del__(self): """Clean up resources when the logger is destroyed.""" if self.gpu_monitor: From 936cab0d7d92fbefde04d35eb0188d4b26ecce0e Mon Sep 17 00:00:00 2001 From: Terry Kong Date: Sun, 20 Apr 2025 07:56:55 -0700 Subject: [PATCH 03/12] feat: introduce a debug API for backoff and retries for RayVirtualCluster (#234) Signed-off-by: Terry Kong Signed-off-by: Parth Chadha --- .../distributed/virtual_cluster.py | 29 ++++++- .../unit/distributed/test_virtual_cluster.py | 81 +++++++++++++++++++ 2 files changed, 107 insertions(+), 3 deletions(-) diff --git a/nemo_reinforcer/distributed/virtual_cluster.py b/nemo_reinforcer/distributed/virtual_cluster.py index 4f19fb821f..8b6600353c 100644 --- a/nemo_reinforcer/distributed/virtual_cluster.py +++ b/nemo_reinforcer/distributed/virtual_cluster.py @@ -19,6 +19,7 @@ import os import ray import logging +import time from ray.util.placement_group import placement_group, remove_placement_group from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -101,6 +102,10 @@ def init_ray(log_dir: Optional[str] = None): logger.info(f"Started local cluster with: {ray.cluster_resources()}") +class ResourceInsufficientError(Exception): + """Exception raised when the cluster does not have enough resources to satisfy the requested configuration.""" + + class RayVirtualCluster: """Creates a virtual distributed cluster using Ray placement groups. @@ -146,7 +151,25 @@ def __init__( ) self.max_colocated_worker_groups = max_colocated_worker_groups self.name = name - self._init_placement_groups(placement_group_strategy) + max_retries = int(os.environ.get("NRL_VIRTUAL_CLUSTER_MAX_RETRIES", 6)) + assert max_retries > 0, ( + f"NRL_VIRTUAL_CLUSTER_MAX_RETRIES={max_retries} must be an integer greater than 0" + ) + for i in range(max_retries): + try: + self._init_placement_groups(placement_group_strategy) + # Reaching here means we were successful + break + except ResourceInsufficientError: + print( + f"Retrying placement group creation... {i + 1}/{max_retries}. Next retry in {2**i} seconds." + ) + time.sleep(2**i) + continue + else: + raise ResourceInsufficientError( + f"Maximum number of retries reached ({max_retries}). Cluster resources may be insufficient or cluster itself is highly unstable. Please check your cluster configuration and your cluster logs." + ) def _init_placement_groups(self, strategy: str): """Creates placement groups for each node in the cluster. Has empty groups for nodes that don't have any bundles. @@ -175,12 +198,12 @@ def _init_placement_groups(self, strategy: str): # Validate resources if self.use_gpus and total_requested_gpus > total_available_gpus: - raise ValueError( + raise ResourceInsufficientError( f"Not enough GPUs available. Requested {total_requested_gpus} GPUs, but only {total_available_gpus} are available in the cluster." ) if total_requested_cpus > total_available_cpus: - raise ValueError( + raise ResourceInsufficientError( f"Not enough CPUs available. Requested {total_requested_cpus} CPUs, but only {total_available_cpus} are available in the cluster." ) diff --git a/tests/unit/distributed/test_virtual_cluster.py b/tests/unit/distributed/test_virtual_cluster.py index 4d01dd24f0..bdf24f3b18 100644 --- a/tests/unit/distributed/test_virtual_cluster.py +++ b/tests/unit/distributed/test_virtual_cluster.py @@ -14,8 +14,14 @@ from nemo_reinforcer.distributed.virtual_cluster import ( _get_node_ip_and_free_port, PY_EXECUTABLES, + RayVirtualCluster, + ResourceInsufficientError, ) import ray +import pytest +import os +from unittest.mock import patch, MagicMock +import importlib def test_get_node_ip_and_free_port_does_not_start_with_zero(): @@ -30,3 +36,78 @@ def test_get_node_ip_and_free_port_does_not_start_with_zero(): ).remote() ) assert not node_ip.startswith("0."), "Node IP should not start with 0.*.*.*" + + +def test_env_max_retries_invalid_value(): + """Test that NRL_VIRTUAL_CLUSTER_MAX_RETRIES rejects invalid values (less than or equal to zero).""" + + # Mock environment with invalid max_retries value + env_vars = {"NRL_VIRTUAL_CLUSTER_MAX_RETRIES": "0"} + + with patch.dict(os.environ, env_vars, clear=True): + with pytest.raises(AssertionError): + RayVirtualCluster(bundle_ct_per_node_list=[1]) + + +def test_env_max_retries_non_integer(): + """Test that NRL_VIRTUAL_CLUSTER_MAX_RETRIES handles non-integer values properly.""" + + # Mock environment with non-integer max_retries value + env_vars = {"NRL_VIRTUAL_CLUSTER_MAX_RETRIES": "not_a_number"} + + with patch.dict(os.environ, env_vars, clear=True): + with pytest.raises(ValueError): + RayVirtualCluster(bundle_ct_per_node_list=[1]) + + +def test_env_max_retries_default_value(): + """Test that default value for NRL_VIRTUAL_CLUSTER_MAX_RETRIES is used when not set.""" + + # Ensure environment variable is not set + with ( + patch.dict(os.environ, {}, clear=True), + patch( + "nemo_reinforcer.distributed.virtual_cluster.RayVirtualCluster._init_placement_groups" + ) as mock_init, + ): + # Mock successful initialization + mock_init.return_value = [MagicMock()] + + # Create cluster + cluster = RayVirtualCluster(bundle_ct_per_node_list=[1]) + + # Default value should be 6 (as seen in the code) + # We can't directly verify this, but we can check that initialization was attempted + assert mock_init.call_count == 1 + + +def test_env_max_retries_exhausted(): + """Test that NRL_VIRTUAL_CLUSTER_MAX_RETRIES correctly handles the case where all retries fail.""" + + # Set specific retry count to 4 + retry_count = 4 + env_vars = {"NRL_VIRTUAL_CLUSTER_MAX_RETRIES": str(retry_count)} + + with ( + patch.dict(os.environ, env_vars, clear=True), + patch( + "nemo_reinforcer.distributed.virtual_cluster.RayVirtualCluster._init_placement_groups" + ) as mock_init, + patch("time.sleep") as mock_sleep, + ): + # Make _init_placement_groups raise ResourceInsufficientError each time + mock_init.side_effect = ResourceInsufficientError("Not enough resources") + + # Create cluster - should retry retry_count times and then fail + with pytest.raises(ResourceInsufficientError): + RayVirtualCluster(bundle_ct_per_node_list=[1]) + + # Verify _init_placement_groups was called exactly retry_count times + assert mock_init.call_count == retry_count + + # Verify time.sleep was called with exponentially increasing values + assert mock_sleep.call_count == retry_count + mock_sleep.assert_any_call(1) # 2^0 + mock_sleep.assert_any_call(2) # 2^1 + mock_sleep.assert_any_call(4) # 2^2 + mock_sleep.assert_any_call(8) # 2^3 From 36963cffafdb3085f6dd38a897443701590bc54c Mon Sep 17 00:00:00 2001 From: Terry Kong Date: Sun, 20 Apr 2025 10:06:09 -0700 Subject: [PATCH 04/12] docs: update docs everywhere to remove uv pip install which isn't reliable (#217) Signed-off-by: Terry Kong Signed-off-by: Parth Chadha --- CONTRIBUTING.md | 6 --- README.md | 34 ++++++----------- docs/cluster.md | 38 ++++++++++--------- docs/guides/grpo.md | 2 +- docs/guides/sft.md | 2 +- docs/testing.md | 2 - nemo_reinforcer/models/generation/vllm.py | 4 +- .../models/generation/vllm_backend.py | 5 +-- 8 files changed, 38 insertions(+), 55 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 2b02a7fb63..85559eabc6 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -13,12 +13,6 @@ docker buildx build -t nemo-reinforcer -f Dockerfile . docker run -it --gpus all -v /path/to/nemo-reinforcer:/workspace/nemo-reinforcer nemo-reinforcer ``` -2. **Install the package in development mode**: -```bash -cd /workspace/nemo-reinforcer -pip install -e . -``` - ## Making Changes ### Workflow: Clone and Branch (No Fork Required) diff --git a/README.md b/README.md index a0aec8cde4..14391c5883 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ - [Nemo-Reinforcer: A Scalable and Efficient Post-Training Library for Models Ranging from tiny to \>100B Parameters, scaling from 1 GPU to 100s](#nemo-reinforcer-a-scalable-and-efficient-post-training-library-for-models-ranging-from-tiny-to-100b-parameters-scaling-from-1-gpu-to-100s) - [Features](#features) - - [Installation](#installation) + - [Prerequisuites](#prerequisuites) - [Quick start](#quick-start) - [SFT](#sft) - [Single Node](#single-node) @@ -38,28 +38,26 @@ What you can expect: - šŸ”œ **Environment Isolation** - Dependency isolation between components - šŸ”œ **DPO Algorithm** - Direct Preference Optimization for alignment -## Installation +## Prerequisuites ```sh -# For faster setup we use `uv` +# For faster setup and environment isolation, we use `uv` pip install uv -# Specify a virtual env that uses Python 3.12 -uv venv -p python3.12.9 .venv -# Install NeMo-Reinforcer with vllm -uv pip install -e .[vllm] -# Install NeMo-Reinforcer with dev/test dependencies -uv pip install -e '.[dev,test]' +# If you cannot install at the system level, you can install for your user with +# pip install --user uv -# Use uv run to launch any runs. -# Note that it is recommended to not activate the venv and instead use `uv run` since +# Use `uv run` to launch all commands. It handles pip installing implicitly and +# ensures your environment is up to date with our lock file. + +# Note that it is not recommended to activate the venv and instead use `uv run` since # it ensures consistent environment usage across different shells and sessions. # Example: uv run python examples/run_grpo_math.py ``` ## Quick start -**Reminder**: Don't forget to set your HF_HOME and WANDB_API_KEY (if needed). You'll need to do a `huggingface-cli login` as well for Llama models. +**Reminder**: Don't forget to set your `HF_HOME`, `WANDB_API_KEY`, and `HF_DATASETS_CACHE` (if needed). You'll need to do a `huggingface-cli login` as well for Llama models. ### SFT @@ -91,12 +89,6 @@ Refer to `examples/configs/sft.yaml` for a full list of parameters that can be o For distributed training across multiple nodes: -Set `UV_CACHE_DIR` to a directory that can be read from all workers before running any uv run command. - -```sh -export UV_CACHE_DIR=/path/that/all/workers/can/access/uv_cache -``` - ```sh # Run from the root of NeMo-Reinforcer repo NUM_ACTOR_NODES=2 @@ -104,8 +96,7 @@ NUM_ACTOR_NODES=2 TIMESTAMP=$(date +%Y%m%d_%H%M%S) # SFT experiment uses Llama-3.1-8B model -COMMAND="uv pip install -e .; uv run ./examples/run_sft.py --config examples/configs/sft.yaml cluster.num_nodes=2 cluster.gpus_per_node=8 checkpointing.checkpoint_dir='results/sft_llama8b_2nodes' logger.wandb_enabled=True logger.wandb.name='sft-llama8b'" \ -UV_CACHE_DIR=YOUR_UV_CACHE_DIR \ +COMMAND="uv run ./examples/run_sft.py --config examples/configs/sft.yaml cluster.num_nodes=2 cluster.gpus_per_node=8 checkpointing.checkpoint_dir='results/sft_llama8b_2nodes' logger.wandb_enabled=True logger.wandb.name='sft-llama8b'" \ CONTAINER=YOUR_CONTAINER \ MOUNTS="$PWD:$PWD" \ sbatch \ @@ -159,8 +150,7 @@ NUM_ACTOR_NODES=2 TIMESTAMP=$(date +%Y%m%d_%H%M%S) # grpo_math_8b uses Llama-3.1-8B-Instruct model -COMMAND="uv pip install -e .; uv run ./examples/run_grpo_math.py --config examples/configs/grpo_math_8B.yaml cluster.num_nodes=2 checkpointing.checkpoint_dir='results/llama8b_2nodes' logger.wandb_enabled=True logger.wandb.name='grpo-llama8b_math'" \ -UV_CACHE_DIR=YOUR_UV_CACHE_DIR \ +COMMAND="uv run ./examples/run_grpo_math.py --config examples/configs/grpo_math_8B.yaml cluster.num_nodes=2 checkpointing.checkpoint_dir='results/llama8b_2nodes' logger.wandb_enabled=True logger.wandb.name='grpo-llama8b_math'" \ CONTAINER=YOUR_CONTAINER \ MOUNTS="$PWD:$PWD" \ sbatch \ diff --git a/docs/cluster.md b/docs/cluster.md index c949b5eb77..8a73288e09 100644 --- a/docs/cluster.md +++ b/docs/cluster.md @@ -4,6 +4,7 @@ - [Slurm](#slurm) - [Batched Job Submission](#batched-job-submission) - [Interactive Launching](#interactive-launching) + - [Slurm UV\_CACHE\_DIR](#slurm-uv_cache_dir) - [Kubernetes](#kubernetes) ## Slurm @@ -14,7 +15,7 @@ # Run from the root of NeMo-Reinforcer repo NUM_ACTOR_NODES=1 # Total nodes requested (head is colocated on ray-worker-0) -COMMAND="uv pip install -e .; uv run ./examples/run_grpo_math.py" \ +COMMAND="uv run ./examples/run_grpo_math.py" \ CONTAINER=YOUR_CONTAINER \ MOUNTS="$PWD:$PWD" \ sbatch \ @@ -39,21 +40,6 @@ Make note of the the job submission number. Once the job begins you can track it tail -f 1980204-logs/ray-driver.log ``` -:::{note} -`UV_CACHE_DIR` defaults to `$SLURM_SUBMIT_DIR/uv_cache` and is mounted to head and worker nodes -to ensure fast `venv` creation. - -If you would like to override it to somewhere else all head/worker nodes can access, you may set it -via: - -```sh -... -UV_CACHE_DIR=/path/that/all/workers/and/head/can/access \ -sbatch ... \ - ray.sub -``` -::: - ### Interactive Launching :::{tip} @@ -87,11 +73,27 @@ bash 1980204-attach.sh ``` Now that you are on the head node, you can launch the command like so: ```sh -uv venv .venv -uv pip install -e . uv run ./examples/run_grpo_math.py ``` +### Slurm UV_CACHE_DIR + +There several choices for `UV_CACHE_DIR` when using `ray.sub`: + +1. (default) `UV_CACHE_DIR` defaults to `$SLURM_SUBMIT_DIR/uv_cache` when not specified the shell environment, and is mounted to head and worker nodes to serve as a persistent cache between runs. +2. Use the warm uv cache from our docker images + ```sh + ... + UV_CACHE_DIR=/home/ray/.cache/uv \ + sbatch ... \ + ray.sub + ``` + +(1) is more efficient in general since the cache is not ephemeral and is persisted run to run; but for users that +don't want to persist the cache, you can use (2), which is just as performant as (1) if the `uv.lock` is +covered by warmed cache. + + ## Kubernetes TBD diff --git a/docs/guides/grpo.md b/docs/guides/grpo.md index b84cbf9f0c..3010295846 100644 --- a/docs/guides/grpo.md +++ b/docs/guides/grpo.md @@ -12,7 +12,7 @@ uv run examples/run_grpo_math.py --config {overrides} If not specified, `config` will default to [examples/configs/grpo.yaml](../../examples/configs/grpo_math_1B.yaml) -**Reminder**: Don't forget to set your HF_HOME and WANDB_API_KEY (if needed). You'll need to do a `huggingface-cli login` as well for Llama models. +**Reminder**: Don't forget to set your HF_HOME, WANDB_API_KEY, and HF_DATASETS_CACHE (if needed). You'll need to do a `huggingface-cli login` as well for Llama models. ## Now, for the details: diff --git a/docs/guides/sft.md b/docs/guides/sft.md index 54ebe4d341..4d22b71427 100644 --- a/docs/guides/sft.md +++ b/docs/guides/sft.md @@ -21,7 +21,7 @@ uv run examples/run_sft.py \ cluster.gpus_per_node=1 \ logger.wandb.name="sft-dev-1-gpu" ``` -**Reminder**: Don't forget to set your HF_HOME and WANDB_API_KEY (if needed). You'll need to do a `huggingface-cli login` as well for Llama models. +**Reminder**: Don't forget to set your `HF_HOME`, `WANDB_API_KEY`, and `HF_DATASETS_CACHE` (if needed). You'll need to do a `huggingface-cli login` as well for Llama models. ## Datasets diff --git a/docs/testing.md b/docs/testing.md index 474b6fa0b1..466789dfd6 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -103,8 +103,6 @@ Functional tests may require multiple GPUs to run. See each script to understand Functional tests are located under `tests/functional/`. ```sh -# Install the project and the test dependencies -uv pip install -e '.[test]' # Run the functional test for sft uv run bash tests/functional/sft.sh ``` diff --git a/nemo_reinforcer/models/generation/vllm.py b/nemo_reinforcer/models/generation/vllm.py index 041da525e3..1483245259 100644 --- a/nemo_reinforcer/models/generation/vllm.py +++ b/nemo_reinforcer/models/generation/vllm.py @@ -153,8 +153,8 @@ def __init__( self.SamplingParams = vllm.SamplingParams except ImportError: raise ImportError( - "vLLM is not installed. Please install it with `pip install nemo-reinforcer[vllm]` " - "or `pip install vllm --no-build-isolation` separately." + f"vLLM is not installed. Please check that VllmGenerationWorker.DEFAULT_PY_EXECUTABLE covers the vllm dependency. " + "If you are working interactively, you can install by running `uv sync --extra vllm` anywhere in the repo." ) vllm_kwargs = self.cfg.get("vllm_kwargs", {}).copy() diff --git a/nemo_reinforcer/models/generation/vllm_backend.py b/nemo_reinforcer/models/generation/vllm_backend.py index a7fd12aa26..1e5fa21a33 100644 --- a/nemo_reinforcer/models/generation/vllm_backend.py +++ b/nemo_reinforcer/models/generation/vllm_backend.py @@ -17,9 +17,8 @@ import vllm except ImportError: raise ImportError( - "vLLM is not installed. Please install it with `pip install nemo-reinforcer[vllm]` " - "or `pip install vllm` separately. This issue may also occur if worker is using incorrect " - "py_executable." + f"vLLM is not installed. Please check that VllmGenerationWorker.DEFAULT_PY_EXECUTABLE covers the vllm dependency. " + "If you are working interactively, you can install by running `uv sync --extra vllm` anywhere in the repo." ) From cda3a3bf747a81c0299fbb63243d60abad09ba3c Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Mon, 21 Apr 2025 12:41:24 -0700 Subject: [PATCH 05/12] fix: Fix missing import (#222) Signed-off-by: Yi-Fu Wu Signed-off-by: Parth Chadha --- .../models/policy/fsdp1_policy_worker.py | 1 + tests/unit/models/policy/test_fsdp1_worker.py | 83 +++++++++++++++---- 2 files changed, 66 insertions(+), 18 deletions(-) diff --git a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py index c98993a1f3..1cb9baed5b 100644 --- a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py +++ b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py @@ -22,6 +22,7 @@ import torch from torch.distributed.device_mesh import init_device_mesh from torch.distributed.fsdp import ( + CPUOffload, FullyShardedDataParallel, MixedPrecision, ) diff --git a/tests/unit/models/policy/test_fsdp1_worker.py b/tests/unit/models/policy/test_fsdp1_worker.py index 4ea14f739a..0ace3084dc 100644 --- a/tests/unit/models/policy/test_fsdp1_worker.py +++ b/tests/unit/models/policy/test_fsdp1_worker.py @@ -261,18 +261,32 @@ def test_hf_policy_init(policy_setup, num_gpus): @pytest.fixture -def training_setup(tokenizer, num_gpus): - """Setup and teardown specifically for training tests.""" +def training_setup(tokenizer, request, num_gpus): + """ + Setup and teardown specifically for training tests. + + When used without parameterization, uses the default config. + When parameterized, takes any config updates as a dictionary in request.param + and applies them to the basic config. + """ policy = None cluster = None data = None loss_fn = None + # Get config updates from request.param if available + config_updates = {} + config_suffix = "" + if hasattr(request, "param") and request.param is not None: + config_updates = request.param + config_suffix = "-" + "-".join([f"{k}={v}" for k, v in config_updates.items()]) + try: # Create resources with unique name - cluster_name = f"test-train-{num_gpus}gpu" + cluster_name = f"test-train-{num_gpus}gpu{config_suffix}" print( - f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs..." + f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs" + f"{' with config updates: ' + str(config_updates) if config_updates else ''}" ) cluster = RayVirtualCluster( @@ -283,7 +297,10 @@ def training_setup(tokenizer, num_gpus): max_colocated_worker_groups=1, ) - config = basic_llama_test_config + # Create a config with optional modifications + config = deepcopy(basic_llama_test_config) + if config_updates: + config.update(config_updates) print("Creating training HfPolicy...") policy = HfPolicy( @@ -341,8 +358,23 @@ def get_max_gpu_utilization(policy): @pytest.mark.timeout(180) -@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) -def test_hf_policy_training(training_setup, tracker, num_gpus): +@pytest.mark.parametrize( + "num_gpus, training_setup, config_name", + [ + (1, None, "default"), + (2, None, "default"), + (2, {"fsdp_offload_enabled": True}, "fsdp_offload"), + (2, {"activation_checkpointing_enabled": True}, "activation_checkpointing"), + ], + indirect=["training_setup"], + ids=[ + "1gpu_default", + "2gpu_default", + "2gpu_fsdp_offload", + "2gpu_activation_checkpointing", + ], +) +def test_hf_policy_training(training_setup, tracker, num_gpus, config_name): def verify_loss_tensor(loss_tensor): assert not torch.isnan(loss_tensor).any(), "Loss should not be NaN" assert not torch.isinf(loss_tensor).any(), "Loss should not be Inf" @@ -357,7 +389,9 @@ def verify_loss_tensor(loss_tensor): assert loss_fn is not None, "Loss function was not created properly" # Call prepare_for_training if available - print("\nPreparing for training...") + print( + f"\nPreparing for training with {num_gpus} GPU(s) and {config_name} config..." + ) policy.prepare_for_training() losses = [] @@ -370,7 +404,9 @@ def verify_loss_tensor(loss_tensor): verify_loss_tensor(loss_tensor) losses.append(loss_tensor[-1].item()) - print(f"Training loss: {results['loss']}") + print( + f"Training loss with {num_gpus} GPU(s) and {config_name} config: {results['loss']}" + ) policy.finish_training() assert losses[0] > losses[-1], "Loss should decrease over training iterations" @@ -379,14 +415,16 @@ def verify_loss_tensor(loss_tensor): policy ) print( - f"Max GPU Utilization after training: {after_training_mem_allocated:,.1f} MB allocated, " + f"Max GPU Utilization after training with {num_gpus} GPU(s) and {config_name} config: {after_training_mem_allocated:,.1f} MB allocated, " f"{after_training_mem_reserved:,.1f} MB reserved" ) tracker.track( - f"after_training_mem_allocated_{num_gpus}gpu", after_training_mem_allocated + f"{num_gpus}gpu_{config_name}_after_training_mem_allocated", + after_training_mem_allocated, ) tracker.track( - f"after_training_mem_reserved_{num_gpus}gpu", after_training_mem_reserved + f"{num_gpus}gpu_{config_name}_after_training_mem_reserved", + after_training_mem_reserved, ) policy.offload_after_refit() @@ -394,20 +432,29 @@ def verify_loss_tensor(loss_tensor): policy ) print( - f"Max GPU Utilization after offload: {after_offload_mem_allocated:,.1f} MB allocated, " + f"Max GPU Utilization after offload with {num_gpus} GPU(s) and {config_name} config: {after_offload_mem_allocated:,.1f} MB allocated, " f"{after_offload_mem_reserved:,.1f} MB reserved" ) tracker.track( - f"after_offload_mem_allocated_{num_gpus}gpu", after_offload_mem_allocated + f"{num_gpus}gpu_{config_name}_after_offload_mem_allocated", + after_offload_mem_allocated, ) tracker.track( - f"after_offload_mem_reserved_{num_gpus}gpu", after_offload_mem_reserved + f"{num_gpus}gpu_{config_name}_after_offload_mem_reserved", + after_offload_mem_reserved, ) # Compare memory after offload to memory after training - assert after_training_mem_allocated > 10_000, ( - "Memory after training should be more than 10GB" - ) + if config_name == "fsdp_offload": + # With FSDP offload, memory usage after training should already be low + assert after_training_mem_allocated < 1_200, ( + "FSDP offload after training should be less than 1.2GB)" + ) + else: + assert after_training_mem_allocated > 10_000, ( + f"Memory after training with {config_name} config should be more than 10GB" + ) + assert after_offload_mem_allocated < 1_200, ( "Memory after offload should be less than 1.2GB" ) From 1f4a2a39275b97ceef95c8983324b752dffc8dfd Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Mon, 21 Apr 2025 12:41:29 -0700 Subject: [PATCH 06/12] feat: FSDP2 SFT (#206) Signed-off-by: Yi-Fu Wu Signed-off-by: Parth Chadha --- examples/configs/sft.yaml | 4 ++++ nemo_reinforcer/algorithms/loss_functions.py | 22 +++++++++++++------ nemo_reinforcer/algorithms/sft.py | 7 ++++++ .../models/policy/dtensor_policy_worker.py | 3 ++- 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/examples/configs/sft.yaml b/examples/configs/sft.yaml index 785b6e0d2e..28126b526c 100644 --- a/examples/configs/sft.yaml +++ b/examples/configs/sft.yaml @@ -47,6 +47,10 @@ policy: weight_decay: 0.1 betas: [0.9, 0.98] eps: 1e-5 + # when using Dtensor, we need to set foreach + # and fused to False + foreach: False + fused: False data: max_input_seq_length: ${policy.max_total_sequence_length} diff --git a/nemo_reinforcer/algorithms/loss_functions.py b/nemo_reinforcer/algorithms/loss_functions.py index d674e7deb0..ef5a698678 100644 --- a/nemo_reinforcer/algorithms/loss_functions.py +++ b/nemo_reinforcer/algorithms/loss_functions.py @@ -112,7 +112,7 @@ def __call__( next_token_logprobs = torch.nn.functional.log_softmax( next_token_logits, dim=-1 ) - next_tokens = data["input_ids"][:, 1:] # Skip first token + next_tokens = data.get("input_ids")[:, 1:].cuda() # Skip first token curr_logprobs = next_token_logprobs.gather( dim=-1, index=next_tokens.unsqueeze(-1) ).squeeze(-1) @@ -168,14 +168,22 @@ def __call__( sample_mask = data["sample_mask"] mask = token_mask * sample_mask.unsqueeze(-1) - next_tokens = data.get("input_ids")[:, 1:].cuda() # Skip first token - next_token_logprobs = torch.nn.functional.log_softmax(next_token_logits, dim=-1) - logprobs = next_token_logprobs[:, :-1] # Remove last position's logits + next_token_logits = next_token_logits.to(torch.float32) # Gather the logprobs for the actual next tokens - token_logprobs = logprobs.gather( - dim=-1, index=next_tokens.unsqueeze(-1) - ).squeeze(-1) + if isinstance(next_token_logits, torch.distributed.tensor.DTensor): + token_logprobs = get_logprobs_from_vocab_parallel_logits( + next_token_logits, data["input_ids"] + ) + else: + next_tokens = data.get("input_ids")[:, 1:].cuda() # Skip first token + next_token_logprobs = torch.nn.functional.log_softmax( + next_token_logits, dim=-1 + ) + logprobs = next_token_logprobs[:, :-1] # Remove last position's logits + token_logprobs = logprobs.gather( + dim=-1, index=next_tokens.unsqueeze(-1) + ).squeeze(-1) # Only compute loss on generated tokens (not input tokens) # by applying the token_loss_mask (shifted by 1 since we're predicting next tokens) diff --git a/nemo_reinforcer/algorithms/sft.py b/nemo_reinforcer/algorithms/sft.py index 45f4f08575..e6a6b3f418 100644 --- a/nemo_reinforcer/algorithms/sft.py +++ b/nemo_reinforcer/algorithms/sft.py @@ -237,6 +237,7 @@ def validate( val_metrics = {"val_loss": 0.0} + policy.prepare_for_training() for batch_idx, val_batch in enumerate(val_dataloader): ## add loss mask based on role to every message add_loss_mask_to_message_log( @@ -247,6 +248,9 @@ def validate( cat_and_padded, input_lengths = batched_message_log_to_flat_message( val_batch["message_log"], pad_value_dict={"token_ids": tokenizer.pad_token_id}, + make_sequence_length_divisible_by=master_config["policy"][ + "make_sequence_length_divisible_by" + ], ) val_data: BatchedDataDict = BatchedDataDict( @@ -358,6 +362,9 @@ def sft_train( cat_and_padded, input_lengths = batched_message_log_to_flat_message( batch["message_log"], pad_value_dict={"token_ids": tokenizer.pad_token_id}, + make_sequence_length_divisible_by=master_config["policy"][ + "make_sequence_length_divisible_by" + ], ) train_data: BatchedDataDict = BatchedDataDict( diff --git a/nemo_reinforcer/models/policy/dtensor_policy_worker.py b/nemo_reinforcer/models/policy/dtensor_policy_worker.py index c4874816ce..23f4efe165 100644 --- a/nemo_reinforcer/models/policy/dtensor_policy_worker.py +++ b/nemo_reinforcer/models/policy/dtensor_policy_worker.py @@ -329,6 +329,7 @@ def train( mb_losses.append(loss.item()) all_mb_metrics.append(loss_metrics) + grad_norm = None if not eval_mode: with torch.no_grad(): grad_norm = get_grad_norm( @@ -355,7 +356,7 @@ def train( with torch.no_grad(): local_loss = torch.tensor(losses, device="cuda") global_loss = torch.zeros_like(local_loss) - torch.distributed.all_reduce(local_loss) + torch.distributed.all_reduce(local_loss, group=self.dp_mesh.get_group()) global_loss = local_loss / self.dp_size # Aggregate metrics across all microbatches From 4683069971735d9b1b3fee271d166317da47a165 Mon Sep 17 00:00:00 2001 From: Parth Chadha Date: Mon, 21 Apr 2025 17:34:59 -0400 Subject: [PATCH 07/12] fix: skip vllm p2p check since its flaky (#238) Signed-off-by: Parth Chadha --- nemo_reinforcer/models/generation/vllm.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nemo_reinforcer/models/generation/vllm.py b/nemo_reinforcer/models/generation/vllm.py index 1483245259..ada0bf2623 100644 --- a/nemo_reinforcer/models/generation/vllm.py +++ b/nemo_reinforcer/models/generation/vllm.py @@ -109,6 +109,8 @@ def configure_worker( init_kwargs["fraction_of_gpus"] = num_gpus env_vars["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0" + # Skip vllm P2P check and rely on driver to report peer to peer capability. + env_vars["VLLM_SKIP_P2P_CHECK"] = "1" return resources, env_vars, init_kwargs From f73e786f4b46b1cb970f3f079062dd2f41054e81 Mon Sep 17 00:00:00 2001 From: Parth Chadha Date: Mon, 21 Apr 2025 19:57:34 -0700 Subject: [PATCH 08/12] Skip valueerror in case of unit tests Signed-off-by: Parth Chadha --- .../models/policy/dtensor_policy_worker.py | 3 ++- .../models/policy/fsdp1_policy_worker.py | 4 ++- .../models/generation/test_vllm_generation.py | 25 +------------------ 3 files changed, 6 insertions(+), 26 deletions(-) diff --git a/nemo_reinforcer/models/policy/dtensor_policy_worker.py b/nemo_reinforcer/models/policy/dtensor_policy_worker.py index 23f4efe165..b877035a4a 100644 --- a/nemo_reinforcer/models/policy/dtensor_policy_worker.py +++ b/nemo_reinforcer/models/policy/dtensor_policy_worker.py @@ -155,7 +155,8 @@ def __init__( ) num_tied_weights = len(_get_tied_weight_keys(self.model)) - if num_tied_weights != 0 and tp_size > 1: + skip_tie_check = self.cfg.get("skip_tie_check", False) + if num_tied_weights != 0 and tp_size > 1 and not skip_tie_check: raise ValueError( f"Using dtensor policy with tp size {tp_size} for model ({model_name}) that has tied weights (num_tied_weights={num_tied_weights}) is not supported (https://github.com/NVIDIA/reinforcer/issues/227). Please use dtensor policy with tensor parallel == 1 instead." ) diff --git a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py index 1cb9baed5b..dc60bbac5c 100644 --- a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py +++ b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py @@ -94,8 +94,10 @@ def __init__( torch_dtype=torch.float32, # use full precision in sft until https://github.com/NVIDIA/reinforcer/issues/13 is fixed ) + # Check if the model has tied weights num_tied_weights = len(_get_tied_weight_keys(self.model)) - if num_tied_weights != 0: + skip_tie_check = self.cfg.get("skip_tie_check", False) + if num_tied_weights != 0 and not skip_tie_check: raise ValueError( f"Using FSP1 with a model ({model_name}) that has tied weights (num_tied_weights={num_tied_weights}) is not supported (https://github.com/NVIDIA/reinforcer/issues/227). Please use dtensor policy with tensor parallel == 1 instead." ) diff --git a/tests/unit/models/generation/test_vllm_generation.py b/tests/unit/models/generation/test_vllm_generation.py index a29f64ea4e..5a26db0391 100644 --- a/tests/unit/models/generation/test_vllm_generation.py +++ b/tests/unit/models/generation/test_vllm_generation.py @@ -27,30 +27,6 @@ from nemo_reinforcer.models.generation.vllm import VllmGeneration, VllmConfig from nemo_reinforcer.models.policy import PolicyConfig - -# Remove this once https://github.com/NVIDIA/reinforcer/issues/227 is fixed -@pytest.fixture(scope="module", autouse=True) -def patch_tied_weights_for_module(): - """ - Module-scoped fixture to automatically monkeypatch - _get_tied_weight_keys from transformers for all tests in this file. - """ - # Create a module-scoped MonkeyPatch object - mpatch = MonkeyPatch() - print( - f"\nApplying module-level patch to transformers.modeling_utils._get_tied_weight_keys for {__name__}" - ) - mpatch.setattr( - "transformers.modeling_utils._get_tied_weight_keys", - lambda model, **kwargs: [], # Always return empty list - raising=True, - ) - # Yield control to the tests in the module - yield - # MonkeyPatch object handles cleanup automatically when the fixture scope ends - mpatch.undo() - - # Define basic vLLM test config basic_vllm_test_config: VllmConfig = { "backend": "vllm", @@ -80,6 +56,7 @@ def get_basic_hf_test_config(enable_dtensor: bool = False) -> PolicyConfig: "tokenizer": { "name": basic_vllm_test_config["tokenizer"]["name"], }, + "skip_tie_check": True, # Required training parameters "train_global_batch_size": 1, "train_micro_batch_size": 1, From 00a11bc7374f106688b20cd5d116ee5143ba034a Mon Sep 17 00:00:00 2001 From: mckimn Date: Mon, 21 Apr 2025 17:31:23 -0700 Subject: [PATCH 09/12] ci: Remove external config from project (#200) Signed-off-by: Nathan McKimpson Signed-off-by: Parth Chadha --- .gitlab-ci.yml | 35 ----------------------------------- 1 file changed, 35 deletions(-) delete mode 100644 .gitlab-ci.yml diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index 29aca28dc5..0000000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,35 +0,0 @@ -default: - image: ${DOC_BUILD_IMAGE} - tags: - - os/linux - - type/docker - -stages: - - build - - deploy - -.sphinx-build: &sphinx-build - - cd docs - - uv run --group docs sphinx-build --fail-on-warning --builder html . _build/html - -build-docs: - stage: build - script: - - *sphinx-build - artifacts: - name: ${CI_PROJECT_NAME}-${CI_COMMIT_SHORT_SHA} - paths: - - docs/_build - -pages: - stage: deploy - needs: ["build-docs"] - script: - - echo "Publishing HTML to GitLab Pages" - - rm -rf public - - mkdir public - - cp -r docs/_build/html/* public/ - rules: - - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - - if: $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH - environment: main From 6b20a92e3cf4f901e8286c037ea49d661fbfc305 Mon Sep 17 00:00:00 2001 From: Parth Chadha Date: Tue, 22 Apr 2025 13:44:41 -0700 Subject: [PATCH 10/12] Update readme to reflect the new 1b model Signed-off-by: Parth Chadha --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index eef9fc1b39..1dacff454c 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ We have a reference GRPO experiment config set up trained for math benchmarks us #### Single Node -To run GRPO on a single GPU for `Llama-3.2-1B-Instruct`: +To run GRPO on a single GPU for `Qwen/Qwen2.5-1.5B`: ```sh # Run the GRPO math example using a 1B parameter model @@ -87,10 +87,10 @@ You can override any of the parameters listed in the yaml configuration file. Fo ```sh uv run python examples/run_grpo_math.py \ - policy.model_name="Qwen/Qwen2-1.5B" \ - checkpointing.checkpoint_dir="results/qwen1_5b_math" \ + policy.model_name="Llama-3.2-1B-Instruct" \ + checkpointing.checkpoint_dir="results/llama1b_math" \ logger.wandb_enabled=True \ - logger.wandb.name="grpo-qwen1_5b_math" \ + logger.wandb.name="grpo-llama1b_math" \ logger.num_val_samples_to_print=10 \ ``` From 3a4c94fdb4263ed6bd204250899751c26a6aa30d Mon Sep 17 00:00:00 2001 From: Parth Chadha Date: Tue, 22 Apr 2025 16:26:05 -0700 Subject: [PATCH 11/12] Move error to train step; use env variable to skip test Signed-off-by: Parth Chadha --- examples/configs/grpo_math_1B.yaml | 1 + .../models/policy/dtensor_policy_worker.py | 19 ++++++++++++------- .../models/policy/fsdp1_policy_worker.py | 17 +++++++++-------- .../models/generation/test_vllm_generation.py | 17 ++++++++++++++++- .../unit/models/policy/test_dtensor_worker.py | 13 +++++++++++-- tests/unit/models/policy/test_fsdp1_worker.py | 13 +++++++++++++ 6 files changed, 62 insertions(+), 18 deletions(-) diff --git a/examples/configs/grpo_math_1B.yaml b/examples/configs/grpo_math_1B.yaml index c71f3c277b..3bd1978787 100644 --- a/examples/configs/grpo_math_1B.yaml +++ b/examples/configs/grpo_math_1B.yaml @@ -28,6 +28,7 @@ checkpointing: save_period: 10 policy: + # Qwen/Qwen2.5-1.5B has tied weights which are only supported with dtensor policy with tp size 1 (https://github.com/NVIDIA/reinforcer/issues/227) model_name: "Qwen/Qwen2.5-1.5B" tokenizer: name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default diff --git a/nemo_reinforcer/models/policy/dtensor_policy_worker.py b/nemo_reinforcer/models/policy/dtensor_policy_worker.py index e2f761907f..71dda574c8 100644 --- a/nemo_reinforcer/models/policy/dtensor_policy_worker.py +++ b/nemo_reinforcer/models/policy/dtensor_policy_worker.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import gc from collections import defaultdict @@ -154,13 +155,6 @@ def __init__( f"World size({world_size}) must be divisible by TP size({tp_size}) to use DTensor" ) - num_tied_weights = len(_get_tied_weight_keys(self.model)) - skip_tie_check = self.cfg.get("skip_tie_check", False) - if num_tied_weights != 0 and tp_size > 1 and not skip_tie_check: - raise ValueError( - f"Using dtensor policy with tp size {tp_size} for model ({model_name}) that has tied weights (num_tied_weights={num_tied_weights}) is not supported (https://github.com/NVIDIA/reinforcer/issues/227). Please use dtensor policy with tensor parallel == 1 instead." - ) - mesh_2d = torch.distributed.device_mesh.init_device_mesh( "cuda", (dp_size, tp_size), mesh_dim_names=("dp", "tp") ) @@ -257,6 +251,17 @@ def train( mbs: Optional[int] = None, ) -> Dict[str, Any]: """Train the policy on a batch of data with a given loss function.""" + num_tied_weights = len(_get_tied_weight_keys(self.model)) + skip_tie_check = os.environ.get("NRL_SKIP_TIED_WEIGHT_CHECK") + if ( + num_tied_weights != 0 + and self.cfg["dtensor_cfg"]["tensor_parallel_size"] > 1 + and not skip_tie_check + ): + raise ValueError( + f"Using dtensor policy with tp size {self.cfg['dtensor_cfg']['tensor_parallel_size']} for model ({self.cfg['model_name']}) that has tied weights (num_tied_weights={num_tied_weights}) is not supported (https://github.com/NVIDIA/reinforcer/issues/227). Please use dtensor policy with tensor parallel == 1 instead." + ) + if gbs is None: gbs = self.cfg["train_global_batch_size"] if mbs is None: diff --git a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py index a207927cd3..f1ba91dab1 100644 --- a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py +++ b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py @@ -17,6 +17,7 @@ from collections import defaultdict from contextlib import contextmanager, nullcontext from typing import Any, Dict, Optional +import os import ray import torch @@ -94,14 +95,6 @@ def __init__( torch_dtype=torch.float32, # use full precision in sft until https://github.com/NVIDIA/reinforcer/issues/13 is fixed ) - # Check if the model has tied weights - num_tied_weights = len(_get_tied_weight_keys(self.model)) - skip_tie_check = self.cfg.get("skip_tie_check", False) - if num_tied_weights != 0 and not skip_tie_check: - raise ValueError( - f"Using FSP1 with a model ({model_name}) that has tied weights (num_tied_weights={num_tied_weights}) is not supported (https://github.com/NVIDIA/reinforcer/issues/227). Please use dtensor policy with tensor parallel == 1 instead." - ) - if init_reference_model: self.reference_model = AutoModelForCausalLM.from_pretrained( model_name, @@ -228,6 +221,14 @@ def train( mbs: Optional[int] = None, ) -> Dict[str, Any]: """Train the policy on a batch of data with a given loss function.""" + # Check if the model has tied weights + num_tied_weights = len(_get_tied_weight_keys(self.model)) + skip_tie_check = os.environ.get("NRL_SKIP_TIED_WEIGHT_CHECK") + if num_tied_weights != 0 and not skip_tie_check: + raise ValueError( + f"Using FSP1 with a model ({self.cfg['model_name']}) that has tied weights (num_tied_weights={num_tied_weights}) is not supported (https://github.com/NVIDIA/reinforcer/issues/227). Please use dtensor policy with tensor parallel == 1 instead." + ) + if gbs is None: gbs = self.cfg["train_global_batch_size"] if mbs is None: diff --git a/tests/unit/models/generation/test_vllm_generation.py b/tests/unit/models/generation/test_vllm_generation.py index cd3a22aefa..5cd3fe3792 100644 --- a/tests/unit/models/generation/test_vllm_generation.py +++ b/tests/unit/models/generation/test_vllm_generation.py @@ -17,6 +17,7 @@ import pytest import torch import ray +import os from nemo_reinforcer.algorithms.utils import get_tokenizer from nemo_reinforcer.distributed.virtual_cluster import RayVirtualCluster @@ -54,7 +55,6 @@ def get_basic_hf_test_config(enable_dtensor: bool = False) -> PolicyConfig: "tokenizer": { "name": basic_vllm_test_config["tokenizer"]["name"], }, - "skip_tie_check": True, # Required training parameters "train_global_batch_size": 1, "train_micro_batch_size": 1, @@ -159,6 +159,21 @@ def test_input_data(tokenizer): ) +@pytest.fixture(scope="module", autouse=True) +def skip_tied_weight_check_for_all(): + """Automatically skip tied weight check for all tests in this module.""" + original_env_value = os.environ.get("NRL_SKIP_TIED_WEIGHT_CHECK", None) + os.environ["NRL_SKIP_TIED_WEIGHT_CHECK"] = "1" + + yield + + # Restore the original value + if original_env_value is not None: + os.environ["NRL_SKIP_TIED_WEIGHT_CHECK"] = original_env_value + else: + os.environ.pop("NRL_SKIP_TIED_WEIGHT_CHECK", None) + + def test_vllm_missing_required_config_key(cluster): """Test that an assertion error is raised when a required config key is missing.""" # Create a config missing a required key by removing 'model_name' diff --git a/tests/unit/models/policy/test_dtensor_worker.py b/tests/unit/models/policy/test_dtensor_worker.py index ffa644b91f..50c8d146cc 100644 --- a/tests/unit/models/policy/test_dtensor_worker.py +++ b/tests/unit/models/policy/test_dtensor_worker.py @@ -16,8 +16,6 @@ import pprint import torch import os -import unittest.mock -import torch.distributed as dist # Define a custom marker for model configuration tests pytestmark = pytest.mark.modelconfig @@ -88,6 +86,17 @@ def create_test_config( } +@pytest.fixture(scope="module", autouse=True) +def skip_tied_weight_check_for_all(): + """Automatically skip tied weight check for all tests in this module.""" + os.environ["NRL_SKIP_TIED_WEIGHT_CHECK"] = "1" + + yield + + # Restore the original value + os.environ.pop("NRL_SKIP_TIED_WEIGHT_CHECK", None) + + @pytest.fixture(scope="module") def two_gpu_virtual_cluster(): cluster_name = "test" diff --git a/tests/unit/models/policy/test_fsdp1_worker.py b/tests/unit/models/policy/test_fsdp1_worker.py index 0ace3084dc..2d30c90612 100644 --- a/tests/unit/models/policy/test_fsdp1_worker.py +++ b/tests/unit/models/policy/test_fsdp1_worker.py @@ -11,10 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import ray import pytest import pprint import torch +import os from copy import deepcopy from nemo_reinforcer.algorithms.interfaces import LossFunction @@ -73,6 +75,17 @@ } +@pytest.fixture(scope="module", autouse=True) +def skip_tied_weight_check_for_all(): + """Automatically skip tied weight check for all tests in this module.""" + os.environ["NRL_SKIP_TIED_WEIGHT_CHECK"] = "1" + + yield + + # Restore the original value + os.environ.pop("NRL_SKIP_TIED_WEIGHT_CHECK", None) + + @pytest.fixture(scope="function") def gc_collect(): """Helper function to force garbage collection after a test""" From 6f43e605f2f0038e91e5376a3bab52082a87d80a Mon Sep 17 00:00:00 2001 From: Parth Chadha Date: Tue, 22 Apr 2025 18:27:14 -0700 Subject: [PATCH 12/12] Fix failing checkpoint unit test Signed-off-by: Parth Chadha --- tests/unit/models/generation/test_vllm_generation.py | 6 +----- tests/unit/utils/test_native_checkpoint.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/unit/models/generation/test_vllm_generation.py b/tests/unit/models/generation/test_vllm_generation.py index 5cd3fe3792..d8ebbbcf89 100644 --- a/tests/unit/models/generation/test_vllm_generation.py +++ b/tests/unit/models/generation/test_vllm_generation.py @@ -162,16 +162,12 @@ def test_input_data(tokenizer): @pytest.fixture(scope="module", autouse=True) def skip_tied_weight_check_for_all(): """Automatically skip tied weight check for all tests in this module.""" - original_env_value = os.environ.get("NRL_SKIP_TIED_WEIGHT_CHECK", None) os.environ["NRL_SKIP_TIED_WEIGHT_CHECK"] = "1" yield # Restore the original value - if original_env_value is not None: - os.environ["NRL_SKIP_TIED_WEIGHT_CHECK"] = original_env_value - else: - os.environ.pop("NRL_SKIP_TIED_WEIGHT_CHECK", None) + os.environ.pop("NRL_SKIP_TIED_WEIGHT_CHECK", None) def test_vllm_missing_required_config_key(cluster): diff --git a/tests/unit/utils/test_native_checkpoint.py b/tests/unit/utils/test_native_checkpoint.py index 18568fed0a..979d2786b8 100755 --- a/tests/unit/utils/test_native_checkpoint.py +++ b/tests/unit/utils/test_native_checkpoint.py @@ -119,6 +119,17 @@ def policy(cluster, tokenizer): policy.worker_group.shutdown() +@pytest.fixture(scope="module", autouse=True) +def skip_tied_weight_check_for_all(): + """Automatically skip tied weight check for all tests in this module.""" + os.environ["NRL_SKIP_TIED_WEIGHT_CHECK"] = "1" + + yield + + # Restore the original value + os.environ.pop("NRL_SKIP_TIED_WEIGHT_CHECK", None) + + def get_dummy_state_dict(state_dict, dummy_dict={}): """Recursively get the dummy state dict by replacing tensors with random ones of the same shape.