diff --git a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py index 7358e519e4..c06d738929 100644 --- a/nemo_reinforcer/models/policy/fsdp1_policy_worker.py +++ b/nemo_reinforcer/models/policy/fsdp1_policy_worker.py @@ -22,6 +22,7 @@ import torch from torch.distributed.device_mesh import init_device_mesh from torch.distributed.fsdp import ( + CPUOffload, FullyShardedDataParallel, MixedPrecision, ) diff --git a/tests/unit/models/policy/test_fsdp1_worker.py b/tests/unit/models/policy/test_fsdp1_worker.py index 4ea14f739a..0ace3084dc 100644 --- a/tests/unit/models/policy/test_fsdp1_worker.py +++ b/tests/unit/models/policy/test_fsdp1_worker.py @@ -261,18 +261,32 @@ def test_hf_policy_init(policy_setup, num_gpus): @pytest.fixture -def training_setup(tokenizer, num_gpus): - """Setup and teardown specifically for training tests.""" +def training_setup(tokenizer, request, num_gpus): + """ + Setup and teardown specifically for training tests. + + When used without parameterization, uses the default config. + When parameterized, takes any config updates as a dictionary in request.param + and applies them to the basic config. + """ policy = None cluster = None data = None loss_fn = None + # Get config updates from request.param if available + config_updates = {} + config_suffix = "" + if hasattr(request, "param") and request.param is not None: + config_updates = request.param + config_suffix = "-" + "-".join([f"{k}={v}" for k, v in config_updates.items()]) + try: # Create resources with unique name - cluster_name = f"test-train-{num_gpus}gpu" + cluster_name = f"test-train-{num_gpus}gpu{config_suffix}" print( - f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs..." + f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs" + f"{' with config updates: ' + str(config_updates) if config_updates else ''}" ) cluster = RayVirtualCluster( @@ -283,7 +297,10 @@ def training_setup(tokenizer, num_gpus): max_colocated_worker_groups=1, ) - config = basic_llama_test_config + # Create a config with optional modifications + config = deepcopy(basic_llama_test_config) + if config_updates: + config.update(config_updates) print("Creating training HfPolicy...") policy = HfPolicy( @@ -341,8 +358,23 @@ def get_max_gpu_utilization(policy): @pytest.mark.timeout(180) -@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"]) -def test_hf_policy_training(training_setup, tracker, num_gpus): +@pytest.mark.parametrize( + "num_gpus, training_setup, config_name", + [ + (1, None, "default"), + (2, None, "default"), + (2, {"fsdp_offload_enabled": True}, "fsdp_offload"), + (2, {"activation_checkpointing_enabled": True}, "activation_checkpointing"), + ], + indirect=["training_setup"], + ids=[ + "1gpu_default", + "2gpu_default", + "2gpu_fsdp_offload", + "2gpu_activation_checkpointing", + ], +) +def test_hf_policy_training(training_setup, tracker, num_gpus, config_name): def verify_loss_tensor(loss_tensor): assert not torch.isnan(loss_tensor).any(), "Loss should not be NaN" assert not torch.isinf(loss_tensor).any(), "Loss should not be Inf" @@ -357,7 +389,9 @@ def verify_loss_tensor(loss_tensor): assert loss_fn is not None, "Loss function was not created properly" # Call prepare_for_training if available - print("\nPreparing for training...") + print( + f"\nPreparing for training with {num_gpus} GPU(s) and {config_name} config..." + ) policy.prepare_for_training() losses = [] @@ -370,7 +404,9 @@ def verify_loss_tensor(loss_tensor): verify_loss_tensor(loss_tensor) losses.append(loss_tensor[-1].item()) - print(f"Training loss: {results['loss']}") + print( + f"Training loss with {num_gpus} GPU(s) and {config_name} config: {results['loss']}" + ) policy.finish_training() assert losses[0] > losses[-1], "Loss should decrease over training iterations" @@ -379,14 +415,16 @@ def verify_loss_tensor(loss_tensor): policy ) print( - f"Max GPU Utilization after training: {after_training_mem_allocated:,.1f} MB allocated, " + f"Max GPU Utilization after training with {num_gpus} GPU(s) and {config_name} config: {after_training_mem_allocated:,.1f} MB allocated, " f"{after_training_mem_reserved:,.1f} MB reserved" ) tracker.track( - f"after_training_mem_allocated_{num_gpus}gpu", after_training_mem_allocated + f"{num_gpus}gpu_{config_name}_after_training_mem_allocated", + after_training_mem_allocated, ) tracker.track( - f"after_training_mem_reserved_{num_gpus}gpu", after_training_mem_reserved + f"{num_gpus}gpu_{config_name}_after_training_mem_reserved", + after_training_mem_reserved, ) policy.offload_after_refit() @@ -394,20 +432,29 @@ def verify_loss_tensor(loss_tensor): policy ) print( - f"Max GPU Utilization after offload: {after_offload_mem_allocated:,.1f} MB allocated, " + f"Max GPU Utilization after offload with {num_gpus} GPU(s) and {config_name} config: {after_offload_mem_allocated:,.1f} MB allocated, " f"{after_offload_mem_reserved:,.1f} MB reserved" ) tracker.track( - f"after_offload_mem_allocated_{num_gpus}gpu", after_offload_mem_allocated + f"{num_gpus}gpu_{config_name}_after_offload_mem_allocated", + after_offload_mem_allocated, ) tracker.track( - f"after_offload_mem_reserved_{num_gpus}gpu", after_offload_mem_reserved + f"{num_gpus}gpu_{config_name}_after_offload_mem_reserved", + after_offload_mem_reserved, ) # Compare memory after offload to memory after training - assert after_training_mem_allocated > 10_000, ( - "Memory after training should be more than 10GB" - ) + if config_name == "fsdp_offload": + # With FSDP offload, memory usage after training should already be low + assert after_training_mem_allocated < 1_200, ( + "FSDP offload after training should be less than 1.2GB)" + ) + else: + assert after_training_mem_allocated > 10_000, ( + f"Memory after training with {config_name} config should be more than 10GB" + ) + assert after_offload_mem_allocated < 1_200, ( "Memory after offload should be less than 1.2GB" )