From 5a62ac88fd7c4337d68fd34e94b242c6e2563bdd Mon Sep 17 00:00:00 2001 From: root Date: Wed, 9 Jul 2025 11:24:15 -0700 Subject: [PATCH 1/2] add super Signed-off-by: root --- nemo_rl/models/megatron/community_import.py | 10 +++ .../models/policy/megatron_policy_worker.py | 68 ++++++++++++++++--- 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/nemo_rl/models/megatron/community_import.py b/nemo_rl/models/megatron/community_import.py index d5ef60c684..758ead7cd4 100644 --- a/nemo_rl/models/megatron/community_import.py +++ b/nemo_rl/models/megatron/community_import.py @@ -43,6 +43,16 @@ def import_model_from_hf_name(hf_model_name: str, output_path: str): hf_model_name, output_path=output_path, ) + elif hf_config.model_type in "nemotron-nas": + from nemo.tron.converter.llama_nemotron import HFLlamaNemotronImporter + + print(f" Importing model {hf_model_name} to {output_path}...") + importer = HFLlamaNemotronImporter( + hf_model_name, + output_path=output_path, + ) + elif "llama" in hf_model_name.lower(): + else: raise ValueError(f"Unknown model_type: {hf_config.model_type}") importer.apply() diff --git a/nemo_rl/models/policy/megatron_policy_worker.py b/nemo_rl/models/policy/megatron_policy_worker.py index ae1918c842..a1df162bc5 100644 --- a/nemo_rl/models/policy/megatron_policy_worker.py +++ b/nemo_rl/models/policy/megatron_policy_worker.py @@ -341,6 +341,55 @@ def __repr__(self): else: return f"{self.__class__.__qualname__}" + def _get_model_cfg(self, pretrained_run_config_path): + cfg_from_pretrained = ConfigContainer.from_yaml(pretrained_run_config_path) + model_cfg = cfg_from_pretrained.model_config + + model_cfg.tensor_model_parallel_size = self.cfg["megatron_cfg"][ + "tensor_model_parallel_size" + ] + model_cfg.pipeline_model_parallel_size = self.cfg["megatron_cfg"][ + "pipeline_model_parallel_size" + ] + model_cfg.num_layers_in_first_pipeline_stage = self.cfg["megatron_cfg"][ + "num_layers_in_first_pipeline_stage" + ] + model_cfg.num_layers_in_last_pipeline_stage = self.cfg["megatron_cfg"][ + "num_layers_in_last_pipeline_stage" + ] + model_cfg.sequence_parallel = self.cfg["megatron_cfg"]["sequence_parallel"] + model_cfg.context_parallel_size = self.cfg["megatron_cfg"][ + "context_parallel_size" + ] # not supported right now + assert model_cfg.context_parallel_size == 1, ( + "Context parallel is not supported right now" + ) + model_cfg.bf16 = self.dtype == torch.bfloat16 + model_cfg.fp16 = self.dtype == torch.float16 + if model_cfg.fp16: + assert not model_cfg.bf16, "fp16 and bf16 cannot be used together" + model_cfg.params_dtype = torch.float16 + elif model_cfg.bf16: + assert not model_cfg.fp16, "fp16 and bf16 cannot be used together" + model_cfg.params_dtype = torch.bfloat16 + else: + model_cfg.params_dtype = torch.float32 + model_cfg.pipeline_dtype = self.dtype_map[self.cfg["megatron_cfg"]["pipeline_dtype"]] + model_cfg.parallel_output = True + if self.cfg["megatron_cfg"]["activation_checkpointing"]: + model_cfg.activations_checkpoint_granularity = "full" + model_cfg.activations_checkpoint_method = "uniform" + model_cfg.activations_checkpoint_num_layers = 1 + if not model_cfg.gated_linear_unit: + assert model_cfg.activation_func is not None, ( + "activation_func must be set if not using gated_linear_unit. This likely " + "indicates an issue in configuration conversion (e.g. activation func was " + "a lambda and couldn't be serialized). This is based on this check " + "https://github.com/NVIDIA/Megatron-LM/blob/1ab876ddc4c1893c76f26d775226a8d1dcdfb3d2/megatron/core/transformer/mlp.py#L174." + ) + model_cfg.apply_rope_fusion = self.cfg["megatron_cfg"]["apply_rope_fusion"] + return model_cfg + def __init__( self, config: PolicyConfig, @@ -356,12 +405,12 @@ def __init__( **kwargs: Any, ): self.cfg = config - dtype_map = { + self.dtype_map = { "float32": torch.float32, "bfloat16": torch.bfloat16, "float16": torch.float16, } - self.dtype = dtype_map[self.cfg["precision"]] + self.dtype = self.dtype_map[self.cfg["precision"]] # cfg["model_name"] is allowed to be either an HF model name or a path to an HF checkpoint # check if hf_model_name is a path @@ -371,6 +420,7 @@ def __init__( if os.path.exists(hf_model_name): hf_model_subdir = f"model_{hf_model_subdir.replace('/', '_')}" + megatron_checkpoint_home = "/lustre/fsw/coreai_dlalgo_llm/jiemingz/rl/mcore_ckpts" if megatron_checkpoint_home is not None: pretrained_path = f"{megatron_checkpoint_home}/{hf_model_subdir}" else: @@ -403,7 +453,6 @@ def __init__( del os.environ[var] import_model_from_hf_name(hf_model_name, pretrained_path) - # Restore environment for var, val in env_backup.items(): os.environ[var] = val @@ -420,7 +469,7 @@ def __init__( pre_init_communication_queue.put(True) destroy_parallel_state() - pretrained_run_config = os.path.join( + pretrained_run_config_path = os.path.join( pretrained_path, "iter_0000000/run_config.yaml" ) @@ -517,7 +566,7 @@ def __init__( load_rng=False, ) self.megatron_cfg = ConfigContainer( - model_config=model_cfg, + model_config=self._get_model_cfg(pretrained_run_config_path), checkpoint_config=checkpoint_config, logger_config=LoggerConfig(logging_level=0), train_config=TrainingConfig( @@ -586,10 +635,13 @@ def __init__( if init_reference_model: self.model = self.move_model(self.model, "cpu") ref_ckpt_context = _init_checkpointing_context(ref_checkpoint_config) + ref_model_cfg = self._get_model_cfg(pretrained_run_config_path) + if not ref_model_cfg.vocab_size: + ref_model_cfg.vocab_size = self.megatron_cfg.tokenizer_config.padded_vocab_size # Create a separate megatron config for the reference model with the correct checkpoint config ref_megatron_cfg = ConfigContainer( - model_config=self.megatron_cfg.model_config, + model_config=ref_model_cfg, checkpoint_config=ref_checkpoint_config, # Use the reference checkpoint config logger_config=self.megatron_cfg.logger_config, train_config=self.megatron_cfg.train_config, @@ -605,8 +657,8 @@ def __init__( ref_state.cfg = ref_megatron_cfg reference_model = get_model_from_config( - self.megatron_cfg.model_config, - self.megatron_cfg.ddp_config, + model_config=ref_model_cfg, + ddp_config=self.megatron_cfg.ddp_config, use_torch_fsdp2=self.megatron_cfg.dist_config.use_torch_fsdp2, overlap_param_gather_with_optimizer_step=self.megatron_cfg.optimizer_config.overlap_param_gather_with_optimizer_step, data_parallel_random_init=self.megatron_cfg.rng_config.data_parallel_random_init, From 1f55dc3c6f7635fef2caaefd0196dd970d28bd0e Mon Sep 17 00:00:00 2001 From: Jimmy Zhang Date: Fri, 11 Jul 2025 09:49:03 -0700 Subject: [PATCH 2/2] mcore policy, update nemo Signed-off-by: Jimmy Zhang --- 3rdparty/NeMo-workspace/NeMo | 2 +- nemo_rl/algorithms/grpo.py | 3 + nemo_rl/models/megatron/community_import.py | 2 - nemo_rl/models/megatron/converters/common.py | 2 +- .../models/policy/megatron_policy_worker.py | 99 +++++-------------- 5 files changed, 32 insertions(+), 76 deletions(-) diff --git a/3rdparty/NeMo-workspace/NeMo b/3rdparty/NeMo-workspace/NeMo index 0e0894300e..17b920cbea 160000 --- a/3rdparty/NeMo-workspace/NeMo +++ b/3rdparty/NeMo-workspace/NeMo @@ -1 +1 @@ -Subproject commit 0e0894300e09aca042bc07859f660f22858f0a9f +Subproject commit 17b920cbea0b78db02e6460a2e2568638714a8c2 diff --git a/nemo_rl/algorithms/grpo.py b/nemo_rl/algorithms/grpo.py index b07cf10cae..943ccb921b 100644 --- a/nemo_rl/algorithms/grpo.py +++ b/nemo_rl/algorithms/grpo.py @@ -405,6 +405,9 @@ def refit_policy_generation( If it is None, the buffer size will be computed by the remaining memory. This parameter is primarily used for testing. """ + # Jimmy: This a workaround given the mcore policy refit memory use is high. + # TODO: implement mcore-> HF inplace weight conversion + _refit_buffer_size_gb = 4 if colocated_inference: policy.offload_before_refit() policy_generation.prepare_for_generation(tags=["weights"]) diff --git a/nemo_rl/models/megatron/community_import.py b/nemo_rl/models/megatron/community_import.py index 758ead7cd4..0b94884e30 100644 --- a/nemo_rl/models/megatron/community_import.py +++ b/nemo_rl/models/megatron/community_import.py @@ -51,8 +51,6 @@ def import_model_from_hf_name(hf_model_name: str, output_path: str): hf_model_name, output_path=output_path, ) - elif "llama" in hf_model_name.lower(): - else: raise ValueError(f"Unknown model_type: {hf_config.model_type}") importer.apply() diff --git a/nemo_rl/models/megatron/converters/common.py b/nemo_rl/models/megatron/converters/common.py index 80be5e5755..aaca41213f 100644 --- a/nemo_rl/models/megatron/converters/common.py +++ b/nemo_rl/models/megatron/converters/common.py @@ -280,7 +280,7 @@ def __init__(self, hf_model_name, megatron_model): self.get_source_fn = lambda source_state_dict, _: _ModelState( source_state_dict ) - elif config.model_type == "llama": + elif config.model_type == "llama" or "nemotron-nas": self.export_mapping = llama_converter.get_export_mapping() self.export_transforms = llama_converter.get_export_transforms(config) self.get_source_fn = lambda source_state_dict, _: _ModelState( diff --git a/nemo_rl/models/policy/megatron_policy_worker.py b/nemo_rl/models/policy/megatron_policy_worker.py index a1df162bc5..7da944beb4 100644 --- a/nemo_rl/models/policy/megatron_policy_worker.py +++ b/nemo_rl/models/policy/megatron_policy_worker.py @@ -344,6 +344,7 @@ def __repr__(self): def _get_model_cfg(self, pretrained_run_config_path): cfg_from_pretrained = ConfigContainer.from_yaml(pretrained_run_config_path) model_cfg = cfg_from_pretrained.model_config + cfg_from_pretrained.logger_config = LoggerConfig() model_cfg.tensor_model_parallel_size = self.cfg["megatron_cfg"][ "tensor_model_parallel_size" @@ -364,6 +365,13 @@ def _get_model_cfg(self, pretrained_run_config_path): assert model_cfg.context_parallel_size == 1, ( "Context parallel is not supported right now" ) + model_cfg.expert_tensor_parallel_size = self.cfg["megatron_cfg"][ + "expert_tensor_parallel_size" + ] + model_cfg.expert_model_parallel_size = self.cfg["megatron_cfg"][ + "expert_model_parallel_size" + ] + model_cfg.sequence_parallel = self.cfg["megatron_cfg"]["sequence_parallel"] model_cfg.bf16 = self.dtype == torch.bfloat16 model_cfg.fp16 = self.dtype == torch.float16 if model_cfg.fp16: @@ -376,6 +384,22 @@ def _get_model_cfg(self, pretrained_run_config_path): model_cfg.params_dtype = torch.float32 model_cfg.pipeline_dtype = self.dtype_map[self.cfg["megatron_cfg"]["pipeline_dtype"]] model_cfg.parallel_output = True + # Setting moe_router_dtype to higher precision (e.g. fp64) can improve numerical stability, + # especially when using many experts. + model_cfg.moe_router_dtype = self.cfg["megatron_cfg"]["moe_router_dtype"] + + # The below two configs (and "freeze_moe_router") are used to stabilize moe training + # by preventing updates to the moe router. We found that this is helpful in reducing + # logprob error during training. + + # Set this to "none" to disable load balancing loss. + model_cfg.moe_router_load_balancing_type = self.cfg["megatron_cfg"][ + "moe_router_load_balancing_type" + ] + # Set this to 0.0 to disable updates to the moe router expert bias + model_cfg.moe_router_bias_update_rate = self.cfg["megatron_cfg"][ + "moe_router_bias_update_rate" + ] if self.cfg["megatron_cfg"]["activation_checkpointing"]: model_cfg.activations_checkpoint_granularity = "full" model_cfg.activations_checkpoint_method = "uniform" @@ -420,7 +444,6 @@ def __init__( if os.path.exists(hf_model_name): hf_model_subdir = f"model_{hf_model_subdir.replace('/', '_')}" - megatron_checkpoint_home = "/lustre/fsw/coreai_dlalgo_llm/jiemingz/rl/mcore_ckpts" if megatron_checkpoint_home is not None: pretrained_path = f"{megatron_checkpoint_home}/{hf_model_subdir}" else: @@ -477,77 +500,6 @@ def __init__( if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token - cfg_from_pretrained = ConfigContainer.from_yaml(pretrained_run_config) - model_cfg = cfg_from_pretrained.model_config - cfg_from_pretrained.logger_config = LoggerConfig() - - model_cfg.tensor_model_parallel_size = self.cfg["megatron_cfg"][ - "tensor_model_parallel_size" - ] - model_cfg.pipeline_model_parallel_size = self.cfg["megatron_cfg"][ - "pipeline_model_parallel_size" - ] - model_cfg.num_layers_in_first_pipeline_stage = self.cfg["megatron_cfg"][ - "num_layers_in_first_pipeline_stage" - ] - model_cfg.num_layers_in_last_pipeline_stage = self.cfg["megatron_cfg"][ - "num_layers_in_last_pipeline_stage" - ] - model_cfg.sequence_parallel = self.cfg["megatron_cfg"]["sequence_parallel"] - model_cfg.context_parallel_size = self.cfg["megatron_cfg"][ - "context_parallel_size" - ] # not supported right now - assert model_cfg.context_parallel_size == 1, ( - "Context parallel is not supported right now" - ) - model_cfg.expert_tensor_parallel_size = self.cfg["megatron_cfg"][ - "expert_tensor_parallel_size" - ] - model_cfg.expert_model_parallel_size = self.cfg["megatron_cfg"][ - "expert_model_parallel_size" - ] - model_cfg.sequence_parallel = self.cfg["megatron_cfg"]["sequence_parallel"] - model_cfg.bf16 = self.dtype == torch.bfloat16 - model_cfg.fp16 = self.dtype == torch.float16 - if model_cfg.fp16: - assert not model_cfg.bf16, "fp16 and bf16 cannot be used together" - model_cfg.params_dtype = torch.float16 - elif model_cfg.bf16: - assert not model_cfg.fp16, "fp16 and bf16 cannot be used together" - model_cfg.params_dtype = torch.bfloat16 - else: - model_cfg.params_dtype = torch.float32 - model_cfg.pipeline_dtype = dtype_map[self.cfg["megatron_cfg"]["pipeline_dtype"]] - model_cfg.parallel_output = True - # Setting moe_router_dtype to higher precision (e.g. fp64) can improve numerical stability, - # especially when using many experts. - model_cfg.moe_router_dtype = self.cfg["megatron_cfg"]["moe_router_dtype"] - - # The below two configs (and "freeze_moe_router") are used to stabilize moe training - # by preventing updates to the moe router. We found that this is helpful in reducing - # logprob error during training. - - # Set this to "none" to disable load balancing loss. - model_cfg.moe_router_load_balancing_type = self.cfg["megatron_cfg"][ - "moe_router_load_balancing_type" - ] - # Set this to 0.0 to disable updates to the moe router expert bias - model_cfg.moe_router_bias_update_rate = self.cfg["megatron_cfg"][ - "moe_router_bias_update_rate" - ] - if self.cfg["megatron_cfg"]["activation_checkpointing"]: - model_cfg.activations_checkpoint_granularity = "full" - model_cfg.activations_checkpoint_method = "uniform" - model_cfg.activations_checkpoint_num_layers = 1 - if not model_cfg.gated_linear_unit: - assert model_cfg.activation_func is not None, ( - "activation_func must be set if not using gated_linear_unit. This likely " - "indicates an issue in configuration conversion (e.g. activation func was " - "a lambda and couldn't be serialized). This is based on this check " - "https://github.com/NVIDIA/Megatron-LM/blob/1ab876ddc4c1893c76f26d775226a8d1dcdfb3d2/megatron/core/transformer/mlp.py#L174." - ) - model_cfg.apply_rope_fusion = self.cfg["megatron_cfg"]["apply_rope_fusion"] - checkpoint_config = CheckpointConfig( save_interval=100, save=weights_path, @@ -1500,6 +1452,9 @@ def prepare_weights_for_ipc(self) -> tuple[list[tuple[str, int]], float]: # more buckets seems to have better perf total_available_bytes *= 0.1 + # free any unused memory in preparation for weight refitting + torch.cuda.empty_cache() + return param_info, total_available_bytes # Temporary fix, 'keys' is a kwarg due to some sort of ray bug