From 62d059e6c5059f755186905b93c80eb0a61aa117 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Tue, 23 Aug 2022 11:53:34 -0700 Subject: [PATCH 01/14] Avoid storing extra copy of params in dist Adam optimizer If params are bf16, dist Adam will only store 16-bit remainder needed to reconstruct fp32 params. Signed-off-by: Tim Moon --- Dockerfile | 6 +++--- .../nlp/models/language_modeling/megatron_gpt_model.py | 7 +++++++ nemo/core/optim/optimizers.py | 1 - 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index d692d4a6177c..384aae8704cb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,10 +34,10 @@ RUN apt-get update && \ # FIXME a workaround to update apex. Remove when base image is updated WORKDIR /tmp/ -RUN git clone https://github.com/ericharper/apex.git && \ +RUN git clone https://github.com/timmoon10/apex.git && \ cd apex && \ - git checkout 19e4f55eb402452f74dead19f68b65d6291cfdb2 && \ - pip install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" ./ + git checkout 2d5ebb793ca212773bc0af4b15afaee2cea5e82f && \ + pip install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ # uninstall stuff from base container RUN pip uninstall -y sacrebleu torchtext diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 201a0c6293ae..2792f67e4831 100755 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -187,6 +187,13 @@ def setup_optimization( optim_kwargs['process_group'] = parallel_state.get_data_parallel_group() optim_kwargs['param_sync_dtype'] = self.autocast_dtype optim_kwargs['contiguous_grad_buffer'] = True + if self.autocast_dtype == torch.float: + optim_kwargs['store_params'] = False + elif self.autocast_dtype == torch.float16: + optim_kwargs['store_params'] = True + elif self.autocast_dtype == torch.bfloat16: + optim_kwargs['store_params'] = False + optim_kwargs['store_param_remainders'] = True return super().setup_optimization(optim_config=optim_config, optim_kwargs=optim_kwargs) def forward(self, tokens, text_position_ids, attention_mask, labels): diff --git a/nemo/core/optim/optimizers.py b/nemo/core/optim/optimizers.py index 2c1fc4a20fca..f6b0bd4239fc 100644 --- a/nemo/core/optim/optimizers.py +++ b/nemo/core/optim/optimizers.py @@ -60,7 +60,6 @@ # Try importing Apex distributed Adam optimizer from apex.contrib.optimizers.distributed_fused_adam import DistributedFusedAdam - import fused_adam_cuda, distributed_adam_cuda # Required kernels HAVE_APEX_DISTRIBUTED_ADAM = True From a62a368d1cfb50748a961a4bb639389eb6d3f88b Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Tue, 23 Aug 2022 16:48:54 -0700 Subject: [PATCH 02/14] Add support for dist Adam in GPT-3 without O2-level AMP Signed-off-by: Tim Moon --- .../language_modeling/megatron_gpt_model.py | 39 ++++++++++--------- nemo/core/optim/optimizers.py | 9 ++++- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 2792f67e4831..974ab8499e88 100755 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -90,11 +90,6 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): self.megatron_amp_o2 = cfg.get('megatron_amp_O2', False) self.with_distributed_adam = cfg.optim.get('name') == 'distributed_fused_adam' - if self.with_distributed_adam and not self.megatron_amp_o2: - raise ValueError( - "Distributed optimizers require O2. Please set megatron_amp_O2 to True in the model config." - ) - if self.megatron_amp_o2: if not self.with_distributed_adam: @@ -183,10 +178,9 @@ def setup_optimization( self, optim_config: Optional[Union[DictConfig, Dict]] = None, optim_kwargs: Optional[Dict[str, Any]] = None, ): optim_kwargs = {} if optim_kwargs is None else optim_kwargs.copy() - if self.with_distributed_adam: - optim_kwargs['process_group'] = parallel_state.get_data_parallel_group() + if self.with_distributed_adam and self.megatron_amp_o2: optim_kwargs['param_sync_dtype'] = self.autocast_dtype - optim_kwargs['contiguous_grad_buffer'] = True + optim_kwargs['contiguous_grad_buffer'] = True # Needed to allocate main grads if self.autocast_dtype == torch.float: optim_kwargs['store_params'] = False elif self.autocast_dtype == torch.float16: @@ -234,15 +228,23 @@ def training_step(self, batch, batch_idx): sequence_parallel_enabled=self.cfg.get('sequence_parallel', False), ) else: - # no pipeline parallelism so we reduce grads asynchronously if not using sequence parallelism - if self.megatron_amp_o2 and not self.cfg.get('sequence_parallel', False): - if self.with_distributed_adam: + # no pipeline parallelism so we reduce grads + # asynchronously + if self.with_distributed_adam: + if self.megatron_amp_o2: + # copy grads to main grad custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=True) else: - custom_sync_context_handler = self._optimizer.no_sync + # keep grad tensors around + custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=False) else: - # TODO: enable async grad all reduce for O1/autocast mixed precision training - custom_sync_context_handler = None + if self.megatron_amp_o2 and not self.cfg.get('sequence_parallel', False): + custom_sync_context_handler = self._optimizer.no_sync + else: + # TODO: enable async grad all reduce for + # O1/autocast mixed precision training and + # sequence parallelism + custom_sync_context_handler = None losses_reduced_per_micro_batch = forward_backward_no_pipelining( forward_step_func=self.get_forward_output_and_loss_func(), batch=batch_for_pipeline, @@ -649,16 +651,15 @@ def configure_optimizers(self): if self.with_distributed_adam: - # Initialize params in reverse order - # Note: Estimate order in which grads are generated in - # backward pass - self._optimizer.init_params(reversed(list(self.parameters()))) - # Overlapped communication interferes with grad reductions # for pipeline parallelism and sequence parallelism if self.cfg.get('pipeline_model_parallel_size', 1) > 1 or self.cfg.get('sequence_parallel', False): self._optimizer.overlap_grad_sync = False + if self.megatron_amp_o2: + # Initialize params so that main grads are available + self._optimizer.init_params(reversed(list(self.parameters()))) + return retval def compute_consumed_samples(self, steps_since_resume=0): diff --git a/nemo/core/optim/optimizers.py b/nemo/core/optim/optimizers.py index f6b0bd4239fc..5eff59f2b4ec 100644 --- a/nemo/core/optim/optimizers.py +++ b/nemo/core/optim/optimizers.py @@ -45,6 +45,7 @@ try: from apex.optimizers import FusedLAMB from apex.optimizers import FusedAdam + from apex.transformer import parallel_state HAVE_APEX = True @@ -66,9 +67,15 @@ # Wrapper class that supports main_grad buffer # Note: main_grad buffer is used for O2-style optimizations class MegatronDistributedFusedAdam(DistributedFusedAdam): + def __init__(self, *args, **kwargs): + if ('process_group' not in kwargs + and not parallel_state.is_unitialized()): + kwargs['process_group'] = parallel_state.get_data_parallel_group() + super().__init__(*args, **kwargs) def _init_param_state(self, param, param_group_id, param_id): super()._init_param_state(param, param_group_id, param_id) - param.main_grad = self.grad_buffer_view(param) + if self.contiguous_grad_buffer: + param.main_grad = self.grad_buffer_view(param) AVAILABLE_OPTIMIZERS['distributed_fused_adam'] = MegatronDistributedFusedAdam From 59a7859caa0503bbcc19f8e705ae93342399bea7 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Thu, 25 Aug 2022 15:14:58 -0700 Subject: [PATCH 03/14] Add support for dist Adam in Megatron-LM models Signed-off-by: Tim Moon --- Dockerfile | 5 +- Jenkinsfile | 4 +- .../language_modeling/megatron_base_model.py | 50 +++++++++++++++---- .../language_modeling/megatron_gpt_model.py | 35 +------------ .../megatron_lm_encoder_decoder_model.py | 39 ++++++++++----- .../megatron_retrieval_model.py | 20 +++++--- nemo/collections/nlp/parts/nlp_overrides.py | 6 ++- nemo/core/optim/optimizers.py | 4 +- 8 files changed, 90 insertions(+), 73 deletions(-) diff --git a/Dockerfile b/Dockerfile index 384aae8704cb..b706d697fb2a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,11 +32,10 @@ RUN apt-get update && \ python-dev ffmpeg && \ rm -rf /var/lib/apt/lists/* -# FIXME a workaround to update apex. Remove when base image is updated WORKDIR /tmp/ -RUN git clone https://github.com/timmoon10/apex.git && \ +RUN git clone https://github.com/NVIDIA/apex.git && \ cd apex && \ - git checkout 2d5ebb793ca212773bc0af4b15afaee2cea5e82f && \ + git checkout 6f7d8ac3dfb279b314ac4ea23b9c2175b066f7ff && \ pip install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ # uninstall stuff from base container diff --git a/Jenkinsfile b/Jenkinsfile index ba19180bcb5f..51065959de2f 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -2,7 +2,7 @@ pipeline { agent { docker { //image 'nvcr.io/nvidia/pytorch:22.05-py3' - image 'gitlab-master.nvidia.com:5005/eharper/nemo_containers:megatron_gpt_v16' + image 'gitlab-master.nvidia.com:5005/tmoon/containers:nemo_dist_adam' args '--device=/dev/nvidia0 --gpus all -e TRANSFORMERS_OFFLINE=1 --user 0:128 -v /home/TestData:/home/TestData -v $HOME/.cache:/root/.cache --shm-size=8g' } } @@ -465,7 +465,7 @@ pipeline { sh 'rm -rf examples/speaker_tasks/diarization/speaker_diarization_results' } } - + stage('Multispeaker ASR Data Simulation') { steps { sh 'python tools/speech_data_simulator/multispeaker_simulator.py \ diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index d5ef8323c497..609d3b53c017 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -14,7 +14,7 @@ import os -from typing import Optional +from typing import Any, Dict, Optional, Union import torch from omegaconf import open_dict @@ -51,9 +51,13 @@ class MegatronBaseModel(NLPModel): 1. Initialize the model parallel for nemo given the model parallel parameters. 2. Turn on all the nvidia optimizations. 3. If `cfg.tokenizer` is available, it loads the tokenizer and pad the vocab to the correct size for tensor model parallelism. - 4. It help to run `configure_gradient_clipping`, if `grad_clip_pl_default` is set True, it uses the pytorch lightning default - gradient clipping. Or if `megatron_amp_o2` is set True, it uses the parameters from optimizer to clip the gradients. - Otherwise, it uses the parameters calculated in the `setup_optimizer_param_groups` method. + 4. If using distributed optimizer, configure to be compatible with + O2-level optimizations and/or model parallelism. + 5. Perform gradient clipping: `grad_clip_pl_default` triggers the + PyTorch Lightning default implementation, `with_distributed_adam` + triggers the distributed optimizer's implementation, + `megatron_amp_o2` triggers gradient clipping on the main grads, + and otherwise gradient clipping is performed on the model grads. """ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True): @@ -71,6 +75,8 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True): self._validate_config() + self.with_distributed_adam = cfg.optim.get('name') == 'distributed_fused_adam' + # used in NVIDIA NGC PyTorch containers self._enable_nvidia_optimizations() @@ -213,7 +219,7 @@ def configure_gradient_clipping(self, *args, **kwargs): # use the default behavior return super().configure_gradient_clipping(*args, **kwargs) - if hasattr(self, 'with_distributed_adam') and self.with_distributed_adam: + if self.with_distributed_adam: grad_norm = clip_grad_norm_distributed_optimizer(self._optimizer, clip_val) else: if self.megatron_amp_o2: @@ -287,15 +293,28 @@ def on_train_batch_end(self, outputs, batch, batch_idx: int, unused: Optional[in # accumulated gradient updates. grad_scaler.optimizer_update_skipped = None + def setup_optimization( + self, optim_config: Optional[Union[DictConfig, Dict]] = None, optim_kwargs: Optional[Dict[str, Any]] = None, + ): + optim_kwargs = {} if optim_kwargs is None else optim_kwargs.copy() + if self.with_distributed_adam and self.megatron_amp_o2: + optim_kwargs['contiguous_grad_buffer'] = True # Needed to allocate main grads + if hasattr(self, 'autocast_dtype'): + optim_kwargs['param_sync_dtype'] = self.autocast_dtype + if self.autocast_dtype == torch.float: + optim_kwargs['store_params'] = False + elif self.autocast_dtype == torch.float16: + optim_kwargs['store_params'] = True + elif self.autocast_dtype == torch.bfloat16: + optim_kwargs['store_params'] = False + optim_kwargs['store_param_remainders'] = True + return super().setup_optimization(optim_config=optim_config, optim_kwargs=optim_kwargs) + def configure_optimizers(self): self.setup_optimization() # Wrap the baseline optimizer with the optimizer class with master parameters - if ( - self.megatron_amp_o2 - and not (hasattr(self, 'with_distributed_adam') and self.with_distributed_adam) - and self._optimizer is not None - ): + if self.megatron_amp_o2 and not self.with_distributed_adam and self._optimizer is not None: if self.cfg.precision == 'bf16': fp32_grad_accum = True contiguous_grad_bucket = True @@ -340,6 +359,17 @@ def configure_optimizers(self): optimizer=self._optimizer, scheduler_config=sched_config, train_dataloader=self._train_dl ) + # Configure distributed optimizer + if self.with_distributed_adam: + # Overlapped communication interferes with grad reductions + # for pipeline parallelism and sequence parallelism + if self.cfg.get('pipeline_model_parallel_size', 1) > 1 or self.cfg.get('sequence_parallel', False): + self._optimizer.overlap_grad_sync = False + + if self.megatron_amp_o2: + # Initialize params so that main grads are available + self._optimizer.init_params(reversed(list(self.parameters()))) + if self._scheduler is None: return self._optimizer else: diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 974ab8499e88..9553ffd7398c 100755 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -13,7 +13,7 @@ # limitations under the License. import re -from typing import Any, Dict, List, Optional, Union +from typing import Any, List, Optional, Union import torch from omegaconf.dictconfig import DictConfig @@ -88,7 +88,6 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): # self.setup_optimizer_param_groups() self.megatron_amp_o2 = cfg.get('megatron_amp_O2', False) - self.with_distributed_adam = cfg.optim.get('name') == 'distributed_fused_adam' if self.megatron_amp_o2: @@ -174,22 +173,6 @@ def setup_optimizer_param_groups(self): else: self._optimizer_param_groups = get_params_for_weight_decay_optimization([self.model]) - def setup_optimization( - self, optim_config: Optional[Union[DictConfig, Dict]] = None, optim_kwargs: Optional[Dict[str, Any]] = None, - ): - optim_kwargs = {} if optim_kwargs is None else optim_kwargs.copy() - if self.with_distributed_adam and self.megatron_amp_o2: - optim_kwargs['param_sync_dtype'] = self.autocast_dtype - optim_kwargs['contiguous_grad_buffer'] = True # Needed to allocate main grads - if self.autocast_dtype == torch.float: - optim_kwargs['store_params'] = False - elif self.autocast_dtype == torch.float16: - optim_kwargs['store_params'] = True - elif self.autocast_dtype == torch.bfloat16: - optim_kwargs['store_params'] = False - optim_kwargs['store_param_remainders'] = True - return super().setup_optimization(optim_config=optim_config, optim_kwargs=optim_kwargs) - def forward(self, tokens, text_position_ids, attention_mask, labels): output_tensor = self.model(tokens, text_position_ids, attention_mask, labels=labels) return output_tensor @@ -646,22 +629,6 @@ def setup_test_data(self, cfg): ) self._test_dl = self.build_pretraining_data_loader(self._test_ds, consumed_samples) - def configure_optimizers(self): - retval = super().configure_optimizers() - - if self.with_distributed_adam: - - # Overlapped communication interferes with grad reductions - # for pipeline parallelism and sequence parallelism - if self.cfg.get('pipeline_model_parallel_size', 1) > 1 or self.cfg.get('sequence_parallel', False): - self._optimizer.overlap_grad_sync = False - - if self.megatron_amp_o2: - # Initialize params so that main grads are available - self._optimizer.init_params(reversed(list(self.parameters()))) - - return retval - def compute_consumed_samples(self, steps_since_resume=0): app_state = AppState() consumed_samples = ( diff --git a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py index eee709d8be6b..9572aa928aad 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py @@ -106,8 +106,9 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): if self.megatron_amp_o2: - # Pre-allocate the model on GPU to have master parameters allocated on the same device with matching data type - self.enc_dec_model.cuda(torch.cuda.current_device()) + if not self.with_distributed_adam: + # Pre-allocate the model on GPU to have master parameters allocated on the same device with matching data type + self.enc_dec_model.cuda(torch.cuda.current_device()) # Model wrapper to convert both model and inputs to half precision self.enc_dec_model = Float16Module(module=self.enc_dec_model, precision=cfg.precision) @@ -263,12 +264,23 @@ def training_step(self, batch, batch_idx): grad_scaler=self.trainer.precision_plugin.scaler if self.cfg.precision == 16 else None, ) else: - # no pipeline parallelism so we reduce grads asynchronously - if self.megatron_amp_o2: - custom_sync_context_handler = self._optimizer.no_sync + # no pipeline parallelism so we reduce grads + # asynchronously + if self.with_distributed_adam: + if self.megatron_amp_o2: + # copy grads to main grad + custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=True) + else: + # keep grad tensors around + custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=False) else: - # TODO: enable async grad all reduce for O1/autocast mixed precision training - custom_sync_context_handler = None + if self.megatron_amp_o2: + custom_sync_context_handler = self._optimizer.no_sync + else: + # TODO: enable async grad all reduce for + # O1/autocast mixed precision training and + # sequence parallelism + custom_sync_context_handler = None losses_reduced_per_micro_batch = forward_backward_no_pipelining( forward_step_func=self.get_forward_output_and_loss_func(), batch=batch_for_pipeline, @@ -290,7 +302,10 @@ def training_step(self, batch, batch_idx): else: loss_mean = torch.tensor(0.0).cuda() - if self.megatron_amp_o2: + if self.with_distributed_adam: + # gradients are reduced internally in distributed optimizer + pass + elif self.megatron_amp_o2: # when using pipeline parallelism grads must be reduced after the pipeline (not asynchronously) if self.cfg.get('pipeline_model_parallel_size', 1) > 1: # main grads are stored in the MainParamsOptimizer wrapper @@ -427,7 +442,7 @@ def _kwargs_to_arg_idx(self): """ Returns a dict {kwarg name: arg index} to be used when mapping kwargs into a list of args. - + Computed on first call, and then cached. """ # build mapping of kwargs to arg index at first run @@ -439,7 +454,7 @@ def _kwargs_to_arg_idx(self): def _build_forward_args_from_kwargs(self, args_name, args, **kwargs): """ A helper method that converts arguments into positional arguments (by name) - + args - a list of arguments to pass to self.enc_dec_model (tensors from batch) args_name - a list of argument name (to be matched against allowed kwargs) kwargs - a dict {arg name: arg value} (used for non-tensor values) @@ -846,7 +861,7 @@ def predict_step(self, batch: Any, batch_idx: int, dataloader_idx: Optional[int] def encode(self, tokens_enc, enc_mask, encoder_input=None, reconfigure_microbatch=True): """ - tokens_enc - encoder input tokens + tokens_enc - encoder input tokens enc_mask - corresponding mask encoder_input - encoder input (bypass tokens), if given tokens_enc can be None. """ @@ -1168,7 +1183,7 @@ def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_ """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device When using pipeline parallelism, we need the global batch to remain on the CPU, since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch diff --git a/nemo/collections/nlp/models/language_modeling/megatron_retrieval_model.py b/nemo/collections/nlp/models/language_modeling/megatron_retrieval_model.py index a2b6bf630af0..880a59efc00c 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_retrieval_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_retrieval_model.py @@ -70,8 +70,9 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): if self.megatron_amp_o2: - # Pre-allocate the model on GPU to have master parameters allocated on the same device with matching data type - self.model.cuda(torch.cuda.current_device()) + if not self.with_distributed_adam: + # Pre-allocate the model on GPU to have master parameters allocated on the same device with matching data type + self.model.cuda(torch.cuda.current_device()) # Model wrapper to convert both model and inputs to half precision self.model = Float16Module(module=self.model, precision=self.cfg.precision) @@ -200,13 +201,16 @@ def training_step(self, batch, batch_idx): loss_scale = self.trainer.precision_plugin.scaler._scale if loss_scale is not None: self.log('loss_scale', loss_scale) - # while async grad allreduce is enabled, bprop will keep moving forward without waiting for - # the finish of async grad AR works. Hence, to guarantee the correctness of grads reduction, - # we cannot start weight update until all async grad AR works are done. - if self.megatron_amp_o2 and self.cfg.get('pipeline_model_parallel_size', 1) == 1: - torch.cuda.synchronize() - if self.megatron_amp_o2: + if self.with_distributed_adam: + # gradients are reduced internally in distributed optimizer + pass + elif self.megatron_amp_o2: + # while async grad allreduce is enabled, bprop will keep moving forward without waiting for + # the finish of async grad AR works. Hence, to guarantee the correctness of grads reduction, + # we cannot start weight update until all async grad AR works are done. + if self.cfg.get('pipeline_model_parallel_size', 1) == 1: + torch.cuda.synchronize() # when using pipeline parallelism grads must be reduced after the pipeline (not asynchronously) if self.cfg.get('pipeline_model_parallel_size', 1) > 1: # main grads are stored in the MainParamsOptimizer wrapper diff --git a/nemo/collections/nlp/parts/nlp_overrides.py b/nemo/collections/nlp/parts/nlp_overrides.py index 2738041ecf85..b699649a2b23 100644 --- a/nemo/collections/nlp/parts/nlp_overrides.py +++ b/nemo/collections/nlp/parts/nlp_overrides.py @@ -94,8 +94,10 @@ def configure_ddp(self): Sets find_unused_parameters to False to use activation-checkpoint-recomputation. """ - if hasattr(self.model, 'megatron_amp_o2') and self.model.megatron_amp_o2: - # do not use DDP if using megatron amp O2 + if (hasattr(self.model, 'megatron_amp_o2') and self.model.megatron_amp_o2) or ( + hasattr(self.model, 'with_distributed_adam') and self.model.with_distributed_adam + ): + # do not use DDP if using megatron amp O2 or distributed optimizer self._model = LightningDistributedModule(self.model) else: app_state = AppState() diff --git a/nemo/core/optim/optimizers.py b/nemo/core/optim/optimizers.py index 5eff59f2b4ec..bdd36a8731df 100644 --- a/nemo/core/optim/optimizers.py +++ b/nemo/core/optim/optimizers.py @@ -68,10 +68,10 @@ # Note: main_grad buffer is used for O2-style optimizations class MegatronDistributedFusedAdam(DistributedFusedAdam): def __init__(self, *args, **kwargs): - if ('process_group' not in kwargs - and not parallel_state.is_unitialized()): + if 'process_group' not in kwargs and not parallel_state.is_unitialized(): kwargs['process_group'] = parallel_state.get_data_parallel_group() super().__init__(*args, **kwargs) + def _init_param_state(self, param, param_group_id, param_id): super()._init_param_state(param, param_group_id, param_id) if self.contiguous_grad_buffer: From 36109c1fb58cbd9f6318c7c085d4350cc544e5ba Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Fri, 26 Aug 2022 16:17:29 -0700 Subject: [PATCH 04/14] Debug dist Adam support without Megatron AMP O2 Signed-off-by: Tim Moon --- .../language_modeling/megatron_base_model.py | 37 ++++++++++++------- nemo/core/optim/optimizers.py | 9 +++-- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index 609d3b53c017..cc7aa82b72c4 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -297,17 +297,26 @@ def setup_optimization( self, optim_config: Optional[Union[DictConfig, Dict]] = None, optim_kwargs: Optional[Dict[str, Any]] = None, ): optim_kwargs = {} if optim_kwargs is None else optim_kwargs.copy() - if self.with_distributed_adam and self.megatron_amp_o2: - optim_kwargs['contiguous_grad_buffer'] = True # Needed to allocate main grads - if hasattr(self, 'autocast_dtype'): - optim_kwargs['param_sync_dtype'] = self.autocast_dtype - if self.autocast_dtype == torch.float: - optim_kwargs['store_params'] = False - elif self.autocast_dtype == torch.float16: - optim_kwargs['store_params'] = True - elif self.autocast_dtype == torch.bfloat16: - optim_kwargs['store_params'] = False - optim_kwargs['store_param_remainders'] = True + if self.with_distributed_adam: + + # Allocate grads since we are storing between microbatches + optim_kwargs['contiguous_grad_buffer'] = True + + if self.megatron_amp_o2: + # Match param allgather with model dtype + if hasattr(self, 'autocast_dtype'): + optim_kwargs['param_sync_dtype'] = self.autocast_dtype + if self.autocast_dtype == torch.float: + optim_kwargs['store_params'] = False + elif self.autocast_dtype == torch.float16: + optim_kwargs['store_params'] = True + elif self.autocast_dtype == torch.bfloat16: + optim_kwargs['store_params'] = False + optim_kwargs['store_param_remainders'] = True + else: + # Assume FP32 params, so no need to store main params + optim_kwargs['store_params'] = False + return super().setup_optimization(optim_config=optim_config, optim_kwargs=optim_kwargs) def configure_optimizers(self): @@ -365,10 +374,10 @@ def configure_optimizers(self): # for pipeline parallelism and sequence parallelism if self.cfg.get('pipeline_model_parallel_size', 1) > 1 or self.cfg.get('sequence_parallel', False): self._optimizer.overlap_grad_sync = False + self._optimizer.greedy_grad_copy = self.megatron_amp_o2 - if self.megatron_amp_o2: - # Initialize params so that main grads are available - self._optimizer.init_params(reversed(list(self.parameters()))) + # Initialize params so that main grads are available + self._optimizer.init_params(reversed(list(self.parameters()))) if self._scheduler is None: return self._optimizer diff --git a/nemo/core/optim/optimizers.py b/nemo/core/optim/optimizers.py index bdd36a8731df..dd9d924f4230 100644 --- a/nemo/core/optim/optimizers.py +++ b/nemo/core/optim/optimizers.py @@ -72,10 +72,13 @@ def __init__(self, *args, **kwargs): kwargs['process_group'] = parallel_state.get_data_parallel_group() super().__init__(*args, **kwargs) - def _init_param_state(self, param, param_group_id, param_id): - super()._init_param_state(param, param_group_id, param_id) + def zero_grad(self, *args, **kwargs): + super().zero_grad(*args, **kwargs) if self.contiguous_grad_buffer: - param.main_grad = self.grad_buffer_view(param) + for param in self.parameters(): + param.main_grad = self.grad_buffer_view(param) + if param.dtype == param.main_grad.dtype and param.is_cuda: + param.grad = param.main_grad AVAILABLE_OPTIMIZERS['distributed_fused_adam'] = MegatronDistributedFusedAdam From 065a89b686c1d2a494ec0ec19559fb221cd55835 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Wed, 7 Sep 2022 15:47:34 -0700 Subject: [PATCH 05/14] Add support for overlapped grad sync with pipeline parallelism in GPT-3 Requires dist Adam optimizer. Signed-off-by: Tim Moon --- .../language_modeling/megatron_base_model.py | 29 ++++++++-- .../language_modeling/megatron_gpt_model.py | 51 ++++++++++------- nemo/core/optim/distributed_adam.py | 55 +++++++++++++++++++ nemo/core/optim/optimizers.py | 22 +------- 4 files changed, 112 insertions(+), 45 deletions(-) create mode 100644 nemo/core/optim/distributed_adam.py diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index cc7aa82b72c4..207a3a45464a 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -370,14 +370,31 @@ def configure_optimizers(self): # Configure distributed optimizer if self.with_distributed_adam: - # Overlapped communication interferes with grad reductions - # for pipeline parallelism and sequence parallelism - if self.cfg.get('pipeline_model_parallel_size', 1) > 1 or self.cfg.get('sequence_parallel', False): - self._optimizer.overlap_grad_sync = False - self._optimizer.greedy_grad_copy = self.megatron_amp_o2 + # Disable async grad reductions for params that are + # synchronized for pipeline parallelism and sequence + # parallelism + if parallel_state.get_pipeline_model_parallel_world_size() > 1 and ( + parallel_state.is_pipeline_first_stage() or parallel_state.is_pipeline_last_stage() + ): + if self.model.share_token_embeddings: + param = self.model.word_embeddings_weight() + param._disable_greedy_grad_copy = not self.megatron_amp_o2 + param._disable_overlap_grad_sync = True + # Initialize embedding grad first to minimize + # number of buckets needed to hold + self._optimizer.init_params([param]) + for param in self.model.parameters(): + if getattr(param, 'sequence_parallel_enabled', False): + param._disable_greedy_grad_copy = not self.megatron_amp_o2 + param._disable_overlap_grad_sync = True # Initialize params so that main grads are available - self._optimizer.init_params(reversed(list(self.parameters()))) + # Note: Consolidate grads without overlap + self._optimizer.init_params( + p for p in self.parameters() + if getattr(p, '_disable_overlap_grad_sync', False) + ) + self._optimizer.init_params(self.parameters()) if self._scheduler is None: return self._optimizer diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 9553ffd7398c..c9d159f1c977 100755 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -199,6 +199,25 @@ def training_step(self, batch, batch_idx): batch_for_pipeline = self.process_global_batch(batch) tensor_shape = [self.cfg.encoder_seq_length, self.cfg.micro_batch_size, self.cfg.hidden_size] + # handle asynchronous grad reduction + if self.with_distributed_adam: + if self.megatron_amp_o2: + # copy grads to main grad + custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=True) + else: + # keep grad tensors around + custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=False) + else: + if (self.megatron_amp_o2 + and self.cfg.get('pipeline_model_parallel_size', 1) == 1 + and not self.cfg.get('sequence_parallel', False)): + custom_sync_context_handler = self._optimizer.no_sync + else: + # TODO: enable async grad all reduce with O1/autocast + # mixed precision training, with pipeline parallelism, + # or with sequence parallelism + custom_sync_context_handler = None + if self.cfg.get('pipeline_model_parallel_size', 1) > 1: losses_reduced_per_micro_batch = forward_backward_pipelining_without_interleaving( forward_step_func=self.get_forward_output_and_loss_func(), @@ -209,25 +228,9 @@ def training_step(self, batch, batch_idx): dtype=self.autocast_dtype, grad_scaler=self.trainer.precision_plugin.scaler if self.cfg.precision == 16 else None, sequence_parallel_enabled=self.cfg.get('sequence_parallel', False), + custom_sync_context_handler=custom_sync_context_handler, ) else: - # no pipeline parallelism so we reduce grads - # asynchronously - if self.with_distributed_adam: - if self.megatron_amp_o2: - # copy grads to main grad - custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=True) - else: - # keep grad tensors around - custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=False) - else: - if self.megatron_amp_o2 and not self.cfg.get('sequence_parallel', False): - custom_sync_context_handler = self._optimizer.no_sync - else: - # TODO: enable async grad all reduce for - # O1/autocast mixed precision training and - # sequence parallelism - custom_sync_context_handler = None losses_reduced_per_micro_batch = forward_backward_no_pipelining( forward_step_func=self.get_forward_output_and_loss_func(), batch=batch_for_pipeline, @@ -253,8 +256,18 @@ def training_step(self, batch, batch_idx): self.allreduce_sequence_parallel_gradients() if self.with_distributed_adam: - # gradients are reduced internally in distributed optimizer - pass + # launch grad reductions + if not parallel_state.is_pipeline_first_stage(): + # first pipeline stage overlaps backward pass with + # grad reductions + self._optimizer.try_grad_sync( + p for p in self.parameters() + if not getattr(p, '_disable_overlap_grad_sync', False) + ) + self._optimizer.try_grad_sync( + p for p in self.parameters() + if getattr(p, 'sequence_parallel_enabled', False) + ) elif self.megatron_amp_o2: # when using pipeline parallelism grads must be all-reduced after the pipeline (not asynchronously) if self.cfg.get('pipeline_model_parallel_size', 1) > 1 or self.cfg.get('sequence_parallel', False): diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py new file mode 100644 index 000000000000..4e0b4607ec9f --- /dev/null +++ b/nemo/core/optim/distributed_adam.py @@ -0,0 +1,55 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from apex.transformer import parallel_state +from apex.contrib.optimizers.distributed_fused_adam import DistributedFusedAdam + +# Wrapper class that supports main_grad buffer +# Note: main_grad buffer is used for O2-style optimizations +class MegatronDistributedFusedAdam(DistributedFusedAdam): + def __init__(self, *args, **kwargs): + if 'process_group' not in kwargs and not parallel_state.is_unitialized(): + kwargs['process_group'] = parallel_state.get_data_parallel_group() + super().__init__(*args, **kwargs) + + def _make_post_backward_hook(self, param, param_group_id, param_id): + def hook(*unused): + with self._lock: + need_to_initialize = 'fragments' not in self.state[param] + if need_to_initialize: + self._init_param_state(param, param_group_id, param_id) + if (self.greedy_grad_copy + and not getattr(param, '_disable_greedy_grad_copy', False)): + self._grad_copy(param) + if (self.overlap_grad_sync + and not getattr(param, '_disable_overlap_grad_sync', False)): + self._try_start_bucket_grad_sync( + params=[param], + ignore_last_bucket=need_to_initialize, + ) + return hook + + def try_grad_sync(self, params): + params = list(params) + for p in params: + self._grad_copy(p) + self._try_start_bucket_grad_sync(params=params) + + def zero_grad(self, *args, **kwargs): + super().zero_grad(*args, **kwargs) + if self.contiguous_grad_buffer: + for param in self.parameters(): + param.main_grad = self.grad_buffer_view(param) + if param.dtype == param.main_grad.dtype and param.is_cuda: + param.grad = param.main_grad diff --git a/nemo/core/optim/optimizers.py b/nemo/core/optim/optimizers.py index dd9d924f4230..12677655695b 100644 --- a/nemo/core/optim/optimizers.py +++ b/nemo/core/optim/optimizers.py @@ -58,30 +58,12 @@ HAVE_APEX_DISTRIBUTED_ADAM = False if HAVE_APEX: try: - - # Try importing Apex distributed Adam optimizer - from apex.contrib.optimizers.distributed_fused_adam import DistributedFusedAdam + # Try importing wrapper for Apex distributed Adam optimizer + from nemo.core.optim.distributed_adam import MegatronDistributedFusedAdam HAVE_APEX_DISTRIBUTED_ADAM = True - # Wrapper class that supports main_grad buffer - # Note: main_grad buffer is used for O2-style optimizations - class MegatronDistributedFusedAdam(DistributedFusedAdam): - def __init__(self, *args, **kwargs): - if 'process_group' not in kwargs and not parallel_state.is_unitialized(): - kwargs['process_group'] = parallel_state.get_data_parallel_group() - super().__init__(*args, **kwargs) - - def zero_grad(self, *args, **kwargs): - super().zero_grad(*args, **kwargs) - if self.contiguous_grad_buffer: - for param in self.parameters(): - param.main_grad = self.grad_buffer_view(param) - if param.dtype == param.main_grad.dtype and param.is_cuda: - param.grad = param.main_grad - AVAILABLE_OPTIMIZERS['distributed_fused_adam'] = MegatronDistributedFusedAdam - except (ImportError, ModuleNotFoundError): logging.warning("Could not import distributed_fused_adam optimizer from Apex") From 1ec40b1f07f274236ee493ee2ca60c2f32a09bb4 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Thu, 8 Sep 2022 16:12:11 -0700 Subject: [PATCH 06/14] Debug dist Adam support for T5 Signed-off-by: Tim Moon --- examples/nlp/language_modeling/megatron_t5_pretraining.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/nlp/language_modeling/megatron_t5_pretraining.py b/examples/nlp/language_modeling/megatron_t5_pretraining.py index 462cc62d28eb..4f044cb3c34d 100644 --- a/examples/nlp/language_modeling/megatron_t5_pretraining.py +++ b/examples/nlp/language_modeling/megatron_t5_pretraining.py @@ -38,6 +38,7 @@ def main(cfg) -> None: logging.info(f'\n{OmegaConf.to_yaml(cfg)}') megatron_amp_o2 = cfg.model.get('megatron_amp_O2', False) + with_distributed_adam = cfg.model.optim.get('name') == 'distributed_fused_adam' plugins = [] strategy = NLPDDPStrategy( no_ddp_communication_hook=True, # we don't use DDP for async grad allreduce @@ -52,7 +53,7 @@ def main(cfg) -> None: growth_interval=cfg.model.get('native_amp_growth_interval', 1000), hysteresis=cfg.model.get('hysteresis', 2), ) - if megatron_amp_o2: + if megatron_amp_o2 and not with_distributed_adam: plugins.append(MegatronHalfPrecisionPlugin(precision=cfg.trainer.precision, device='cuda', scaler=scaler)) else: plugins.append(PipelineMixedPrecisionPlugin(precision=cfg.trainer.precision, device='cuda', scaler=scaler)) From e06d34a0919aa624b7782cf0b7c43f68ebee9d6f Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Thu, 8 Sep 2022 17:33:58 -0700 Subject: [PATCH 07/14] Add support for overlapped grad sync with pipeline parallelism in T5 Requires dist Adam optimizer Signed-off-by: Tim Moon --- .../language_modeling/megatron_base_model.py | 20 +--- .../language_modeling/megatron_gpt_model.py | 40 +++++-- .../megatron_lm_encoder_decoder_model.py | 100 ++++++++++++++---- nemo/core/optim/distributed_adam.py | 13 ++- nemo/core/optim/optimizers.py | 1 - 5 files changed, 118 insertions(+), 56 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index 207a3a45464a..a7fafc20d1bb 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -370,29 +370,11 @@ def configure_optimizers(self): # Configure distributed optimizer if self.with_distributed_adam: - # Disable async grad reductions for params that are - # synchronized for pipeline parallelism and sequence - # parallelism - if parallel_state.get_pipeline_model_parallel_world_size() > 1 and ( - parallel_state.is_pipeline_first_stage() or parallel_state.is_pipeline_last_stage() - ): - if self.model.share_token_embeddings: - param = self.model.word_embeddings_weight() - param._disable_greedy_grad_copy = not self.megatron_amp_o2 - param._disable_overlap_grad_sync = True - # Initialize embedding grad first to minimize - # number of buckets needed to hold - self._optimizer.init_params([param]) - for param in self.model.parameters(): - if getattr(param, 'sequence_parallel_enabled', False): - param._disable_greedy_grad_copy = not self.megatron_amp_o2 - param._disable_overlap_grad_sync = True # Initialize params so that main grads are available # Note: Consolidate grads without overlap self._optimizer.init_params( - p for p in self.parameters() - if getattr(p, '_disable_overlap_grad_sync', False) + p for p in self.parameters() if getattr(p, '_disable_overlap_grad_sync', False) ) self._optimizer.init_params(self.parameters()) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index ed2d72af8446..0e9e785af9c0 100755 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -172,6 +172,29 @@ def setup_optimizer_param_groups(self): else: self._optimizer_param_groups = get_params_for_weight_decay_optimization([self.model]) + def configure_optimizers(self): + + if self.with_distributed_adam: + # Disable async grad reductions for params that are + # synchronized for pipeline parallelism and sequence + # parallelism + if ( + parallel_state.get_pipeline_model_parallel_world_size() > 1 + and (parallel_state.is_pipeline_first_stage() or parallel_state.is_pipeline_last_stage()) + and self.model.share_token_embeddings + ): + # Disable overlapped grad sync for embedding grad when + # pipeline parallelism is enabled + param = self.model.word_embeddings_weight() + param._disable_greedy_grad_copy = not self.megatron_amp_o2 + param._disable_overlap_grad_sync = True + for param in self.parameters(): + if getattr(param, 'sequence_parallel_enabled', False): + param._disable_greedy_grad_copy = not self.megatron_amp_o2 + param._disable_overlap_grad_sync = True + + return super().configure_optimizers() + def forward(self, tokens, text_position_ids, attention_mask, labels): output_tensor = self.model(tokens, text_position_ids, attention_mask, labels=labels) return output_tensor @@ -207,9 +230,11 @@ def training_step(self, batch, batch_idx): # keep grad tensors around custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=False) else: - if (self.megatron_amp_o2 + if ( + self.megatron_amp_o2 and self.cfg.get('pipeline_model_parallel_size', 1) == 1 - and not self.cfg.get('sequence_parallel', False)): + and not self.cfg.get('sequence_parallel', False) + ): custom_sync_context_handler = self._optimizer.no_sync else: # TODO: enable async grad all reduce with O1/autocast @@ -256,17 +281,12 @@ def training_step(self, batch, batch_idx): if self.with_distributed_adam: # launch grad reductions + # Note: grads in first pipeline stage have already been + # reduced if not parallel_state.is_pipeline_first_stage(): - # first pipeline stage overlaps backward pass with - # grad reductions self._optimizer.try_grad_sync( - p for p in self.parameters() - if not getattr(p, '_disable_overlap_grad_sync', False) + p for p in self.parameters() if not getattr(p, '_disable_overlap_grad_sync', False) ) - self._optimizer.try_grad_sync( - p for p in self.parameters() - if getattr(p, 'sequence_parallel_enabled', False) - ) elif self.megatron_amp_o2: # when using pipeline parallelism grads must be all-reduced after the pipeline (not asynchronously) if self.cfg.get('pipeline_model_parallel_size', 1) > 1 or self.cfg.get('sequence_parallel', False): diff --git a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py index da9a432520ff..4fba8bf6f7aa 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py @@ -124,6 +124,58 @@ def setup_optimizer_param_groups(self): """ModelPT override. Optimizer will get self._optimizer_param_groups""" self._optimizer_param_groups = get_params_for_weight_decay_optimization([self.enc_dec_model]) + def configure_optimizers(self): + + if self.with_distributed_adam: + + # Identify params that require grad reductions between + # pipeline stages + # See: allreduce_word_and_position_embeddings + model_parallel_params = [] + if parallel_state.get_pipeline_model_parallel_world_size() > 1 and ( + parallel_state.is_rank_in_embedding_group() + ): + if self.cfg.get('share_token_embeddings', True) and self.cfg.get( + 'share_decoder_tokens_head_embeddings', True + ): + model_parallel_params.append(self.enc_dec_model.word_embeddings_weight()) + if ( + parallel_state.is_rank_in_position_embedding_group() + and parallel_state.get_pipeline_model_parallel_world_size() > 1 + and parallel_state.get_pipeline_model_parallel_split_rank() is not None + and self.cfg.encoder.get('position_embedding_type') == 'learned_absolute' + and self.cfg.decoder.get('position_embedding_type') == 'learned_absolute' + ): + if self.cfg.get('share_token_embeddings', True): + model_parallel_params.append(self.enc_dec_model.position_embeddings_weight()) + if ( + parallel_state.get_pipeline_model_parallel_world_size() > 2 + and parallel_state.get_pipeline_model_parallel_split_rank() is not None + ): + if ( + self.cfg.encoder.get('position_embedding_type') == 'relative' + and parallel_state.is_rank_in_encoder_relative_position_embedding_group() + and parallel_state.get_pipeline_model_parallel_split_rank() > 1 + ): + model_parallel_params.append(self.enc_dec_model.encoder_relative_position_embeddings_weight()) + if ( + self.cfg.decoder.get('position_embedding_type') == 'relative' + and parallel_state.is_rank_in_decoder_relative_position_embedding_group() + ): + model_parallel_params.append(self.enc_dec_model.decoder_relative_position_embeddings_weight()) + if not self.cfg.decoder.get('relative_position_bias_self_attention_only', True): + model_parallel_params.append( + self.enc_dec_model.decoder_cross_attention_relative_position_embeddings_weight() + ) + + # Disable async grad reductions for params that are + # synchronized for pipeline parallelism + for param in model_parallel_params: + param._disable_greedy_grad_copy = not self.megatron_amp_o2 + param._disable_overlap_grad_sync = True + + return super().configure_optimizers() + def _handle_bias_activation_fusion_args(self, cfg): # For oldest models, we don't have the option to turn on/off bias activation fusion. It is always on. if not hasattr(cfg, 'bias_gelu_fusion') and not hasattr(cfg, 'bias_activation_fusion'): @@ -248,6 +300,27 @@ def training_step(self, batch, batch_idx): decoder_seq_length = batch_for_pipeline[1].size(1) tensor_shape = [encoder_seq_length, get_micro_batch_size(), self.cfg.encoder.hidden_size] + # handle asynchronous grad reduction + if self.with_distributed_adam: + if self.megatron_amp_o2: + # copy grads to main grad + custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=True) + else: + # keep grad tensors around + custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=False) + else: + if ( + self.megatron_amp_o2 + and self.cfg.get('pipeline_model_parallel_size', 1) == 1 + and not self.cfg.get('sequence_parallel', False) + ): + custom_sync_context_handler = self._optimizer.no_sync + else: + # TODO: enable async grad all reduce with O1/autocast + # mixed precision training, with pipeline parallelism, + # or with sequence parallelism + custom_sync_context_handler = None + if self.cfg.get('pipeline_model_parallel_size', 1) > 1: losses_reduced_per_micro_batch = forward_backward_pipelining_without_interleaving( forward_step_func=self.get_forward_output_and_loss_func(), @@ -258,25 +331,9 @@ def training_step(self, batch, batch_idx): decoder_sequence_length=decoder_seq_length, dtype=self.autocast_dtype, grad_scaler=self.trainer.precision_plugin.scaler if self.cfg.precision == 16 else None, + custom_sync_context_handler=custom_sync_context_handler, ) else: - # no pipeline parallelism so we reduce grads - # asynchronously - if self.with_distributed_adam: - if self.megatron_amp_o2: - # copy grads to main grad - custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=True) - else: - # keep grad tensors around - custom_sync_context_handler = lambda: self._optimizer.no_sync(greedy_grad_copy=False) - else: - if self.megatron_amp_o2: - custom_sync_context_handler = self._optimizer.no_sync - else: - # TODO: enable async grad all reduce for - # O1/autocast mixed precision training and - # sequence parallelism - custom_sync_context_handler = None losses_reduced_per_micro_batch = forward_backward_no_pipelining( forward_step_func=self.get_forward_output_and_loss_func(), batch=batch_for_pipeline, @@ -299,8 +356,13 @@ def training_step(self, batch, batch_idx): loss_mean = torch.tensor(0.0).cuda() if self.with_distributed_adam: - # gradients are reduced internally in distributed optimizer - pass + # launch grad reductions + # Note: grads in first pipeline stage have already been + # reduced + if not parallel_state.is_pipeline_first_stage(): + self._optimizer.try_grad_sync( + p for p in self.parameters() if not getattr(p, '_disable_overlap_grad_sync', False) + ) elif self.megatron_amp_o2: # when using pipeline parallelism grads must be reduced after the pipeline (not asynchronously) if self.cfg.get('pipeline_model_parallel_size', 1) > 1: diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index 4e0b4607ec9f..4098c55c52df 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from apex.transformer import parallel_state from apex.contrib.optimizers.distributed_fused_adam import DistributedFusedAdam +from apex.transformer import parallel_state + # Wrapper class that supports main_grad buffer # Note: main_grad buffer is used for O2-style optimizations @@ -29,15 +30,13 @@ def hook(*unused): need_to_initialize = 'fragments' not in self.state[param] if need_to_initialize: self._init_param_state(param, param_group_id, param_id) - if (self.greedy_grad_copy - and not getattr(param, '_disable_greedy_grad_copy', False)): + if self.greedy_grad_copy and not getattr(param, '_disable_greedy_grad_copy', False): self._grad_copy(param) - if (self.overlap_grad_sync - and not getattr(param, '_disable_overlap_grad_sync', False)): + if self.overlap_grad_sync and not getattr(param, '_disable_overlap_grad_sync', False): self._try_start_bucket_grad_sync( - params=[param], - ignore_last_bucket=need_to_initialize, + params=[param], ignore_last_bucket=need_to_initialize, ) + return hook def try_grad_sync(self, params): diff --git a/nemo/core/optim/optimizers.py b/nemo/core/optim/optimizers.py index 041012b826b6..fd9f1b4672ea 100644 --- a/nemo/core/optim/optimizers.py +++ b/nemo/core/optim/optimizers.py @@ -45,7 +45,6 @@ try: from apex.optimizers import FusedLAMB from apex.optimizers import FusedAdam - from apex.transformer import parallel_state HAVE_APEX = True From 811b59cf0b2ec14fd65d7cc79193808c34b24231 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Tue, 20 Sep 2022 10:25:57 -0700 Subject: [PATCH 08/14] Update Apex commits in Dockerfile and Jenkinsfile Signed-off-by: Tim Moon --- Dockerfile | 2 +- Jenkinsfile | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Dockerfile b/Dockerfile index ea1c276d85da..2ca5194ea311 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,7 +36,7 @@ RUN apt-get update && \ WORKDIR /tmp/ RUN git clone https://github.com/NVIDIA/apex.git && \ cd apex && \ - git checkout 6f7d8ac3dfb279b314ac4ea23b9c2175b066f7ff && \ + git checkout 2b0e8371113fe70758f1964c40bf7dbe304fd9e6 && \ pip install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ # uninstall stuff from base container diff --git a/Jenkinsfile b/Jenkinsfile index c5cf88b8734e..e6f118351453 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,8 +1,8 @@ pipeline { agent { docker { - //image 'gitlab-master.nvidia.com:5005/eharper/nemo_containers:nemo_ci_pytorch_22.07_apex_3c19f1061879394f28272a99a7ea26d58f72dace' - image 'nvcr.io/nvidia/pytorch:22.08-py3' + // image 'nvcr.io/nvidia/pytorch:22.08-py3' + image 'gitlab-master.nvidia.com:5005/tmoon/containers/bignlp:dist_adam_overlap_pipeline' args '--device=/dev/nvidia0 --gpus all -e TRANSFORMERS_OFFLINE=1 --user 0:128 -v /home/TestData:/home/TestData -v $HOME/.cache:/root/.cache --shm-size=8g' } } @@ -406,7 +406,7 @@ pipeline { sh 'rm -rf examples/speaker_tasks/diarization/speaker_diarization_results' } } - + stage('Multispeaker ASR Data Simulation') { steps { sh 'python tools/speech_data_simulator/multispeaker_simulator.py \ @@ -1969,7 +1969,7 @@ pipeline { } } } - + stage('L2: Parallel Pretraining BERT pretraining from Text/Preprocessed') { when { anyOf { @@ -3183,8 +3183,8 @@ assert_frame_equal(training_curve, gt_curve, rtol=1e-3, atol=1e-3)"''' } } } - - + + // TODO: Add this test back. Test was failing on CI machines due to HW error // stage('L2: Megatron GPT Convert from Megatron-LM checkpoing and Eval') { From 8da1ac5ced35d81caacd56f08a7a6062d479aae5 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Fri, 14 Oct 2022 00:59:26 -0700 Subject: [PATCH 09/14] Support distributed Adam in Megatron grad scaler class. Signed-off-by: Tim Moon --- nemo/collections/nlp/parts/nlp_overrides.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/nemo/collections/nlp/parts/nlp_overrides.py b/nemo/collections/nlp/parts/nlp_overrides.py index 01af0383514e..cf814bf0d8e6 100644 --- a/nemo/collections/nlp/parts/nlp_overrides.py +++ b/nemo/collections/nlp/parts/nlp_overrides.py @@ -51,6 +51,16 @@ HAVE_APEX = False +HAVE_APEX_DISTRIBUTED_ADAM = False +if HAVE_APEX: + try: + from apex.contrib.optimizers.distributed_fused_adam import DistributedFusedAdam + + HAVE_APEX_DISTRIBUTED_ADAM = True + + except (ImportError, ModuleNotFoundError): + pass + class NLPDDPStrategy(DDPStrategy): """ DDP plugin for Pytorch Lightning. Needed to customize DDP for model parallel models. @@ -420,6 +430,12 @@ def __init__( self.hysteresis = hysteresis self._hysteresis_tracker = self.hysteresis + def unscale_(self, optimizer): + if HAVE_APEX_DISTRIBUTED_ADAM and isinstance(optimizer, DistributedFusedAdam) and self._enabled: + assert self._scale is not None + optimizer._grad_scale /= self._scale.view([]) + super().unscale_(optimizer) + def _maybe_opt_step(self, optimizer, optimizer_state, *args, **kwargs): retval = None found_inf = torch.cuda.FloatTensor([sum(v.item() for v in optimizer_state["found_inf_per_device"].values())]) From 5da9e4298de116edc330603fc1f91543778c7f22 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Wed, 19 Oct 2022 16:08:04 -0700 Subject: [PATCH 10/14] Update dist Adam to accommodate changes in GPT model Changes were made to support pipeline parallelism with interleaved pipeline parallelism. Distributed Adam does not support this currently. Signed-off-by: Tim Moon --- .../language_modeling/megatron_base_model.py | 12 +++- .../language_modeling/megatron_gpt_model.py | 59 ++++++++++++++----- 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index 4f85c04f7140..b48995d0a269 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import itertools import os import re from typing import Any, Dict, Optional, Union @@ -380,10 +381,17 @@ def configure_optimizers(self): # Initialize params so that main grads are available # Note: Consolidate grads without overlap + if isinstance(self.model, list): + params = list(itertools.chain.from_iterable( + module.parameters() for module in self.model + )) + else: + params = list(self.parameters()) self._optimizer.init_params( - p for p in self.parameters() if getattr(p, '_disable_overlap_grad_sync', False) + p for p in params + if getattr(p, '_disable_overlap_grad_sync', False) ) - self._optimizer.init_params(self.parameters()) + self._optimizer.init_params(params) if self._scheduler is None: return self._optimizer diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 09ae56f23411..d79538cb3706 100755 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import itertools from typing import Any, List, Optional, Union import torch @@ -182,20 +183,38 @@ def setup_optimizer_param_groups(self): def configure_optimizers(self): if self.with_distributed_adam: - # Disable async grad reductions for params that are - # synchronized for pipeline parallelism and sequence - # parallelism - if ( - parallel_state.get_pipeline_model_parallel_world_size() > 1 - and (parallel_state.is_pipeline_first_stage() or parallel_state.is_pipeline_last_stage()) - and self.model.share_token_embeddings - ): - # Disable overlapped grad sync for embedding grad when - # pipeline parallelism is enabled - param = self.model.word_embeddings_weight() - param._disable_greedy_grad_copy = not self.megatron_amp_o2 - param._disable_overlap_grad_sync = True - for param in self.parameters(): + + # Disable overlapped grad sync for embedding grad when + # pipeline parallelism is enabled + if parallel_state.get_pipeline_model_parallel_world_size() > 1: + if parallel_state.is_pipeline_first_stage(ignore_virtual=True): + if isinstance(self.model, list): + module = self.model[0] # only the first virtual rank has the embeddings + else: + module = self.model + if module.share_token_embeddings: + param = module.word_embeddings_weight() + param._disable_greedy_grad_copy = not self.megatron_amp_o2 + param._disable_overlap_grad_sync = True + if parallel_state.is_pipeline_last_stage(ignore_virtual=True): + if isinstance(self.model, list): + module = self.model[-1] # only the last virtual rank has the embeddings + else: + module = self.model + if module.share_token_embeddings: + param = module.word_embeddings_weight() + param._disable_greedy_grad_copy = not self.megatron_amp_o2 + param._disable_overlap_grad_sync = True + + # Disable overlapped grad sync for layer norm grads when + # sequence parallelism is enabled + if isinstance(self.model, list): + params = list(itertools.chain.from_iterable( + module.parameters() for module in self.model + )) + else: + params = list(self.parameters()) + for param in params: if getattr(param, 'sequence_parallel_enabled', False): param._disable_greedy_grad_copy = not self.megatron_amp_o2 param._disable_overlap_grad_sync = True @@ -257,6 +276,15 @@ def training_step(self, batch, batch_idx): # TODO: enable async grad all reduce for O1/autocast mixed precision training custom_sync_context_handler = None + # TODO: support distributed Adam with pipeline parallelism + # with interleaved schedule + if self.with_distributed_adam: + if self.cfg.get('virtual_pipeline_model_parallel_size', None) is not None: + raise ValueError( + 'Distributed Adam optimizer does ' + 'not support pipeline parallelism with interleaving' + ) + # run forward and backwards passes for an entire global batch # we do this inside training_step to support pipeline parallelism fwd_bwd_function = self._get_fwd_bwd_function() @@ -292,7 +320,8 @@ def training_step(self, batch, batch_idx): # reduced if not parallel_state.is_pipeline_first_stage(): self._optimizer.try_grad_sync( - p for p in self.parameters() if not getattr(p, '_disable_overlap_grad_sync', False) + p for p in self._optimizer.parameters() + if not getattr(p, '_disable_overlap_grad_sync', False) ) elif self.megatron_amp_o2: # when using pipeline parallelism grads must be all-reduced after the pipeline (not asynchronously) From a0883042ce051583f619eb4f46dc474f524d8636 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 19 Oct 2022 23:11:15 +0000 Subject: [PATCH 11/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../models/language_modeling/megatron_base_model.py | 9 ++------- .../nlp/models/language_modeling/megatron_gpt_model.py | 10 +++------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index b48995d0a269..b7c5b47fe75d 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -382,15 +382,10 @@ def configure_optimizers(self): # Initialize params so that main grads are available # Note: Consolidate grads without overlap if isinstance(self.model, list): - params = list(itertools.chain.from_iterable( - module.parameters() for module in self.model - )) + params = list(itertools.chain.from_iterable(module.parameters() for module in self.model)) else: params = list(self.parameters()) - self._optimizer.init_params( - p for p in params - if getattr(p, '_disable_overlap_grad_sync', False) - ) + self._optimizer.init_params(p for p in params if getattr(p, '_disable_overlap_grad_sync', False)) self._optimizer.init_params(params) if self._scheduler is None: diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index bec5d36e8bbf..ff8eead01153 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -210,9 +210,7 @@ def configure_optimizers(self): # Disable overlapped grad sync for layer norm grads when # sequence parallelism is enabled if isinstance(self.model, list): - params = list(itertools.chain.from_iterable( - module.parameters() for module in self.model - )) + params = list(itertools.chain.from_iterable(module.parameters() for module in self.model)) else: params = list(self.parameters()) for param in params: @@ -282,8 +280,7 @@ def training_step(self, batch, batch_idx): if self.with_distributed_adam: if self.cfg.get('virtual_pipeline_model_parallel_size', None) is not None: raise ValueError( - 'Distributed Adam optimizer does ' - 'not support pipeline parallelism with interleaving' + 'Distributed Adam optimizer does ' 'not support pipeline parallelism with interleaving' ) # run forward and backwards passes for an entire global batch @@ -321,8 +318,7 @@ def training_step(self, batch, batch_idx): # reduced if not parallel_state.is_pipeline_first_stage(): self._optimizer.try_grad_sync( - p for p in self._optimizer.parameters() - if not getattr(p, '_disable_overlap_grad_sync', False) + p for p in self._optimizer.parameters() if not getattr(p, '_disable_overlap_grad_sync', False) ) elif self.megatron_amp_o2: # when using pipeline parallelism grads must be all-reduced after the pipeline (not asynchronously) From 6706b091b294a5af0b55fe26fc1b801ef830f838 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Wed, 19 Oct 2022 21:38:02 -0700 Subject: [PATCH 12/14] Minor tweaks to dist Adam integration Review suggestions from @ericharper and @crcrpar. Signed-off-by: Tim Moon --- .../language_modeling/megatron_base_model.py | 23 +++++++++++++------ .../language_modeling/megatron_gpt_model.py | 16 ++++++------- .../megatron_lm_encoder_decoder_model.py | 4 +--- nemo/collections/nlp/parts/nlp_overrides.py | 20 ++++------------ nemo/core/optim/distributed_adam.py | 2 +- 5 files changed, 31 insertions(+), 34 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index b7c5b47fe75d..054c9b6d8685 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import itertools import os import re from typing import Any, Dict, Optional, Union @@ -263,6 +262,20 @@ def allreduce_gradients(self): for buf, synced in zip(grads, torch._utils._unflatten_dense_tensors(coalesced, grads)): buf.copy_(synced) + def reduce_overlap_gradients(self): + """Reduce grads if overlapped grad sync is enabled + + Used for pipeline parallelism with the distributed Adam + optimizer. In the first pipeline stage, the grad sync is + overlapped with the final backward pass. In other pipeline + stages, the grad sync is deferred until the bubble overhead. + + """ + if self.with_distributed_adam: + self._optimizer.try_grad_sync( + p for p in self._optimizer.parameters() if not getattr(p, '_disable_overlap_grad_sync', False) + ) + def on_train_batch_end(self, outputs, batch, batch_idx: int, unused: Optional[int] = 0) -> None: super().on_train_batch_end(outputs, batch, batch_idx) @@ -381,12 +394,8 @@ def configure_optimizers(self): # Initialize params so that main grads are available # Note: Consolidate grads without overlap - if isinstance(self.model, list): - params = list(itertools.chain.from_iterable(module.parameters() for module in self.model)) - else: - params = list(self.parameters()) - self._optimizer.init_params(p for p in params if getattr(p, '_disable_overlap_grad_sync', False)) - self._optimizer.init_params(params) + self._optimizer.init_params(p for p in self.parameters() if getattr(p, '_disable_overlap_grad_sync', False)) + self._optimizer.init_params(self.parameters()) if self._scheduler is None: return self._optimizer diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index ff8eead01153..4edb8b9489f3 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -209,11 +209,7 @@ def configure_optimizers(self): # Disable overlapped grad sync for layer norm grads when # sequence parallelism is enabled - if isinstance(self.model, list): - params = list(itertools.chain.from_iterable(module.parameters() for module in self.model)) - else: - params = list(self.parameters()) - for param in params: + for param in self.parameters(): if getattr(param, 'sequence_parallel_enabled', False): param._disable_greedy_grad_copy = not self.megatron_amp_o2 param._disable_overlap_grad_sync = True @@ -317,9 +313,7 @@ def training_step(self, batch, batch_idx): # Note: grads in first pipeline stage have already been # reduced if not parallel_state.is_pipeline_first_stage(): - self._optimizer.try_grad_sync( - p for p in self._optimizer.parameters() if not getattr(p, '_disable_overlap_grad_sync', False) - ) + self.reduce_overlap_gradients() elif self.megatron_amp_o2: # when using pipeline parallelism grads must be all-reduced after the pipeline (not asynchronously) if self.cfg.get('pipeline_model_parallel_size', 1) > 1 or self.cfg.get('sequence_parallel', False): @@ -902,3 +896,9 @@ def on_load_checkpoint(self, checkpoint) -> None: parallel_state.set_virtual_pipeline_model_parallel_rank(i) self.model[i].module.load_state_dict(checkpoint[f'model{i}'], strict=True) parallel_state.set_virtual_pipeline_model_parallel_rank(0) + + def parameters(self): + if isinstance(self.model, list): + return itertools.chain.from_iterable(module.parameters() for module in self.model) + else: + return self.model.parameters() diff --git a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py index 86b3cc433e23..b6d70dfb649e 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py @@ -367,9 +367,7 @@ def training_step(self, batch, batch_idx): # Note: grads in first pipeline stage have already been # reduced if not parallel_state.is_pipeline_first_stage(): - self._optimizer.try_grad_sync( - p for p in self.parameters() if not getattr(p, '_disable_overlap_grad_sync', False) - ) + self.reduce_overlap_gradients() elif self.megatron_amp_o2: # when using pipeline parallelism grads must be reduced after the pipeline (not asynchronously) if self.cfg.get('pipeline_model_parallel_size', 1) > 1: diff --git a/nemo/collections/nlp/parts/nlp_overrides.py b/nemo/collections/nlp/parts/nlp_overrides.py index e78917350d87..2e52be81ce34 100644 --- a/nemo/collections/nlp/parts/nlp_overrides.py +++ b/nemo/collections/nlp/parts/nlp_overrides.py @@ -51,16 +51,6 @@ HAVE_APEX = False -HAVE_APEX_DISTRIBUTED_ADAM = False -if HAVE_APEX: - try: - from apex.contrib.optimizers.distributed_fused_adam import DistributedFusedAdam - - HAVE_APEX_DISTRIBUTED_ADAM = True - - except (ImportError, ModuleNotFoundError): - pass - class NLPDDPStrategy(DDPStrategy): """ DDP plugin for Pytorch Lightning. Needed to customize DDP for model parallel models. @@ -431,11 +421,11 @@ def __init__( self.hysteresis = hysteresis self._hysteresis_tracker = self.hysteresis - def unscale_(self, optimizer): - if HAVE_APEX_DISTRIBUTED_ADAM and isinstance(optimizer, DistributedFusedAdam) and self._enabled: - assert self._scale is not None - optimizer._grad_scale /= self._scale.view([]) - super().unscale_(optimizer) + def _unscale_grads_(self, optimizer, *args): + if getattr(optimizer, "_custom_amp_unscale_grads", False): + return optimizer.unscale_grads(*args) + else: + return super()._unscale_grads_(optimizer, *args) def _maybe_opt_step(self, optimizer, optimizer_state, *args, **kwargs): retval = None diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index 4098c55c52df..cba3985c6fcb 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 6ed59e8a8e92fe426f403c5194b2f2e19a75e801 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 20 Oct 2022 04:39:29 +0000 Subject: [PATCH 13/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../nlp/models/language_modeling/megatron_base_model.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index 054c9b6d8685..e3bb9ad9e675 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -394,7 +394,9 @@ def configure_optimizers(self): # Initialize params so that main grads are available # Note: Consolidate grads without overlap - self._optimizer.init_params(p for p in self.parameters() if getattr(p, '_disable_overlap_grad_sync', False)) + self._optimizer.init_params( + p for p in self.parameters() if getattr(p, '_disable_overlap_grad_sync', False) + ) self._optimizer.init_params(self.parameters()) if self._scheduler is None: From 0bfb2db8df53e91b257f9da6e7e502e6655748f0 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Thu, 20 Oct 2022 16:22:57 -0700 Subject: [PATCH 14/14] Remove error when dist Adam and interleaved pipeline parallelism are enabled Signed-off-by: Tim Moon --- .../nlp/models/language_modeling/megatron_gpt_model.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 4edb8b9489f3..bb8bd73f996e 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -271,14 +271,6 @@ def training_step(self, batch, batch_idx): # TODO: enable async grad all reduce for O1/autocast mixed precision training custom_sync_context_handler = None - # TODO: support distributed Adam with pipeline parallelism - # with interleaved schedule - if self.with_distributed_adam: - if self.cfg.get('virtual_pipeline_model_parallel_size', None) is not None: - raise ValueError( - 'Distributed Adam optimizer does ' 'not support pipeline parallelism with interleaving' - ) - # run forward and backwards passes for an entire global batch # we do this inside training_step to support pipeline parallelism fwd_bwd_function = self._get_fwd_bwd_function()