From 7929d397f64ca65ea363ebfed19a8ed9016c2475 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Thu, 14 Aug 2025 12:38:30 -0700 Subject: [PATCH 01/19] memory optimizations for Nemotron12B 12k seqlen DPO training Signed-off-by: Yubo Gao --- examples/configs/dpo_nemotron12b.yaml | 101 ++++++++++++++++++ nemo_rl/models/dtensor/parallelize.py | 20 ++++ .../models/policy/dtensor_policy_worker.py | 2 + nemo_rl/models/policy/utils.py | 3 + 4 files changed, 126 insertions(+) create mode 100644 examples/configs/dpo_nemotron12b.yaml diff --git a/examples/configs/dpo_nemotron12b.yaml b/examples/configs/dpo_nemotron12b.yaml new file mode 100644 index 0000000000..ba14098823 --- /dev/null +++ b/examples/configs/dpo_nemotron12b.yaml @@ -0,0 +1,101 @@ +# DPO Algorithm Configuration +dpo: + max_num_epochs: 1 + max_num_steps: 100 + val_period: 10 + val_batches: 1 + val_global_batch_size: 16 + val_micro_batch_size: 1 + val_at_start: true + seed: 42 + + reference_policy_kl_penalty: 0.1 + preference_average_log_probs: False # whether normalizing log probs according to the sequence length in preference_loss + sft_average_log_probs: ${.preference_average_log_probs} # whether normalizing log probs according to the sequence length in sft_loss + + preference_loss_weight: 1 # the coefficient of the preference loss + sft_loss_weight: 0 # the coefficient of the SFT loss + +checkpointing: + enabled: true + checkpoint_dir: "results/dpo" + metric_name: "val_loss" + higher_is_better: false + keep_top_k: null + save_period: 50 + +policy: + model_name: "mistralai/Mistral-Nemo-Instruct-2407" + tokenizer: + name: ${policy.model_name} + + # number of preference samples per batch + # each preference sample corresponds to a pair of chosen and rejected responses + # so the actual batch size processed by the model is train_global_batch_size * 2 + train_global_batch_size: 8 + train_micro_batch_size: 1 + + + #logprob_batch_size: ${policy.train_micro_batch_size} + max_total_sequence_length: 12288 + precision: "bfloat16" + fsdp_offload_enabled: false + + dtensor_cfg: + enabled: true + cpu_offload: false + sequence_parallel: false + activation_checkpointing: true + tensor_parallel_size: 8 + context_parallel_size: 1 + custom_parallel_plan: null + + dynamic_batching: + enabled: false + + sequence_packing: + enabled: false + + # makes the training sequence length divisible by the tensor parallel size + # this is useful for sequence parallel training + make_sequence_length_divisible_by: ${policy.dtensor_cfg.tensor_parallel_size} + max_grad_norm: 1.0 + + optimizer: + name: "torch.optim.AdamW" + kwargs: + lr: 1.0e-6 + weight_decay: 0.01 + betas: [0.9, 0.999] + eps: 1e-8 + # when using Dtensor, we need to set foreach + # and fused to False + foreach: False + fused: False + + scheduler: + - name: "torch.optim.lr_scheduler.ConstantLR" + kwargs: + factor: 1.0 + total_iters: 10000000000 + - milestones: [] + +data: + dataset_name: "HelpSteer3" + max_input_seq_length: ${policy.max_total_sequence_length} +logger: + log_dir: "logs" # Base directory for all logs + wandb_enabled: true # Make sure you do a ``wandb login [Your API key]'' before running + tensorboard_enabled: false + mlflow_enabled: false + monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard + wandb: + project: "ybgao-aug7-dpo-oom-test" + name: "baseline" + gpu_monitoring: + collection_interval: 10 # How often to collect GPU usage metrics (in seconds) + flush_interval: 10 # How often to flush GPU usage metrics to the loggers (in seconds) + +cluster: + gpus_per_node: 8 + num_nodes: 1 diff --git a/nemo_rl/models/dtensor/parallelize.py b/nemo_rl/models/dtensor/parallelize.py index 25ecaf8051..64b8c52f80 100644 --- a/nemo_rl/models/dtensor/parallelize.py +++ b/nemo_rl/models/dtensor/parallelize.py @@ -538,6 +538,26 @@ def _parallelize_model( for i in range(len(layers)): layers[i].mlp = checkpoint_wrapper(layers[i].mlp) # type: ignore + """ + the extra memory overhead for layer norm seems to be only present + in mistral models, where some intermediate state is converted to float32 + + need to find a better solution for checkpointing + """ + if "self_attn" in layers[i].__dict__: + layers[i].self_attn = checkpoint_wrapper(layers[i].self_attn) # type: ignore + + if ( + "input_layernorm" in layers[i].__dict__ + and "post_attention_layernorm" in layers[i].__dict__ + ): + layers[i].input_layernorm = checkpoint_wrapper( + layers[i].input_layernorm # type: ignore + ) + layers[i].post_attention_layernorm = checkpoint_wrapper( + layers[i].post_attention_layernorm # type: ignore + ) + mp_policy = MixedPrecisionPolicy( param_dtype=param_dtype, reduce_dtype=torch.float32, diff --git a/nemo_rl/models/policy/dtensor_policy_worker.py b/nemo_rl/models/policy/dtensor_policy_worker.py index 62bd12cfe3..17f2c6406c 100644 --- a/nemo_rl/models/policy/dtensor_policy_worker.py +++ b/nemo_rl/models/policy/dtensor_policy_worker.py @@ -630,6 +630,8 @@ def train( for mb_idx, mb in enumerate( itertools.chain(mb_iterator, dummy_iterator) ): + torch.cuda.empty_cache() + with torch.autocast(device_type="cuda", dtype=self.dtype): if self.enable_seq_packing: input_ids = mb.get("input_ids").cuda() diff --git a/nemo_rl/models/policy/utils.py b/nemo_rl/models/policy/utils.py index c09a201268..755fd5a541 100644 --- a/nemo_rl/models/policy/utils.py +++ b/nemo_rl/models/policy/utils.py @@ -160,6 +160,9 @@ def configure_expandable_segments() -> None: # Add expandable_segments to existing configuration if existing_conf: # Append to existing configuration + + # max_split_size_mb:64 reduces fragmentation, enables 12B models + # with 12K context to fit on a single node new_conf = f"{existing_conf},expandable_segments:True" else: # Set new configuration From 02bee2a5197c512ca6e01ae02270777eaa9357ba Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Mon, 18 Aug 2025 14:44:22 -0700 Subject: [PATCH 02/19] implement suggested changes Signed-off-by: Yubo Gao --- ...ruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml} | 15 ++++-- examples/run_dpo.py | 4 ++ examples/run_grpo_math.py | 4 ++ examples/run_grpo_sliding_puzzle.py | 4 ++ examples/run_rm.py | 4 ++ examples/run_sft.py | 4 ++ nemo_rl/models/dtensor/parallelize.py | 9 ++-- nemo_rl/models/policy/utils.py | 54 +++++++------------ nemo_rl/utils/envvars.py | 13 +++++ 9 files changed, 65 insertions(+), 46 deletions(-) rename examples/configs/{dpo_nemotron12b.yaml => recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml} (84%) create mode 100644 nemo_rl/utils/envvars.py diff --git a/examples/configs/dpo_nemotron12b.yaml b/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml similarity index 84% rename from examples/configs/dpo_nemotron12b.yaml rename to examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml index ba14098823..2a1a5709c6 100644 --- a/examples/configs/dpo_nemotron12b.yaml +++ b/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml @@ -18,11 +18,12 @@ dpo: checkpointing: enabled: true - checkpoint_dir: "results/dpo" + checkpoint_dir: "results/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long" metric_name: "val_loss" higher_is_better: false keep_top_k: null save_period: 50 + checkpoint_must_save_by: null policy: model_name: "mistralai/Mistral-Nemo-Instruct-2407" @@ -35,6 +36,8 @@ policy: train_global_batch_size: 8 train_micro_batch_size: 1 + env: + PYTORCH_CUDA_ALLOC_CONF: "max_split_size_mb:64" #logprob_batch_size: ${policy.train_micro_batch_size} max_total_sequence_length: 12288 @@ -82,16 +85,18 @@ policy: data: dataset_name: "HelpSteer3" + shuffle: False max_input_seq_length: ${policy.max_total_sequence_length} + logger: - log_dir: "logs" # Base directory for all logs - wandb_enabled: true # Make sure you do a ``wandb login [Your API key]'' before running + log_dir: "logs/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long" # Base directory for all logs + wandb_enabled: false # Make sure you do a ``wandb login [Your API key]'' before running tensorboard_enabled: false mlflow_enabled: false monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard wandb: - project: "ybgao-aug7-dpo-oom-test" - name: "baseline" + project: "nemo-rl" + name: "dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long" gpu_monitoring: collection_interval: 10 # How often to collect GPU usage metrics (in seconds) flush_interval: 10 # How often to flush GPU usage metrics to the loggers (in seconds) diff --git a/examples/run_dpo.py b/examples/run_dpo.py index a9702ed93e..160e9967de 100644 --- a/examples/run_dpo.py +++ b/examples/run_dpo.py @@ -29,6 +29,7 @@ from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.models.policy import PolicyConfig from nemo_rl.utils.config import load_config, parse_hydra_overrides +from nemo_rl.utils.envvars import set_envvars from nemo_rl.utils.logger import get_next_experiment_dir @@ -236,6 +237,9 @@ def main(): init_ray() + if "env" in config["policy"]: + set_envvars(config["policy"]["env"]) + # setup data train_dataset, val_dataset, tokenizer, dpo_task_spec = setup_data( config["data"], config["policy"] diff --git a/examples/run_grpo_math.py b/examples/run_grpo_math.py index f31c2c212c..4694a8d3b9 100644 --- a/examples/run_grpo_math.py +++ b/examples/run_grpo_math.py @@ -41,6 +41,7 @@ from nemo_rl.environments.math_environment import MathEnvironment from nemo_rl.models.generation import configure_generation_config from nemo_rl.utils.config import load_config, parse_hydra_overrides +from nemo_rl.utils.envvars import set_envvars from nemo_rl.utils.logger import get_next_experiment_dir OmegaConf.register_new_resolver("mul", lambda a, b: a * b) @@ -222,6 +223,9 @@ def main() -> None: init_ray() + if "env" in config["policy"]: + set_envvars(config["policy"]["env"]) + # setup tokenizer tokenizer = get_tokenizer(config["policy"]["tokenizer"]) assert config["policy"]["generation"] is not None, ( diff --git a/examples/run_grpo_sliding_puzzle.py b/examples/run_grpo_sliding_puzzle.py index ca2359d0d2..701387fe0b 100644 --- a/examples/run_grpo_sliding_puzzle.py +++ b/examples/run_grpo_sliding_puzzle.py @@ -35,6 +35,7 @@ ) from nemo_rl.models.generation import configure_generation_config from nemo_rl.utils.config import load_config, parse_hydra_overrides +from nemo_rl.utils.envvars import set_envvars from nemo_rl.utils.logger import get_next_experiment_dir OmegaConf.register_new_resolver("mul", lambda a, b: a * b) @@ -223,6 +224,9 @@ def main(): init_ray() + if "env" in config["policy"]: + set_envvars(config["policy"]["env"]) + set_seed(config["grpo"]["seed"]) # setup tokenizer diff --git a/examples/run_rm.py b/examples/run_rm.py index 6586d8edb7..5dacb301fb 100644 --- a/examples/run_rm.py +++ b/examples/run_rm.py @@ -29,6 +29,7 @@ from nemo_rl.data.llm_message_utils import get_formatted_message_log from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.utils.config import load_config, parse_hydra_overrides +from nemo_rl.utils.envvars import set_envvars from nemo_rl.utils.logger import get_next_experiment_dir @@ -175,6 +176,9 @@ def main(): init_ray() + if "env" in config["policy"]: + set_envvars(config["policy"]["env"]) + # setup tokenizer tokenizer = get_tokenizer(config["policy"]["tokenizer"]) diff --git a/examples/run_sft.py b/examples/run_sft.py index fc2956b48a..ce78df3319 100644 --- a/examples/run_sft.py +++ b/examples/run_sft.py @@ -29,6 +29,7 @@ from nemo_rl.data.llm_message_utils import get_formatted_message_log from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.utils.config import load_config, parse_hydra_overrides +from nemo_rl.utils.envvars import set_envvars from nemo_rl.utils.logger import get_next_experiment_dir OmegaConf.register_new_resolver("mul", lambda a, b: a * b) @@ -192,6 +193,9 @@ def main(): init_ray() + if "env" in config["policy"]: + set_envvars(config["policy"]["env"]) + # setup tokenizer tokenizer = get_tokenizer(config["policy"]["tokenizer"]) diff --git a/nemo_rl/models/dtensor/parallelize.py b/nemo_rl/models/dtensor/parallelize.py index 64b8c52f80..e640f020b6 100644 --- a/nemo_rl/models/dtensor/parallelize.py +++ b/nemo_rl/models/dtensor/parallelize.py @@ -544,16 +544,15 @@ def _parallelize_model( need to find a better solution for checkpointing """ - if "self_attn" in layers[i].__dict__: + if hasattr(layers[i], "self_attn"): layers[i].self_attn = checkpoint_wrapper(layers[i].self_attn) # type: ignore - if ( - "input_layernorm" in layers[i].__dict__ - and "post_attention_layernorm" in layers[i].__dict__ - ): + if hasattr(layers[i], "input_layernorm"): layers[i].input_layernorm = checkpoint_wrapper( layers[i].input_layernorm # type: ignore ) + + if hasattr(layers[i], "post_attention_layernorm"): layers[i].post_attention_layernorm = checkpoint_wrapper( layers[i].post_attention_layernorm # type: ignore ) diff --git a/nemo_rl/models/policy/utils.py b/nemo_rl/models/policy/utils.py index 755fd5a541..7751098c9a 100644 --- a/nemo_rl/models/policy/utils.py +++ b/nemo_rl/models/policy/utils.py @@ -146,44 +146,26 @@ def configure_expandable_segments() -> None: This helps with memory allocation but causes crashes on Ampere GPUs, so we only enable it on newer architectures. If PYTORCH_CUDA_ALLOC_CONF is already set, preserves existing values. """ + conf = { + k: v + for k, v in ( + item.split(":") + for item in os.environ.get("PYTORCH_CUDA_ALLOC_CONF", "").split(",") + ) + } compute_capability = torch.cuda.get_device_properties(0).major - - if compute_capability >= 9: # Hopper+ - existing_conf = os.environ.get("PYTORCH_CUDA_ALLOC_CONF", "") - - # Check if expandable_segments is already configured - if "expandable_segments" in existing_conf: - print(f"expandable_segments already configured: {existing_conf}") - # Already configured, don't override - return - - # Add expandable_segments to existing configuration - if existing_conf: - # Append to existing configuration - - # max_split_size_mb:64 reduces fragmentation, enables 12B models - # with 12K context to fit on a single node - new_conf = f"{existing_conf},expandable_segments:True" - else: - # Set new configuration - new_conf = "expandable_segments:True" - - print(f"Setting PYTORCH_CUDA_ALLOC_CONF to {new_conf}") - os.environ["PYTORCH_CUDA_ALLOC_CONF"] = new_conf - + if compute_capability >= 9: + conf["expandable_segments"] = conf.get("expandable_segments", "True") else: - ## make sure that expandable_segments is not set to True - if "expandable_segments" in os.environ.get("PYTORCH_CUDA_ALLOC_CONF", ""): - conf_items = os.environ["PYTORCH_CUDA_ALLOC_CONF"].split(",") - for item in conf_items: - if item.strip().startswith("expandable_segments"): - key_value = item.split(":") - if len(key_value) == 2 and key_value[1].strip().lower() == "true": - raise RuntimeError( - "expandable_segments is enabled in PYTORCH_CUDA_ALLOC_CONF, " - "but this is not supported on architectures older than Hopper (compute capability < 9). " - "Please set expandable_segments to False." - ) + if conf.get("expandable_segments", "").lower() == "true": + raise RuntimeError( + "expandable_segments is enabled in PYTORCH_CUDA_ALLOC_CONF, " + "but this is not supported on architectures older than Hopper (compute capability < 9). " + "Please set expandable_segments to False." + ) + os.environ["PYTORCH_CUDA_ALLOC_CONF"] = ",".join( + f"{k}:{v}" for k, v in conf.items() + ) def configure_dynamo_cache() -> None: diff --git a/nemo_rl/utils/envvars.py b/nemo_rl/utils/envvars.py new file mode 100644 index 0000000000..1519c82a4a --- /dev/null +++ b/nemo_rl/utils/envvars.py @@ -0,0 +1,13 @@ + +import os + +def set_envvars(envvars: dict[str, str]): + """Set environment variables. + + Args: + envvars: Dictionary of environment variables to set. + """ + print("Setting environment variables:") + for key, value in envvars.items(): + os.environ[key] = value + print(f" - {key}: {value}") From fb8c1bbb1297d42061a135514f9c1aebe4b27ca7 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Mon, 18 Aug 2025 14:46:45 -0700 Subject: [PATCH 03/19] add copyright Signed-off-by: Yubo Gao --- nemo_rl/utils/envvars.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/nemo_rl/utils/envvars.py b/nemo_rl/utils/envvars.py index 1519c82a4a..96a6020d9f 100644 --- a/nemo_rl/utils/envvars.py +++ b/nemo_rl/utils/envvars.py @@ -1,6 +1,20 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import os + def set_envvars(envvars: dict[str, str]): """Set environment variables. From 87f858b0cfac94d9cdbaf3d61c523af3c1f18a69 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Mon, 18 Aug 2025 15:11:37 -0700 Subject: [PATCH 04/19] make lint pass Signed-off-by: Yubo Gao --- pyrefly.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyrefly.toml b/pyrefly.toml index 3a195b1494..259826544c 100644 --- a/pyrefly.toml +++ b/pyrefly.toml @@ -101,6 +101,7 @@ project-includes = [ "nemo_rl/models/policy/interfaces.py", "nemo_rl/models/policy/utils.py", "nemo_rl/utils/__init__.py", + "nemo_rl/utils/envvars.py", "nemo_rl/utils/checkpoint.py", "nemo_rl/utils/config.py", "nemo_rl/utils/native_checkpoint.py", From 00d5d9aa069225130df583cd3b080367a2df8403 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Tue, 19 Aug 2025 11:34:27 -0700 Subject: [PATCH 05/19] update configuration key and README Signed-off-by: Yubo Gao --- README.md | 13 +++++++++++++ ...mo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml | 2 +- examples/run_dpo.py | 4 ++-- examples/run_grpo_math.py | 4 ++-- examples/run_grpo_sliding_puzzle.py | 4 ++-- examples/run_rm.py | 4 ++-- examples/run_sft.py | 4 ++-- 7 files changed, 24 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 75bf02f4fe..c3f7d01e12 100644 --- a/README.md +++ b/README.md @@ -473,6 +473,19 @@ For detailed instructions on how to set up and launch NeMo RL on Slurm or Kubern NRL_FORCE_REBUILD_VENVS=true uv run examples/run_grpo.py ... ``` +- Large amounts of memory fragmentation might occur when running models without support for FlashAttention2. If OOM occurs after a few iterations of training, it may help to tweak the allocator settings to reduce memory fragmentation. To do so, **either**: + 1. Launch training with: + ```sh + PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:64 uv run python examples/run_dpo.py ... + ``` + 2. Make the change more permanently by adding this flag in the training configuration: + ```yaml + policy: + # set GLOBAL environment variables + env_vars: + PYTORCH_CUDA_ALLOC_CONF: "max_split_size_mb:64" + ``` + ## Citation If you use NeMo RL in your research, please cite it using the following BibTeX entry: diff --git a/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml b/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml index 2a1a5709c6..99b4dc6591 100644 --- a/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml +++ b/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml @@ -36,7 +36,7 @@ policy: train_global_batch_size: 8 train_micro_batch_size: 1 - env: + env_vars: PYTORCH_CUDA_ALLOC_CONF: "max_split_size_mb:64" #logprob_batch_size: ${policy.train_micro_batch_size} diff --git a/examples/run_dpo.py b/examples/run_dpo.py index 160e9967de..86a482572b 100644 --- a/examples/run_dpo.py +++ b/examples/run_dpo.py @@ -237,8 +237,8 @@ def main(): init_ray() - if "env" in config["policy"]: - set_envvars(config["policy"]["env"]) + if "env_vars" in config["policy"]: + set_envvars(config["policy"]["env_vars"]) # setup data train_dataset, val_dataset, tokenizer, dpo_task_spec = setup_data( diff --git a/examples/run_grpo_math.py b/examples/run_grpo_math.py index 4694a8d3b9..baa126fa0c 100644 --- a/examples/run_grpo_math.py +++ b/examples/run_grpo_math.py @@ -223,8 +223,8 @@ def main() -> None: init_ray() - if "env" in config["policy"]: - set_envvars(config["policy"]["env"]) + if "env_vars" in config["policy"]: + set_envvars(config["policy"]["env_vars"]) # setup tokenizer tokenizer = get_tokenizer(config["policy"]["tokenizer"]) diff --git a/examples/run_grpo_sliding_puzzle.py b/examples/run_grpo_sliding_puzzle.py index 701387fe0b..04014d5bf1 100644 --- a/examples/run_grpo_sliding_puzzle.py +++ b/examples/run_grpo_sliding_puzzle.py @@ -224,8 +224,8 @@ def main(): init_ray() - if "env" in config["policy"]: - set_envvars(config["policy"]["env"]) + if "env_vars" in config["policy"]: + set_envvars(config["policy"]["env_vars"]) set_seed(config["grpo"]["seed"]) diff --git a/examples/run_rm.py b/examples/run_rm.py index 5dacb301fb..fb1dbb463e 100644 --- a/examples/run_rm.py +++ b/examples/run_rm.py @@ -176,8 +176,8 @@ def main(): init_ray() - if "env" in config["policy"]: - set_envvars(config["policy"]["env"]) + if "env_vars" in config["policy"]: + set_envvars(config["policy"]["env_vars"]) # setup tokenizer tokenizer = get_tokenizer(config["policy"]["tokenizer"]) diff --git a/examples/run_sft.py b/examples/run_sft.py index ce78df3319..a6dd0697b2 100644 --- a/examples/run_sft.py +++ b/examples/run_sft.py @@ -193,8 +193,8 @@ def main(): init_ray() - if "env" in config["policy"]: - set_envvars(config["policy"]["env"]) + if "env_vars" in config["policy"]: + set_envvars(config["policy"]["env_vars"]) # setup tokenizer tokenizer = get_tokenizer(config["policy"]["tokenizer"]) From 9be4a2cce56ecd0bafbd479f22d23be9f5c702ee Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Tue, 19 Aug 2025 16:25:19 -0700 Subject: [PATCH 06/19] fix allocator setting Signed-off-by: Yubo Gao --- ...truct-2407-1n8g-fsdp2tp8-actckpt-long.yaml | 4 +-- examples/run_dpo.py | 4 --- examples/run_grpo_math.py | 4 --- examples/run_grpo_sliding_puzzle.py | 4 --- examples/run_rm.py | 4 --- examples/run_sft.py | 4 --- nemo_rl/models/policy/utils.py | 25 ++++++++++------- nemo_rl/utils/envvars.py | 27 ------------------- pyrefly.toml | 1 - 9 files changed, 18 insertions(+), 59 deletions(-) delete mode 100644 nemo_rl/utils/envvars.py diff --git a/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml b/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml index 99b4dc6591..59aac4f950 100644 --- a/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml +++ b/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml @@ -36,8 +36,6 @@ policy: train_global_batch_size: 8 train_micro_batch_size: 1 - env_vars: - PYTORCH_CUDA_ALLOC_CONF: "max_split_size_mb:64" #logprob_batch_size: ${policy.train_micro_batch_size} max_total_sequence_length: 12288 @@ -52,6 +50,8 @@ policy: tensor_parallel_size: 8 context_parallel_size: 1 custom_parallel_plan: null + env_vars: + PYTORCH_CUDA_ALLOC_CONF: "max_split_size_mb:64" dynamic_batching: enabled: false diff --git a/examples/run_dpo.py b/examples/run_dpo.py index 86a482572b..a9702ed93e 100644 --- a/examples/run_dpo.py +++ b/examples/run_dpo.py @@ -29,7 +29,6 @@ from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.models.policy import PolicyConfig from nemo_rl.utils.config import load_config, parse_hydra_overrides -from nemo_rl.utils.envvars import set_envvars from nemo_rl.utils.logger import get_next_experiment_dir @@ -237,9 +236,6 @@ def main(): init_ray() - if "env_vars" in config["policy"]: - set_envvars(config["policy"]["env_vars"]) - # setup data train_dataset, val_dataset, tokenizer, dpo_task_spec = setup_data( config["data"], config["policy"] diff --git a/examples/run_grpo_math.py b/examples/run_grpo_math.py index baa126fa0c..f31c2c212c 100644 --- a/examples/run_grpo_math.py +++ b/examples/run_grpo_math.py @@ -41,7 +41,6 @@ from nemo_rl.environments.math_environment import MathEnvironment from nemo_rl.models.generation import configure_generation_config from nemo_rl.utils.config import load_config, parse_hydra_overrides -from nemo_rl.utils.envvars import set_envvars from nemo_rl.utils.logger import get_next_experiment_dir OmegaConf.register_new_resolver("mul", lambda a, b: a * b) @@ -223,9 +222,6 @@ def main() -> None: init_ray() - if "env_vars" in config["policy"]: - set_envvars(config["policy"]["env_vars"]) - # setup tokenizer tokenizer = get_tokenizer(config["policy"]["tokenizer"]) assert config["policy"]["generation"] is not None, ( diff --git a/examples/run_grpo_sliding_puzzle.py b/examples/run_grpo_sliding_puzzle.py index 04014d5bf1..ca2359d0d2 100644 --- a/examples/run_grpo_sliding_puzzle.py +++ b/examples/run_grpo_sliding_puzzle.py @@ -35,7 +35,6 @@ ) from nemo_rl.models.generation import configure_generation_config from nemo_rl.utils.config import load_config, parse_hydra_overrides -from nemo_rl.utils.envvars import set_envvars from nemo_rl.utils.logger import get_next_experiment_dir OmegaConf.register_new_resolver("mul", lambda a, b: a * b) @@ -224,9 +223,6 @@ def main(): init_ray() - if "env_vars" in config["policy"]: - set_envvars(config["policy"]["env_vars"]) - set_seed(config["grpo"]["seed"]) # setup tokenizer diff --git a/examples/run_rm.py b/examples/run_rm.py index fb1dbb463e..6586d8edb7 100644 --- a/examples/run_rm.py +++ b/examples/run_rm.py @@ -29,7 +29,6 @@ from nemo_rl.data.llm_message_utils import get_formatted_message_log from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.utils.config import load_config, parse_hydra_overrides -from nemo_rl.utils.envvars import set_envvars from nemo_rl.utils.logger import get_next_experiment_dir @@ -176,9 +175,6 @@ def main(): init_ray() - if "env_vars" in config["policy"]: - set_envvars(config["policy"]["env_vars"]) - # setup tokenizer tokenizer = get_tokenizer(config["policy"]["tokenizer"]) diff --git a/examples/run_sft.py b/examples/run_sft.py index a6dd0697b2..fc2956b48a 100644 --- a/examples/run_sft.py +++ b/examples/run_sft.py @@ -29,7 +29,6 @@ from nemo_rl.data.llm_message_utils import get_formatted_message_log from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.utils.config import load_config, parse_hydra_overrides -from nemo_rl.utils.envvars import set_envvars from nemo_rl.utils.logger import get_next_experiment_dir OmegaConf.register_new_resolver("mul", lambda a, b: a * b) @@ -193,9 +192,6 @@ def main(): init_ray() - if "env_vars" in config["policy"]: - set_envvars(config["policy"]["env_vars"]) - # setup tokenizer tokenizer = get_tokenizer(config["policy"]["tokenizer"]) diff --git a/nemo_rl/models/policy/utils.py b/nemo_rl/models/policy/utils.py index 7751098c9a..b206084833 100644 --- a/nemo_rl/models/policy/utils.py +++ b/nemo_rl/models/policy/utils.py @@ -146,13 +146,18 @@ def configure_expandable_segments() -> None: This helps with memory allocation but causes crashes on Ampere GPUs, so we only enable it on newer architectures. If PYTORCH_CUDA_ALLOC_CONF is already set, preserves existing values. """ - conf = { - k: v - for k, v in ( - item.split(":") - for item in os.environ.get("PYTORCH_CUDA_ALLOC_CONF", "").split(",") - ) - } + conf = ( + { + k: v + for k, v in ( + item.split(":") + for item in os.environ.get("PYTORCH_CUDA_ALLOC_CONF").split(",") + ) + } + if os.environ.get("PYTORCH_CUDA_ALLOC_CONF", None) + else {} + ) + compute_capability = torch.cuda.get_device_properties(0).major if compute_capability >= 9: conf["expandable_segments"] = conf.get("expandable_segments", "True") @@ -163,8 +168,10 @@ def configure_expandable_segments() -> None: "but this is not supported on architectures older than Hopper (compute capability < 9). " "Please set expandable_segments to False." ) - os.environ["PYTORCH_CUDA_ALLOC_CONF"] = ",".join( - f"{k}:{v}" for k, v in conf.items() + + # don't write back to the environment variable since torch is already loaded + torch.cuda.memory._set_allocator_settings( + ",".join(f"{k}:{v}" for k, v in conf.items()) ) diff --git a/nemo_rl/utils/envvars.py b/nemo_rl/utils/envvars.py deleted file mode 100644 index 96a6020d9f..0000000000 --- a/nemo_rl/utils/envvars.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os - - -def set_envvars(envvars: dict[str, str]): - """Set environment variables. - - Args: - envvars: Dictionary of environment variables to set. - """ - print("Setting environment variables:") - for key, value in envvars.items(): - os.environ[key] = value - print(f" - {key}: {value}") diff --git a/pyrefly.toml b/pyrefly.toml index 259826544c..3a195b1494 100644 --- a/pyrefly.toml +++ b/pyrefly.toml @@ -101,7 +101,6 @@ project-includes = [ "nemo_rl/models/policy/interfaces.py", "nemo_rl/models/policy/utils.py", "nemo_rl/utils/__init__.py", - "nemo_rl/utils/envvars.py", "nemo_rl/utils/checkpoint.py", "nemo_rl/utils/config.py", "nemo_rl/utils/native_checkpoint.py", From 63a82de99e543df86378497451e35cbf86238da6 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Tue, 19 Aug 2025 16:28:28 -0700 Subject: [PATCH 07/19] update readme and lint Signed-off-by: Yubo Gao --- README.md | 7 ++++--- nemo_rl/models/policy/utils.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index c3f7d01e12..7aa050de0f 100644 --- a/README.md +++ b/README.md @@ -481,9 +481,10 @@ For detailed instructions on how to set up and launch NeMo RL on Slurm or Kubern 2. Make the change more permanently by adding this flag in the training configuration: ```yaml policy: - # set GLOBAL environment variables - env_vars: - PYTORCH_CUDA_ALLOC_CONF: "max_split_size_mb:64" + # ... + dtensor_cfg: + env_vars: + PYTORCH_CUDA_ALLOC_CONF: "max_split_size_mb:64" ``` ## Citation diff --git a/nemo_rl/models/policy/utils.py b/nemo_rl/models/policy/utils.py index b206084833..b6832bafd7 100644 --- a/nemo_rl/models/policy/utils.py +++ b/nemo_rl/models/policy/utils.py @@ -168,7 +168,7 @@ def configure_expandable_segments() -> None: "but this is not supported on architectures older than Hopper (compute capability < 9). " "Please set expandable_segments to False." ) - + # don't write back to the environment variable since torch is already loaded torch.cuda.memory._set_allocator_settings( ",".join(f"{k}:{v}" for k, v in conf.items()) From a2cdc5a273f019fa1f97406a84759a151c7239d9 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Tue, 19 Aug 2025 21:50:24 -0700 Subject: [PATCH 08/19] disable expandable segments by default Signed-off-by: Yubo Gao --- nemo_rl/models/policy/dtensor_policy_worker.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nemo_rl/models/policy/dtensor_policy_worker.py b/nemo_rl/models/policy/dtensor_policy_worker.py index c4e84b6839..fb4613e52a 100644 --- a/nemo_rl/models/policy/dtensor_policy_worker.py +++ b/nemo_rl/models/policy/dtensor_policy_worker.py @@ -65,7 +65,6 @@ ) from nemo_rl.models.policy.utils import ( configure_dynamo_cache, - configure_expandable_segments, get_gpu_info, get_handle_from_tensor, get_runtime_env_for_policy_worker, @@ -165,9 +164,6 @@ def __init__( # with different order of node_bundles configure_dynamo_cache() - # Only enable expandable_segments on Hopper and newer architectures (compute capability 9.x+) - configure_expandable_segments() - self.cfg = config # torch distributed init. Envars for rank, world_size, and master_addr and master_port are set from the ray remote call torch.distributed.init_process_group(backend="nccl") From d3c9ad7b6e4eb08bb82f14af93738f65f62a61b5 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Wed, 20 Aug 2025 14:47:26 -0400 Subject: [PATCH 09/19] Update README.md Co-authored-by: Shang Wang Signed-off-by: Yubo Gao --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 7aa050de0f..caf42e51b4 100644 --- a/README.md +++ b/README.md @@ -473,7 +473,10 @@ For detailed instructions on how to set up and launch NeMo RL on Slurm or Kubern NRL_FORCE_REBUILD_VENVS=true uv run examples/run_grpo.py ... ``` -- Large amounts of memory fragmentation might occur when running models without support for FlashAttention2. If OOM occurs after a few iterations of training, it may help to tweak the allocator settings to reduce memory fragmentation. To do so, **either**: +- Large amounts of memory fragmentation might occur when running models without support for FlashAttention2. + If OOM occurs after a few iterations of training, it may help to tweak the allocator settings to reduce memory fragmentation. + To do so, specify [`max_split_size_mb`](https://docs.pytorch.org/docs/stable/notes/cuda.html#optimizing-memory-usage-with-pytorch-cuda-alloc-conf) + at **either** one of the following places: 1. Launch training with: ```sh PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:64 uv run python examples/run_dpo.py ... From ae128ccee2e76705edf7b7858105be30ccfac6f4 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Wed, 20 Aug 2025 12:12:14 -0700 Subject: [PATCH 10/19] remove configure_expandable_segments Signed-off-by: Yubo Gao --- .../models/policy/megatron_policy_worker.py | 4 - nemo_rl/models/policy/utils.py | 35 ----- tests/unit/models/policy/test_utils.py | 139 ------------------ 3 files changed, 178 deletions(-) diff --git a/nemo_rl/models/policy/megatron_policy_worker.py b/nemo_rl/models/policy/megatron_policy_worker.py index 4ee48901bc..7245fb5073 100644 --- a/nemo_rl/models/policy/megatron_policy_worker.py +++ b/nemo_rl/models/policy/megatron_policy_worker.py @@ -118,7 +118,6 @@ ) from nemo_rl.models.policy.utils import ( configure_dynamo_cache, - configure_expandable_segments, get_gpu_info, get_handle_from_tensor, get_megatron_checkpoint_dir, @@ -410,9 +409,6 @@ def __init__( # with different order of node_bundles configure_dynamo_cache() - # Only enable expandable_segments on Hopper and newer architectures (compute capability 9.x+) - configure_expandable_segments() - # cfg["model_name"] is allowed to be either an HF model name or a path to an HF checkpoint # check if hf_model_name is a path hf_model_name = self.cfg["model_name"] diff --git a/nemo_rl/models/policy/utils.py b/nemo_rl/models/policy/utils.py index 4d8b70bd08..42662f7a44 100644 --- a/nemo_rl/models/policy/utils.py +++ b/nemo_rl/models/policy/utils.py @@ -165,41 +165,6 @@ def sliding_window_overwrite(model_name: str) -> dict[str, Any]: return overwrite_dict -def configure_expandable_segments() -> None: - """Configure expandable_segments on Hopper and newer architectures (compute capability 9.x+). - - This helps with memory allocation but causes crashes on Ampere GPUs, so we only enable it - on newer architectures. If PYTORCH_CUDA_ALLOC_CONF is already set, preserves existing values. - """ - conf = ( - { - k: v - for k, v in ( - item.split(":") - for item in os.environ.get("PYTORCH_CUDA_ALLOC_CONF").split(",") - ) - } - if os.environ.get("PYTORCH_CUDA_ALLOC_CONF", None) - else {} - ) - - compute_capability = torch.cuda.get_device_properties(0).major - if compute_capability >= 9: - conf["expandable_segments"] = conf.get("expandable_segments", "True") - else: - if conf.get("expandable_segments", "").lower() == "true": - raise RuntimeError( - "expandable_segments is enabled in PYTORCH_CUDA_ALLOC_CONF, " - "but this is not supported on architectures older than Hopper (compute capability < 9). " - "Please set expandable_segments to False." - ) - - # don't write back to the environment variable since torch is already loaded - torch.cuda.memory._set_allocator_settings( - ",".join(f"{k}:{v}" for k, v in conf.items()) - ) - - def configure_dynamo_cache() -> None: """Disable dynamo autotune_local_cache. diff --git a/tests/unit/models/policy/test_utils.py b/tests/unit/models/policy/test_utils.py index 5712985cd3..8fb4d8f8b2 100644 --- a/tests/unit/models/policy/test_utils.py +++ b/tests/unit/models/policy/test_utils.py @@ -14,151 +14,12 @@ import os import unittest.mock -from unittest.mock import MagicMock, patch from nemo_rl.models.policy.utils import ( - configure_expandable_segments, get_megatron_checkpoint_dir, ) -class TestConfigureExpandableSegments(unittest.TestCase): - """Test cases for configure_expandable_segments function.""" - - def setUp(self): - """Set up test environment.""" - # Store original environment variable - self.original_pytorch_cuda_alloc_conf = os.environ.get( - "PYTORCH_CUDA_ALLOC_CONF" - ) - - def tearDown(self): - """Clean up after tests.""" - # Restore original environment variable - if self.original_pytorch_cuda_alloc_conf is not None: - os.environ["PYTORCH_CUDA_ALLOC_CONF"] = ( - self.original_pytorch_cuda_alloc_conf - ) - elif "PYTORCH_CUDA_ALLOC_CONF" in os.environ: - del os.environ["PYTORCH_CUDA_ALLOC_CONF"] - - @patch("torch.cuda.get_device_properties") - def test_hopper_gpu_no_existing_config(self, mock_get_device_properties): - """Test Hopper+ GPU (compute capability >= 9) with no existing PYTORCH_CUDA_ALLOC_CONF.""" - # Mock GPU properties for Hopper+ architecture - mock_device_properties = MagicMock() - mock_device_properties.major = 9 - mock_get_device_properties.return_value = mock_device_properties - - # Ensure no existing config - if "PYTORCH_CUDA_ALLOC_CONF" in os.environ: - del os.environ["PYTORCH_CUDA_ALLOC_CONF"] - - # Call the function - configure_expandable_segments() - - # Verify the environment variable was set correctly - self.assertEqual( - os.environ["PYTORCH_CUDA_ALLOC_CONF"], "expandable_segments:True" - ) - - @patch("torch.cuda.get_device_properties") - def test_hopper_gpu_with_existing_config(self, mock_get_device_properties): - """Test Hopper+ GPU with existing PYTORCH_CUDA_ALLOC_CONF.""" - # Mock GPU properties for Hopper+ architecture - mock_device_properties = MagicMock() - mock_device_properties.major = 9 - mock_get_device_properties.return_value = mock_device_properties - - # Set existing config - existing_config = "max_split_size_mb:128" - os.environ["PYTORCH_CUDA_ALLOC_CONF"] = existing_config - - # Call the function - configure_expandable_segments() - - # Verify the environment variable was updated correctly - expected_config = f"{existing_config},expandable_segments:True" - self.assertEqual(os.environ["PYTORCH_CUDA_ALLOC_CONF"], expected_config) - - @patch("torch.cuda.get_device_properties") - def test_hopper_gpu_already_configured(self, mock_get_device_properties): - """Test Hopper+ GPU with existing config that already has expandable_segments.""" - # Mock GPU properties for Hopper+ architecture - mock_device_properties = MagicMock() - mock_device_properties.major = 9 - mock_get_device_properties.return_value = mock_device_properties - - # Set existing config with expandable_segments already present - existing_config = "max_split_size_mb:128,expandable_segments:False" - os.environ["PYTORCH_CUDA_ALLOC_CONF"] = existing_config - - # Call the function - configure_expandable_segments() - - # Verify the environment variable was not changed - self.assertEqual(os.environ["PYTORCH_CUDA_ALLOC_CONF"], existing_config) - - @patch("torch.cuda.get_device_properties") - def test_ampere_gpu_no_config_change(self, mock_get_device_properties): - """Test Ampere GPU (compute capability < 9) should not modify config.""" - # Mock GPU properties for Ampere architecture - mock_device_properties = MagicMock() - mock_device_properties.major = 8 # Ampere - mock_get_device_properties.return_value = mock_device_properties - - # Set existing config - existing_config = "max_split_size_mb:128" - os.environ["PYTORCH_CUDA_ALLOC_CONF"] = existing_config - - # Call the function - configure_expandable_segments() - - # Verify the environment variable was not changed - self.assertEqual(os.environ["PYTORCH_CUDA_ALLOC_CONF"], existing_config) - - @patch("torch.cuda.get_device_properties") - def test_ampere_gpu_no_existing_config(self, mock_get_device_properties): - """Test Ampere GPU with no existing config should not set anything.""" - # Mock GPU properties for Ampere architecture - mock_device_properties = MagicMock() - mock_device_properties.major = 8 # Ampere - mock_get_device_properties.return_value = mock_device_properties - - # Ensure no existing config - if "PYTORCH_CUDA_ALLOC_CONF" in os.environ: - del os.environ["PYTORCH_CUDA_ALLOC_CONF"] - - # Call the function - configure_expandable_segments() - - # Verify the environment variable was not set - self.assertNotIn("PYTORCH_CUDA_ALLOC_CONF", os.environ) - - @patch("torch.cuda.get_device_properties") - def test_ampere_gpu_with_expandable_segments_true_raises_error( - self, mock_get_device_properties - ): - """Test Ampere GPU with expandable_segments:True in config raises RuntimeError.""" - # Mock GPU properties for Ampere architecture - mock_device_properties = MagicMock() - mock_device_properties.major = 8 # Ampere - mock_get_device_properties.return_value = mock_device_properties - - # Set config with expandable_segments:True - os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" - - # Call the function and expect RuntimeError - with self.assertRaises(RuntimeError) as context: - configure_expandable_segments() - - # Verify the error message - self.assertIn("expandable_segments is enabled", str(context.exception)) - self.assertIn( - "not supported on architectures older than Hopper", str(context.exception) - ) - - class TestGetMegatronCheckpointDir: """Test cases for the get_megatron_checkpoint_dir function.""" From caaa87f9cf40cc4bea923d9472cdc78f66f82f88 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Wed, 20 Aug 2025 16:20:38 -0400 Subject: [PATCH 11/19] Update README.md Co-authored-by: Terry Kong Signed-off-by: Yubo Gao --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index caf42e51b4..77ec8274eb 100644 --- a/README.md +++ b/README.md @@ -479,7 +479,8 @@ For detailed instructions on how to set up and launch NeMo RL on Slurm or Kubern at **either** one of the following places: 1. Launch training with: ```sh - PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:64 uv run python examples/run_dpo.py ... + # This will globally apply to all ray actors + PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:64 uv run python examples/run_dpo.py ... ``` 2. Make the change more permanently by adding this flag in the training configuration: ```yaml From 60e2909bdf1c309b1d85805eba0ad612fd3f1bcd Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Wed, 20 Aug 2025 16:21:59 -0700 Subject: [PATCH 12/19] fix config schema Signed-off-by: Yubo Gao --- ...o-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml | 2 +- nemo_rl/models/policy/__init__.py | 1 + nemo_rl/utils/logger.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml b/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml index 59aac4f950..084ea843f2 100644 --- a/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml +++ b/examples/configs/recipes/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.yaml @@ -40,7 +40,6 @@ policy: #logprob_batch_size: ${policy.train_micro_batch_size} max_total_sequence_length: 12288 precision: "bfloat16" - fsdp_offload_enabled: false dtensor_cfg: enabled: true @@ -94,6 +93,7 @@ logger: tensorboard_enabled: false mlflow_enabled: false monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard + num_val_samples_to_print: 0 # Number of validation samples to pretty print on terminal wandb: project: "nemo-rl" name: "dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long" diff --git a/nemo_rl/models/policy/__init__.py b/nemo_rl/models/policy/__init__.py index 872fff35ff..bd751c3161 100644 --- a/nemo_rl/models/policy/__init__.py +++ b/nemo_rl/models/policy/__init__.py @@ -19,6 +19,7 @@ class DTensorConfig(TypedDict): enabled: bool + env_vars: NotRequired[dict[str, str]] cpu_offload: NotRequired[bool] sequence_parallel: NotRequired[bool] activation_checkpointing: NotRequired[bool] diff --git a/nemo_rl/utils/logger.py b/nemo_rl/utils/logger.py index 4cf2621cd4..e81ed037f3 100644 --- a/nemo_rl/utils/logger.py +++ b/nemo_rl/utils/logger.py @@ -72,7 +72,7 @@ class LoggerConfig(TypedDict): tensorboard_enabled: bool mlflow_enabled: bool wandb: WandbConfig - tensorboard: TensorboardConfig + tensorboard: NotRequired[TensorboardConfig] mlflow: NotRequired[MLflowConfig] monitor_gpus: bool gpu_monitoring: GPUMonitoringConfig From 35918c76b1ddcbaa66bfedb1ef6521006a164bc7 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Thu, 21 Aug 2025 11:04:16 -0700 Subject: [PATCH 13/19] add test script Signed-off-by: Yubo Gao --- ...nstruct-2407-1n8g-fsdp2tp8-actckpt-long.sh | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100755 tests/test_suites/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.sh diff --git a/tests/test_suites/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.sh b/tests/test_suites/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.sh new file mode 100755 index 0000000000..8f9e22f337 --- /dev/null +++ b/tests/test_suites/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.sh @@ -0,0 +1,40 @@ +#!/bin/bash +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) +source $SCRIPT_DIR/common.env + +# ===== BEGIN CONFIG ===== +NUM_NODES=1 +STEPS_PER_RUN=100 +MAX_STEPS=100 +NUM_RUNS=$(( (MAX_STEPS + STEPS_PER_RUN - 1) / STEPS_PER_RUN )) # Round up +NUM_MINUTES=45 +# ===== END CONFIG ===== + +exit_if_max_steps_reached + +# Run the experiment +cd $PROJECT_ROOT +uv run examples/run_dpo.py \ + --config $CONFIG_PATH \ + dpo.max_num_steps=$MAX_STEPS \ + logger.log_dir=$LOG_DIR \ + logger.wandb_enabled=True \ + logger.wandb.project=nemo-rl \ + logger.wandb.name=$EXP_NAME \ + logger.monitor_gpus=True \ + logger.tensorboard_enabled=True \ + checkpointing.enabled=True \ + checkpointing.checkpoint_dir=$CKPT_DIR \ + $@ \ + 2>&1 | tee $RUN_LOG + +# Convert tensorboard logs to json +uv run tests/json_dump_tb_logs.py $LOG_DIR --output_path $JSON_METRICS + +# Only run metrics if the target step is reached +if [[ $(jq 'to_entries | .[] | select(.key == "train/loss") | .value | keys | map(tonumber) | max' $JSON_METRICS) -ge $MAX_STEPS ]]; then + uv run tests/check_metrics.py $JSON_METRICS \ + 'data["train/loss"]["1"] > 0.6990' \ + 'data["train/loss"]["1"] < 0.6992' \ + 'data["train/loss"]["100"] < 0.60' +fi From e1d0447926eef79450acd5b2413fb8509b04069d Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Thu, 21 Aug 2025 13:34:12 -0700 Subject: [PATCH 14/19] will tests pass now? Signed-off-by: Yubo Gao --- tests/test_suites/nightly.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_suites/nightly.txt b/tests/test_suites/nightly.txt index f6a82a8bb7..c53f5017ea 100644 --- a/tests/test_suites/nightly.txt +++ b/tests/test_suites/nightly.txt @@ -53,3 +53,6 @@ tests/test_suites/llm/dpo-llama3.1-8b-instruct-4n8g-fsdp2tp2-quick.v2.sh # Short megatron tests/test_suites/llm/dpo-llama3.1-8b-instruct-4n8g-megatrontp2pp2-quick.sh + +# Long dtensor +tests/test_suites/llm/dpo-mistral-nemo-instruct-2407-1n8g-fsdp2tp8-actckpt-long.sh From 51a26072cd1010b8e257abbedda716ceb3e0f40e Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Fri, 22 Aug 2025 18:52:44 -0700 Subject: [PATCH 15/19] make tests pass Signed-off-by: Yubo Gao --- examples/configs/dpo.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/configs/dpo.yaml b/examples/configs/dpo.yaml index ecc159b484..85bcd72910 100755 --- a/examples/configs/dpo.yaml +++ b/examples/configs/dpo.yaml @@ -45,6 +45,8 @@ policy: precision: "bfloat16" dtensor_cfg: + env_vars: + PYTORCH_CUDA_ALLOC_CONF: "" # Refers to https://docs.pytorch.org/docs/stable/notes/cuda.html#optimizing-memory-usage-with-pytorch-cuda-alloc-conf enabled: true cpu_offload: False sequence_parallel: false From 28cae686e9b5bbef181cf7f5685beb27308b14d8 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Mon, 25 Aug 2025 10:30:30 -0700 Subject: [PATCH 16/19] include field in logger config Signed-off-by: Yubo Gao --- nemo_rl/utils/logger.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo_rl/utils/logger.py b/nemo_rl/utils/logger.py index ac4886cfd3..711b8fd596 100644 --- a/nemo_rl/utils/logger.py +++ b/nemo_rl/utils/logger.py @@ -76,6 +76,7 @@ class LoggerConfig(TypedDict): mlflow: NotRequired[MLflowConfig] monitor_gpus: bool gpu_monitoring: GPUMonitoringConfig + num_val_samples_to_print: NotRequired[int] class LoggerInterface(ABC): From 38aca9c66828fff68f83a5661f4cad98a1b7efa2 Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Mon, 25 Aug 2025 12:56:07 -0700 Subject: [PATCH 17/19] remove expandable segments from v2 Signed-off-by: Yubo Gao --- nemo_rl/models/policy/dtensor_policy_worker_v2.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nemo_rl/models/policy/dtensor_policy_worker_v2.py b/nemo_rl/models/policy/dtensor_policy_worker_v2.py index de435cfda0..41027bb8cf 100644 --- a/nemo_rl/models/policy/dtensor_policy_worker_v2.py +++ b/nemo_rl/models/policy/dtensor_policy_worker_v2.py @@ -75,7 +75,6 @@ ) from nemo_rl.models.policy.utils import ( configure_dynamo_cache, - configure_expandable_segments, get_gpu_info, get_handle_from_tensor, get_runtime_env_for_policy_worker, @@ -126,9 +125,6 @@ def __init__( # with different order of node_bundles configure_dynamo_cache() - # Only enable expandable_segments on Hopper and newer architectures (compute capability 9.x+) - configure_expandable_segments() - self.cfg = config # torch distributed init. Envars for rank, world_size, and master_addr and master_port are set from the ray remote call torch.distributed.init_process_group(backend="nccl") From 189868b570adc4ab59f5761fe85de9e697a0158a Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Mon, 25 Aug 2025 14:57:25 -0700 Subject: [PATCH 18/19] please pass :( Signed-off-by: Yubo Gao --- examples/configs/dpo.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/configs/dpo.yaml b/examples/configs/dpo.yaml index 83179bd0c3..cfe2c011e3 100755 --- a/examples/configs/dpo.yaml +++ b/examples/configs/dpo.yaml @@ -157,9 +157,11 @@ data: logger: log_dir: "logs" # Base directory for all logs wandb_enabled: false # Make sure you do a ``wandb login [Your API key]'' before running + tensorboard_enabled: false mlflow_enabled: false # Disable MLflow logging monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard + num_val_samples_to_print: 0 # Number of validation samples to pretty print on terminal wandb: project: "dpo-dev" name: "dpo" From 3271a0886e47f76d88172ad1ada78eb9768aa1dd Mon Sep 17 00:00:00 2001 From: Yubo Gao Date: Tue, 26 Aug 2025 08:16:40 -0700 Subject: [PATCH 19/19] empty cache Signed-off-by: Yubo Gao --- nemo_rl/models/policy/dtensor_policy_worker_v2.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nemo_rl/models/policy/dtensor_policy_worker_v2.py b/nemo_rl/models/policy/dtensor_policy_worker_v2.py index 41027bb8cf..dbe07990ac 100644 --- a/nemo_rl/models/policy/dtensor_policy_worker_v2.py +++ b/nemo_rl/models/policy/dtensor_policy_worker_v2.py @@ -566,6 +566,8 @@ def train( for mb_idx, mb in enumerate( itertools.chain(mb_iterator, dummy_iterator) ): + torch.cuda.empty_cache() + with torch.autocast(device_type="cuda", dtype=self.dtype): if self.enable_seq_packing: input_ids = mb.get("input_ids").cuda()