From d8da9096b9ba2d72e86d34089b6450b3d86ea987 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Thu, 17 Apr 2025 18:56:18 -0700 Subject: [PATCH 1/4] Fix missing import that got lost in merge Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/models/policy/fsdp1_policy_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py index 7358e519e4..c06d738929 100644 --- a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py +++ b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py @@ -22,6 +22,7 @@ import torch from torch.distributed.device_mesh import init_device_mesh from torch.distributed.fsdp import ( + CPUOffload, FullyShardedDataParallel, MixedPrecision, ) From ea1345b452fdf7b4a3e244f41f18f9ce180f68b8 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 18 Apr 2025 11:19:02 -0700 Subject: [PATCH 2/4] Add unit tests Signed-off-by: Yi-Fu Wu --- tests/unit/models/policy/test_fsdp1_worker.py | 157 +++++++++++++++++- 1 file changed, 151 insertions(+), 6 deletions(-) diff --git a/tests/unit/models/policy/test_fsdp1_worker.py b/tests/unit/models/policy/test_fsdp1_worker.py index 4ea14f739a..dea18501eb 100644 --- a/tests/unit/models/policy/test_fsdp1_worker.py +++ b/tests/unit/models/policy/test_fsdp1_worker.py @@ -261,18 +261,32 @@ def test_hf_policy_init(policy_setup, num_gpus): @pytest.fixture -def training_setup(tokenizer, num_gpus): - """Setup and teardown specifically for training tests.""" +def training_setup(tokenizer, request, num_gpus): + """ + Setup and teardown specifically for training tests. + + When used without parameterization, uses the default config. + When parameterized, takes any config updates as a dictionary in request.param + and applies them to the basic config. + """ policy = None cluster = None data = None loss_fn = None + # Get config updates from request.param if available + config_updates = {} + config_suffix = "" + if hasattr(request, "param"): + config_updates = request.param + config_suffix = "-" + "-".join([f"{k}={v}" for k, v in config_updates.items()]) + try: # Create resources with unique name - cluster_name = f"test-train-{num_gpus}gpu" + cluster_name = f"test-train-{num_gpus}gpu{config_suffix}" print( - f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs..." + f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs" + f"{' with config updates: ' + str(config_updates) if config_updates else ''}" ) cluster = RayVirtualCluster( @@ -283,7 +297,10 @@ def training_setup(tokenizer, num_gpus): max_colocated_worker_groups=1, ) - config = basic_llama_test_config + # Create a config with optional modifications + config = deepcopy(basic_llama_test_config) + if config_updates: + config.update(config_updates) print("Creating training HfPolicy...") policy = HfPolicy( @@ -318,7 +335,11 @@ def training_setup(tokenizer, num_gpus): loss_fn: LossFunction = simple_loss # Provide the resources to the test - yield policy, cluster, data, loss_fn + # Default config is used if no config updates are provided + if config_updates: + yield policy, cluster, data, loss_fn, config + else: + yield policy, cluster, data, loss_fn except Exception as e: print(f"Error during training setup: {e}") @@ -700,3 +721,127 @@ def test_hf_policy_generation_with_stop(test_input_data, tokenizer): print("Cleaning up resources for test") cluster.shutdown() policy.worker_group.shutdown() + + +@pytest.mark.timeout(180) +@pytest.mark.parametrize("num_gpus", [2], ids=["2gpu"]) +@pytest.mark.parametrize( + "training_setup, config_name", + [ + ( + {"fsdp_offload_enabled": True, "activation_checkpointing_enabled": False}, + "fsdp_offload", + ), + ( + {"fsdp_offload_enabled": False, "activation_checkpointing_enabled": True}, + "activation_checkpointing", + ), + ], + indirect=["training_setup"], + ids=["fsdp_offload", "activation_checkpointing"], +) +def test_hf_policy_training_with_modified_setup( + training_setup, tracker, num_gpus, config_name +): + """Test training with 2 GPUs and special FSDP configurations (offload or activation checkpointing).""" + + def verify_loss_tensor(loss_tensor): + assert not torch.isnan(loss_tensor).any(), "Loss should not be NaN" + assert not torch.isinf(loss_tensor).any(), "Loss should not be Inf" + return loss_tensor + + policy, cluster, data, loss_fn, config = training_setup + + # Verify resources were created properly + assert policy is not None, "Training policy was not created properly" + assert cluster is not None, "Training cluster was not created properly" + assert data is not None, "Test data was not created properly" + assert loss_fn is not None, "Loss function was not created properly" + assert num_gpus == 2, "This test is specifically for 2 GPUs" + + # Verify we have exactly 2 workers/GPUs + assert len(policy.worker_group.workers) == 2, ( + "Should have exactly 2 workers for this test" + ) + + # Check GPU distribution + gpu_infos = ray.get([w.get_gpu_info.remote() for w in policy.worker_group.workers]) + assert len(gpu_infos) == 2, "Should have information from 2 GPUs" + assert gpu_infos[0]["rank"] != gpu_infos[1]["rank"], ( + "Workers should have different ranks" + ) + + # Call prepare_for_training + print(f"\nPreparing for training with 2 GPUs and {config_name}...") + policy.prepare_for_training() + + # Get initial GPU utilization across both GPUs + initial_mem_allocated, initial_mem_reserved = get_max_gpu_utilization(policy) + print( + f"Initial GPU Utilization: {initial_mem_allocated:,.1f} MB allocated, " + f"{initial_mem_reserved:,.1f} MB reserved" + ) + tracker.track(f"2gpu_{config_name}_initial_mem_allocated", initial_mem_allocated) + + # Training loop + losses = [] + for steps in range(4): + results = policy.train(data, loss_fn) + + # Verify results + assert "loss" in results, "Training results should contain 'loss'" + loss_tensor = results["loss"] + verify_loss_tensor(loss_tensor) + losses.append(loss_tensor[-1].item()) + + print(f"Training loss with 2 GPUs and {config_name}: {results['loss']}") + + policy.finish_training() + assert losses[0] > losses[-1], "Loss should decrease over training iterations" + + # Check GPU memory usage after training + after_training_mem_allocated, after_training_mem_reserved = get_max_gpu_utilization( + policy + ) + print( + f"Max GPU Utilization after training with 2 GPUs and {config_name}: {after_training_mem_allocated:,.1f} MB allocated, " + f"{after_training_mem_reserved:,.1f} MB reserved" + ) + tracker.track( + f"2gpu_{config_name}_after_training_mem_allocated", after_training_mem_allocated + ) + tracker.track( + f"2gpu_{config_name}_after_training_mem_reserved", after_training_mem_reserved + ) + + policy.offload_after_refit() + after_offload_mem_allocated, after_offload_mem_reserved = get_max_gpu_utilization( + policy + ) + print( + f"Max GPU Utilization after offload with 2 GPUs and {config_name}: {after_offload_mem_allocated:,.1f} MB allocated, " + f"{after_offload_mem_reserved:,.1f} MB reserved" + ) + tracker.track( + f"2gpu_{config_name}_after_offload_mem_allocated", after_offload_mem_allocated + ) + tracker.track( + f"2gpu_{config_name}_after_offload_mem_reserved", after_offload_mem_reserved + ) + + # Memory usage assertions based on configuration + if config_name == "fsdp_offload": + # With FSDP offload, memory usage after training should already be low + assert after_training_mem_allocated < 1_200, ( + "FSDP offload after training should be less than 1.2GB" + ) + else: + assert after_training_mem_allocated > 10_000, ( + "Memory after training should be more than 10GB" + ) + + + # Common assertion for both configurations + assert after_offload_mem_allocated < 1_200, ( + "Memory after offload should be less than 1.2GB" + ) From e70f1200c82c677f6c66e5deedb1405e5b482b8a Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 18 Apr 2025 11:22:56 -0700 Subject: [PATCH 3/4] Don't need to set defaults Signed-off-by: Yi-Fu Wu --- tests/unit/models/policy/test_fsdp1_worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/models/policy/test_fsdp1_worker.py b/tests/unit/models/policy/test_fsdp1_worker.py index dea18501eb..ea7cd44f98 100644 --- a/tests/unit/models/policy/test_fsdp1_worker.py +++ b/tests/unit/models/policy/test_fsdp1_worker.py @@ -729,11 +729,11 @@ def test_hf_policy_generation_with_stop(test_input_data, tokenizer): "training_setup, config_name", [ ( - {"fsdp_offload_enabled": True, "activation_checkpointing_enabled": False}, + {"fsdp_offload_enabled": True}, "fsdp_offload", ), ( - {"fsdp_offload_enabled": False, "activation_checkpointing_enabled": True}, + {"activation_checkpointing_enabled": True}, "activation_checkpointing", ), ], @@ -743,7 +743,7 @@ def test_hf_policy_generation_with_stop(test_input_data, tokenizer): def test_hf_policy_training_with_modified_setup( training_setup, tracker, num_gpus, config_name ): - """Test training with 2 GPUs and special FSDP configurations (offload or activation checkpointing).""" + """Test training with 2 GPUs and modified configs (offload or activation checkpointing).""" def verify_loss_tensor(loss_tensor): assert not torch.isnan(loss_tensor).any(), "Loss should not be NaN" From 9dcb3a700a7ea62694201a30086b91faea3a2cb1 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 18 Apr 2025 13:32:36 -0700 Subject: [PATCH 4/4] Unify with previous test Signed-off-by: Yi-Fu Wu --- tests/unit/models/policy/test_fsdp1_worker.py | 188 +++++------------- 1 file changed, 45 insertions(+), 143 deletions(-) diff --git a/tests/unit/models/policy/test_fsdp1_worker.py b/tests/unit/models/policy/test_fsdp1_worker.py index ea7cd44f98..0ace3084dc 100644 --- a/tests/unit/models/policy/test_fsdp1_worker.py +++ b/tests/unit/models/policy/test_fsdp1_worker.py @@ -277,7 +277,7 @@ def training_setup(tokenizer, request, num_gpus): # Get config updates from request.param if available config_updates = {} config_suffix = "" - if hasattr(request, "param"): + if hasattr(request, "param") and request.param is not None: config_updates = request.param config_suffix = "-" + "-".join([f"{k}={v}" for k, v in config_updates.items()]) @@ -335,11 +335,7 @@ def training_setup(tokenizer, request, num_gpus): loss_fn: LossFunction = simple_loss # Provide the resources to the test - # Default config is used if no config updates are provided - if config_updates: - yield policy, cluster, data, loss_fn, config - else: - yield policy, cluster, data, loss_fn + yield policy, cluster, data, loss_fn except Exception as e: print(f"Error during training setup: {e}") @@ -362,8 +358,23 @@ def get_max_gpu_utilization(policy): @pytest.mark.timeout(180) -@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) -def test_hf_policy_training(training_setup, tracker, num_gpus): +@pytest.mark.parametrize( + "num_gpus, training_setup, config_name", + [ + (1, None, "default"), + (2, None, "default"), + (2, {"fsdp_offload_enabled": True}, "fsdp_offload"), + (2, {"activation_checkpointing_enabled": True}, "activation_checkpointing"), + ], + indirect=["training_setup"], + ids=[ + "1gpu_default", + "2gpu_default", + "2gpu_fsdp_offload", + "2gpu_activation_checkpointing", + ], +) +def test_hf_policy_training(training_setup, tracker, num_gpus, config_name): def verify_loss_tensor(loss_tensor): assert not torch.isnan(loss_tensor).any(), "Loss should not be NaN" assert not torch.isinf(loss_tensor).any(), "Loss should not be Inf" @@ -378,7 +389,9 @@ def verify_loss_tensor(loss_tensor): assert loss_fn is not None, "Loss function was not created properly" # Call prepare_for_training if available - print("\nPreparing for training...") + print( + f"\nPreparing for training with {num_gpus} GPU(s) and {config_name} config..." + ) policy.prepare_for_training() losses = [] @@ -391,7 +404,9 @@ def verify_loss_tensor(loss_tensor): verify_loss_tensor(loss_tensor) losses.append(loss_tensor[-1].item()) - print(f"Training loss: {results['loss']}") + print( + f"Training loss with {num_gpus} GPU(s) and {config_name} config: {results['loss']}" + ) policy.finish_training() assert losses[0] > losses[-1], "Loss should decrease over training iterations" @@ -400,14 +415,16 @@ def verify_loss_tensor(loss_tensor): policy ) print( - f"Max GPU Utilization after training: {after_training_mem_allocated:,.1f} MB allocated, " + f"Max GPU Utilization after training with {num_gpus} GPU(s) and {config_name} config: {after_training_mem_allocated:,.1f} MB allocated, " f"{after_training_mem_reserved:,.1f} MB reserved" ) tracker.track( - f"after_training_mem_allocated_{num_gpus}gpu", after_training_mem_allocated + f"{num_gpus}gpu_{config_name}_after_training_mem_allocated", + after_training_mem_allocated, ) tracker.track( - f"after_training_mem_reserved_{num_gpus}gpu", after_training_mem_reserved + f"{num_gpus}gpu_{config_name}_after_training_mem_reserved", + after_training_mem_reserved, ) policy.offload_after_refit() @@ -415,20 +432,29 @@ def verify_loss_tensor(loss_tensor): policy ) print( - f"Max GPU Utilization after offload: {after_offload_mem_allocated:,.1f} MB allocated, " + f"Max GPU Utilization after offload with {num_gpus} GPU(s) and {config_name} config: {after_offload_mem_allocated:,.1f} MB allocated, " f"{after_offload_mem_reserved:,.1f} MB reserved" ) tracker.track( - f"after_offload_mem_allocated_{num_gpus}gpu", after_offload_mem_allocated + f"{num_gpus}gpu_{config_name}_after_offload_mem_allocated", + after_offload_mem_allocated, ) tracker.track( - f"after_offload_mem_reserved_{num_gpus}gpu", after_offload_mem_reserved + f"{num_gpus}gpu_{config_name}_after_offload_mem_reserved", + after_offload_mem_reserved, ) # Compare memory after offload to memory after training - assert after_training_mem_allocated > 10_000, ( - "Memory after training should be more than 10GB" - ) + if config_name == "fsdp_offload": + # With FSDP offload, memory usage after training should already be low + assert after_training_mem_allocated < 1_200, ( + "FSDP offload after training should be less than 1.2GB)" + ) + else: + assert after_training_mem_allocated > 10_000, ( + f"Memory after training with {config_name} config should be more than 10GB" + ) + assert after_offload_mem_allocated < 1_200, ( "Memory after offload should be less than 1.2GB" ) @@ -721,127 +747,3 @@ def test_hf_policy_generation_with_stop(test_input_data, tokenizer): print("Cleaning up resources for test") cluster.shutdown() policy.worker_group.shutdown() - - -@pytest.mark.timeout(180) -@pytest.mark.parametrize("num_gpus", [2], ids=["2gpu"]) -@pytest.mark.parametrize( - "training_setup, config_name", - [ - ( - {"fsdp_offload_enabled": True}, - "fsdp_offload", - ), - ( - {"activation_checkpointing_enabled": True}, - "activation_checkpointing", - ), - ], - indirect=["training_setup"], - ids=["fsdp_offload", "activation_checkpointing"], -) -def test_hf_policy_training_with_modified_setup( - training_setup, tracker, num_gpus, config_name -): - """Test training with 2 GPUs and modified configs (offload or activation checkpointing).""" - - def verify_loss_tensor(loss_tensor): - assert not torch.isnan(loss_tensor).any(), "Loss should not be NaN" - assert not torch.isinf(loss_tensor).any(), "Loss should not be Inf" - return loss_tensor - - policy, cluster, data, loss_fn, config = training_setup - - # Verify resources were created properly - assert policy is not None, "Training policy was not created properly" - assert cluster is not None, "Training cluster was not created properly" - assert data is not None, "Test data was not created properly" - assert loss_fn is not None, "Loss function was not created properly" - assert num_gpus == 2, "This test is specifically for 2 GPUs" - - # Verify we have exactly 2 workers/GPUs - assert len(policy.worker_group.workers) == 2, ( - "Should have exactly 2 workers for this test" - ) - - # Check GPU distribution - gpu_infos = ray.get([w.get_gpu_info.remote() for w in policy.worker_group.workers]) - assert len(gpu_infos) == 2, "Should have information from 2 GPUs" - assert gpu_infos[0]["rank"] != gpu_infos[1]["rank"], ( - "Workers should have different ranks" - ) - - # Call prepare_for_training - print(f"\nPreparing for training with 2 GPUs and {config_name}...") - policy.prepare_for_training() - - # Get initial GPU utilization across both GPUs - initial_mem_allocated, initial_mem_reserved = get_max_gpu_utilization(policy) - print( - f"Initial GPU Utilization: {initial_mem_allocated:,.1f} MB allocated, " - f"{initial_mem_reserved:,.1f} MB reserved" - ) - tracker.track(f"2gpu_{config_name}_initial_mem_allocated", initial_mem_allocated) - - # Training loop - losses = [] - for steps in range(4): - results = policy.train(data, loss_fn) - - # Verify results - assert "loss" in results, "Training results should contain 'loss'" - loss_tensor = results["loss"] - verify_loss_tensor(loss_tensor) - losses.append(loss_tensor[-1].item()) - - print(f"Training loss with 2 GPUs and {config_name}: {results['loss']}") - - policy.finish_training() - assert losses[0] > losses[-1], "Loss should decrease over training iterations" - - # Check GPU memory usage after training - after_training_mem_allocated, after_training_mem_reserved = get_max_gpu_utilization( - policy - ) - print( - f"Max GPU Utilization after training with 2 GPUs and {config_name}: {after_training_mem_allocated:,.1f} MB allocated, " - f"{after_training_mem_reserved:,.1f} MB reserved" - ) - tracker.track( - f"2gpu_{config_name}_after_training_mem_allocated", after_training_mem_allocated - ) - tracker.track( - f"2gpu_{config_name}_after_training_mem_reserved", after_training_mem_reserved - ) - - policy.offload_after_refit() - after_offload_mem_allocated, after_offload_mem_reserved = get_max_gpu_utilization( - policy - ) - print( - f"Max GPU Utilization after offload with 2 GPUs and {config_name}: {after_offload_mem_allocated:,.1f} MB allocated, " - f"{after_offload_mem_reserved:,.1f} MB reserved" - ) - tracker.track( - f"2gpu_{config_name}_after_offload_mem_allocated", after_offload_mem_allocated - ) - tracker.track( - f"2gpu_{config_name}_after_offload_mem_reserved", after_offload_mem_reserved - ) - - # Memory usage assertions based on configuration - if config_name == "fsdp_offload": - # With FSDP offload, memory usage after training should already be low - assert after_training_mem_allocated < 1_200, ( - "FSDP offload after training should be less than 1.2GB" - ) - else: - assert after_training_mem_allocated > 10_000, ( - "Memory after training should be more than 10GB" - ) - - - # Common assertion for both configurations - assert after_offload_mem_allocated < 1_200, ( - "Memory after offload should be less than 1.2GB" - )