diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index 8fc9308931..a71c267cdd 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -150,7 +150,7 @@ jobs: if: ${{ needs.pre-flight.outputs.test_level != 'none' }} with: RUNNER: self-hosted-azure - TIMEOUT: 20 + TIMEOUT: 30 UNIT_TEST_SCRIPT: | cd /opt/reinforcer if [[ "${{ needs.pre-flight.outputs.test_level }}" =~ ^(L0|L1|L2)$ ]]; then diff --git a/examples/configs/grpo_math_1B.yaml b/examples/configs/grpo_math_1B.yaml index ac37b0f0ac..91dcb96893 100644 --- a/examples/configs/grpo_math_1B.yaml +++ b/examples/configs/grpo_math_1B.yaml @@ -33,6 +33,8 @@ policy: logprob_batch_size: 4 max_total_sequence_length: 512 precision: "bfloat16" + fsdp_offload_enabled: false + activation_checkpointing_enabled: false optimizer: name: "torch.optim.AdamW" diff --git a/examples/configs/grpo_math_8B.yaml b/examples/configs/grpo_math_8B.yaml index 56d65a465c..e791c66a34 100644 --- a/examples/configs/grpo_math_8B.yaml +++ b/examples/configs/grpo_math_8B.yaml @@ -15,6 +15,8 @@ policy: logprob_batch_size: 2 max_total_sequence_length: 4096 precision: "bfloat16" + fsdp_offload_enabled: false + activation_checkpointing_enabled: false optimizer: name: "torch.optim.AdamW" diff --git a/examples/configs/sft.yaml b/examples/configs/sft.yaml index dad99b2479..a7b4efbbcc 100644 --- a/examples/configs/sft.yaml +++ b/examples/configs/sft.yaml @@ -25,6 +25,8 @@ policy: train_micro_batch_size: 1 max_total_sequence_length: 1024 precision: "float32" + fsdp_offload_enabled: false + activation_checkpointing_enabled: false optimizer: name: "torch.optim.AdamW" diff --git a/nemo_reinforcer/distributed/worker_groups.py b/nemo_reinforcer/distributed/worker_groups.py index 4e3bbbf2a6..383ad1ed15 100644 --- a/nemo_reinforcer/distributed/worker_groups.py +++ b/nemo_reinforcer/distributed/worker_groups.py @@ -135,7 +135,8 @@ def __call__( if env_vars: if "runtime_env" not in options: options["runtime_env"] = {} - options["runtime_env"]["env_vars"] = env_vars + for k, v in env_vars.items(): + options["runtime_env"]["env_vars"][k] = v # Apply initialization parameters if init_kwargs: @@ -207,7 +208,6 @@ def __init__( self.cluster = cluster self.name_prefix = name_prefix self.tied_workers_groups = [] - # Maps worker indices to their corresponding tied group index # For example, if worker with index 3 belongs to tied worker group 1, # then worker_to_tied_group_index[3] = 1 diff --git a/nemo_reinforcer/models/policy/__init__.py b/nemo_reinforcer/models/policy/__init__.py index 43c9422e2a..e14f9fc3eb 100644 --- a/nemo_reinforcer/models/policy/__init__.py +++ b/nemo_reinforcer/models/policy/__init__.py @@ -31,3 +31,5 @@ class PolicyConfig(TypedDict): logprob_batch_size: int generation: GenerationConfig precision: str + fsdp_offload_enabled: bool + activation_checkpointing_enabled: bool diff --git a/nemo_reinforcer/models/policy/hf_policy.py b/nemo_reinforcer/models/policy/hf_policy.py index 0141a382ef..d8462370fa 100644 --- a/nemo_reinforcer/models/policy/hf_policy.py +++ b/nemo_reinforcer/models/policy/hf_policy.py @@ -22,6 +22,7 @@ import torch from torch.distributed.device_mesh import init_device_mesh from torch.distributed.fsdp import ( + CPUOffload, FullyShardedDataParallel, MixedPrecision, ) @@ -110,6 +111,12 @@ def __init__( # ------------------------------------------------ def do_fsdp(model): + if world_size == 1: + print( + "[INFO] Using a single GPU - skipping FSDP wrapper to avoid GPU memory offloading issues" + ) + return model + # Create a device mesh with 'world_size' GPUs in a 1D arrangement. mesh = init_device_mesh("cuda", (world_size,)) mp_policy = MixedPrecision( @@ -118,21 +125,32 @@ def do_fsdp(model): buffer_dtype=torch.float32, ) + cpu_offload = ( + CPUOffload(offload_params=True) + if self.cfg["fsdp_offload_enabled"] + else None + ) + return FullyShardedDataParallel( model, device_mesh=mesh, auto_wrap_policy=size_based_auto_wrap_policy, mixed_precision=mp_policy, + cpu_offload=cpu_offload, ) self.model.to("cuda") + if self.cfg["activation_checkpointing_enabled"]: + self.model.gradient_checkpointing_enable( + gradient_checkpointing_kwargs={"use_reentrant": False} + ) self.model = do_fsdp(self.model) - self.model = self.move_to_cpu(self.model) + self.model = self.manual_offload_to_cpu(self.model) if self.reference_model is not None: self.reference_model.to("cuda") self.reference_model = do_fsdp(self.reference_model) - self.reference_model = self.move_to_cpu(self.reference_model) - self.model.to("cuda") + self.reference_model = self.manual_offload_to_cpu(self.reference_model) + self.model = self.manual_load_to_gpu(self.model) self._held_reference_model_params = None # register_fsdp_forward_method(self.model, "generate") if init_optimizer: @@ -440,8 +458,8 @@ def use_reference_model(self): original_model = self.model original_reference_model = self.reference_model - self.model = self.move_to_cpu(self.model) - self.reference_model = self.reference_model.to("cuda") + self.model = self.manual_offload_to_cpu(self.model) + self.reference_model = self.manual_load_to_gpu(self.reference_model) # Swap the references self.model, self.reference_model = self.reference_model, self.model @@ -454,8 +472,8 @@ def use_reference_model(self): finally: # Restore original references and device placement - self.reference_model = self.move_to_cpu(original_reference_model) - self.model = original_model.to("cuda") + self.reference_model = self.manual_offload_to_cpu(original_reference_model) + self.model = self.manual_load_to_gpu(original_model) gc.collect() torch.cuda.empty_cache() @@ -537,7 +555,13 @@ def generate( # Set attention mask for the actual tokens (at the end for left padding) left_padded_attention_mask[i, seq_len - length :] = 1 - outputs = self.model.module.generate( + if isinstance( + self.model, torch.distributed.fsdp.FullyShardedDataParallel + ): + generation_module = self.model.module + else: + generation_module = self.model + outputs = generation_module.generate( input_ids=left_padded_input_ids, attention_mask=left_padded_attention_mask, max_new_tokens=gen_cfg["max_new_tokens"], @@ -724,6 +748,11 @@ def report_device_id(self) -> str: def get_weight_ipc_handles(self, offload_model=True): from torch.multiprocessing.reductions import reduce_tensor + # If the model is not FSDP, then we need to manually move it to the GPU + # For an FSDP model, model.state_dict() will move the params to the GPU + if not isinstance(self.model, torch.distributed.fsdp.FullyShardedDataParallel): + self.model = self.manual_load_to_gpu(self.model) + # TODO @sahilj: do this without an allgather (maybe FSDP2) params = self.model.state_dict() @@ -735,6 +764,8 @@ def get_weight_ipc_handles(self, offload_model=True): # Replace the original params with the converted ones params = dtype_params + # For FSDP1, params may get GC'ed before sending to vllm, + # so we need to hold a reference to them self._held_reference_model_params = params data = {} device_uuid = self.report_device_id() @@ -742,27 +773,28 @@ def get_weight_ipc_handles(self, offload_model=True): data[name] = reduce_tensor(p.detach()) if offload_model: - self.model = self.move_to_cpu(self.model) + self.model = self.manual_offload_to_cpu(self.model) gc.collect() torch.cuda.empty_cache() return {device_uuid: data} def prepare_for_lp_inference(self): - self.model.to("cuda") + self.model = self.manual_load_to_gpu(self.model) self.model.eval() self.offload_before_refit() def prepare_for_training(self, *args, **kwargs): # onload models and optimizer state to cuda - self.model.to("cuda") + self.model = self.manual_load_to_gpu(self.model) self.model.train() - # Move optimizer state to CUDA if it exists - if hasattr(self, "optimizer") and self.optimizer is not None: - for state in self.optimizer.state.values(): - for k, v in state.items(): - if torch.is_tensor(v) and not v.is_cuda: - state[k] = v.to("cuda") + if not self.cfg["fsdp_offload_enabled"]: + # Move optimizer state to CUDA if it exists + if hasattr(self, "optimizer") and self.optimizer is not None: + for state in self.optimizer.state.values(): + for k, v in state.items(): + if torch.is_tensor(v) and not v.is_cuda: + state[k] = v.to("cuda") torch.cuda.empty_cache() @@ -770,14 +802,12 @@ def prepare_for_training(self, *args, **kwargs): def offload_before_refit(self): """Offload the optimizer and buffers to the CPU.""" torch.randn(1).cuda() # wake up torch allocator - if hasattr(self, "optimizer") and self.optimizer is not None: - for state in self.optimizer.state.values(): - for k, v in state.items(): - if torch.is_tensor(v): - state[k] = v.to("cpu") - - for buffer in self.model.buffers(): - buffer.data = buffer.data.to("cpu") + if not self.cfg["fsdp_offload_enabled"]: + if hasattr(self, "optimizer") and self.optimizer is not None: + for state in self.optimizer.state.values(): + for k, v in state.items(): + if torch.is_tensor(v): + state[k] = v.to("cpu") gc.collect() torch.cuda.empty_cache() @@ -792,7 +822,7 @@ def offload_before_refit(self): @torch.no_grad() def offload_after_refit(self): # Offload as much as possible on the CPU - self.model = self.move_to_cpu(self.model) + self.model = self.manual_offload_to_cpu(self.model) self.model.eval() torch.randn(1).cuda() # wake up torch allocator self.offload_before_refit() # rerun the old offload function @@ -810,15 +840,39 @@ def offload_after_refit(self): f"GPU Memory after refit complete: {allocated:.2f}GB allocated, {reserved:.2f}GB reserved" ) - def move_to_cpu(self, model): + def manual_offload_to_cpu(self, model): + if self.cfg["fsdp_offload_enabled"]: + return model + for param in model.parameters(): - param.data = param.data.to("cpu") + param.data = param.data.to("cpu", non_blocking=True) + if hasattr(param, "_local_shard"): + param._local_shard = param.data + if param.grad is not None: + param.grad = param.grad.to("cpu", non_blocking=True) + for buffer in model.buffers(): + buffer.data = buffer.data.to("cpu", non_blocking=True) + + if hasattr(model, "_fsdp_wrapped_module"): + self.manual_offload_to_cpu(model._fsdp_wrapped_module) + return model + + def manual_load_to_gpu(self, model): + if self.cfg["fsdp_offload_enabled"]: + return model + + for param in model.parameters(): + param.data = param.data.to("cuda", non_blocking=True) + if hasattr(param, "_local_shard"): + param._local_shard = param.data + if param.grad is not None: + param.grad = param.grad.to("cuda", non_blocking=True) for buffer in model.buffers(): - buffer.data = buffer.data.to("cpu") + buffer.data = buffer.data.to("cuda", non_blocking=True) if hasattr(model, "_fsdp_wrapped_module"): - model._fsdp_wrapped_module.to("cpu") + self.manual_load_to_gpu(model._fsdp_wrapped_module) return model diff --git a/nemo_reinforcer/utils/native_checkpoint.py b/nemo_reinforcer/utils/native_checkpoint.py index d163b2afd0..ede0300025 100644 --- a/nemo_reinforcer/utils/native_checkpoint.py +++ b/nemo_reinforcer/utils/native_checkpoint.py @@ -154,7 +154,10 @@ def save_checkpoint( save_hf: Whether to save in HuggingFace format """ if save_hf: - model_state_dict = model._fsdp_wrapped_module.state_dict() + if hasattr(model, "_fsdp_wrapped_module"): + model_state_dict = model._fsdp_wrapped_module.state_dict() + else: + model_state_dict = model.state_dict() if torch.distributed.get_rank() == 0: # Create a new path by appending "-hf" to the weights path diff --git a/tests/unit/models/generation/test_vllm_generation.py b/tests/unit/models/generation/test_vllm_generation.py index d86f4983cc..8bd3960f73 100644 --- a/tests/unit/models/generation/test_vllm_generation.py +++ b/tests/unit/models/generation/test_vllm_generation.py @@ -61,6 +61,8 @@ "max_new_tokens": 16, "do_sample": False, "precision": "float32", + "fsdp_offload_enabled": False, + "activation_checkpointing_enabled": False, "optimizer": { "name": "torch.optim.AdamW", "kwargs": { diff --git a/tests/unit/models/policy/test_hf_ray_policy.py b/tests/unit/models/policy/test_hf_ray_policy.py index f56d3b8df3..88f17e1c29 100644 --- a/tests/unit/models/policy/test_hf_ray_policy.py +++ b/tests/unit/models/policy/test_hf_ray_policy.py @@ -36,6 +36,8 @@ "learning_rate": 5e-6, "logprob_batch_size": 1, "precision": "float32", + "fsdp_offload_enabled": False, + "activation_checkpointing_enabled": False, "generation": { "backend": "hf", "temperature": 1.0, @@ -127,20 +129,20 @@ def test_input_data(tokenizer): @pytest.fixture -def policy_setup(tokenizer): +def policy_setup(tokenizer, num_gpus): """Setup and teardown for policy tests - creates a virtual cluster and policy.""" policy = None cluster = None - cluster_name = "test" - print(f"Creating virtual cluster '{cluster_name}'...") + cluster_name = f"test-init-{num_gpus}gpu" + print(f"Creating virtual cluster '{cluster_name}' for {num_gpus} GPUs...") cluster = RayVirtualCluster( name=cluster_name, - bundle_ct_per_node_list=[2], # Single node, 2 gpus + bundle_ct_per_node_list=[num_gpus], use_gpus=True, - num_gpus_per_node=2, # Using both GPUs - max_colocated_worker_groups=1, # Only one worker group + num_gpus_per_node=num_gpus, + max_colocated_worker_groups=1, ) config = basic_llama_test_config @@ -158,15 +160,18 @@ def policy_setup(tokenizer): @pytest.mark.timeout(180) -def test_hf_policy_init(policy_setup): +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) +def test_hf_policy_init(policy_setup, num_gpus): policy, cluster = policy_setup # Verify cluster and policy were properly created assert policy is not None, "Policy was not created properly" assert cluster is not None, "Cluster was not created properly" - # Verify we have two workers, one per GPU - assert len(policy.worker_group.workers) == 2, "Should have 2 workers, one per GPU" + # Verify we have workers matching the GPU count + assert len(policy.worker_group.workers) == num_gpus, ( + f"Should have {num_gpus} worker(s), one per GPU" + ) # Check workers are alive worker_alive = ray.get([w.is_alive.remote() for w in policy.worker_group.workers]) @@ -182,42 +187,51 @@ def test_hf_policy_init(policy_setup): # Check 1: Verify workers have different ranks gpu_ranks = [info["rank"] for info in gpu_infos] - assert len(set(gpu_ranks)) == 2, f"Expected 2 different ranks, got {gpu_ranks}" - assert set(gpu_ranks) == {0, 1}, f"Expected ranks 0 and 1, got {gpu_ranks}" + assert len(set(gpu_ranks)) == num_gpus, ( + f"Expected {num_gpus} different ranks, got {gpu_ranks}" + ) + assert set(gpu_ranks) == set(range(num_gpus)), ( + f"Expected ranks {set(range(num_gpus))}, got {gpu_ranks}" + ) # Check 2: Verify workers have different local_ranks local_ranks = [info["local_rank"] for info in gpu_infos] - assert len(set(local_ranks)) == 2, ( - f"Expected 2 different local_ranks, got {local_ranks}" + assert len(set(local_ranks)) == num_gpus, ( + f"Expected {num_gpus} different local_ranks, got {local_ranks}" ) - assert set(local_ranks) == {0, 1}, ( - f"Expected local_ranks 0 and 1, got {local_ranks}" + assert set(local_ranks) == set(range(num_gpus)), ( + f"Expected local_ranks {set(range(num_gpus))}, got {local_ranks}" ) # Check 3: Verify workers have different CUDA_VISIBLE_DEVICES cuda_visible_devices = [ info["env_vars"].get("CUDA_VISIBLE_DEVICES") for info in gpu_infos ] - assert len(set(cuda_visible_devices)) == 2, ( - f"Expected different CUDA_VISIBLE_DEVICES, got {cuda_visible_devices}" - ) + if num_gpus > 1: + assert len(set(cuda_visible_devices)) == num_gpus, ( + f"Expected different CUDA_VISIBLE_DEVICES, got {cuda_visible_devices}" + ) + else: + assert len(set(cuda_visible_devices)) == 1, ( + f"Expected one CUDA_VISIBLE_DEVICES for 1 GPU, got {cuda_visible_devices}" + ) # Check 4: Verify all workers report correct world_size for info in gpu_infos: - assert info["world_size"] == 2, ( - f"Expected world_size=2, got {info['world_size']}" + assert info["world_size"] == num_gpus, ( + f"Expected world_size={num_gpus}, got {info['world_size']}" ) - assert info["env_vars"]["WORLD_SIZE"] == "2", ( - f"Expected WORLD_SIZE=2, got {info['env_vars']['WORLD_SIZE']}" + assert info["env_vars"]["WORLD_SIZE"] == str(num_gpus), ( + f"Expected WORLD_SIZE={num_gpus}, got {info['env_vars']['WORLD_SIZE']}" ) - # Check 5: Verify significant GPU memory is allocated (at least 1GB) on both GPUs + # Check 5: Verify significant GPU memory is allocated (at least 1GB) on all GPUs for info in gpu_infos: assert info["memory_allocated_mb"] > 1000, ( f"Not enough memory allocated on GPU for rank {info['rank']}: {info['memory_allocated_mb']:.2f} MB" ) - # Check 6: Verify model parameters are on CUDA devices for both workers + # Check 6: Verify model parameters are on CUDA devices for all workers for info in gpu_infos: param_sample = list(info["parameter_sample"].values())[0] assert "cuda" in param_sample["device"], ( @@ -239,7 +253,7 @@ def test_hf_policy_init(policy_setup): @pytest.fixture -def training_setup(tokenizer): +def training_setup(tokenizer, num_gpus): """Setup and teardown specifically for training tests.""" policy = None cluster = None @@ -248,15 +262,17 @@ def training_setup(tokenizer): try: # Create resources with unique name - cluster_name = "test-train" - print(f"Creating training virtual cluster '{cluster_name}'...") + cluster_name = f"test-train-{num_gpus}gpu" + print( + f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs..." + ) cluster = RayVirtualCluster( name=cluster_name, - bundle_ct_per_node_list=[2], # Single node, 2 gpus + bundle_ct_per_node_list=[num_gpus], use_gpus=True, - num_gpus_per_node=2, # Using both GPUs - max_colocated_worker_groups=1, # Only one worker group + num_gpus_per_node=num_gpus, + max_colocated_worker_groups=1, ) config = basic_llama_test_config @@ -306,8 +322,19 @@ def training_setup(tokenizer): policy.worker_group.shutdown() +def get_max_gpu_utilization(policy): + max_memory_allocated = 0 + max_memory_reserved = 0 + gpu_infos = ray.get([w.get_gpu_info.remote() for w in policy.worker_group.workers]) + for info in gpu_infos: + max_memory_allocated = max(max_memory_allocated, info["memory_allocated_mb"]) + max_memory_reserved = max(max_memory_reserved, info["memory_reserved_mb"]) + return max_memory_allocated, max_memory_reserved + + @pytest.mark.timeout(180) -def test_hf_policy_training(training_setup): +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) +def test_hf_policy_training(training_setup, tracker, num_gpus): def verify_loss_tensor(loss_tensor): assert not torch.isnan(loss_tensor).any(), "Loss should not be NaN" assert not torch.isinf(loss_tensor).any(), "Loss should not be Inf" @@ -338,29 +365,68 @@ def verify_loss_tensor(loss_tensor): print(f"Training loss: {results['loss']}") policy.finish_training() - - # Verify loss changed between iterations (model parameters were updated) assert losses[0] > losses[-1], "Loss should decrease over training iterations" + after_training_mem_allocated, after_training_mem_reserved = get_max_gpu_utilization( + policy + ) + print( + f"Max GPU Utilization after training: {after_training_mem_allocated:,.1f} MB allocated, " + f"{after_training_mem_reserved:,.1f} MB reserved" + ) + tracker.track( + f"after_training_mem_allocated_{num_gpus}gpu", after_training_mem_allocated + ) + tracker.track( + f"after_training_mem_reserved_{num_gpus}gpu", after_training_mem_reserved + ) + + policy.offload_after_refit() + after_offload_mem_allocated, after_offload_mem_reserved = get_max_gpu_utilization( + policy + ) + print( + f"Max GPU Utilization after offload: {after_offload_mem_allocated:,.1f} MB allocated, " + f"{after_offload_mem_reserved:,.1f} MB reserved" + ) + tracker.track( + f"after_offload_mem_allocated_{num_gpus}gpu", after_offload_mem_allocated + ) + tracker.track( + f"after_offload_mem_reserved_{num_gpus}gpu", after_offload_mem_reserved + ) + + # Compare memory after offload to memory after training + assert after_training_mem_allocated > 10_000, ( + "Memory after training should be more than 10GB" + ) + assert after_offload_mem_allocated < 1_200, ( + "Memory after offload should be less than 1.2GB" + ) + @pytest.fixture -def generation_setup(request, test_input_data, tokenizer): +def generation_setup(request, test_input_data, tokenizer, num_gpus): """Setup and teardown specifically for generation tests.""" policy = None cluster = None data = None + init_reference_model = request.param try: # Create resources with unique name - cluster_name = "test-generate" - print(f"Creating generation virtual cluster '{cluster_name}'...") + cluster_name = f"test-gen-{num_gpus}gpu-ref{init_reference_model}" + print( + f"Creating generation virtual cluster '{cluster_name}' for {num_gpus} GPUs " + f"(ref_model={init_reference_model})..." + ) cluster = RayVirtualCluster( name=cluster_name, - bundle_ct_per_node_list=[2], # Single node, 2 gpus + bundle_ct_per_node_list=[num_gpus], use_gpus=True, - num_gpus_per_node=2, # Using both GPUs - max_colocated_worker_groups=1, # Only one worker group + num_gpus_per_node=num_gpus, + max_colocated_worker_groups=1, ) config = basic_llama_test_config @@ -397,8 +463,9 @@ def generation_setup(request, test_input_data, tokenizer): @pytest.mark.timeout(180) +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) @pytest.mark.parametrize("generation_setup", [False], indirect=True) -def test_hf_policy_generation(generation_setup, tokenizer, tracker): +def test_hf_policy_generation(generation_setup, tokenizer, num_gpus, tracker): policy, cluster, data, prompts, expected_generations = generation_setup # Verify resources were created properly @@ -453,7 +520,7 @@ def test_hf_policy_generation(generation_setup, tokenizer, tracker): torch.exp(torch.abs(results["logprobs"] - fprop_results["logprobs"])) ) print(f"avg prob mult error: {avg_prob_mult_error}") - tracker.track("avg_prob_mult_error", float(avg_prob_mult_error)) + tracker.track(f"avg_prob_mult_error_{num_gpus}gpu", float(avg_prob_mult_error)) assert avg_prob_mult_error <= 1.025 # get logprobs for the expected generations @@ -478,7 +545,7 @@ def test_hf_policy_generation(generation_setup, tokenizer, tracker): expected_logprobs = policy.get_logprobs(expected_data)["logprobs"] mean_lps = torch.mean(expected_logprobs * expected_tokenized["attention_mask"]) - tracker.track("mean_lps", float(mean_lps)) + tracker.track(f"mean_lps_{num_gpus}gpu", float(mean_lps)) assert mean_lps > -1.7, "Expected logprobs should be greater than -1.7" assert mean_lps < -1.4, "Expected logprobs should be less than -1.4" @@ -488,6 +555,7 @@ def test_hf_policy_generation(generation_setup, tokenizer, tracker): @pytest.mark.timeout(180) +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) @pytest.mark.parametrize("generation_setup", [True], indirect=True) def test_all_hf_policy_generation_lps_ref_training(generation_setup): policy, cluster, data, prompts, expected_generations = generation_setup diff --git a/tests/unit/utils/test_native_checkpoint.py b/tests/unit/utils/test_native_checkpoint.py index 7da9ee6947..41d73000b8 100755 --- a/tests/unit/utils/test_native_checkpoint.py +++ b/tests/unit/utils/test_native_checkpoint.py @@ -42,6 +42,8 @@ "logprob_batch_size": 1, "max_total_sequence_length": 1024, "precision": "float32", + "fsdp_offload_enabled": False, + "activation_checkpointing_enabled": False, "optimizer": { "name": "torch.optim.AdamW", "kwargs": { @@ -71,18 +73,20 @@ def mock_experiment(): return model, optimizer, scheduler -@pytest.fixture(scope="module") -def cluster(): +@pytest.fixture(scope="function") +def cluster(num_gpus): """Create a virtual cluster for testing.""" - # Create a cluster with 2 GPU + cluster_name = f"test-cluster-{num_gpus}gpu" + print(f"Creating virtual cluster '{cluster_name}' for {num_gpus} GPUs...") + # Create a cluster with num_gpus GPU virtual_cluster = RayVirtualCluster( - bundle_ct_per_node_list=[2], # 1 node with 2 GPU bundle + bundle_ct_per_node_list=[num_gpus], # 1 node with num_gpus GPU bundle use_gpus=True, max_colocated_worker_groups=1, - num_gpus_per_node=2, # Use available GPUs - name="test-cluster", + num_gpus_per_node=num_gpus, # Use available GPUs + name=cluster_name, ) - yield virtual_cluster + yield virtual_cluster # Yield only the cluster object virtual_cluster.shutdown() @@ -260,7 +264,8 @@ def test_save_and_load_model_and_optimizer(mock_experiment): check_dict_equality(new_optimizer.state_dict(), optimizer.state_dict()) -def test_save_and_load_hf_checkpoint(policy): +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) +def test_save_and_load_hf_checkpoint(policy, num_gpus): ## warm up with a forward pass ## this is needed before saving a checkpoint because FSDP does some lazy initialization input_ids = torch.randint(0, 16000, (4, 128)) # 4 sequences, each of length 128 @@ -285,20 +290,13 @@ def test_save_and_load_hf_checkpoint(policy): ) ## make sure we save both HF and DCP checkpoints - assert set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp"))) == { - "__0_0.distcp", - "__1_0.distcp", - ".metadata", - } - ## 1B model has two shards - assert set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp-hf"))) == { - "config.json", - "generation_config.json", - "model-00001-of-00002.safetensors", - "model-00002-of-00002.safetensors", - "model.safetensors.index.json", - } + # Dynamically create the expected set of distcp files based on num_gpus + expected_distcp_files = {f"__{rank}_0.distcp" for rank in range(num_gpus)} + expected_files = expected_distcp_files.union({".metadata"}) + assert ( + set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp"))) == expected_files + ) assert set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp_tokenizer"))) == { "tokenizer_config.json", "tokenizer.json", @@ -308,6 +306,27 @@ def test_save_and_load_hf_checkpoint(policy): converted_model = AutoModelForCausalLM.from_pretrained( os.path.join(tmp_dir, "test_hf_and_dcp-hf") ) + + hf_save_dir = os.path.join(tmp_dir, "test_hf_and_dcp-hf") + hf_files = set(os.listdir(hf_save_dir)) + + # Check the HF saved files structure: could be single or sharded + expected_common_hf_files = {"config.json", "generation_config.json"} + if "model.safetensors" in hf_files: + # Single file format (1 GPU or smaller model) + expected_hf_files = expected_common_hf_files.union({"model.safetensors"}) + else: + # Sharded format (>=2 GPUs or larger model) + expected_hf_files = expected_common_hf_files.union( + { + "model-00001-of-00002.safetensors", + "model-00002-of-00002.safetensors", + "model.safetensors.index.json", + } + ) + assert hf_files == expected_hf_files + + coverted_model = AutoModelForCausalLM.from_pretrained(hf_save_dir) original_model = AutoModelForCausalLM.from_pretrained( simple_policy_config["model_name"] ) @@ -316,7 +335,8 @@ def test_save_and_load_hf_checkpoint(policy): check_dict_equality(converted_model.state_dict(), original_model.state_dict()) -def test_convert_dcp_to_hf(policy): +@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) +def test_convert_dcp_to_hf(policy, num_gpus): ## warm up with a forward pass ## this is needed before saving a checkpoint because FSDP does some lazy initialization input_ids = torch.randint(0, 16000, (4, 128)) # 4 sequences, each of length 128 @@ -339,20 +359,33 @@ def test_convert_dcp_to_hf(policy): save_torch_dist=True, ) + # Dynamically create the expected set of distcp files based on num_gpus + expected_distcp_files = {f"__{rank}_0.distcp" for rank in range(num_gpus)} + expected_files = expected_distcp_files.union({".metadata"}) + ## make sure we save both HF and DCP checkpoints - assert set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp"))) == { - "__0_0.distcp", - "__1_0.distcp", - ".metadata", - } - ## 1B model has two shards - assert set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp-hf"))) == { - "config.json", - "generation_config.json", - "model-00001-of-00002.safetensors", - "model-00002-of-00002.safetensors", - "model.safetensors.index.json", - } + assert ( + set(os.listdir(os.path.join(tmp_dir, "test_hf_and_dcp"))) == expected_files + ) + + # Check the HF saved files structure: could be single or sharded + hf_save_dir = os.path.join(tmp_dir, "test_hf_and_dcp-hf") + hf_files = set(os.listdir(hf_save_dir)) + expected_common_hf_files = {"config.json", "generation_config.json"} + + if "model.safetensors" in hf_files: + # Single file format (1 GPU or smaller model) + expected_hf_files = expected_common_hf_files.union({"model.safetensors"}) + else: + # Sharded format (>=2 GPUs or larger model) + expected_hf_files = expected_common_hf_files.union( + { + "model-00001-of-00002.safetensors", + "model-00002-of-00002.safetensors", + "model.safetensors.index.json", + } + ) + assert hf_files == expected_hf_files offline_converted_model_path = convert_dcp_to_hf( os.path.join(tmp_dir, "test_hf_and_dcp"),