Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 3rdparty/NeMo-workspace/NeMo
Submodule NeMo updated from 0e0894 to 17b920
3 changes: 3 additions & 0 deletions nemo_rl/algorithms/grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ def refit_policy_generation(
If it is None, the buffer size will be computed by the remaining memory.
This parameter is primarily used for testing.
"""
# Jimmy: This a workaround given the mcore policy refit memory use is high.
# TODO: implement mcore-> HF inplace weight conversion
_refit_buffer_size_gb = 4
if colocated_inference:
policy.offload_before_refit()
policy_generation.prepare_for_generation(tags=["weights"])
Expand Down
8 changes: 8 additions & 0 deletions nemo_rl/models/megatron/community_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ def import_model_from_hf_name(hf_model_name: str, output_path: str):
hf_model_name,
output_path=output_path,
)
elif hf_config.model_type in "nemotron-nas":
from nemo.tron.converter.llama_nemotron import HFLlamaNemotronImporter

print(f" Importing model {hf_model_name} to {output_path}...")
importer = HFLlamaNemotronImporter(
hf_model_name,
output_path=output_path,
)
else:
raise ValueError(f"Unknown model_type: {hf_config.model_type}")
importer.apply()
Expand Down
2 changes: 1 addition & 1 deletion nemo_rl/models/megatron/converters/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def __init__(self, hf_model_name, megatron_model):
self.get_source_fn = lambda source_state_dict, _: _ModelState(
source_state_dict
)
elif config.model_type == "llama":
elif config.model_type == "llama" or "nemotron-nas":
self.export_mapping = llama_converter.get_export_mapping()
self.export_transforms = llama_converter.get_export_transforms(config)
self.get_source_fn = lambda source_state_dict, _: _ModelState(
Expand Down
165 changes: 86 additions & 79 deletions nemo_rl/models/policy/megatron_policy_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,79 @@ def __repr__(self):
else:
return f"{self.__class__.__qualname__}"

def _get_model_cfg(self, pretrained_run_config_path):
cfg_from_pretrained = ConfigContainer.from_yaml(pretrained_run_config_path)
model_cfg = cfg_from_pretrained.model_config
cfg_from_pretrained.logger_config = LoggerConfig()

model_cfg.tensor_model_parallel_size = self.cfg["megatron_cfg"][
"tensor_model_parallel_size"
]
model_cfg.pipeline_model_parallel_size = self.cfg["megatron_cfg"][
"pipeline_model_parallel_size"
]
model_cfg.num_layers_in_first_pipeline_stage = self.cfg["megatron_cfg"][
"num_layers_in_first_pipeline_stage"
]
model_cfg.num_layers_in_last_pipeline_stage = self.cfg["megatron_cfg"][
"num_layers_in_last_pipeline_stage"
]
model_cfg.sequence_parallel = self.cfg["megatron_cfg"]["sequence_parallel"]
model_cfg.context_parallel_size = self.cfg["megatron_cfg"][
"context_parallel_size"
] # not supported right now
assert model_cfg.context_parallel_size == 1, (
"Context parallel is not supported right now"
)
model_cfg.expert_tensor_parallel_size = self.cfg["megatron_cfg"][
"expert_tensor_parallel_size"
]
model_cfg.expert_model_parallel_size = self.cfg["megatron_cfg"][
"expert_model_parallel_size"
]
model_cfg.sequence_parallel = self.cfg["megatron_cfg"]["sequence_parallel"]
model_cfg.bf16 = self.dtype == torch.bfloat16
model_cfg.fp16 = self.dtype == torch.float16
if model_cfg.fp16:
assert not model_cfg.bf16, "fp16 and bf16 cannot be used together"
model_cfg.params_dtype = torch.float16
elif model_cfg.bf16:
assert not model_cfg.fp16, "fp16 and bf16 cannot be used together"
model_cfg.params_dtype = torch.bfloat16
else:
model_cfg.params_dtype = torch.float32
model_cfg.pipeline_dtype = self.dtype_map[self.cfg["megatron_cfg"]["pipeline_dtype"]]
model_cfg.parallel_output = True
# Setting moe_router_dtype to higher precision (e.g. fp64) can improve numerical stability,
# especially when using many experts.
model_cfg.moe_router_dtype = self.cfg["megatron_cfg"]["moe_router_dtype"]

# The below two configs (and "freeze_moe_router") are used to stabilize moe training
# by preventing updates to the moe router. We found that this is helpful in reducing
# logprob error during training.

# Set this to "none" to disable load balancing loss.
model_cfg.moe_router_load_balancing_type = self.cfg["megatron_cfg"][
"moe_router_load_balancing_type"
]
# Set this to 0.0 to disable updates to the moe router expert bias
model_cfg.moe_router_bias_update_rate = self.cfg["megatron_cfg"][
"moe_router_bias_update_rate"
]
if self.cfg["megatron_cfg"]["activation_checkpointing"]:
model_cfg.activations_checkpoint_granularity = "full"
model_cfg.activations_checkpoint_method = "uniform"
model_cfg.activations_checkpoint_num_layers = 1
if not model_cfg.gated_linear_unit:
assert model_cfg.activation_func is not None, (
"activation_func must be set if not using gated_linear_unit. This likely "
"indicates an issue in configuration conversion (e.g. activation func was "
"a lambda and couldn't be serialized). This is based on this check "
"https://github.com/NVIDIA/Megatron-LM/blob/1ab876ddc4c1893c76f26d775226a8d1dcdfb3d2/megatron/core/transformer/mlp.py#L174."
)
model_cfg.apply_rope_fusion = self.cfg["megatron_cfg"]["apply_rope_fusion"]
return model_cfg

def __init__(
self,
config: PolicyConfig,
Expand All @@ -356,12 +429,12 @@ def __init__(
**kwargs: Any,
):
self.cfg = config
dtype_map = {
self.dtype_map = {
"float32": torch.float32,
"bfloat16": torch.bfloat16,
"float16": torch.float16,
}
self.dtype = dtype_map[self.cfg["precision"]]
self.dtype = self.dtype_map[self.cfg["precision"]]

# cfg["model_name"] is allowed to be either an HF model name or a path to an HF checkpoint
# check if hf_model_name is a path
Expand Down Expand Up @@ -403,7 +476,6 @@ def __init__(
del os.environ[var]

import_model_from_hf_name(hf_model_name, pretrained_path)

# Restore environment
for var, val in env_backup.items():
os.environ[var] = val
Expand All @@ -420,85 +492,14 @@ def __init__(
pre_init_communication_queue.put(True)
destroy_parallel_state()

pretrained_run_config = os.path.join(
pretrained_run_config_path = os.path.join(
pretrained_path, "iter_0000000/run_config.yaml"
)

self.tokenizer = tokenizer
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token

cfg_from_pretrained = ConfigContainer.from_yaml(pretrained_run_config)
model_cfg = cfg_from_pretrained.model_config
cfg_from_pretrained.logger_config = LoggerConfig()

model_cfg.tensor_model_parallel_size = self.cfg["megatron_cfg"][
"tensor_model_parallel_size"
]
model_cfg.pipeline_model_parallel_size = self.cfg["megatron_cfg"][
"pipeline_model_parallel_size"
]
model_cfg.num_layers_in_first_pipeline_stage = self.cfg["megatron_cfg"][
"num_layers_in_first_pipeline_stage"
]
model_cfg.num_layers_in_last_pipeline_stage = self.cfg["megatron_cfg"][
"num_layers_in_last_pipeline_stage"
]
model_cfg.sequence_parallel = self.cfg["megatron_cfg"]["sequence_parallel"]
model_cfg.context_parallel_size = self.cfg["megatron_cfg"][
"context_parallel_size"
] # not supported right now
assert model_cfg.context_parallel_size == 1, (
"Context parallel is not supported right now"
)
model_cfg.expert_tensor_parallel_size = self.cfg["megatron_cfg"][
"expert_tensor_parallel_size"
]
model_cfg.expert_model_parallel_size = self.cfg["megatron_cfg"][
"expert_model_parallel_size"
]
model_cfg.sequence_parallel = self.cfg["megatron_cfg"]["sequence_parallel"]
model_cfg.bf16 = self.dtype == torch.bfloat16
model_cfg.fp16 = self.dtype == torch.float16
if model_cfg.fp16:
assert not model_cfg.bf16, "fp16 and bf16 cannot be used together"
model_cfg.params_dtype = torch.float16
elif model_cfg.bf16:
assert not model_cfg.fp16, "fp16 and bf16 cannot be used together"
model_cfg.params_dtype = torch.bfloat16
else:
model_cfg.params_dtype = torch.float32
model_cfg.pipeline_dtype = dtype_map[self.cfg["megatron_cfg"]["pipeline_dtype"]]
model_cfg.parallel_output = True
# Setting moe_router_dtype to higher precision (e.g. fp64) can improve numerical stability,
# especially when using many experts.
model_cfg.moe_router_dtype = self.cfg["megatron_cfg"]["moe_router_dtype"]

# The below two configs (and "freeze_moe_router") are used to stabilize moe training
# by preventing updates to the moe router. We found that this is helpful in reducing
# logprob error during training.

# Set this to "none" to disable load balancing loss.
model_cfg.moe_router_load_balancing_type = self.cfg["megatron_cfg"][
"moe_router_load_balancing_type"
]
# Set this to 0.0 to disable updates to the moe router expert bias
model_cfg.moe_router_bias_update_rate = self.cfg["megatron_cfg"][
"moe_router_bias_update_rate"
]
if self.cfg["megatron_cfg"]["activation_checkpointing"]:
model_cfg.activations_checkpoint_granularity = "full"
model_cfg.activations_checkpoint_method = "uniform"
model_cfg.activations_checkpoint_num_layers = 1
if not model_cfg.gated_linear_unit:
assert model_cfg.activation_func is not None, (
"activation_func must be set if not using gated_linear_unit. This likely "
"indicates an issue in configuration conversion (e.g. activation func was "
"a lambda and couldn't be serialized). This is based on this check "
"https://github.com/NVIDIA/Megatron-LM/blob/1ab876ddc4c1893c76f26d775226a8d1dcdfb3d2/megatron/core/transformer/mlp.py#L174."
)
model_cfg.apply_rope_fusion = self.cfg["megatron_cfg"]["apply_rope_fusion"]

checkpoint_config = CheckpointConfig(
save_interval=100,
save=weights_path,
Expand All @@ -517,7 +518,7 @@ def __init__(
load_rng=False,
)
self.megatron_cfg = ConfigContainer(
model_config=model_cfg,
model_config=self._get_model_cfg(pretrained_run_config_path),
checkpoint_config=checkpoint_config,
logger_config=LoggerConfig(logging_level=0),
train_config=TrainingConfig(
Expand Down Expand Up @@ -586,10 +587,13 @@ def __init__(
if init_reference_model:
self.model = self.move_model(self.model, "cpu")
ref_ckpt_context = _init_checkpointing_context(ref_checkpoint_config)
ref_model_cfg = self._get_model_cfg(pretrained_run_config_path)
if not ref_model_cfg.vocab_size:
ref_model_cfg.vocab_size = self.megatron_cfg.tokenizer_config.padded_vocab_size

# Create a separate megatron config for the reference model with the correct checkpoint config
ref_megatron_cfg = ConfigContainer(
model_config=self.megatron_cfg.model_config,
model_config=ref_model_cfg,
checkpoint_config=ref_checkpoint_config, # Use the reference checkpoint config
logger_config=self.megatron_cfg.logger_config,
train_config=self.megatron_cfg.train_config,
Expand All @@ -605,8 +609,8 @@ def __init__(
ref_state.cfg = ref_megatron_cfg

reference_model = get_model_from_config(
self.megatron_cfg.model_config,
self.megatron_cfg.ddp_config,
model_config=ref_model_cfg,
ddp_config=self.megatron_cfg.ddp_config,
use_torch_fsdp2=self.megatron_cfg.dist_config.use_torch_fsdp2,
overlap_param_gather_with_optimizer_step=self.megatron_cfg.optimizer_config.overlap_param_gather_with_optimizer_step,
data_parallel_random_init=self.megatron_cfg.rng_config.data_parallel_random_init,
Expand Down Expand Up @@ -1448,6 +1452,9 @@ def prepare_weights_for_ipc(self) -> tuple[list[tuple[str, int]], float]:
# more buckets seems to have better perf
total_available_bytes *= 0.1

# free any unused memory in preparation for weight refitting
torch.cuda.empty_cache()

return param_info, total_available_bytes

# Temporary fix, 'keys' is a kwarg due to some sort of ray bug
Expand Down
Loading