Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nemo_reinforcer/models/policy/fsdp1_policy_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import torch
from torch.distributed.device_mesh import init_device_mesh
from torch.distributed.fsdp import (
CPUOffload,
FullyShardedDataParallel,
MixedPrecision,
)
Expand Down
83 changes: 65 additions & 18 deletions tests/unit/models/policy/test_fsdp1_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,18 +261,32 @@ def test_hf_policy_init(policy_setup, num_gpus):


@pytest.fixture
def training_setup(tokenizer, num_gpus):
"""Setup and teardown specifically for training tests."""
def training_setup(tokenizer, request, num_gpus):
"""
Setup and teardown specifically for training tests.

When used without parameterization, uses the default config.
When parameterized, takes any config updates as a dictionary in request.param
and applies them to the basic config.
"""
policy = None
cluster = None
data = None
loss_fn = None

# Get config updates from request.param if available
config_updates = {}
config_suffix = ""
if hasattr(request, "param") and request.param is not None:
config_updates = request.param
config_suffix = "-" + "-".join([f"{k}={v}" for k, v in config_updates.items()])

try:
# Create resources with unique name
cluster_name = f"test-train-{num_gpus}gpu"
cluster_name = f"test-train-{num_gpus}gpu{config_suffix}"
print(
f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs..."
f"Creating training virtual cluster '{cluster_name}' for {num_gpus} GPUs"
f"{' with config updates: ' + str(config_updates) if config_updates else ''}"
)

cluster = RayVirtualCluster(
Expand All @@ -283,7 +297,10 @@ def training_setup(tokenizer, num_gpus):
max_colocated_worker_groups=1,
)

config = basic_llama_test_config
# Create a config with optional modifications
config = deepcopy(basic_llama_test_config)
if config_updates:
config.update(config_updates)

print("Creating training HfPolicy...")
policy = HfPolicy(
Expand Down Expand Up @@ -341,8 +358,23 @@ def get_max_gpu_utilization(policy):


@pytest.mark.timeout(180)
@pytest.mark.parametrize("num_gpus", [1, 2], ids=["1gpu", "2gpu"])
def test_hf_policy_training(training_setup, tracker, num_gpus):
@pytest.mark.parametrize(
"num_gpus, training_setup, config_name",
[
(1, None, "default"),
(2, None, "default"),
(2, {"fsdp_offload_enabled": True}, "fsdp_offload"),
(2, {"activation_checkpointing_enabled": True}, "activation_checkpointing"),
],
indirect=["training_setup"],
ids=[
"1gpu_default",
"2gpu_default",
"2gpu_fsdp_offload",
"2gpu_activation_checkpointing",
],
)
def test_hf_policy_training(training_setup, tracker, num_gpus, config_name):
def verify_loss_tensor(loss_tensor):
assert not torch.isnan(loss_tensor).any(), "Loss should not be NaN"
assert not torch.isinf(loss_tensor).any(), "Loss should not be Inf"
Expand All @@ -357,7 +389,9 @@ def verify_loss_tensor(loss_tensor):
assert loss_fn is not None, "Loss function was not created properly"

# Call prepare_for_training if available
print("\nPreparing for training...")
print(
f"\nPreparing for training with {num_gpus} GPU(s) and {config_name} config..."
)
policy.prepare_for_training()

losses = []
Expand All @@ -370,7 +404,9 @@ def verify_loss_tensor(loss_tensor):
verify_loss_tensor(loss_tensor)
losses.append(loss_tensor[-1].item())

print(f"Training loss: {results['loss']}")
print(
f"Training loss with {num_gpus} GPU(s) and {config_name} config: {results['loss']}"
)

policy.finish_training()
assert losses[0] > losses[-1], "Loss should decrease over training iterations"
Expand All @@ -379,35 +415,46 @@ def verify_loss_tensor(loss_tensor):
policy
)
print(
f"Max GPU Utilization after training: {after_training_mem_allocated:,.1f} MB allocated, "
f"Max GPU Utilization after training with {num_gpus} GPU(s) and {config_name} config: {after_training_mem_allocated:,.1f} MB allocated, "
f"{after_training_mem_reserved:,.1f} MB reserved"
)
tracker.track(
f"after_training_mem_allocated_{num_gpus}gpu", after_training_mem_allocated
f"{num_gpus}gpu_{config_name}_after_training_mem_allocated",
after_training_mem_allocated,
)
tracker.track(
f"after_training_mem_reserved_{num_gpus}gpu", after_training_mem_reserved
f"{num_gpus}gpu_{config_name}_after_training_mem_reserved",
after_training_mem_reserved,
)

policy.offload_after_refit()
after_offload_mem_allocated, after_offload_mem_reserved = get_max_gpu_utilization(
policy
)
print(
f"Max GPU Utilization after offload: {after_offload_mem_allocated:,.1f} MB allocated, "
f"Max GPU Utilization after offload with {num_gpus} GPU(s) and {config_name} config: {after_offload_mem_allocated:,.1f} MB allocated, "
f"{after_offload_mem_reserved:,.1f} MB reserved"
)
tracker.track(
f"after_offload_mem_allocated_{num_gpus}gpu", after_offload_mem_allocated
f"{num_gpus}gpu_{config_name}_after_offload_mem_allocated",
after_offload_mem_allocated,
)
tracker.track(
f"after_offload_mem_reserved_{num_gpus}gpu", after_offload_mem_reserved
f"{num_gpus}gpu_{config_name}_after_offload_mem_reserved",
after_offload_mem_reserved,
)

# Compare memory after offload to memory after training
assert after_training_mem_allocated > 10_000, (
"Memory after training should be more than 10GB"
)
if config_name == "fsdp_offload":
# With FSDP offload, memory usage after training should already be low
assert after_training_mem_allocated < 1_200, (
"FSDP offload after training should be less than 1.2GB)"
)
else:
assert after_training_mem_allocated > 10_000, (
f"Memory after training with {config_name} config should be more than 10GB"
)

assert after_offload_mem_allocated < 1_200, (
"Memory after offload should be less than 1.2GB"
)
Expand Down