From f8b9514d3657cf697f155d36e3c41463feb8db40 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Wed, 2 Apr 2025 14:35:45 -0700 Subject: [PATCH 01/23] CPU offload changes Signed-off-by: Yi-Fu Wu --- examples/configs/grpo_math_1B.yaml | 2 + examples/configs/grpo_math_8B.yaml | 2 + examples/configs/sft.yaml | 2 + nemo_reinforcer/distributed/worker_groups.py | 9 +- nemo_reinforcer/models/policy/__init__.py | 2 + nemo_reinforcer/models/policy/hf_policy.py | 92 ++++++++++++++------ ray.sub | 3 +- 7 files changed, 80 insertions(+), 32 deletions(-) diff --git a/examples/configs/grpo_math_1B.yaml b/examples/configs/grpo_math_1B.yaml index 422e869f56..c79c9bde5b 100644 --- a/examples/configs/grpo_math_1B.yaml +++ b/examples/configs/grpo_math_1B.yaml @@ -31,6 +31,8 @@ policy: logprob_batch_size: 4 max_total_sequence_length: 512 precision: "bfloat16" + expandable_segments_enabled: false + fsdp_offload_enabled: false optimizer: name: "torch.optim.AdamW" diff --git a/examples/configs/grpo_math_8B.yaml b/examples/configs/grpo_math_8B.yaml index 261db927b1..06005425cb 100644 --- a/examples/configs/grpo_math_8B.yaml +++ b/examples/configs/grpo_math_8B.yaml @@ -13,6 +13,8 @@ policy: logprob_batch_size: 2 max_total_sequence_length: 4096 precision: "bfloat16" + expandable_segments_enabled: false + fsdp_offload_enabled: false optimizer: name: "torch.optim.AdamW" diff --git a/examples/configs/sft.yaml b/examples/configs/sft.yaml index e4b116a351..3b20e4cc93 100644 --- a/examples/configs/sft.yaml +++ b/examples/configs/sft.yaml @@ -22,6 +22,8 @@ policy: train_micro_batch_size: 1 max_total_sequence_length: 1024 precision: "float32" + expandable_segments_enabled: false + fsdp_offload_enabled: false optimizer: name: "torch.optim.AdamW" diff --git a/nemo_reinforcer/distributed/worker_groups.py b/nemo_reinforcer/distributed/worker_groups.py index 9cc7798631..94af81b555 100644 --- a/nemo_reinforcer/distributed/worker_groups.py +++ b/nemo_reinforcer/distributed/worker_groups.py @@ -134,7 +134,8 @@ def __call__( if env_vars: if "runtime_env" not in options: options["runtime_env"] = {} - options["runtime_env"]["env_vars"] = env_vars + for k, v in env_vars.items(): + options["runtime_env"]["env_vars"][k] = v # Apply initialization parameters if init_kwargs: @@ -178,6 +179,7 @@ def __init__( workers_per_node: Optional[Union[int, List[int]]] = None, name_prefix: str = "", bundle_indices_list: Optional[List[tuple]] = None, + additional_env_vars: Optional[Dict[str, str]] = None, ): """Initialize a group of distributed Ray workers. @@ -190,13 +192,14 @@ def __init__( bundle_indices_list: Explicit list of (node_idx, [local_bundle_indices]) tuples. Each tuple defines a tied group of workers placed on the same node. If provided, workers_per_node is ignored. + additional_env_vars: Additional environment variables to pass to the workers """ self._workers = [] self._worker_metadata = [] self.cluster = cluster self.name_prefix = name_prefix self.tied_workers_groups = [] - + self.additional_env_vars = additional_env_vars # Maps worker indices to their corresponding tied group index # For example, if worker with index 3 belongs to tied worker group 1, # then worker_to_tied_group_index[3] = 1 @@ -286,6 +289,8 @@ def _create_workers_from_bundle_indices( "NODE_RANK": str(node_idx), } ) + if self.additional_env_vars: + env_vars.update(self.additional_env_vars) # For tensor parallel groups, only the first worker gets bundle_indices worker_bundle_indices = ( diff --git a/nemo_reinforcer/models/policy/__init__.py b/nemo_reinforcer/models/policy/__init__.py index ee2bf2389e..0e57b62ca1 100644 --- a/nemo_reinforcer/models/policy/__init__.py +++ b/nemo_reinforcer/models/policy/__init__.py @@ -25,3 +25,5 @@ class PolicyConfig(TypedDict): logprob_batch_size: int generation: GenerationConfig precision: str + expandable_segments_enabled: bool + fsdp_offload_enabled: bool diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index ebc9e879f2..9c80fbbc8b 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -22,6 +22,7 @@ import torch from torch.distributed.device_mesh import init_device_mesh from torch.distributed.fsdp import ( + CPUOffload, FullyShardedDataParallel, FullStateDictConfig, MixedPrecision, @@ -116,21 +117,28 @@ def do_fsdp(model): buffer_dtype=torch.float32, ) + cpu_offload = ( + CPUOffload(offload_params=True) + if self.cfg["fsdp_offload_enabled"] + else None + ) + return FullyShardedDataParallel( model, device_mesh=mesh, auto_wrap_policy=size_based_auto_wrap_policy, mixed_precision=mp_policy, + cpu_offload=cpu_offload, ) self.model.to("cuda") self.model = do_fsdp(self.model) - self.model = self.move_to_cpu(self.model) + self.model = self.manual_offload_to_cpu(self.model) if self.reference_model is not None: self.reference_model.to("cuda") self.reference_model = do_fsdp(self.reference_model) - self.reference_model = self.move_to_cpu(self.reference_model) - self.model.to("cuda") + self.reference_model = self.manual_offload_to_cpu(self.reference_model) + self.model = self.manual_load_to_gpu(self.model) self._held_reference_model_params = None # register_fsdp_forward_method(self.model, "generate") if init_optimizer: @@ -435,8 +443,8 @@ def use_reference_model(self): original_model = self.model original_reference_model = self.reference_model - self.model = self.move_to_cpu(self.model) - self.reference_model = self.reference_model.to("cuda") + self.model = self.manual_offload_to_cpu(self.model) + self.reference_model = self.manual_load_to_gpu(self.reference_model) # Swap the references self.model, self.reference_model = self.reference_model, self.model @@ -449,8 +457,8 @@ def use_reference_model(self): finally: # Restore original references and device placement - self.reference_model = self.move_to_cpu(original_reference_model) - self.model = original_model.to("cuda") + self.reference_model = self.manual_offload_to_cpu(original_reference_model) + self.model = self.manual_load_to_gpu(original_model) gc.collect() torch.cuda.empty_cache() @@ -724,27 +732,28 @@ def get_weight_ipc_handles(self, offload_model=True): data[name] = reduce_tensor(p.detach()) if offload_model: - self.model = self.move_to_cpu(self.model) + self.model = self.manual_offload_to_cpu(self.model) gc.collect() torch.cuda.empty_cache() return {device_uuid: data} def prepare_for_lp_inference(self): - self.model.to("cuda") + self.model = self.manual_load_to_gpu(self.model) self.model.eval() self.offload_before_refit() def prepare_for_training(self, *args, **kwargs): # onload models and optimizer state to cuda - self.model.to("cuda") + self.model = self.manual_load_to_gpu(self.model) self.model.train() - # Move optimizer state to CUDA if it exists - if hasattr(self, "optimizer") and self.optimizer is not None: - for state in self.optimizer.state.values(): - for k, v in state.items(): - if torch.is_tensor(v) and not v.is_cuda: - state[k] = v.to("cuda") + if not self.cfg["fsdp_offload_enabled"]: + # Move optimizer state to CUDA if it exists + if hasattr(self, "optimizer") and self.optimizer is not None: + for state in self.optimizer.state.values(): + for k, v in state.items(): + if torch.is_tensor(v) and not v.is_cuda: + state[k] = v.to("cuda") torch.cuda.empty_cache() @@ -752,14 +761,15 @@ def prepare_for_training(self, *args, **kwargs): def offload_before_refit(self): """Offload the optimizer and buffers to the CPU.""" torch.randn(1).cuda() # wake up torch allocator - if hasattr(self, "optimizer") and self.optimizer is not None: - for state in self.optimizer.state.values(): - for k, v in state.items(): - if torch.is_tensor(v): - state[k] = v.to("cpu") + if not self.cfg["fsdp_offload_enabled"]: + if hasattr(self, "optimizer") and self.optimizer is not None: + for state in self.optimizer.state.values(): + for k, v in state.items(): + if torch.is_tensor(v): + state[k] = v.to("cpu") - for buffer in self.model.buffers(): - buffer.data = buffer.data.to("cpu") + for buffer in self.model.buffers(): + buffer.data = buffer.data.to("cpu") gc.collect() torch.cuda.empty_cache() @@ -774,7 +784,7 @@ def offload_before_refit(self): @torch.no_grad() def offload_after_refit(self): # Offload as much as possible on the CPU - self.model = self.move_to_cpu(self.model) + self.model = self.manual_offload_to_cpu(self.model) self.model.eval() torch.randn(1).cuda() # wake up torch allocator self.offload_before_refit() # rerun the old offload function @@ -792,15 +802,35 @@ def offload_after_refit(self): f"GPU Memory after refit complete: {allocated:.2f}GB allocated, {reserved:.2f}GB reserved" ) - def move_to_cpu(self, model): + def manual_offload_to_cpu(self, model): + if self.cfg["fsdp_offload_enabled"]: + return model + for param in model.parameters(): - param.data = param.data.to("cpu") + param.data = param.data.to("cpu", non_blocking=True) + if hasattr(param, "_local_shard"): + param._local_shard = param.data + if param.grad is not None: + param.grad = param.grad.to("cpu", non_blocking=True) - for buffer in model.buffers(): - buffer.data = buffer.data.to("cpu") + if hasattr(model, "_fsdp_wrapped_module"): + self.manual_offload_to_cpu(model._fsdp_wrapped_module) + + return model + + def manual_load_to_gpu(self, model): + if self.cfg["fsdp_offload_enabled"]: + return model + + for param in model.parameters(): + param.data = param.data.to("cuda", non_blocking=True) + if hasattr(param, "_local_shard"): + param._local_shard = param.data + if param.grad is not None: + param.grad = param.grad.to("cuda", non_blocking=True) if hasattr(model, "_fsdp_wrapped_module"): - model._fsdp_wrapped_module.to("cpu") + self.manual_load_to_gpu(model._fsdp_wrapped_module) return model @@ -912,11 +942,15 @@ def __init__( optimizer_path=optimizer_path, init_reference_model=init_reference_model, ) + additional_env_vars = {} + if config["expandable_segments_enabled"]: + additional_env_vars["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" self.worker_group = RayWorkerGroup( cluster, worker_builder, name_prefix=name_prefix, workers_per_node=workers_per_node, + additional_env_vars=additional_env_vars, ) self.dp_size = self.worker_group.world_size self.cfg = config diff --git a/ray.sub b/ray.sub index 7258aec045..3fb8b76585 100644 --- a/ray.sub +++ b/ray.sub @@ -17,6 +17,7 @@ set -eoux pipefail CONTAINER=$CONTAINER MOUNTS=$MOUNTS COMMAND=${COMMAND:-} # This is a script relative to the SLURM_SUBMIT_DIR. If left empty, it will leave the cluster idle after it's brought up. +PORT=${PORT:-41993} ######################################################## COMMON_SRUN_ARGS="" @@ -54,7 +55,7 @@ done head_node=${nodes_array[0]} head_node_ip=${ip_addresses_array[0]} -port=41993 +port=$PORT ip_head=$head_node_ip:$port # First we start the head of the ray cluster on one of the physical nodes From c02d45474c81589c54681d0c24c8076d87f7e26e Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Thu, 3 Apr 2025 10:42:59 -0700 Subject: [PATCH 02/23] Fix unit tests Signed-off-by: Yi-Fu Wu --- tests/unit/models/generation/test_vllm_generation.py | 4 ++++ tests/unit/models/policy/test_hf_ray_policy.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/tests/unit/models/generation/test_vllm_generation.py b/tests/unit/models/generation/test_vllm_generation.py index 8c810e31dc..b9d97d6ce2 100644 --- a/tests/unit/models/generation/test_vllm_generation.py +++ b/tests/unit/models/generation/test_vllm_generation.py @@ -225,6 +225,8 @@ def test_vllm_generation_with_hf_training(cluster, tokenizer): "max_new_tokens": 16, "do_sample": False, "precision": "float32", + "expandable_segments_enabled": False, + "fsdp_offload_enabled": False, "optimizer": { "name": "torch.optim.AdamW", "kwargs": { @@ -481,6 +483,8 @@ def test_vllm_policy_weight_update(cluster, tokenizer, tensor_parallel_size): "max_new_tokens": 16, "do_sample": False, "precision": "float32", + "expandable_segments_enabled": False, + "fsdp_offload_enabled": False, "optimizer": { "name": "torch.optim.AdamW", "kwargs": { diff --git a/tests/unit/models/policy/test_hf_ray_policy.py b/tests/unit/models/policy/test_hf_ray_policy.py index ded244feac..dba9b1a734 100644 --- a/tests/unit/models/policy/test_hf_ray_policy.py +++ b/tests/unit/models/policy/test_hf_ray_policy.py @@ -33,6 +33,8 @@ "learning_rate": 5e-6, "logprob_batch_size": 1, "precision": "float32", + "expandable_segments_enabled": False, + "fsdp_offload_enabled": False, "generation": { "backend": "hf", "temperature": 1.0, From 7229b8747f6bf0659102a1a6403ee2f75a0fdb9f Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Thu, 3 Apr 2025 20:00:10 -0700 Subject: [PATCH 03/23] Add memory tracking in unit test Signed-off-by: Yi-Fu Wu --- .../unit/models/policy/test_hf_ray_policy.py | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/tests/unit/models/policy/test_hf_ray_policy.py b/tests/unit/models/policy/test_hf_ray_policy.py index dba9b1a734..68af549e7e 100644 --- a/tests/unit/models/policy/test_hf_ray_policy.py +++ b/tests/unit/models/policy/test_hf_ray_policy.py @@ -241,9 +241,17 @@ def training_setup(): cluster.shutdown() policy.worker_group.shutdown() +def get_max_gpu_utilization(policy): + max_memory_allocated = 0 + max_memory_reserved = 0 + gpu_infos = ray.get([w.get_gpu_info.remote() for w in policy.worker_group.workers]) + for info in gpu_infos: + max_memory_allocated = max(max_memory_allocated, info["memory_allocated_mb"]) + max_memory_reserved = max(max_memory_reserved, info["memory_reserved_mb"]) + return max_memory_allocated, max_memory_reserved @pytest.mark.timeout(180) -def test_hf_policy_training(training_setup): +def test_hf_policy_training(training_setup, tracker): def verify_loss_tensor(loss_tensor): assert not torch.isnan(loss_tensor).any(), "Loss should not be NaN" assert not torch.isinf(loss_tensor).any(), "Loss should not be Inf" @@ -274,10 +282,26 @@ def verify_loss_tensor(loss_tensor): print(f"Training loss: {results['loss']}") policy.finish_training() + assert losses[0] > losses[-1], "Loss should decrease over training iterations" + + after_training_mem_allocated, after_training_mem_reserved = get_max_gpu_utilization(policy) + print(f"Max GPU Utilization after training: {after_training_mem_allocated:,.1f} MB allocated, " \ + f"{after_training_mem_reserved:,.1f} MB reserved") + tracker.track("after_training_mem_allocated", after_training_mem_allocated) + tracker.track("after_training_mem_reserved", after_training_mem_reserved) + + policy.offload_after_refit() + after_offload_mem_allocated, after_offload_mem_reserved = get_max_gpu_utilization(policy) + print(f"Max GPU Utilization after offload: {after_offload_mem_allocated:,.1f} MB allocated, " \ + f"{after_offload_mem_reserved:,.1f} MB reserved") + tracker.track("after_offload_mem_allocated", after_offload_mem_allocated) + tracker.track("after_offload_mem_reserved", after_offload_mem_reserved) # Verify loss changed between iterations (model parameters were updated) - assert losses[0] > losses[-1], "Loss should decrease over training iterations" + # Compare memory after offload to memory after training + assert after_training_mem_allocated > 10_000, "Memory after training should be more than 10GB" + assert after_offload_mem_allocated < 1_200, "Memory after offload should be less than 1.2GB" @pytest.fixture def generation_setup(request): From 3b13a9af5d51a9b1482b5b6f153c201bcded2d22 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 4 Apr 2025 11:26:32 -0700 Subject: [PATCH 04/23] ruff Signed-off-by: Yi-Fu Wu --- .../unit/models/policy/test_hf_ray_policy.py | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/tests/unit/models/policy/test_hf_ray_policy.py b/tests/unit/models/policy/test_hf_ray_policy.py index 68af549e7e..ed102fa5e9 100644 --- a/tests/unit/models/policy/test_hf_ray_policy.py +++ b/tests/unit/models/policy/test_hf_ray_policy.py @@ -241,6 +241,7 @@ def training_setup(): cluster.shutdown() policy.worker_group.shutdown() + def get_max_gpu_utilization(policy): max_memory_allocated = 0 max_memory_reserved = 0 @@ -250,6 +251,7 @@ def get_max_gpu_utilization(policy): max_memory_reserved = max(max_memory_reserved, info["memory_reserved_mb"]) return max_memory_allocated, max_memory_reserved + @pytest.mark.timeout(180) def test_hf_policy_training(training_setup, tracker): def verify_loss_tensor(loss_tensor): @@ -284,24 +286,37 @@ def verify_loss_tensor(loss_tensor): policy.finish_training() assert losses[0] > losses[-1], "Loss should decrease over training iterations" - after_training_mem_allocated, after_training_mem_reserved = get_max_gpu_utilization(policy) - print(f"Max GPU Utilization after training: {after_training_mem_allocated:,.1f} MB allocated, " \ - f"{after_training_mem_reserved:,.1f} MB reserved") + after_training_mem_allocated, after_training_mem_reserved = get_max_gpu_utilization( + policy + ) + print( + f"Max GPU Utilization after training: {after_training_mem_allocated:,.1f} MB allocated, " + f"{after_training_mem_reserved:,.1f} MB reserved" + ) tracker.track("after_training_mem_allocated", after_training_mem_allocated) tracker.track("after_training_mem_reserved", after_training_mem_reserved) policy.offload_after_refit() - after_offload_mem_allocated, after_offload_mem_reserved = get_max_gpu_utilization(policy) - print(f"Max GPU Utilization after offload: {after_offload_mem_allocated:,.1f} MB allocated, " \ - f"{after_offload_mem_reserved:,.1f} MB reserved") + after_offload_mem_allocated, after_offload_mem_reserved = get_max_gpu_utilization( + policy + ) + print( + f"Max GPU Utilization after offload: {after_offload_mem_allocated:,.1f} MB allocated, " + f"{after_offload_mem_reserved:,.1f} MB reserved" + ) tracker.track("after_offload_mem_allocated", after_offload_mem_allocated) tracker.track("after_offload_mem_reserved", after_offload_mem_reserved) # Verify loss changed between iterations (model parameters were updated) # Compare memory after offload to memory after training - assert after_training_mem_allocated > 10_000, "Memory after training should be more than 10GB" - assert after_offload_mem_allocated < 1_200, "Memory after offload should be less than 1.2GB" + assert after_training_mem_allocated > 10_000, ( + "Memory after training should be more than 10GB" + ) + assert after_offload_mem_allocated < 1_200, ( + "Memory after offload should be less than 1.2GB" + ) + @pytest.fixture def generation_setup(request): From 95c4508fbd816879c77fbdfb0bed1ab89bdbc814 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 4 Apr 2025 11:57:44 -0700 Subject: [PATCH 05/23] Remove expandable_segments_enabled Signed-off-by: Yi-Fu Wu --- examples/configs/grpo_math_1B.yaml | 1 - examples/configs/sft.yaml | 1 - nemo_reinforcer/models/policy/__init__.py | 1 - nemo_reinforcer/models/policy/hf_policy.py | 4 ---- tests/unit/models/generation/test_vllm_generation.py | 2 -- tests/unit/models/policy/test_hf_ray_policy.py | 1 - 6 files changed, 10 deletions(-) diff --git a/examples/configs/grpo_math_1B.yaml b/examples/configs/grpo_math_1B.yaml index c79c9bde5b..04ae81b569 100644 --- a/examples/configs/grpo_math_1B.yaml +++ b/examples/configs/grpo_math_1B.yaml @@ -31,7 +31,6 @@ policy: logprob_batch_size: 4 max_total_sequence_length: 512 precision: "bfloat16" - expandable_segments_enabled: false fsdp_offload_enabled: false optimizer: diff --git a/examples/configs/sft.yaml b/examples/configs/sft.yaml index 3b20e4cc93..59d6cc259e 100644 --- a/examples/configs/sft.yaml +++ b/examples/configs/sft.yaml @@ -22,7 +22,6 @@ policy: train_micro_batch_size: 1 max_total_sequence_length: 1024 precision: "float32" - expandable_segments_enabled: false fsdp_offload_enabled: false optimizer: diff --git a/nemo_reinforcer/models/policy/__init__.py b/nemo_reinforcer/models/policy/__init__.py index 0e57b62ca1..273356b5f6 100644 --- a/nemo_reinforcer/models/policy/__init__.py +++ b/nemo_reinforcer/models/policy/__init__.py @@ -25,5 +25,4 @@ class PolicyConfig(TypedDict): logprob_batch_size: int generation: GenerationConfig precision: str - expandable_segments_enabled: bool fsdp_offload_enabled: bool diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index 519eb455c0..b8b1ee349a 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -945,15 +945,11 @@ def __init__( optimizer_path=optimizer_path, init_reference_model=init_reference_model, ) - additional_env_vars = {} - if config["expandable_segments_enabled"]: - additional_env_vars["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" self.worker_group = RayWorkerGroup( cluster, worker_builder, name_prefix=name_prefix, workers_per_node=workers_per_node, - additional_env_vars=additional_env_vars, ) self.dp_size = self.worker_group.world_size self.cfg = config diff --git a/tests/unit/models/generation/test_vllm_generation.py b/tests/unit/models/generation/test_vllm_generation.py index cc08ba5c46..80d6e09d56 100644 --- a/tests/unit/models/generation/test_vllm_generation.py +++ b/tests/unit/models/generation/test_vllm_generation.py @@ -226,7 +226,6 @@ def test_vllm_generation_with_hf_training(cluster, tokenizer): "max_new_tokens": 16, "do_sample": False, "precision": "float32", - "expandable_segments_enabled": False, "fsdp_offload_enabled": False, "optimizer": { "name": "torch.optim.AdamW", @@ -516,7 +515,6 @@ def test_vllm_weight_update_and_prefix_cache_reset( "max_new_tokens": 16, "do_sample": False, "precision": "float32", - "expandable_segments_enabled": False, "fsdp_offload_enabled": False, "optimizer": {"name": "torch.optim.AdamW", "kwargs": {"lr": 1e-6}}, } diff --git a/tests/unit/models/policy/test_hf_ray_policy.py b/tests/unit/models/policy/test_hf_ray_policy.py index ed102fa5e9..05b6c27b17 100644 --- a/tests/unit/models/policy/test_hf_ray_policy.py +++ b/tests/unit/models/policy/test_hf_ray_policy.py @@ -33,7 +33,6 @@ "learning_rate": 5e-6, "logprob_batch_size": 1, "precision": "float32", - "expandable_segments_enabled": False, "fsdp_offload_enabled": False, "generation": { "backend": "hf", From 44edbe836726efb4767ab08088c47c9fe11791a5 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 4 Apr 2025 14:09:27 -0700 Subject: [PATCH 06/23] whitespace Signed-off-by: Yi-Fu Wu --- tests/unit/models/generation/test_vllm_generation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/models/generation/test_vllm_generation.py b/tests/unit/models/generation/test_vllm_generation.py index 80d6e09d56..260a6de4ae 100644 --- a/tests/unit/models/generation/test_vllm_generation.py +++ b/tests/unit/models/generation/test_vllm_generation.py @@ -515,7 +515,7 @@ def test_vllm_weight_update_and_prefix_cache_reset( "max_new_tokens": 16, "do_sample": False, "precision": "float32", - "fsdp_offload_enabled": False, + "fsdp_offload_enabled": False, "optimizer": {"name": "torch.optim.AdamW", "kwargs": {"lr": 1e-6}}, } From 412f57dd24a1af53649f1db7fbd24cae1a03732d Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 4 Apr 2025 18:21:05 -0700 Subject: [PATCH 07/23] Offload/onload buffers Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/models/policy/hf_policy.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index b8b1ee349a..96429fe321 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -815,6 +815,8 @@ def manual_offload_to_cpu(self, model): param._local_shard = param.data if param.grad is not None: param.grad = param.grad.to("cpu", non_blocking=True) + for buffer in model.buffers(): + buffer.data = buffer.data.to("cpu", non_blocking=True) if hasattr(model, "_fsdp_wrapped_module"): self.manual_offload_to_cpu(model._fsdp_wrapped_module) @@ -831,6 +833,8 @@ def manual_load_to_gpu(self, model): param._local_shard = param.data if param.grad is not None: param.grad = param.grad.to("cuda", non_blocking=True) + for buffer in model.buffers(): + buffer.data = buffer.data.to("cuda", non_blocking=True) if hasattr(model, "_fsdp_wrapped_module"): self.manual_load_to_gpu(model._fsdp_wrapped_module) From ba48445ddd8e7c1a8e7c63dc4a3752c8640858b6 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 4 Apr 2025 19:40:46 -0700 Subject: [PATCH 08/23] Activation checkpointing Signed-off-by: Yi-Fu Wu --- examples/configs/grpo_math_1B.yaml | 1 + examples/configs/grpo_math_8B.yaml | 1 + examples/configs/sft.yaml | 1 + nemo_reinforcer/models/policy/hf_policy.py | 2 ++ 4 files changed, 5 insertions(+) diff --git a/examples/configs/grpo_math_1B.yaml b/examples/configs/grpo_math_1B.yaml index 04ae81b569..6d5d46be2a 100644 --- a/examples/configs/grpo_math_1B.yaml +++ b/examples/configs/grpo_math_1B.yaml @@ -32,6 +32,7 @@ policy: max_total_sequence_length: 512 precision: "bfloat16" fsdp_offload_enabled: false + activation_checkpointing_enabled: false optimizer: name: "torch.optim.AdamW" diff --git a/examples/configs/grpo_math_8B.yaml b/examples/configs/grpo_math_8B.yaml index 06005425cb..1d3f96214b 100644 --- a/examples/configs/grpo_math_8B.yaml +++ b/examples/configs/grpo_math_8B.yaml @@ -15,6 +15,7 @@ policy: precision: "bfloat16" expandable_segments_enabled: false fsdp_offload_enabled: false + activation_checkpointing_enabled: false optimizer: name: "torch.optim.AdamW" diff --git a/examples/configs/sft.yaml b/examples/configs/sft.yaml index 59d6cc259e..0c6dcefdb3 100644 --- a/examples/configs/sft.yaml +++ b/examples/configs/sft.yaml @@ -23,6 +23,7 @@ policy: max_total_sequence_length: 1024 precision: "float32" fsdp_offload_enabled: false + activation_checkpointing_enabled: false optimizer: name: "torch.optim.AdamW" diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index 96429fe321..9706be292f 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -132,6 +132,8 @@ def do_fsdp(model): ) self.model.to("cuda") + if self.cfg["activation_checkpointing_enabled"]: + self.model.gradient_checkpointing_enable(gradient_checkpointing_kwargs={'use_reentrant': False}) self.model = do_fsdp(self.model) self.model = self.manual_offload_to_cpu(self.model) if self.reference_model is not None: From ef25d4be81853c24688bf07b7332a0f7da84752b Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 4 Apr 2025 20:31:45 -0700 Subject: [PATCH 09/23] ruff + tests Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/models/policy/hf_policy.py | 4 +++- tests/unit/models/generation/test_vllm_generation.py | 2 ++ tests/unit/models/policy/test_hf_ray_policy.py | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index 9706be292f..ac9fa8dc80 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -133,7 +133,9 @@ def do_fsdp(model): self.model.to("cuda") if self.cfg["activation_checkpointing_enabled"]: - self.model.gradient_checkpointing_enable(gradient_checkpointing_kwargs={'use_reentrant': False}) + self.model.gradient_checkpointing_enable( + gradient_checkpointing_kwargs={"use_reentrant": False} + ) self.model = do_fsdp(self.model) self.model = self.manual_offload_to_cpu(self.model) if self.reference_model is not None: diff --git a/tests/unit/models/generation/test_vllm_generation.py b/tests/unit/models/generation/test_vllm_generation.py index 260a6de4ae..7de7f9307a 100644 --- a/tests/unit/models/generation/test_vllm_generation.py +++ b/tests/unit/models/generation/test_vllm_generation.py @@ -227,6 +227,7 @@ def test_vllm_generation_with_hf_training(cluster, tokenizer): "do_sample": False, "precision": "float32", "fsdp_offload_enabled": False, + "activation_checkpointing_enabled": False, "optimizer": { "name": "torch.optim.AdamW", "kwargs": { @@ -516,6 +517,7 @@ def test_vllm_weight_update_and_prefix_cache_reset( "do_sample": False, "precision": "float32", "fsdp_offload_enabled": False, + "activation_checkpointing_enabled": False, "optimizer": {"name": "torch.optim.AdamW", "kwargs": {"lr": 1e-6}}, } diff --git a/tests/unit/models/policy/test_hf_ray_policy.py b/tests/unit/models/policy/test_hf_ray_policy.py index 05b6c27b17..1f95296346 100644 --- a/tests/unit/models/policy/test_hf_ray_policy.py +++ b/tests/unit/models/policy/test_hf_ray_policy.py @@ -34,6 +34,7 @@ "logprob_batch_size": 1, "precision": "float32", "fsdp_offload_enabled": False, + "activation_checkpointing_enabled": False, "generation": { "backend": "hf", "temperature": 1.0, From b26b86a62b0c502f92c4f124ef39ad850997c9f8 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Thu, 10 Apr 2025 02:26:37 -0700 Subject: [PATCH 10/23] No fsdp for single gpu Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/models/policy/hf_policy.py | 17 ++- .../unit/models/policy/test_hf_ray_policy.py | 109 ++++++++++-------- 2 files changed, 75 insertions(+), 51 deletions(-) diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index ac9fa8dc80..d6ba9331bd 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -109,6 +109,10 @@ def __init__( # ------------------------------------------------ def do_fsdp(model): + if world_size == 1: + print("world_size == 1, skipping FSDP") + return model + # Create a device mesh with 'world_size' GPUs in a 1D arrangement. mesh = init_device_mesh("cuda", (world_size,)) mp_policy = MixedPrecision( @@ -542,7 +546,11 @@ def generate( # Set attention mask for the actual tokens (at the end for left padding) left_padded_attention_mask[i, seq_len - length :] = 1 - outputs = self.model.module.generate( + if isinstance(self.model, torch.distributed.fsdp.FullyShardedDataParallel): + generation_module = self.model.module + else: + generation_module = self.model + outputs = generation_module.generate( input_ids=left_padded_input_ids, attention_mask=left_padded_attention_mask, max_new_tokens=gen_cfg["max_new_tokens"], @@ -721,6 +729,11 @@ def report_device_id(self) -> str: def get_weight_ipc_handles(self, offload_model=True): from torch.multiprocessing.reductions import reduce_tensor + # If the model is not FSDP, then we need to manually move it to the GPU + # For an FSDP model, model.state_dict() will move the params to the GPU + if not isinstance(self.model, torch.distributed.fsdp.FullyShardedDataParallel): + self.model = self.manual_load_to_gpu(self.model) + # TODO @sahilj: do this without an allgather (maybe FSDP2) params = self.model.state_dict() @@ -775,8 +788,6 @@ def offload_before_refit(self): if torch.is_tensor(v): state[k] = v.to("cpu") - for buffer in self.model.buffers(): - buffer.data = buffer.data.to("cpu") gc.collect() torch.cuda.empty_cache() diff --git a/tests/unit/models/policy/test_hf_ray_policy.py b/tests/unit/models/policy/test_hf_ray_policy.py index 1f95296346..900603a41d 100644 --- a/tests/unit/models/policy/test_hf_ray_policy.py +++ b/tests/unit/models/policy/test_hf_ray_policy.py @@ -69,20 +69,20 @@ def gc_collect(): @pytest.fixture -def policy_setup(): +def policy_setup(num_gpus): """Setup and teardown for policy tests - creates a virtual cluster and policy.""" policy = None cluster = None - cluster_name = "test" - print(f"Creating virtual cluster '{cluster_name}'...") + cluster_name = f"test-init-{num_gpus}gpu" + print(f"Creating virtual cluster '{cluster_name}' for {num_gpus} GPUs...") cluster = RayVirtualCluster( name=cluster_name, - bundle_ct_per_node_list=[2], # Single node, 2 gpus + bundle_ct_per_node_list=[num_gpus], use_gpus=True, - num_gpus_per_node=2, # Using both GPUs - max_colocated_worker_groups=1, # Only one worker group + num_gpus_per_node=num_gpus, + max_colocated_worker_groups=1, ) config = basic_llama_test_config @@ -99,15 +99,18 @@ def policy_setup(): @pytest.mark.timeout(180) -def test_hf_policy_init(policy_setup): +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) +def test_hf_policy_init(policy_setup, num_gpus): policy, cluster = policy_setup # Verify cluster and policy were properly created assert policy is not None, "Policy was not created properly" assert cluster is not None, "Cluster was not created properly" - # Verify we have two workers, one per GPU - assert len(policy.worker_group.workers) == 2, "Should have 2 workers, one per GPU" + # Verify we have workers matching the GPU count + assert len(policy.worker_group.workers) == num_gpus, ( + f"Should have {num_gpus} worker(s), one per GPU" + ) # Check workers are alive worker_alive = ray.get([w.is_alive.remote() for w in policy.worker_group.workers]) @@ -123,42 +126,47 @@ def test_hf_policy_init(policy_setup): # Check 1: Verify workers have different ranks gpu_ranks = [info["rank"] for info in gpu_infos] - assert len(set(gpu_ranks)) == 2, f"Expected 2 different ranks, got {gpu_ranks}" - assert set(gpu_ranks) == {0, 1}, f"Expected ranks 0 and 1, got {gpu_ranks}" + assert len(set(gpu_ranks)) == num_gpus, f"Expected {num_gpus} different ranks, got {gpu_ranks}" + assert set(gpu_ranks) == set(range(num_gpus)), f"Expected ranks {set(range(num_gpus))}, got {gpu_ranks}" # Check 2: Verify workers have different local_ranks local_ranks = [info["local_rank"] for info in gpu_infos] - assert len(set(local_ranks)) == 2, ( - f"Expected 2 different local_ranks, got {local_ranks}" + assert len(set(local_ranks)) == num_gpus, ( + f"Expected {num_gpus} different local_ranks, got {local_ranks}" ) - assert set(local_ranks) == {0, 1}, ( - f"Expected local_ranks 0 and 1, got {local_ranks}" + assert set(local_ranks) == set(range(num_gpus)), ( + f"Expected local_ranks {set(range(num_gpus))}, got {local_ranks}" ) # Check 3: Verify workers have different CUDA_VISIBLE_DEVICES cuda_visible_devices = [ info["env_vars"].get("CUDA_VISIBLE_DEVICES") for info in gpu_infos ] - assert len(set(cuda_visible_devices)) == 2, ( - f"Expected different CUDA_VISIBLE_DEVICES, got {cuda_visible_devices}" - ) + if num_gpus > 1: + assert len(set(cuda_visible_devices)) == num_gpus, ( + f"Expected different CUDA_VISIBLE_DEVICES, got {cuda_visible_devices}" + ) + else: + assert len(set(cuda_visible_devices)) == 1, ( + f"Expected one CUDA_VISIBLE_DEVICES for 1 GPU, got {cuda_visible_devices}" + ) # Check 4: Verify all workers report correct world_size for info in gpu_infos: - assert info["world_size"] == 2, ( - f"Expected world_size=2, got {info['world_size']}" + assert info["world_size"] == num_gpus, ( + f"Expected world_size={num_gpus}, got {info['world_size']}" ) - assert info["env_vars"]["WORLD_SIZE"] == "2", ( - f"Expected WORLD_SIZE=2, got {info['env_vars']['WORLD_SIZE']}" + assert info["env_vars"]["WORLD_SIZE"] == str(num_gpus), ( + f"Expected WORLD_SIZE={num_gpus}, got {info['env_vars']['WORLD_SIZE']}" ) - # Check 5: Verify significant GPU memory is allocated (at least 1GB) on both GPUs + # Check 5: Verify significant GPU memory is allocated (at least 1GB) on all GPUs for info in gpu_infos: assert info["memory_allocated_mb"] > 1000, ( f"Not enough memory allocated on GPU for rank {info['rank']}: {info['memory_allocated_mb']:.2f} MB" ) - # Check 6: Verify model parameters are on CUDA devices for both workers + # Check 6: Verify model parameters are on CUDA devices for all workers for info in gpu_infos: param_sample = list(info["parameter_sample"].values())[0] assert "cuda" in param_sample["device"], ( @@ -180,7 +188,7 @@ def test_hf_policy_init(policy_setup): @pytest.fixture -def training_setup(): +def training_setup(num_gpus): """Setup and teardown specifically for training tests.""" policy = None cluster = None @@ -189,15 +197,15 @@ def training_setup(): try: # Create resources with unique name - cluster_name = "test-train" - print(f"Creating training virtual cluster '{cluster_name}'...") + cluster_name = f"test-train-{num_gpus}gpu" + print(f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs...") cluster = RayVirtualCluster( name=cluster_name, - bundle_ct_per_node_list=[2], # Single node, 2 gpus + bundle_ct_per_node_list=[num_gpus], use_gpus=True, - num_gpus_per_node=2, # Using both GPUs - max_colocated_worker_groups=1, # Only one worker group + num_gpus_per_node=num_gpus, + max_colocated_worker_groups=1, ) config = basic_llama_test_config @@ -253,7 +261,8 @@ def get_max_gpu_utilization(policy): @pytest.mark.timeout(180) -def test_hf_policy_training(training_setup, tracker): +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) +def test_hf_policy_training(training_setup, tracker, num_gpus): def verify_loss_tensor(loss_tensor): assert not torch.isnan(loss_tensor).any(), "Loss should not be NaN" assert not torch.isinf(loss_tensor).any(), "Loss should not be Inf" @@ -293,8 +302,8 @@ def verify_loss_tensor(loss_tensor): f"Max GPU Utilization after training: {after_training_mem_allocated:,.1f} MB allocated, " f"{after_training_mem_reserved:,.1f} MB reserved" ) - tracker.track("after_training_mem_allocated", after_training_mem_allocated) - tracker.track("after_training_mem_reserved", after_training_mem_reserved) + tracker.track(f"after_training_mem_allocated_{num_gpus}gpu", after_training_mem_allocated) + tracker.track(f"after_training_mem_reserved_{num_gpus}gpu", after_training_mem_reserved) policy.offload_after_refit() after_offload_mem_allocated, after_offload_mem_reserved = get_max_gpu_utilization( @@ -304,10 +313,8 @@ def verify_loss_tensor(loss_tensor): f"Max GPU Utilization after offload: {after_offload_mem_allocated:,.1f} MB allocated, " f"{after_offload_mem_reserved:,.1f} MB reserved" ) - tracker.track("after_offload_mem_allocated", after_offload_mem_allocated) - tracker.track("after_offload_mem_reserved", after_offload_mem_reserved) - - # Verify loss changed between iterations (model parameters were updated) + tracker.track(f"after_offload_mem_allocated_{num_gpus}gpu", after_offload_mem_allocated) + tracker.track(f"after_offload_mem_reserved_{num_gpus}gpu", after_offload_mem_reserved) # Compare memory after offload to memory after training assert after_training_mem_allocated > 10_000, ( @@ -319,30 +326,34 @@ def verify_loss_tensor(loss_tensor): @pytest.fixture -def generation_setup(request): +def generation_setup(request, num_gpus): """Setup and teardown specifically for generation tests.""" policy = None cluster = None data = None + init_reference_model = request.param try: # Create resources with unique name - cluster_name = "test-generate" - print(f"Creating generation virtual cluster '{cluster_name}'...") + cluster_name = f"test-gen-{num_gpus}gpu-ref{init_reference_model}" + print( + f"Creating generation virtual cluster '{cluster_name}' for {num_gpus} GPUs " + f"(ref_model={init_reference_model})..." + ) cluster = RayVirtualCluster( name=cluster_name, - bundle_ct_per_node_list=[2], # Single node, 2 gpus + bundle_ct_per_node_list=[num_gpus], use_gpus=True, - num_gpus_per_node=2, # Using both GPUs - max_colocated_worker_groups=1, # Only one worker group + num_gpus_per_node=num_gpus, + max_colocated_worker_groups=1, ) config = basic_llama_test_config print("Creating generation HfPolicy...") policy = HfPolicy( - cluster=cluster, config=config, init_reference_model=request.param + cluster=cluster, config=config, init_reference_model=init_reference_model ) # Create a test batch @@ -407,8 +418,9 @@ def generation_setup(request): @pytest.mark.timeout(180) +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) @pytest.mark.parametrize("generation_setup", [False], indirect=True) -def test_hf_policy_generation(generation_setup, tracker): +def test_hf_policy_generation(generation_setup, tracker, num_gpus): policy, cluster, data, tokenizer, prompts, expected_generations = generation_setup # Verify resources were created properly @@ -459,7 +471,7 @@ def test_hf_policy_generation(generation_setup, tracker): torch.exp(torch.abs(results["logprobs"] - fprop_results["logprobs"])) ) print(f"avg prob mult error: {avg_prob_mult_error}") - tracker.track("avg_prob_mult_error", float(avg_prob_mult_error)) + tracker.track(f"avg_prob_mult_error_{num_gpus}gpu", float(avg_prob_mult_error)) assert avg_prob_mult_error <= 1.025 # get logprobs for the expected generations @@ -484,7 +496,7 @@ def test_hf_policy_generation(generation_setup, tracker): expected_logprobs = policy.get_logprobs(expected_data)["logprobs"] mean_lps = torch.mean(expected_logprobs * expected_tokenized["attention_mask"]) - tracker.track("mean_lps", float(mean_lps)) + tracker.track(f"mean_lps_{num_gpus}gpu", float(mean_lps)) assert mean_lps > -1.7, "Expected logprobs should be greater than -1.7" assert mean_lps < -1.4, "Expected logprobs should be less than -1.4" @@ -494,8 +506,9 @@ def test_hf_policy_generation(generation_setup, tracker): @pytest.mark.timeout(180) +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) @pytest.mark.parametrize("generation_setup", [True], indirect=True) -def test_all_hf_policy_generation_lps_ref_training(generation_setup): +def test_all_hf_policy_generation_lps_ref_training(generation_setup, num_gpus): policy, cluster, data, tokenizer, prompts, expected_generations = generation_setup # Verify resources were created properly From 65d47dea6bae9cfe8ed4f1dd51a28c9c988f8ce8 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Thu, 10 Apr 2025 02:29:06 -0700 Subject: [PATCH 11/23] Cleanup Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/distributed/worker_groups.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/nemo_reinforcer/distributed/worker_groups.py b/nemo_reinforcer/distributed/worker_groups.py index 63a142a894..192c62a818 100644 --- a/nemo_reinforcer/distributed/worker_groups.py +++ b/nemo_reinforcer/distributed/worker_groups.py @@ -190,7 +190,6 @@ def __init__( workers_per_node: Optional[Union[int, List[int]]] = None, name_prefix: str = "", bundle_indices_list: Optional[List[tuple]] = None, - additional_env_vars: Optional[Dict[str, str]] = None, ): """Initialize a group of distributed Ray workers. @@ -203,14 +202,12 @@ def __init__( bundle_indices_list: Explicit list of (node_idx, [local_bundle_indices]) tuples. Each tuple defines a tied group of workers placed on the same node. If provided, workers_per_node is ignored. - additional_env_vars: Additional environment variables to pass to the workers """ self._workers = [] self._worker_metadata = [] self.cluster = cluster self.name_prefix = name_prefix self.tied_workers_groups = [] - self.additional_env_vars = additional_env_vars # Maps worker indices to their corresponding tied group index # For example, if worker with index 3 belongs to tied worker group 1, # then worker_to_tied_group_index[3] = 1 @@ -300,8 +297,6 @@ def _create_workers_from_bundle_indices( "NODE_RANK": str(node_idx), } ) - if self.additional_env_vars: - env_vars.update(self.additional_env_vars) # For tensor parallel groups, only the first worker gets bundle_indices worker_bundle_indices = ( From 45f6de406410d4c292c319507ca03d69ee46395a Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Thu, 10 Apr 2025 09:51:05 -0700 Subject: [PATCH 12/23] ruff Signed-off-by: Yi-Fu Wu --- .../unit/models/policy/test_hf_ray_policy.py | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/tests/unit/models/policy/test_hf_ray_policy.py b/tests/unit/models/policy/test_hf_ray_policy.py index 8e789ab990..c23221d036 100644 --- a/tests/unit/models/policy/test_hf_ray_policy.py +++ b/tests/unit/models/policy/test_hf_ray_policy.py @@ -137,8 +137,12 @@ def test_hf_policy_init(policy_setup, num_gpus): # Check 1: Verify workers have different ranks gpu_ranks = [info["rank"] for info in gpu_infos] - assert len(set(gpu_ranks)) == num_gpus, f"Expected {num_gpus} different ranks, got {gpu_ranks}" - assert set(gpu_ranks) == set(range(num_gpus)), f"Expected ranks {set(range(num_gpus))}, got {gpu_ranks}" + assert len(set(gpu_ranks)) == num_gpus, ( + f"Expected {num_gpus} different ranks, got {gpu_ranks}" + ) + assert set(gpu_ranks) == set(range(num_gpus)), ( + f"Expected ranks {set(range(num_gpus))}, got {gpu_ranks}" + ) # Check 2: Verify workers have different local_ranks local_ranks = [info["local_rank"] for info in gpu_infos] @@ -159,7 +163,7 @@ def test_hf_policy_init(policy_setup, num_gpus): ) else: assert len(set(cuda_visible_devices)) == 1, ( - f"Expected one CUDA_VISIBLE_DEVICES for 1 GPU, got {cuda_visible_devices}" + f"Expected one CUDA_VISIBLE_DEVICES for 1 GPU, got {cuda_visible_devices}" ) # Check 4: Verify all workers report correct world_size @@ -209,7 +213,9 @@ def training_setup(num_gpus): try: # Create resources with unique name cluster_name = f"test-train-{num_gpus}gpu" - print(f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs...") + print( + f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs..." + ) cluster = RayVirtualCluster( name=cluster_name, @@ -313,8 +319,12 @@ def verify_loss_tensor(loss_tensor): f"Max GPU Utilization after training: {after_training_mem_allocated:,.1f} MB allocated, " f"{after_training_mem_reserved:,.1f} MB reserved" ) - tracker.track(f"after_training_mem_allocated_{num_gpus}gpu", after_training_mem_allocated) - tracker.track(f"after_training_mem_reserved_{num_gpus}gpu", after_training_mem_reserved) + tracker.track( + f"after_training_mem_allocated_{num_gpus}gpu", after_training_mem_allocated + ) + tracker.track( + f"after_training_mem_reserved_{num_gpus}gpu", after_training_mem_reserved + ) policy.offload_after_refit() after_offload_mem_allocated, after_offload_mem_reserved = get_max_gpu_utilization( @@ -324,8 +334,12 @@ def verify_loss_tensor(loss_tensor): f"Max GPU Utilization after offload: {after_offload_mem_allocated:,.1f} MB allocated, " f"{after_offload_mem_reserved:,.1f} MB reserved" ) - tracker.track(f"after_offload_mem_allocated_{num_gpus}gpu", after_offload_mem_allocated) - tracker.track(f"after_offload_mem_reserved_{num_gpus}gpu", after_offload_mem_reserved) + tracker.track( + f"after_offload_mem_allocated_{num_gpus}gpu", after_offload_mem_allocated + ) + tracker.track( + f"after_offload_mem_reserved_{num_gpus}gpu", after_offload_mem_reserved + ) # Compare memory after offload to memory after training assert after_training_mem_allocated > 10_000, ( From b087ae2c23b3f3da402d1018ebe1a173acb2aabe Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Thu, 10 Apr 2025 10:22:06 -0700 Subject: [PATCH 13/23] ruff Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/models/policy/hf_policy.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index 281e535628..2fa0ec83f9 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -552,7 +552,9 @@ def generate( # Set attention mask for the actual tokens (at the end for left padding) left_padded_attention_mask[i, seq_len - length :] = 1 - if isinstance(self.model, torch.distributed.fsdp.FullyShardedDataParallel): + if isinstance( + self.model, torch.distributed.fsdp.FullyShardedDataParallel + ): generation_module = self.model.module else: generation_module = self.model @@ -800,7 +802,6 @@ def offload_before_refit(self): if torch.is_tensor(v): state[k] = v.to("cpu") - gc.collect() torch.cuda.empty_cache() From 9194a570b87163746f85526daccf74b7f53e7bbc Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Thu, 10 Apr 2025 11:24:13 -0700 Subject: [PATCH 14/23] Fix unit tests Signed-off-by: Yi-Fu Wu --- tests/unit/utils/test_native_checkpoint.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/utils/test_native_checkpoint.py b/tests/unit/utils/test_native_checkpoint.py index 8f71badea1..5f4657f746 100755 --- a/tests/unit/utils/test_native_checkpoint.py +++ b/tests/unit/utils/test_native_checkpoint.py @@ -37,6 +37,8 @@ "logprob_batch_size": 1, "max_total_sequence_length": 1024, "precision": "float32", + "fsdp_offload_enabled": False, + "activation_checkpointing_enabled": False, } From baf8396bb972fb032414b186be9be1322550dd9d Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Thu, 10 Apr 2025 12:37:38 -0700 Subject: [PATCH 15/23] Fix checkpoint Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/utils/native_checkpoint.py | 5 +- tests/unit/utils/test_native_checkpoint.py | 60 +++++++++++++--------- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/nemo_reinforcer/utils/native_checkpoint.py b/nemo_reinforcer/utils/native_checkpoint.py index 6f22ea82fd..c6fdd15c73 100644 --- a/nemo_reinforcer/utils/native_checkpoint.py +++ b/nemo_reinforcer/utils/native_checkpoint.py @@ -150,7 +150,10 @@ def save_checkpoint( save_hf: Whether to save in HuggingFace format """ if save_hf: - model_state_dict = model._fsdp_wrapped_module.state_dict() + if hasattr(model, "_fsdp_wrapped_module"): + model_state_dict = model._fsdp_wrapped_module.state_dict() + else: + model_state_dict = model.state_dict() if torch.distributed.get_rank() == 0: # Create a new path by appending "-hf" to the weights path diff --git a/tests/unit/utils/test_native_checkpoint.py b/tests/unit/utils/test_native_checkpoint.py index 5f4657f746..ae34c4b228 100755 --- a/tests/unit/utils/test_native_checkpoint.py +++ b/tests/unit/utils/test_native_checkpoint.py @@ -59,18 +59,20 @@ def mock_experiment(): return model, optimizer, scheduler -@pytest.fixture(scope="module") -def cluster(): +@pytest.fixture(scope="function") +def cluster(num_gpus): """Create a virtual cluster for testing.""" - # Create a cluster with 2 GPU + cluster_name = f"test-cluster-{num_gpus}gpu" + print(f"Creating virtual cluster '{cluster_name}' for {num_gpus} GPUs...") + # Create a cluster with num_gpus GPU virtual_cluster = RayVirtualCluster( - bundle_ct_per_node_list=[2], # 1 node with 2 GPU bundle + bundle_ct_per_node_list=[num_gpus], # 1 node with num_gpus GPU bundle use_gpus=True, max_colocated_worker_groups=1, - num_gpus_per_node=2, # Use available GPUs - name="test-cluster", + num_gpus_per_node=num_gpus, # Use available GPUs + name=cluster_name, ) - yield virtual_cluster + yield virtual_cluster # Yield only the cluster object virtual_cluster.shutdown() @@ -239,7 +241,8 @@ def test_save_and_load_model_and_optimizer(mock_experiment): check_dict_equality(new_optimizer.state_dict(), optimizer.state_dict()) -def test_save_and_load_hf_checkpoint(policy): +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) +def test_save_and_load_hf_checkpoint(policy, num_gpus): ## warm up with a forward pass ## this is needed before saving a checkpoint because FSDP does some lazy initialization input_ids = torch.randint(0, 16000, (4, 128)) # 4 sequences, each of length 128 @@ -263,23 +266,34 @@ def test_save_and_load_hf_checkpoint(policy): ) ## make sure we save both HF and DCP checkpoints - assert set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp"))) == { - "__0_0.distcp", - "__1_0.distcp", - ".metadata", - } - ## 1B model has two shards - assert set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp-hf"))) == { - "config.json", - "generation_config.json", - "model-00001-of-00002.safetensors", - "model-00002-of-00002.safetensors", - "model.safetensors.index.json", - } + # Dynamically create the expected set of distcp files based on num_gpus + expected_distcp_files = {f"__{rank}_0.distcp" for rank in range(num_gpus)} + expected_files = expected_distcp_files.union({".metadata"}) - coverted_model = AutoModelForCausalLM.from_pretrained( - os.path.join(tmp_dir, "test_hf_and_dcp-hf") + assert ( + set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp"))) == expected_files ) + + hf_save_dir = os.path.join(tmp_dir, "test_hf_and_dcp-hf") + hf_files = set(os.listdir(hf_save_dir)) + + # Check the HF saved files structure: could be single or sharded + expected_common_hf_files = {"config.json", "generation_config.json"} + if "model.safetensors" in hf_files: + # Single file format (1 GPU or smaller model) + expected_hf_files = expected_common_hf_files.union({"model.safetensors"}) + else: + # Sharded format (>=2 GPUs or larger model) + expected_hf_files = expected_common_hf_files.union( + { + "model-00001-of-00002.safetensors", + "model-00002-of-00002.safetensors", + "model.safetensors.index.json", + } + ) + assert hf_files == expected_hf_files + + coverted_model = AutoModelForCausalLM.from_pretrained(hf_save_dir) original_model = AutoModelForCausalLM.from_pretrained( simple_policy_config["model_name"] ) From 3faa60eb1a46dcae72d006ed1e76d58b00253ca9 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 11 Apr 2025 02:45:46 -0700 Subject: [PATCH 16/23] Increase cicd timeout Signed-off-by: Yi-Fu Wu --- .github/workflows/cicd-main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index 4041f31bc3..a367beec76 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -137,7 +137,7 @@ jobs: if: ${{ needs.pre-flight.outputs.run_ci == 'true' }} with: RUNNER: self-hosted-azure - TIMEOUT: 15 + TIMEOUT: 20 UNIT_TEST_SCRIPT: | cd /opt/reinforcer uv run --no-sync bash -x ./tests/run_unit.sh From 2eef1a5dbab56fd7180e02708a45319734aa6310 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 11 Apr 2025 09:29:37 -0700 Subject: [PATCH 17/23] Update nemo_reinforcer/models/policy/hf_policy.py Co-authored-by: Parth Chadha Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/models/policy/hf_policy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index 2fa0ec83f9..12effddbba 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -111,7 +111,7 @@ def __init__( def do_fsdp(model): if world_size == 1: - print("world_size == 1, skipping FSDP") + print("[INFO] Using a single GPU - skipping FSDP wrapper to avoid GPU memory offloading issues") return model # Create a device mesh with 'world_size' GPUs in a 1D arrangement. From e926f92e882cb035e020016aa12831b0ae4ecb84 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 11 Apr 2025 17:06:24 -0700 Subject: [PATCH 18/23] Fix configs Signed-off-by: Yi-Fu Wu --- examples/configs/grpo_math_8B.yaml | 1 - nemo_reinforcer/models/policy/__init__.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/configs/grpo_math_8B.yaml b/examples/configs/grpo_math_8B.yaml index 1ead08eacf..5f98129e32 100644 --- a/examples/configs/grpo_math_8B.yaml +++ b/examples/configs/grpo_math_8B.yaml @@ -14,7 +14,6 @@ policy: logprob_batch_size: 2 max_total_sequence_length: 4096 precision: "bfloat16" - expandable_segments_enabled: false fsdp_offload_enabled: false activation_checkpointing_enabled: false diff --git a/nemo_reinforcer/models/policy/__init__.py b/nemo_reinforcer/models/policy/__init__.py index 417294a034..1111c023fb 100644 --- a/nemo_reinforcer/models/policy/__init__.py +++ b/nemo_reinforcer/models/policy/__init__.py @@ -27,3 +27,4 @@ class PolicyConfig(TypedDict): generation: GenerationConfig precision: str fsdp_offload_enabled: bool + activation_checkpointing_enabled: bool From 2887c56dcd57af73edd9ed88174e1ee40ee27cb8 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 11 Apr 2025 19:34:00 -0700 Subject: [PATCH 19/23] ruff Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/models/policy/hf_policy.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index 12effddbba..8a205c5078 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -111,7 +111,9 @@ def __init__( def do_fsdp(model): if world_size == 1: - print("[INFO] Using a single GPU - skipping FSDP wrapper to avoid GPU memory offloading issues") + print( + "[INFO] Using a single GPU - skipping FSDP wrapper to avoid GPU memory offloading issues" + ) return model # Create a device mesh with 'world_size' GPUs in a 1D arrangement. From be4a3fa763f9d93859be9228eace4ccfd55fff99 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Tue, 15 Apr 2025 09:11:34 -0700 Subject: [PATCH 20/23] Comment about held params Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/models/policy/hf_policy.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index a29a6e0788..4a35ee7abd 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -763,6 +763,8 @@ def get_weight_ipc_handles(self, offload_model=True): # Replace the original params with the converted ones params = dtype_params + # For FSDP1, params may get GC'ed before sending to vllm, + # so we need to hold a reference to them self._held_reference_model_params = params data = {} device_uuid = self.report_device_id() From 989ea205a76fd57fc4cf628185e3508e212ea97e Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Wed, 16 Apr 2025 10:39:55 -0700 Subject: [PATCH 21/23] Fix test Signed-off-by: Yi-Fu Wu --- tests/unit/utils/test_native_checkpoint.py | 43 +++++++++++++++------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/tests/unit/utils/test_native_checkpoint.py b/tests/unit/utils/test_native_checkpoint.py index efbd36fa59..8d96476ae4 100755 --- a/tests/unit/utils/test_native_checkpoint.py +++ b/tests/unit/utils/test_native_checkpoint.py @@ -326,7 +326,9 @@ def test_save_and_load_hf_checkpoint(policy, num_gpus): ## make sure converted model matches the original check_dict_equality(converted_model.state_dict(), original_model.state_dict()) -def test_convert_dcp_to_hf(policy): + +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) +def test_convert_dcp_to_hf(policy, num_gpus): ## warm up with a forward pass ## this is needed before saving a checkpoint because FSDP does some lazy initialization input_ids = torch.randint(0, 16000, (4, 128)) # 4 sequences, each of length 128 @@ -349,20 +351,33 @@ def test_convert_dcp_to_hf(policy): save_torch_dist=True, ) + # Dynamically create the expected set of distcp files based on num_gpus + expected_distcp_files = {f"__{rank}_0.distcp" for rank in range(num_gpus)} + expected_files = expected_distcp_files.union({".metadata"}) + ## make sure we save both HF and DCP checkpoints - assert set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp"))) == { - "__0_0.distcp", - "__1_0.distcp", - ".metadata", - } - ## 1B model has two shards - assert set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp-hf"))) == { - "config.json", - "generation_config.json", - "model-00001-of-00002.safetensors", - "model-00002-of-00002.safetensors", - "model.safetensors.index.json", - } + assert ( + set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp"))) == expected_files + ) + + # Check the HF saved files structure: could be single or sharded + hf_save_dir = os.path.join(tmp_dir, "test_hf_and_dcp-hf") + hf_files = set(os.listdir(hf_save_dir)) + expected_common_hf_files = {"config.json", "generation_config.json"} + + if "model.safetensors" in hf_files: + # Single file format (1 GPU or smaller model) + expected_hf_files = expected_common_hf_files.union({"model.safetensors"}) + else: + # Sharded format (>=2 GPUs or larger model) + expected_hf_files = expected_common_hf_files.union( + { + "model-00001-of-00002.safetensors", + "model-00002-of-00002.safetensors", + "model.safetensors.index.json", + } + ) + assert hf_files == expected_hf_files offline_converted_model_path = convert_dcp_to_hf( os.path.join(tmp_dir, "test_hf_and_dcp"), From 6dd553cd0c7c9add423f11902839b1a8cbec00ce Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Wed, 16 Apr 2025 11:36:16 -0700 Subject: [PATCH 22/23] Longer cicd timeout Signed-off-by: Yi-Fu Wu --- .github/workflows/cicd-main.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index 7d2e77028c..db3ed5015c 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -145,7 +145,7 @@ jobs: if: ${{ needs.pre-flight.outputs.test_level != 'none' }} with: RUNNER: self-hosted-azure - TIMEOUT: 20 + TIMEOUT: 25 UNIT_TEST_SCRIPT: | cd /opt/reinforcer if [[ "${{ needs.pre-flight.outputs.test_level }}" =~ ^(L0|L1|L2)$ ]]; then @@ -182,12 +182,12 @@ jobs: cd /opt/reinforcer cat <- ${{ (needs.pre-flight.outputs.test_level == 'none') || - (needs.pre-flight.outputs.test_level != 'none' && - needs.lint-check.result == 'success' && - needs.sphinx-build.result == 'success' && + (needs.pre-flight.outputs.test_level != 'none' && + needs.lint-check.result == 'success' && + needs.sphinx-build.result == 'success' && needs.tests.result == 'success') }} CI_SKIP: ${{ github.event.label.name == 'Skip CICD' }} From 31223ac310196d9237064dcc4004987466885cab Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Wed, 16 Apr 2025 12:17:59 -0700 Subject: [PATCH 23/23] 30 min timeout Signed-off-by: Yi-Fu Wu --- .github/workflows/cicd-main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index db3ed5015c..64a0fd37e3 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -145,7 +145,7 @@ jobs: if: ${{ needs.pre-flight.outputs.test_level != 'none' }} with: RUNNER: self-hosted-azure - TIMEOUT: 25 + TIMEOUT: 30 UNIT_TEST_SCRIPT: | cd /opt/reinforcer if [[ "${{ needs.pre-flight.outputs.test_level }}" =~ ^(L0|L1|L2)$ ]]; then