From d42f1b353a256bf3b582685a421ed37ff947bfca Mon Sep 17 00:00:00 2001 From: Tim Moon <4406448+timmoon10@users.noreply.github.com> Date: Tue, 25 Jul 2023 12:12:16 -0700 Subject: [PATCH 01/23] Disable distopt contiguous param buffer by default (#7095) Signed-off-by: Tim Moon --- .../nlp/models/language_modeling/megatron_base_model.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index 74a0201968a5..76cde4f6b032 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -66,7 +66,7 @@ class MegatronBaseModel(NLPModel): - Initialize the model parallel world for nemo. - Turn on all of the nvidia optimizations. - - If `cfg.tokenizer` is available, it loads the tokenizer and pad the vocab to the + - If `cfg.tokenizer` is available, it loads the tokenizer and pad the vocab to the correct size for tensor model parallelism. - If using distributed optimizer, configure to be compatible with O2 level optimizations and/or model parallelism. @@ -407,9 +407,8 @@ def setup_optimization( optim_kwargs = {} if optim_kwargs is None else optim_kwargs.copy() if self.with_distributed_adam: - # Allocate contiguous buffers to avoid extra copies + # Allocate contiguous buffer to avoid extra copies optim_kwargs['contiguous_grad_buffer'] = True - optim_kwargs['contiguous_param_buffer'] = True # Make sure optimizer state is in FP32 optim_dtype = torch.float32 @@ -509,7 +508,8 @@ def configure_optimizers(self): self._optimizer.init_params(reversed(no_overlap_params)) # Initialize contiguous parameter buffer - self._optimizer.init_param_buffer() + if self._optimizer.contiguous_param_buffer: + self._optimizer.init_param_buffer() if self._scheduler is None: return self._optimizer From 295df4c0b74a91ac6ce94d0408981ab0dcfce90a Mon Sep 17 00:00:00 2001 From: Tim Moon <4406448+timmoon10@users.noreply.github.com> Date: Thu, 7 Sep 2023 13:57:45 -0700 Subject: [PATCH 02/23] Use distributed optimizer support for multiple dtypes (#7359) * Update distopt wrapper with multiple dtype support Remove manual handling of separate FP32 optimizer. Signed-off-by: Tim Moon * Use distopt support for contiguous buffers with multiple dtypes Signed-off-by: Tim Moon * Fix typo Signed-off-by: Tim Moon * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Separate distopt buckets for first GPT layer and non-overlapped params Signed-off-by: Tim Moon * Add distopt logic for int dtypes Signed-off-by: Tim Moon * Update Apex commit Signed-off-by: Tim Moon * Remove unused variables Signed-off-by: Tim Moon * Update Apex commit in README and Jenkensfile Signed-off-by: Tim Moon * Debug Dockerfile and Jenkinsfile Signed-off-by: Tim Moon --------- Signed-off-by: Tim Moon Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Eric Harper --- Dockerfile | 20 +- README.rst | 46 +-- .../nlp/modules/common/megatron/clip_grads.py | 2 +- nemo/core/optim/distributed_adam.py | 287 ++++++------------ 4 files changed, 127 insertions(+), 228 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2e6b617087bc..e935d3bd810c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -ARG BASE_IMAGE=nvcr.io/nvidia/pytorch:23.06-py3 +ARG BASE_IMAGE=nvcr.io/nvidia/pytorch:23.08-py3 # build an image that includes only the nemo dependencies, ensures that dependencies # are included first for optimal caching, and useful for building a development @@ -45,12 +45,18 @@ RUN apt-get update && \ WORKDIR /workspace/ WORKDIR /tmp/ -# TODO: Remove once this Apex commit (5/12/23) is included in PyTorch -# container + +# Distributed Adam support for multiple dtypes RUN git clone https://github.com/NVIDIA/apex.git && \ cd apex && \ - git checkout 8b7a1ff183741dd8f9b87e7bafd04cfde99cea28 && \ - pip3 install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ + git checkout 52e18c894223800cb611682dce27d88050edf1de && \ + pip3 install -v --no-build-isolation --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ + +# install megatron core, this can be removed once 0.3 pip package is released +RUN git clone https://github.com/NVIDIA/Megatron-LM.git && \ + cd Megatron-LM && \ + git checkout 01c8704453af7e26134441224c8a351746ca0349 && \ + pip install -e . # uninstall stuff from base container RUN pip3 uninstall -y sacrebleu torchtext @@ -76,6 +82,8 @@ RUN for f in $(ls requirements*.txt); do pip3 install --disable-pip-version-chec RUN pip install flash-attn # pinned triton version for flash-attention https://github.com/HazyResearch/flash-attention/blob/main/flash_attn/flash_attn_triton.py#L3 RUN pip install triton==2.0.0.dev20221202 +# install numba for latest containers +RUN pip install numba>=0.57.1 # install k2, skip if installation fails COPY scripts /tmp/nemo/scripts/ @@ -94,7 +102,7 @@ COPY . . # start building the final container FROM nemo-deps as nemo -ARG NEMO_VERSION=1.20.0 +ARG NEMO_VERSION=1.21.0 # Check that NEMO_VERSION is set. Build will fail without this. Expose NEMO and base container # version information as runtime environment variable for introspection purposes diff --git a/README.rst b/README.rst index 6fbe9047d0c4..6dc491523a99 100644 --- a/README.rst +++ b/README.rst @@ -41,14 +41,14 @@ Introduction ------------ -NVIDIA NeMo is a conversational AI toolkit built for researchers working on automatic speech recognition (ASR), -text-to-speech synthesis (TTS), large language models (LLMs), and +NVIDIA NeMo is a conversational AI toolkit built for researchers working on automatic speech recognition (ASR), +text-to-speech synthesis (TTS), large language models (LLMs), and natural language processing (NLP). -The primary objective of NeMo is to help researchers from industry and academia to reuse prior work (code and pretrained models) +The primary objective of NeMo is to help researchers from industry and academia to reuse prior work (code and pretrained models) and make it easier to create new `conversational AI models `_. -All NeMo models are trained with `Lightning `_ and -training is automatically scalable to 1000s of GPUs. +All NeMo models are trained with `Lightning `_ and +training is automatically scalable to 1000s of GPUs. Additionally, NeMo Megatron LLM models can be trained up to 1 trillion parameters using tensor and pipeline model parallelism. NeMo models can be optimized for inference and deployed for production use-cases with `NVIDIA Riva `_. @@ -57,14 +57,14 @@ State of the Art pretrained NeMo models are freely available on `HuggingFace Hub `NVIDIA NGC `_. These models can be used to transcribe audio, synthesize speech, or translate text in just a few lines of code. -We have extensive `tutorials `_ that +We have extensive `tutorials `_ that can all be run on `Google Colab `_. -For advanced users that want to train NeMo models from scratch or finetune existing NeMo models +For advanced users that want to train NeMo models from scratch or finetune existing NeMo models we have a full suite of `example scripts `_ that support multi-GPU/multi-node training. For scaling NeMo LLM training on Slurm clusters or public clouds, please see the `NVIDIA NeMo Megatron Launcher `_. -The NM launcher has extensive recipes, scripts, utilities, and documentation for training NeMo LLMs and also has an `Autoconfigurator `_ +The NM launcher has extensive recipes, scripts, utilities, and documentation for training NeMo LLMs and also has an `Autoconfigurator `_ which can be used to find the optimal model parallel configuration for training on a specific cluster. Also see our `introductory video `_ for a high level overview of NeMo. @@ -115,13 +115,15 @@ Key Features * `Prompt Learning `_ * `NGC collection of pre-trained NLP models. `_ * `Synthetic Tabular Data Generation `_ -* `Speech synthesis (TTS) `_ - * Spectrogram generation: Tacotron2, GlowTTS, TalkNet, FastPitch, FastSpeech2, Mixer-TTS, Mixer-TTS-X - * Vocoders: WaveGlow, SqueezeWave, UniGlow, MelGAN, HiFiGAN, UnivNet - * End-to-end speech generation: FastPitch_HifiGan_E2E, FastSpeech2_HifiGan_E2E, VITS - * `NGC collection of pre-trained TTS models. `_ +* Text-to-Speech Synthesis (TTS): + * `Documentation `_ + * Mel-Spectrogram generators: FastPitch, SSL FastPitch, Mixer-TTS/Mixer-TTS-X, RAD-TTS, Tacotron2 + * Vocoders: HiFiGAN, UnivNet, WaveGlow + * End-to-End Models: VITS + * `Pre-trained Model Checkpoints in NVIDIA GPU Cloud (NGC) `_ * `Tools `_ * `Text Processing (text normalization and inverse text normalization) `_ + * `NeMo Forced Aligner `_ * `CTC-Segmentation tool `_ * `Speech Data Explorer `_: a dash-based tool for interactive exploration of ASR/TTS datasets * `Speech Data Processor `_ @@ -132,7 +134,7 @@ Built for speed, NeMo can utilize NVIDIA's Tensor Cores and scale out training t Requirements ------------ -1) Python 3.9 or above +1) Python 3.10 or above 2) Pytorch 1.13.1 or above 3) NVIDIA GPU for training @@ -176,7 +178,7 @@ We recommend installing NeMo in a fresh Conda environment. .. code-block:: bash - conda create --name nemo python==3.8.10 + conda create --name nemo python==3.10.12 conda activate nemo Install PyTorch using their `configurator `_. @@ -245,8 +247,8 @@ To install Apex, run git clone https://github.com/NVIDIA/apex.git cd apex - git checkout 57057e2fcf1c084c0fcc818f55c0ff6ea1b24ae2 - pip install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ + git checkout 52e18c894223800cb611682dce27d88050edf1de + pip install -v --no-build-isolation --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ It is highly recommended to use the NVIDIA PyTorch or NeMo container if having issues installing Apex or any other dependencies. @@ -265,6 +267,8 @@ packaging is also needed: pip install packaging +With the latest versions of Apex, the `pyproject.toml` file in Apex may need to be deleted in order to install locally. + Transformer Engine ~~~~~~~~~~~~~~~~~~ @@ -283,7 +287,7 @@ Transformer Engine requires PyTorch to be built with CUDA 11.8. Flash Attention ~~~~~~~~~~~~~~~~~~~~ -Transformer Engine already supports Flash Attention for GPT models. If you want to use Flash Attention for non-causal models or use with attention bias (introduced from position encoding, e.g. Alibi), please install `flash-attn `_. +Transformer Engine already supports Flash Attention for GPT models. If you want to use Flash Attention for non-causal models or use with attention bias (introduced from position encoding, e.g. Alibi), please install `flash-attn `_. .. code-block:: bash @@ -292,7 +296,7 @@ Transformer Engine already supports Flash Attention for GPT models. If you want NLP inference UI ~~~~~~~~~~~~~~~~~~~~ -To launch the inference web UI server, please install the gradio `gradio `_. +To launch the inference web UI server, please install the gradio `gradio `_. .. code-block:: bash @@ -304,13 +308,13 @@ NeMo Text Processing, specifically (Inverse) Text Normalization, is now a separa Docker containers: ~~~~~~~~~~~~~~~~~~ -We release NeMo containers alongside NeMo releases. For example, NeMo ``r1.19.0`` comes with container ``nemo:23.04``, you may find more details about released containers in `releases page `_. +We release NeMo containers alongside NeMo releases. For example, NeMo ``r1.20.0`` comes with container ``nemo:23.06``, you may find more details about released containers in `releases page `_. To use built container, please run .. code-block:: bash - docker pull nvcr.io/nvidia/nemo:23.04 + docker pull nvcr.io/nvidia/nemo:23.06 To build a nemo container with Dockerfile from a branch, please run diff --git a/nemo/collections/nlp/modules/common/megatron/clip_grads.py b/nemo/collections/nlp/modules/common/megatron/clip_grads.py index a1620931a695..4c38fdd1ef8c 100644 --- a/nemo/collections/nlp/modules/common/megatron/clip_grads.py +++ b/nemo/collections/nlp/modules/common/megatron/clip_grads.py @@ -200,7 +200,7 @@ def clip_grad_norm_distributed_optimizer(optimizer, max_norm, norm_type=2): # - parameter should not be shared # - should not be a replica due to tensor model parallelism params_for_norm = [] - for param in optimizer.parameters(with_fp32_optim_params=True): + for param in optimizer.parameters(): is_not_shared = param_is_not_shared(param) is_not_tp_duplicate = param_is_not_tensor_parallel_duplicate(param) if is_not_shared and is_not_tp_duplicate: diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index 706bc48774e3..62bba769f652 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -14,31 +14,50 @@ import collections import itertools +from typing import Callable, Iterable, Optional, Union import torch -from apex.contrib.optimizers.distributed_fused_adam import ( - DistributedFusedAdam, - _coalescing_manager, - _coalescing_manager_append_work, - _disable_pre_forward_hook, -) +from apex.contrib.optimizers.distributed_fused_adam import DistributedFusedAdam, _disable_pre_forward_hook from megatron.core import parallel_state +from megatron.core.dist_checkpointing.dict_utils import dict_list_map_inplace +from megatron.core.dist_checkpointing.mapping import ShardedTensor +from megatron.core.dist_checkpointing.optimizer import ( + get_param_id_to_sharded_param_map, + make_sharded_optimizer_tensor, + optim_state_to_sharding_state, +) -def _str_to_dtype(dtype): +def _str_to_dtype(dtype: Union[str, torch.dtype]) -> torch.dtype: if isinstance(dtype, torch.dtype): return dtype name = str(dtype).strip().lower() - if name in ('', 'none'): - return torch.float32 - elif name in ('torch.float32', 'float32', 'float', 'fp32', '32'): - return torch.float32 - elif name in ('torch.float16', 'float16', 'half', 'fp16', '16'): - return torch.float16 - elif name in ('torch.bfloat16', 'bfloat16', 'bf16'): - return torch.bfloat16 - else: - raise ValueError(f'unsupported dtype ({dtype})') + if name.startswith("torch."): + name = name.replace("torch.", "", 1) + if name.startswith("fp"): + name = name.replace("fp", "float", 1) + dtype = dict( + float32=torch.float32, + float=torch.float32, + float64=torch.float64, + double=torch.float64, + float16=torch.float16, + half=torch.float16, + bfloat16=torch.bfloat16, + bf16=torch.bfloat16, + uint8=torch.uint8, + byte=torch.uint8, + int8=torch.int8, + char=torch.int8, + int16=torch.int16, + short=torch.int16, + int32=torch.int32, + int=torch.int32, + int64=torch.int64, + long=torch.int64, + bool=torch.bool, + )[name] + return dtype class MegatronDistributedFusedAdam(DistributedFusedAdam): @@ -49,7 +68,12 @@ class MegatronDistributedFusedAdam(DistributedFusedAdam): """ - def __init__(self, params, disable_distributed_parameters=False, **kwargs): + def __init__( + self, + params: Union[Iterable[torch.nn.Parameter], Iterable[dict]], + disable_distributed_parameters: bool = False, + **kwargs, + ): # Initialize process groups if 'process_group' not in kwargs and not parallel_state.is_unitialized(): @@ -72,78 +96,25 @@ def __init__(self, params, disable_distributed_parameters=False, **kwargs): if not isinstance(param_groups[0], dict): param_groups = [{'params': param_groups}] - # Check if explicit FP32 optimizer is needed - self._fp32_optim = None - distopt_param_groups = param_groups - dtype = kwargs['dtype'] if 'dtype' in kwargs else torch.float32 - grad_sync_dtype = kwargs['grad_sync_dtype'] if 'grad_sync_dtype' in kwargs else dtype - needs_fp32_optimizer = dtype != torch.float32 or grad_sync_dtype != torch.float32 - if needs_fp32_optimizer: - needs_fp32_optimizer = any( - any(getattr(param, '_with_fp32_optimizer', False) for param in param_group['params']) - for param_group in param_groups - ) - if needs_fp32_optimizer: - - # Find params that require explicit FP32 optimizer - distopt_param_groups = [] - fp32_param_groups = [] - self._fp32_optim_main_params = collections.OrderedDict() - for param_group in param_groups: - distopt_param_group = param_group.copy() - distopt_param_group['params'] = [] - fp32_param_group = param_group.copy() - fp32_param_group['params'] = [] - for model_param in param_group['params']: - if getattr(model_param, '_with_fp32_optimizer', False): - main_param = model_param.detach().clone().float() - model_param.main_grad = main_param.grad - fp32_param_group['params'].append(main_param) - self._fp32_optim_main_params[model_param] = main_param - else: - distopt_param_group['params'].append(model_param) - distopt_param_groups.append(distopt_param_group) - fp32_param_groups.append(fp32_param_group) - - # Add callback hook so grads accumulate into FP32 buffer - self._fp32_register_post_backward_hooks() - - # Construct explicit FP32 optimizer - adamw_kwargs = {} - for name in ('lr', 'betas', 'eps', 'weight_decay', 'amsgrad'): - if name in kwargs: - adamw_kwargs[name] = kwargs[name] - self._fp32_optim = torch.optim.AdamW(fp32_param_groups, **adamw_kwargs) - self._fp32_optim_grad_sync_needed = True - # Construct distributed optimizer - super().__init__(distopt_param_groups, **kwargs) + super().__init__(param_groups, **kwargs) - def _fp32_register_post_backward_hooks(self): - """Attach hooks for FP32 gradients""" - - # Helper function to avoid issues with late binding closures - def make_post_backward_hook(param): - def post_backward_hook(*unused): - self._fp32_optim_grad_sync_needed = True - if hasattr(param, 'main_grad'): - with torch.no_grad(): - if param.grad is not None: - param.main_grad += param.grad - param.grad = None - - return post_backward_hook - - # Construct hooks and register with params - self._fp32_grad_accs = [] - for param in self._fp32_optim_main_params.keys(): - param_tmp = param.expand_as(param) - grad_acc = param_tmp.grad_fn.next_functions[0][0] - hook = make_post_backward_hook(param) - grad_acc.register_hook(hook) - self._fp32_grad_accs.append(grad_acc) + # Initialize weights that require FP32 grads + if self.dtype != torch.float32 or self.grad_sync_dtype != torch.float32: + fp32_params = [] + for param_group in param_groups: + fp32_params.extend( + filter(lambda param: getattr(param, '_with_fp32_optimizer', False), param_group['params'],) + ) + if fp32_params: + assert self.dtype == torch.float32, ( + 'Param requires FP32 state, ' f'but optimizer is initialized with {dtype}' + ) + self.init_params_bucket( + fp32_params, grad_sync_dtype=torch.float32, + ) - def _make_post_backward_hook(self, param, param_group_id, param_id): + def _make_post_backward_hook(self, param: torch.nn.Parameter, param_group_id: int, param_id: int,) -> Callable: def hook(*unused): if getattr(param, '_pre_forward_hook_is_enabled', False): raise RuntimeError( @@ -166,76 +137,29 @@ def hook(*unused): return hook - def _filter_distopt_params(self, params): - if self._fp32_optim is None: - return params - if params is None: - return None - if isinstance(params, torch.Tensor): - params = [params] - return filter(lambda param: param not in self._fp32_optim_main_params, params) - - def parameters(self, with_fp32_optim_params=False): - if with_fp32_optim_params and self._fp32_optim is not None: - return itertools.chain(super().parameters(), self._fp32_optim_main_params.keys()) - else: - return super().parameters() - - def init_params(self, params=None): - super().init_params(self._filter_distopt_params(params)) - - def init_params_bucket(self, params): - super().init_params_bucket(self._filter_distopt_params(params)) + def try_grad_sync(self, params: Iterable[torch.nn.Parameter]) -> None: + def is_grad_copy_enabled(param: torch.nn.Parameter) -> bool: + return not getattr(param, '_disable_greedy_grad_copy', False) and not getattr( + param, '_disable_overlap_grad_sync', False + ) - def try_grad_sync(self, params): - params = self._filter_distopt_params(params) - params = [p for p in params if not getattr(p, '_disable_greedy_grad_copy', False)] - params = [p for p in params if not getattr(p, '_disable_overlap_grad_sync', False)] + params = list(filter(is_grad_copy_enabled, params)) for p in params: self._grad_copy(p) self._try_start_bucket_grad_sync(params=params) - def _try_start_bucket_param_sync(self, params=None): - super()._try_start_bucket_param_sync(self._filter_distopt_params(params)) - - def _fp32_optim_grad_sync(self): - if self._fp32_optim is None or not self._fp32_optim_grad_sync_needed: - return - for model_param, main_param in self._fp32_optim_main_params.items(): - if model_param.grad is not None: - main_param.grad += model_param.grad.detach() - with _coalescing_manager(self.process_group, self.device, async_ops=True) as cm: - for main_param in self._fp32_optim_main_params.values(): - _coalescing_manager_append_work( - cm, - torch.distributed.all_reduce( - main_param.grad, op=torch.distributed.ReduceOp.AVG, group=self.process_group, async_op=True, - ), - ) - cm.wait() - self._fp32_optim_grad_sync_needed = False - - def zero_grad(self, *args, **kwargs): + def zero_grad(self, *args, **kwargs) -> None: super().zero_grad(*args, **kwargs) - # Reset grads for explicit FP32 optimizer - if self._fp32_optim is not None: - self._fp32_optim_grad_sync_needed = True - self._fp32_optim.zero_grad(set_to_none=False) - for model_param, main_param in self._fp32_optim_main_params.items(): - if main_param.grad is None: - main_param.grad = torch.zeros_like(main_param) - if model_param.grad is not None: - model_param.grad.zero_() - model_param.main_grad = main_param.grad - # Reset main grads if self.contiguous_grad_buffer: for param in self.parameters(): with _disable_pre_forward_hook(param): param.main_grad = self.grad_buffer_view(param) - def grad_norm(self, parameters=None, norm_type=2.0, force=False): + def grad_norm( + self, parameters: Optional[Iterable[torch.nn.Parameter]] = None, norm_type: float = 2.0, force: bool = False, + ) -> torch.Tensor: assert norm_type == 2 if parameters is not None: @@ -246,24 +170,10 @@ def grad_norm(self, parameters=None, norm_type=2.0, force=False): if force or self._grad_norm is None: # Compute norm of local gradients for distributed optimizer - grad_norm_sq = self._local_grad_norm( - parameters=self._filter_distopt_params(parameters), norm_type=norm_type, - ) + grad_norm_sq = self._local_grad_norm(parameters=parameters, norm_type=norm_type,) if self.redundant_size > 1: grad_norm_sq /= self.redundant_size - # Compute norm of local gradients for explicit FP32 optimizer - if self._fp32_optim is not None: - self._fp32_optim_grad_sync() - if parameters is None: - for main_param in self._fp32_optim_main_params.values(): - grad_norm_sq += torch.linalg.norm(main_param.grad) ** 2 / self.process_group_size - else: - for model_param in parameters: - if model_param in self._fp32_optim_main_params: - main_param = self._fp32_optim_main_params[model_param] - grad_norm_sq += torch.linalg.norm(main_param.grad) ** 2 / self.process_group_size - # Sum over all procs to get grad norm torch.distributed.all_reduce( grad_norm_sq, op=torch.distributed.ReduceOp.SUM, @@ -273,48 +183,25 @@ def grad_norm(self, parameters=None, norm_type=2.0, force=False): # Use cached grad norm return super().grad_norm() - def step(self, closure=None, *, grad_scaler=None): - - # Apply distributed optimizer - loss = super().step(closure=closure, grad_scaler=grad_scaler) - - if self._fp32_optim is not None: - - # Handle grad scaling - if grad_scaler is not None: - scaler_state = grad_scaler._per_optimizer_states[id(self)] - for _, found_inf in scaler_state['found_inf_per_device'].items(): - if found_inf.item(): - return loss - - # Update learning rate - for distopt_group, fp32_optim_group in zip(self.param_groups, self._fp32_optim.param_groups): - fp32_optim_group['lr'] = distopt_group['lr'] + def sharded_state_dict(self, model_sharded_state_dict): + optimizer_state_dict = self.state_dict() - # Apply explicit FP32 optimizer - self._fp32_optim_grad_sync() - for main_param in self._fp32_optim_main_params.values(): - main_param.grad *= self._grad_scale - self._fp32_optim.step() - for model_param, main_param in self._fp32_optim_main_params.items(): - model_param.detach().copy_(main_param.detach()) + id_to_sharded_param_map = get_param_id_to_sharded_param_map( + model_sharded_state_dict=model_sharded_state_dict, optim_params_iter=self.parameters(), + ) + # Convert state + step = optimizer_state_dict['state'].pop('step') + state_dict_format = optimizer_state_dict.pop('format', None) + optim_state_to_sharding_state(optimizer_state_dict, id_to_sharded_param_map) + optimizer_state_dict['state']['step'] = step + if state_dict_format is not None: + optimizer_state_dict['format'] = state_dict_format - return loss + def rename_fp32_params(x): + if isinstance(x, ShardedTensor) and x.key.startswith('optimizer.state.param'): + x.key = x.key.replace('optimizer.state.param', 'optimizer.state.fp32_param') + return x - def state_dict(self, *args, **kwargs): - state_dict = super().state_dict(*args, **kwargs) - if self._fp32_optim is not None and state_dict is not None: - state_dict['fp32_optim'] = self._fp32_optim.state_dict() - state_dict['fp32_optim_fp32_params'] = list(self._fp32_optim_main_params.values()) - return state_dict + dict_list_map_inplace(rename_fp32_params, optimizer_state_dict) - def load_state_dict(self, state_dict): - if self._fp32_optim is not None and 'fp32_optim' in state_dict: - self._fp32_optim.load_state_dict(state_dict['fp32_optim']) - del state_dict['fp32_optim'] - for old_main_param, new_main_param in zip( - self._fp32_optim_main_params.values(), state_dict['fp32_optim_fp32_params'] - ): - old_main_param.copy_(new_main_param.detach()) - del state_dict['fp32_optim_fp32_params'] - return super().load_state_dict(state_dict) + return optimizer_state_dict From 9f3218244e8c4647c71149e7d029096c69766f43 Mon Sep 17 00:00:00 2001 From: Sudhakar Singh Date: Thu, 7 Sep 2023 16:11:30 -0700 Subject: [PATCH 03/23] fp8 poc usage with megatron-core Signed-off-by: Tim Moon --- nemo/core/optim/optimizer_with_main_params.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo/core/optim/optimizer_with_main_params.py b/nemo/core/optim/optimizer_with_main_params.py index 44d54a0e63ff..34467275df56 100644 --- a/nemo/core/optim/optimizer_with_main_params.py +++ b/nemo/core/optim/optimizer_with_main_params.py @@ -162,7 +162,7 @@ class MainParamsOptimizerWrapper(torch.optim.Optimizer): Arguments: optimizer: base optimizer such as Adam or SGD. fp32_grad_accum: to enable the use of fp32 in gradient accumulation and allreduce. - contiguous_grad_bucket: to enable allocating the master gradients in the + contiguous_grad_bucket: to enable allocating the master gradients in the contiguous memory space to reduce memory fragmentation. async_grad_allreduce: enable asynchronous gradient allreduce that is executed along with the training step backprop. From 264a012fa2af755645cfd7c5a74e0de00d338c1b Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Fri, 8 Sep 2023 17:42:20 -0700 Subject: [PATCH 04/23] Add FP8 support to distopt Signed-off-by: Tim Moon --- .../language_modeling/megatron_base_model.py | 4 +- .../language_modeling/megatron_gpt_model.py | 4 - nemo/core/optim/distributed_adam.py | 307 +++++++++++++++++- 3 files changed, 308 insertions(+), 7 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index 76cde4f6b032..7b1c032e4e42 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -489,9 +489,11 @@ def configure_optimizers(self): if self.with_distributed_adam: # Initialize param buckets if explicitly provided - if hasattr(self, 'distributed_adam_buckets'): + if getattr(self, 'distributed_adam_buckets', None): for bucket in self.distributed_adam_buckets: self._optimizer.init_params_bucket(bucket) + self._optimizer.init_params_bucket(self.parameters()) + if hasattr(self, 'distributed_adam_buckets'): del self.distributed_adam_buckets # Make sure all params are initialized so main grads are diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 7874b35dd83b..b71dd68d6941 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -434,10 +434,6 @@ def configure_optimizers(self): [p for p in layer.parameters() if not getattr(p, '_disable_overlap_grad_sync', False)] ) buckets.reverse() - used_params = set() - for bucket in buckets: - used_params.update(bucket) - buckets[-1].extend(p for p in self.parameters() if p not in used_params) self.distributed_adam_buckets = buckets return super().configure_optimizers() diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index 62bba769f652..a1e28c894062 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -14,10 +14,14 @@ import collections import itertools -from typing import Callable, Iterable, Optional, Union +from typing import Callable, Dict, Iterable, Optional, Union import torch -from apex.contrib.optimizers.distributed_fused_adam import DistributedFusedAdam, _disable_pre_forward_hook +from apex.contrib.optimizers.distributed_fused_adam import ( + DistributedFusedAdam, + _disable_pre_forward_hook, + _multi_tensor_copy, +) from megatron.core import parallel_state from megatron.core.dist_checkpointing.dict_utils import dict_list_map_inplace from megatron.core.dist_checkpointing.mapping import ShardedTensor @@ -27,6 +31,14 @@ optim_state_to_sharding_state, ) +# Check if Transformer Engine has FP8 tensor class +HAVE_TE_FP8TENSOR = False +try: + from transformer_engine.pytorch import Float8Tensor + HAVE_TE_FP8TENSOR = True +except (ImportError, ModuleNotFoundError): + pass + def _str_to_dtype(dtype: Union[str, torch.dtype]) -> torch.dtype: if isinstance(dtype, torch.dtype): @@ -60,6 +72,10 @@ def _str_to_dtype(dtype: Union[str, torch.dtype]) -> torch.dtype: return dtype +def _is_fp8_tensor(tensor: torch.Tensor) -> bool: + return HAVE_TE_FP8TENSOR and isinstance(tensor, Float8Tensor) + + class MegatronDistributedFusedAdam(DistributedFusedAdam): """Wrapper class that supports NeMo-Megatron optimizations @@ -114,6 +130,10 @@ def __init__( fp32_params, grad_sync_dtype=torch.float32, ) + def _broadcast_params(self) -> None: + # Assume params have already been synchronized + pass + def _make_post_backward_hook(self, param: torch.nn.Parameter, param_group_id: int, param_id: int,) -> Callable: def hook(*unused): if getattr(param, '_pre_forward_hook_is_enabled', False): @@ -137,6 +157,122 @@ def hook(*unused): return hook + def init_params( + self, + params: Optional[Iterable[torch.nn.Parameter]] = None, + param_sync_dtype: Optional[torch.dtype] = None, + **kwargs, + ) -> None: + """Initialize optimizer state for parameters + + Initializes FP8 and non-FP8 params separately. + + """ + + # Default cases + if params is None: + params = self.parameters() + elif isinstance(params, torch.Tensor): + params = [params] + + # Ignore parameters that have already been initialized + params = [param for param in params if "fragments" not in self.state[param]] + if not params: + return + + # Initialize FP8 and non-FP8 tensors separately + if any(_is_fp8_tensor(param) for param in params): + super().init_params( + filter(_is_fp8_tensor, params), + param_sync_dtype=torch.uint8, + **kwargs, + ) + super().init_params( + params, + param_sync_dtype=param_sync_dtype, + **kwargs, + ) + + def init_params_bucket( + self, + params: Iterable[torch.nn.Parameter], + param_sync_dtype: Optional[torch.dtype] = None, + **kwargs, + ) -> None: + """Initialize optimizer state for parameters in one effective bucket + + If any FP8 params are detected, all non-FP8 params are removed + from the bucket and their overlapped grad syncs are disabled. + This assumes that weight matrices are FP8 params and that + non-FP8 params are small (e.g. biases and layer norm params). + + """ + + # Ignore parameters that have already been initialized + if isinstance(params, torch.Tensor): + params = [params] + params = [param for param in params if "fragments" not in self.state[param]] + if not params: + return + + # Ignore non-FP8 params if there are any FP8 params + if any(_is_fp8_tensor(param) for param in params): + for param in params: + if not _is_fp8_tensor(param): + param._disable_overlap_grad_sync = True + params = filter(_is_fp8_tensor, params) + param_sync_dtype = torch.uint8 + + # Initialize parameter buckets + super().init_params_bucket( + params, + param_sync_dtype=param_sync_dtype, + **kwargs, + ) + + def _init_param_state( + self, + param: torch.nn.Parameter, + param_group_id: int, + param_id: int, + param_sync_dtype: Optional[torch.dtype] = None, + **kwargs, + ) -> None: + """Initialize optimizer state for a parameter + + Initializing the master weights requires slicing a flattened + view of the param. FP8 tensors do not handle these operations + gracefully, so we hack around it by explicitly casting to + FP32. + + """ + + # Initialize non-FP8 params as usual + if not _is_fp8_tensor(param): + super()._init_param_state( + param, + param_group_id, + param_id, + param_sync_dtype=param_sync_dtype, + **kwargs, + ) + + # Return immediately if already initialized + if "fragments" in self.state[param]: + return + + # Initialize with FP32 copy of param + fp32_param = param.float() + super()._init_param_state( + fp32_param, + param_group_id, + param_id, + param_sync_dtype=torch.uint8, + **kwargs, + ) + self.state[param].update(self.state[fp32_param]) + del self.state[fp32_param] + def try_grad_sync(self, params: Iterable[torch.nn.Parameter]) -> None: def is_grad_copy_enabled(param: torch.nn.Parameter) -> bool: return not getattr(param, '_disable_greedy_grad_copy', False) and not getattr( @@ -183,6 +319,173 @@ def grad_norm( # Use cached grad norm return super().grad_norm() + @torch.no_grad() + def _param_copy_fragments( + self, + fragments: Iterable[DistributedFusedAdam.ParameterFragment], + ) -> None: + """Update parameter fragments with values from parameter buckets + + For FP8 params, values are copied directly into the FP8 data + buffer. + + """ + + # Figure out corresponding positions in param buckets and params + buffers_in = [] + buffers_out = [] + for fragment in fragments: + + # Check if fragment needs to be updated + bucket_id = fragment.bucket_id + bucket_start, bucket_end = fragment.bucket_range + param_start, param_end = fragment.param_range + if param_end <= param_start or bucket_id not in self._params_buckets: + continue + + # Corresponding positions in bucket and param + state_bucket = self.state["buckets"][bucket_id] + param_bucket = self._params_buckets[bucket_id] + param = self.parameter(fragment) + buffer_in = param_bucket.params_bucket[bucket_start:bucket_end] + if _is_fp8_tensor(param): + # Copy into FP8 params's data buffer + assert ( + param_bucket.params_bucket.dtype == torch.uint8 + ), "Expected FP8 params to perform param sync in UINT8" + buffer_out = param._data.view(-1)[param_start:param_end] + buffers_in.append(buffer_in) + buffers_out.append(buffer_out) + elif ( + torch.is_floating_point(buffer_in) + and torch.is_floating_point(param) + ): + # Cast between floating-point dtypes + buffer_out = param.detach().view(-1)[param_start:param_end] + buffers_in.append(buffer_in) + buffers_out.append(buffer_out) + else: + # Copy most significant bytes for non-floating-point + # dtypes + # Note: Assume dtypes are little-endian + buffer_out = param.detach().view(-1)[param_start:param_end] + in_bytes = buffer_in.unsqueeze(-1).view(torch.uint8) + out_bytes = buffer_out.unsqueeze(-1).view(torch.uint8) + copy_size = min(in_bytes.size(-1), out_bytes.size(-1)) + buffers_in.append(in_bytes[..., -copy_size:]) + buffers_out.append(out_bytes[..., -copy_size:]) + if copy_size < out_bytes.size(-1): + out_bytes[..., :-copy_size].zero_() + + # Copy data from parameter buckets to parameters + _multi_tensor_copy( + buffers_in, + buffers_out, + dummy_overflow_buf=self._dummy_overflow_buf, + ) + + @torch.no_grad() + def _check_params_shard_dtypes( + self, + params_buckets: Dict[int, DistributedFusedAdam.ParameterBucket], + ) -> None: + """Make sure local shards of parameters are in expected datatypes + + For FP8 params, FP32 values are cast into FP8 using per-param + scaling factors and per-param amaxes are computed and reduced. + + """ + + # Peform FP8 casts if needed + num_fp8_params = sum( + 1 for param in self.parameters() if _is_fp8_tensor(param) + ) + if num_fp8_params > 0: + + # Packed buffer for amax reductions + amaxes = torch.zeros( + num_fp8_params, + dtype=torch.float32, + device=self.device, + ) + amax_pos = -1 + + # Loop through FP8 tensors + fp8_params_shards = dict() + for param in self.parameters(): + if not _is_fp8_tensor(param): + continue + amax_pos += 1 + + # Loop through fragments with local data + for fragment in self.state[param]["fragments"]: + if not fragment.in_local_shard: + continue + shard_start, shard_end = fragment.shard_range + if shard_end <= shard_start: + continue + shard_range = slice(shard_start, shard_end) + + # Get bucket containing fragment + bucket_id = fragment.bucket_id + if bucket_id not in params_buckets: + continue + state_bucket = self.state["buckets"][bucket_id] + param_bucket = params_buckets[bucket_id] + if state_bucket.param_sync_dtype != torch.uint8: + continue + + # Allocate FP8 buffer if needed + if bucket_id not in fp8_params_shards: + fp8_params_shards[bucket_id] = torch.empty_like( + param_bucket.params_shard, + dtype=torch.uint8, + ) + + # FP8 cast and amax + ### TODO Multi-tensor cast-amax + fp32_fragment = param_bucket.params_shard[shard_range] + fp8_fragment = Float8Tensor.from_float32( + param_bucket.params_shard[shard_range], + param._scale, + param._flavor, + ) + fp8_params_shards[bucket_id][shard_range].copy_( + fp8_fragment._data, + ) + amaxes[amax_pos:amax_pos+1].copy_(fp32_fragment.amax()) + + # Update param shards with FP8 buffers + for bucket_id, params_shard in fp8_params_shards.items(): + params_buckets[bucket_id].params_shard = params_shard + + # Reduce amaxes + torch.distributed.all_reduce( + amaxes, + op=torch.distributed.ReduceOp.MAX, + group=self.distributed_process_group, + ) + + # Unpack amaxes + ### TODO Handle + # buffers_in = [] + # buffers_out = [] + # pos = -1 + # for param in self.parameters(): + # if not _is_fp8_tensor(param): + # continue + # pos += 1 + # buffers_in.append(amaxes[pos:pos+1]) + # buffers_out.append(param._amax) + # _multi_tensor_copy( + # buffers_in, + # buffers_out, + # dummy_overflow_buf=self._dummy_overflow_buf, + # ) + + # Handle any remaining dtype conversions + super()._check_params_shard_dtypes(params_buckets) + def sharded_state_dict(self, model_sharded_state_dict): optimizer_state_dict = self.state_dict() From a8545c8ab7ffec1af698be4ab1b64004f6894745 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Mon, 11 Sep 2023 10:57:31 -0700 Subject: [PATCH 05/23] Correctly accumulate amax when param is split across buckets Signed-off-by: Tim Moon --- nemo/core/optim/distributed_adam.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index a1e28c894062..fa4d91e49aec 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -444,6 +444,7 @@ def _check_params_shard_dtypes( # FP8 cast and amax ### TODO Multi-tensor cast-amax + ### TODO Use updated scale fp32_fragment = param_bucket.params_shard[shard_range] fp8_fragment = Float8Tensor.from_float32( param_bucket.params_shard[shard_range], @@ -453,7 +454,8 @@ def _check_params_shard_dtypes( fp8_params_shards[bucket_id][shard_range].copy_( fp8_fragment._data, ) - amaxes[amax_pos:amax_pos+1].copy_(fp32_fragment.amax()) + amax = torch.maximum(amaxes[amax_pos:amax_pos+1], fp32_fragment.amax()) + amaxes[amax_pos:amax_pos+1].copy_(amax) # Update param shards with FP8 buffers for bucket_id, params_shard in fp8_params_shards.items(): From 023bfe50915b382a1a8771da96f98e42e7045890 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Mon, 18 Sep 2023 15:21:54 -0700 Subject: [PATCH 06/23] Debug FP8 casts in distopt Signed-off-by: Tim Moon --- nemo/core/optim/distributed_adam.py | 166 +++++++++++++++------------- 1 file changed, 90 insertions(+), 76 deletions(-) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index fa4d91e49aec..ae21a00eddfb 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -35,6 +35,8 @@ HAVE_TE_FP8TENSOR = False try: from transformer_engine.pytorch import Float8Tensor + from transformer_engine.pytorch.fp8 import get_fp8_te_dtype + from transformer_engine.pytorch.cpp_extensions import cast_to_fp8 HAVE_TE_FP8TENSOR = True except (ImportError, ModuleNotFoundError): pass @@ -384,6 +386,17 @@ def _param_copy_fragments( dummy_overflow_buf=self._dummy_overflow_buf, ) + # Precompute transposes + ### TODO Optimized transpose kernel + for fragment in fragments: + param = self.parameter(fragment) + if _is_fp8_tensor(param): + param._transpose = None + for fragment in fragments: + param = self.parameter(fragment) + if _is_fp8_tensor(param): + param.transpose() + @torch.no_grad() def _check_params_shard_dtypes( self, @@ -396,94 +409,95 @@ def _check_params_shard_dtypes( """ - # Peform FP8 casts if needed + # Just call base class function if there are no FP8 tensors num_fp8_params = sum( 1 for param in self.parameters() if _is_fp8_tensor(param) ) - if num_fp8_params > 0: + if num_fp8_params == 0: + super()._check_params_shard_dtypes(params_buckets) + return - # Packed buffer for amax reductions - amaxes = torch.zeros( - num_fp8_params, - dtype=torch.float32, - device=self.device, + # Iterate through FP8 tensors + fp8_params_shards = dict() + amaxes = [] + for param in self.parameters(): + if not _is_fp8_tensor(param): + continue + + # FP8 scaling factors + fp8_meta = param.fp8_meta_view["scaling_fwd"] + fp8_meta_index = param.gemm_index + fp8_dtype = get_fp8_te_dtype( + param.fp8_meta_view["recipe"], + fprop_tensor=True, ) - amax_pos = -1 + fp8_meta.scale_inv[fp8_meta_index] = 1 / fp8_meta.scale[fp8_meta_index] + param._scale_inv_cache = fp8_meta.scale_inv[fp8_meta_index] + amaxes.append(fp8_meta.amax_history[0][fp8_meta_index].view(1)) - # Loop through FP8 tensors - fp8_params_shards = dict() - for param in self.parameters(): - if not _is_fp8_tensor(param): + # Iterate through fragments with local data + for fragment in self.state[param]["fragments"]: + if not fragment.in_local_shard: continue - amax_pos += 1 - - # Loop through fragments with local data - for fragment in self.state[param]["fragments"]: - if not fragment.in_local_shard: - continue - shard_start, shard_end = fragment.shard_range - if shard_end <= shard_start: - continue - shard_range = slice(shard_start, shard_end) - - # Get bucket containing fragment - bucket_id = fragment.bucket_id - if bucket_id not in params_buckets: - continue - state_bucket = self.state["buckets"][bucket_id] - param_bucket = params_buckets[bucket_id] - if state_bucket.param_sync_dtype != torch.uint8: - continue - - # Allocate FP8 buffer if needed - if bucket_id not in fp8_params_shards: - fp8_params_shards[bucket_id] = torch.empty_like( - param_bucket.params_shard, - dtype=torch.uint8, - ) + shard_start, shard_end = fragment.shard_range + if shard_end <= shard_start: + continue + shard_range = slice(shard_start, shard_end) - # FP8 cast and amax - ### TODO Multi-tensor cast-amax - ### TODO Use updated scale - fp32_fragment = param_bucket.params_shard[shard_range] - fp8_fragment = Float8Tensor.from_float32( - param_bucket.params_shard[shard_range], - param._scale, - param._flavor, - ) - fp8_params_shards[bucket_id][shard_range].copy_( - fp8_fragment._data, + # Get bucket containing fragment + bucket_id = fragment.bucket_id + if bucket_id not in params_buckets: + continue + state_bucket = self.state["buckets"][bucket_id] + param_bucket = params_buckets[bucket_id] + if state_bucket.param_sync_dtype != torch.uint8: + continue + + # Allocate FP8 buffer if needed + if bucket_id not in fp8_params_shards: + fp8_params_shards[bucket_id] = torch.empty_like( + param_bucket.params_shard, + dtype=torch.uint8, ) - amax = torch.maximum(amaxes[amax_pos:amax_pos+1], fp32_fragment.amax()) - amaxes[amax_pos:amax_pos+1].copy_(amax) - # Update param shards with FP8 buffers - for bucket_id, params_shard in fp8_params_shards.items(): - params_buckets[bucket_id].params_shard = params_shard + # FP8 cast and amax + ### TODO Multi-tensor cast-amax + fp32_fragment = param_bucket.params_shard[shard_range].view(1, -1) + fp8_fragment = fp8_params_shards[bucket_id][shard_range].view(1, -1) + cast_to_fp8( + fp32_fragment, + fp8_meta, + fp8_meta_index, + fp8_dtype, + out=fp8_fragment, + ) - # Reduce amaxes - torch.distributed.all_reduce( - amaxes, - op=torch.distributed.ReduceOp.MAX, - group=self.distributed_process_group, - ) + # Update param shards with FP8 buffers + for bucket_id, params_shard in fp8_params_shards.items(): + params_buckets[bucket_id].params_shard = params_shard - # Unpack amaxes - ### TODO Handle - # buffers_in = [] - # buffers_out = [] - # pos = -1 - # for param in self.parameters(): - # if not _is_fp8_tensor(param): - # continue - # pos += 1 - # buffers_in.append(amaxes[pos:pos+1]) - # buffers_out.append(param._amax) - # _multi_tensor_copy( - # buffers_in, - # buffers_out, - # dummy_overflow_buf=self._dummy_overflow_buf, - # ) + # Reduce amaxes + packed_amaxes = torch.zeros( + num_fp8_params, + dtype=torch.float32, + device=self.device, + ) + packed_amax_views = [packed_amaxes[i].view(1) for i in range(len(amaxes))] + _multi_tensor_copy( + amaxes, + packed_amax_views, + dummy_overflow_buf=self._dummy_overflow_buf, + ) + torch.distributed.all_reduce( + packed_amaxes, + op=torch.distributed.ReduceOp.MAX, + group=self.distributed_process_group, + ) + _multi_tensor_copy( + packed_amax_views, + amaxes, + dummy_overflow_buf=self._dummy_overflow_buf, + ) # Handle any remaining dtype conversions super()._check_params_shard_dtypes(params_buckets) From 4d15023eec18f688f339784eee2f5022f0c84798 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Tue, 19 Sep 2023 21:16:04 -0700 Subject: [PATCH 07/23] Optimize distopt handling of FP8 scaling factors Signed-off-by: Tim Moon --- nemo/core/optim/distributed_adam.py | 43 ++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index ae21a00eddfb..c2f14eacd2b6 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -336,6 +336,7 @@ def _param_copy_fragments( # Figure out corresponding positions in param buckets and params buffers_in = [] buffers_out = [] + fragments = list(fragments) for fragment in fragments: # Check if fragment needs to be updated @@ -387,7 +388,6 @@ def _param_copy_fragments( ) # Precompute transposes - ### TODO Optimized transpose kernel for fragment in fragments: param = self.parameter(fragment) if _is_fp8_tensor(param): @@ -417,23 +417,47 @@ def _check_params_shard_dtypes( super()._check_params_shard_dtypes(params_buckets) return - # Iterate through FP8 tensors - fp8_params_shards = dict() + # FP8 scaling factors amaxes = [] + scales = [] + scale_invs = torch.empty( + num_fp8_params, + dtype=torch.float32, + device=self.device, + ) + i = -1 + for param in self.parameters(): + if not _is_fp8_tensor(param): + continue + i += 1 + fp8_meta = param.fp8_meta_view["scaling_fwd"] + fp8_meta_index = param.gemm_index + amaxes.append(fp8_meta.amax_history[0][fp8_meta_index].view(1)) + scales.append(fp8_meta.scale[fp8_meta_index].view(1)) + param._scale_inv_cache = scale_invs[i] + + # Update cached scale-inverses + scale_inv_views = [scale_invs[i].view(1) for i in range(num_fp8_params)] + _multi_tensor_copy( + scales, + scale_inv_views, + dummy_overflow_buf=self._dummy_overflow_buf, + ) + torch.reciprocal(scale_invs, out=scale_invs) + + # Cast local data to FP8 + fp8_params_shards = dict() for param in self.parameters(): if not _is_fp8_tensor(param): continue - # FP8 scaling factors + # FP8 metadata fp8_meta = param.fp8_meta_view["scaling_fwd"] fp8_meta_index = param.gemm_index fp8_dtype = get_fp8_te_dtype( param.fp8_meta_view["recipe"], fprop_tensor=True, ) - fp8_meta.scale_inv[fp8_meta_index] = 1 / fp8_meta.scale[fp8_meta_index] - param._scale_inv_cache = fp8_meta.scale_inv[fp8_meta_index] - amaxes.append(fp8_meta.amax_history[0][fp8_meta_index].view(1)) # Iterate through fragments with local data for fragment in self.state[param]["fragments"]: @@ -461,7 +485,6 @@ def _check_params_shard_dtypes( ) # FP8 cast and amax - ### TODO Multi-tensor cast-amax fp32_fragment = param_bucket.params_shard[shard_range].view(1, -1) fp8_fragment = fp8_params_shards[bucket_id][shard_range].view(1, -1) cast_to_fp8( @@ -477,12 +500,12 @@ def _check_params_shard_dtypes( params_buckets[bucket_id].params_shard = params_shard # Reduce amaxes - packed_amaxes = torch.zeros( + packed_amaxes = torch.empty( num_fp8_params, dtype=torch.float32, device=self.device, ) - packed_amax_views = [packed_amaxes[i].view(1) for i in range(len(amaxes))] + packed_amax_views = [packed_amaxes[i].view(1) for i in range(num_fp8_params)] _multi_tensor_copy( amaxes, packed_amax_views, From 97f6dd0c27974b0f7ce92355177821b0983cd066 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 20 Sep 2023 20:03:17 +0000 Subject: [PATCH 08/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../language_modeling/megatron_gpt_model.py | 17 ++- nemo/core/optim/distributed_adam.py | 101 ++++-------------- 2 files changed, 35 insertions(+), 83 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index b71dd68d6941..3e7126f0d006 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -225,11 +225,18 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): virtual_pipeline_model_parallel_size=self.cfg.get('virtual_pipeline_model_parallel_size', None), ) else: - self.model = build_model( - model_provider_func=self.model_provider_func, - wrap_with_ddp=False, - virtual_pipeline_model_parallel_size=self.cfg.get('virtual_pipeline_model_parallel_size', None), - ) + fp8_enabled = cfg.get('fp8', False) + fp8_recipe = None + if fp8_enabled and HAVE_TE: + fp8_recipe = transformer_engine.common.recipe.DelayedScaling( + margin=0, interval=1, fp8_format=transformer_engine.common.recipe.Format.E4M3 + ) + with transformer_engine.pytorch.fp8_autocast(enabled=fp8_enabled, fp8_recipe=fp8_recipe): + self.model = build_model( + model_provider_func=self.model_provider_func, + wrap_with_ddp=False, + virtual_pipeline_model_parallel_size=self.cfg.get('virtual_pipeline_model_parallel_size', None), + ) # if we're not using interleaved, then self.model is a module. if self.cfg.get('virtual_pipeline_model_parallel_size', None) is None: diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index c2f14eacd2b6..5c05010da7e3 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -35,8 +35,9 @@ HAVE_TE_FP8TENSOR = False try: from transformer_engine.pytorch import Float8Tensor - from transformer_engine.pytorch.fp8 import get_fp8_te_dtype from transformer_engine.pytorch.cpp_extensions import cast_to_fp8 + from transformer_engine.pytorch.fp8 import get_fp8_te_dtype + HAVE_TE_FP8TENSOR = True except (ImportError, ModuleNotFoundError): pass @@ -185,21 +186,14 @@ def init_params( # Initialize FP8 and non-FP8 tensors separately if any(_is_fp8_tensor(param) for param in params): super().init_params( - filter(_is_fp8_tensor, params), - param_sync_dtype=torch.uint8, - **kwargs, + filter(_is_fp8_tensor, params), param_sync_dtype=torch.uint8, **kwargs, ) super().init_params( - params, - param_sync_dtype=param_sync_dtype, - **kwargs, + params, param_sync_dtype=param_sync_dtype, **kwargs, ) def init_params_bucket( - self, - params: Iterable[torch.nn.Parameter], - param_sync_dtype: Optional[torch.dtype] = None, - **kwargs, + self, params: Iterable[torch.nn.Parameter], param_sync_dtype: Optional[torch.dtype] = None, **kwargs, ) -> None: """Initialize optimizer state for parameters in one effective bucket @@ -227,9 +221,7 @@ def init_params_bucket( # Initialize parameter buckets super().init_params_bucket( - params, - param_sync_dtype=param_sync_dtype, - **kwargs, + params, param_sync_dtype=param_sync_dtype, **kwargs, ) def _init_param_state( @@ -252,11 +244,7 @@ def _init_param_state( # Initialize non-FP8 params as usual if not _is_fp8_tensor(param): super()._init_param_state( - param, - param_group_id, - param_id, - param_sync_dtype=param_sync_dtype, - **kwargs, + param, param_group_id, param_id, param_sync_dtype=param_sync_dtype, **kwargs, ) # Return immediately if already initialized @@ -266,11 +254,7 @@ def _init_param_state( # Initialize with FP32 copy of param fp32_param = param.float() super()._init_param_state( - fp32_param, - param_group_id, - param_id, - param_sync_dtype=torch.uint8, - **kwargs, + fp32_param, param_group_id, param_id, param_sync_dtype=torch.uint8, **kwargs, ) self.state[param].update(self.state[fp32_param]) del self.state[fp32_param] @@ -322,10 +306,7 @@ def grad_norm( return super().grad_norm() @torch.no_grad() - def _param_copy_fragments( - self, - fragments: Iterable[DistributedFusedAdam.ParameterFragment], - ) -> None: + def _param_copy_fragments(self, fragments: Iterable[DistributedFusedAdam.ParameterFragment],) -> None: """Update parameter fragments with values from parameter buckets For FP8 params, values are copied directly into the FP8 data @@ -355,14 +336,11 @@ def _param_copy_fragments( # Copy into FP8 params's data buffer assert ( param_bucket.params_bucket.dtype == torch.uint8 - ), "Expected FP8 params to perform param sync in UINT8" + ), "Expected FP8 params to perform param sync in UINT8" buffer_out = param._data.view(-1)[param_start:param_end] buffers_in.append(buffer_in) buffers_out.append(buffer_out) - elif ( - torch.is_floating_point(buffer_in) - and torch.is_floating_point(param) - ): + elif torch.is_floating_point(buffer_in) and torch.is_floating_point(param): # Cast between floating-point dtypes buffer_out = param.detach().view(-1)[param_start:param_end] buffers_in.append(buffer_in) @@ -382,9 +360,7 @@ def _param_copy_fragments( # Copy data from parameter buckets to parameters _multi_tensor_copy( - buffers_in, - buffers_out, - dummy_overflow_buf=self._dummy_overflow_buf, + buffers_in, buffers_out, dummy_overflow_buf=self._dummy_overflow_buf, ) # Precompute transposes @@ -398,10 +374,7 @@ def _param_copy_fragments( param.transpose() @torch.no_grad() - def _check_params_shard_dtypes( - self, - params_buckets: Dict[int, DistributedFusedAdam.ParameterBucket], - ) -> None: + def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedAdam.ParameterBucket],) -> None: """Make sure local shards of parameters are in expected datatypes For FP8 params, FP32 values are cast into FP8 using per-param @@ -410,9 +383,7 @@ def _check_params_shard_dtypes( """ # Just call base class function if there are no FP8 tensors - num_fp8_params = sum( - 1 for param in self.parameters() if _is_fp8_tensor(param) - ) + num_fp8_params = sum(1 for param in self.parameters() if _is_fp8_tensor(param)) if num_fp8_params == 0: super()._check_params_shard_dtypes(params_buckets) return @@ -420,11 +391,7 @@ def _check_params_shard_dtypes( # FP8 scaling factors amaxes = [] scales = [] - scale_invs = torch.empty( - num_fp8_params, - dtype=torch.float32, - device=self.device, - ) + scale_invs = torch.empty(num_fp8_params, dtype=torch.float32, device=self.device,) i = -1 for param in self.parameters(): if not _is_fp8_tensor(param): @@ -439,9 +406,7 @@ def _check_params_shard_dtypes( # Update cached scale-inverses scale_inv_views = [scale_invs[i].view(1) for i in range(num_fp8_params)] _multi_tensor_copy( - scales, - scale_inv_views, - dummy_overflow_buf=self._dummy_overflow_buf, + scales, scale_inv_views, dummy_overflow_buf=self._dummy_overflow_buf, ) torch.reciprocal(scale_invs, out=scale_invs) @@ -454,10 +419,7 @@ def _check_params_shard_dtypes( # FP8 metadata fp8_meta = param.fp8_meta_view["scaling_fwd"] fp8_meta_index = param.gemm_index - fp8_dtype = get_fp8_te_dtype( - param.fp8_meta_view["recipe"], - fprop_tensor=True, - ) + fp8_dtype = get_fp8_te_dtype(param.fp8_meta_view["recipe"], fprop_tensor=True,) # Iterate through fragments with local data for fragment in self.state[param]["fragments"]: @@ -479,20 +441,13 @@ def _check_params_shard_dtypes( # Allocate FP8 buffer if needed if bucket_id not in fp8_params_shards: - fp8_params_shards[bucket_id] = torch.empty_like( - param_bucket.params_shard, - dtype=torch.uint8, - ) + fp8_params_shards[bucket_id] = torch.empty_like(param_bucket.params_shard, dtype=torch.uint8,) # FP8 cast and amax fp32_fragment = param_bucket.params_shard[shard_range].view(1, -1) fp8_fragment = fp8_params_shards[bucket_id][shard_range].view(1, -1) cast_to_fp8( - fp32_fragment, - fp8_meta, - fp8_meta_index, - fp8_dtype, - out=fp8_fragment, + fp32_fragment, fp8_meta, fp8_meta_index, fp8_dtype, out=fp8_fragment, ) # Update param shards with FP8 buffers @@ -500,26 +455,16 @@ def _check_params_shard_dtypes( params_buckets[bucket_id].params_shard = params_shard # Reduce amaxes - packed_amaxes = torch.empty( - num_fp8_params, - dtype=torch.float32, - device=self.device, - ) + packed_amaxes = torch.empty(num_fp8_params, dtype=torch.float32, device=self.device,) packed_amax_views = [packed_amaxes[i].view(1) for i in range(num_fp8_params)] _multi_tensor_copy( - amaxes, - packed_amax_views, - dummy_overflow_buf=self._dummy_overflow_buf, + amaxes, packed_amax_views, dummy_overflow_buf=self._dummy_overflow_buf, ) torch.distributed.all_reduce( - packed_amaxes, - op=torch.distributed.ReduceOp.MAX, - group=self.distributed_process_group, + packed_amaxes, op=torch.distributed.ReduceOp.MAX, group=self.distributed_process_group, ) _multi_tensor_copy( - packed_amax_views, - amaxes, - dummy_overflow_buf=self._dummy_overflow_buf, + packed_amax_views, amaxes, dummy_overflow_buf=self._dummy_overflow_buf, ) # Handle any remaining dtype conversions From a2763a592e9cdc7d4d02074e04e8efc14e385d06 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Thu, 28 Sep 2023 13:19:42 -0700 Subject: [PATCH 09/23] Debug rebase errors Signed-off-by: Tim Moon --- nemo/core/optim/distributed_adam.py | 30 ----------------------------- 1 file changed, 30 deletions(-) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index 5c05010da7e3..76475214e436 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -23,13 +23,6 @@ _multi_tensor_copy, ) from megatron.core import parallel_state -from megatron.core.dist_checkpointing.dict_utils import dict_list_map_inplace -from megatron.core.dist_checkpointing.mapping import ShardedTensor -from megatron.core.dist_checkpointing.optimizer import ( - get_param_id_to_sharded_param_map, - make_sharded_optimizer_tensor, - optim_state_to_sharding_state, -) # Check if Transformer Engine has FP8 tensor class HAVE_TE_FP8TENSOR = False @@ -469,26 +462,3 @@ def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedA # Handle any remaining dtype conversions super()._check_params_shard_dtypes(params_buckets) - - def sharded_state_dict(self, model_sharded_state_dict): - optimizer_state_dict = self.state_dict() - - id_to_sharded_param_map = get_param_id_to_sharded_param_map( - model_sharded_state_dict=model_sharded_state_dict, optim_params_iter=self.parameters(), - ) - # Convert state - step = optimizer_state_dict['state'].pop('step') - state_dict_format = optimizer_state_dict.pop('format', None) - optim_state_to_sharding_state(optimizer_state_dict, id_to_sharded_param_map) - optimizer_state_dict['state']['step'] = step - if state_dict_format is not None: - optimizer_state_dict['format'] = state_dict_format - - def rename_fp32_params(x): - if isinstance(x, ShardedTensor) and x.key.startswith('optimizer.state.param'): - x.key = x.key.replace('optimizer.state.param', 'optimizer.state.fp32_param') - return x - - dict_list_map_inplace(rename_fp32_params, optimizer_state_dict) - - return optimizer_state_dict From c1c050d6b528d6917f17e7d544f5efee4bb8e2a8 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Thu, 28 Sep 2023 13:20:49 -0700 Subject: [PATCH 10/23] Fix bug when constructing GPT without TE Signed-off-by: Tim Moon --- .../models/language_modeling/megatron_gpt_model.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 3e7126f0d006..a58464f710c4 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from contextlib import nullcontext import itertools import queue import warnings @@ -226,12 +227,19 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): ) else: fp8_enabled = cfg.get('fp8', False) - fp8_recipe = None + make_fp8_autocast = nullcontext if fp8_enabled and HAVE_TE: fp8_recipe = transformer_engine.common.recipe.DelayedScaling( - margin=0, interval=1, fp8_format=transformer_engine.common.recipe.Format.E4M3 + margin=0, + interval=1, + fp8_format=transformer_engine.common.recipe.Format.E4M3, ) - with transformer_engine.pytorch.fp8_autocast(enabled=fp8_enabled, fp8_recipe=fp8_recipe): + make_fp8_autocast = partial( + transformer_engine.pytorch.fp8_autocast, + enabled=fp8_enabled, + fp8_recipe=fp8_recipe, + ) + with make_fp8_autocast(): self.model = build_model( model_provider_func=self.model_provider_func, wrap_with_ddp=False, From 8d6bdb346577f8641d7ea483f20da633ee740995 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 28 Sep 2023 20:44:09 +0000 Subject: [PATCH 11/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../nlp/models/language_modeling/megatron_gpt_model.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index a58464f710c4..d5b2359e39e1 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from contextlib import nullcontext import itertools import queue import warnings +from contextlib import nullcontext from functools import partial from typing import Any, Dict, Iterator, List, Optional, Union @@ -230,14 +230,10 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): make_fp8_autocast = nullcontext if fp8_enabled and HAVE_TE: fp8_recipe = transformer_engine.common.recipe.DelayedScaling( - margin=0, - interval=1, - fp8_format=transformer_engine.common.recipe.Format.E4M3, + margin=0, interval=1, fp8_format=transformer_engine.common.recipe.Format.E4M3, ) make_fp8_autocast = partial( - transformer_engine.pytorch.fp8_autocast, - enabled=fp8_enabled, - fp8_recipe=fp8_recipe, + transformer_engine.pytorch.fp8_autocast, enabled=fp8_enabled, fp8_recipe=fp8_recipe, ) with make_fp8_autocast(): self.model = build_model( From 1edb8b527aeb667b3c23fcb67ffd3f1216254083 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Thu, 28 Sep 2023 13:50:50 -0700 Subject: [PATCH 12/23] Remove unrelated changes from main branch Signed-off-by: Tim Moon --- Dockerfile | 12 +---- README.rst | 46 +++++++++---------- nemo/core/optim/optimizer_with_main_params.py | 2 +- 3 files changed, 24 insertions(+), 36 deletions(-) diff --git a/Dockerfile b/Dockerfile index e935d3bd810c..5e7c01f733a8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -ARG BASE_IMAGE=nvcr.io/nvidia/pytorch:23.08-py3 +ARG BASE_IMAGE=nvcr.io/nvidia/pytorch:23.06-py3 # build an image that includes only the nemo dependencies, ensures that dependencies # are included first for optimal caching, and useful for building a development @@ -52,12 +52,6 @@ RUN git clone https://github.com/NVIDIA/apex.git && \ git checkout 52e18c894223800cb611682dce27d88050edf1de && \ pip3 install -v --no-build-isolation --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ -# install megatron core, this can be removed once 0.3 pip package is released -RUN git clone https://github.com/NVIDIA/Megatron-LM.git && \ - cd Megatron-LM && \ - git checkout 01c8704453af7e26134441224c8a351746ca0349 && \ - pip install -e . - # uninstall stuff from base container RUN pip3 uninstall -y sacrebleu torchtext @@ -82,8 +76,6 @@ RUN for f in $(ls requirements*.txt); do pip3 install --disable-pip-version-chec RUN pip install flash-attn # pinned triton version for flash-attention https://github.com/HazyResearch/flash-attention/blob/main/flash_attn/flash_attn_triton.py#L3 RUN pip install triton==2.0.0.dev20221202 -# install numba for latest containers -RUN pip install numba>=0.57.1 # install k2, skip if installation fails COPY scripts /tmp/nemo/scripts/ @@ -102,7 +94,7 @@ COPY . . # start building the final container FROM nemo-deps as nemo -ARG NEMO_VERSION=1.21.0 +ARG NEMO_VERSION=1.20.0 # Check that NEMO_VERSION is set. Build will fail without this. Expose NEMO and base container # version information as runtime environment variable for introspection purposes diff --git a/README.rst b/README.rst index 6dc491523a99..6fbe9047d0c4 100644 --- a/README.rst +++ b/README.rst @@ -41,14 +41,14 @@ Introduction ------------ -NVIDIA NeMo is a conversational AI toolkit built for researchers working on automatic speech recognition (ASR), -text-to-speech synthesis (TTS), large language models (LLMs), and +NVIDIA NeMo is a conversational AI toolkit built for researchers working on automatic speech recognition (ASR), +text-to-speech synthesis (TTS), large language models (LLMs), and natural language processing (NLP). -The primary objective of NeMo is to help researchers from industry and academia to reuse prior work (code and pretrained models) +The primary objective of NeMo is to help researchers from industry and academia to reuse prior work (code and pretrained models) and make it easier to create new `conversational AI models `_. -All NeMo models are trained with `Lightning `_ and -training is automatically scalable to 1000s of GPUs. +All NeMo models are trained with `Lightning `_ and +training is automatically scalable to 1000s of GPUs. Additionally, NeMo Megatron LLM models can be trained up to 1 trillion parameters using tensor and pipeline model parallelism. NeMo models can be optimized for inference and deployed for production use-cases with `NVIDIA Riva `_. @@ -57,14 +57,14 @@ State of the Art pretrained NeMo models are freely available on `HuggingFace Hub `NVIDIA NGC `_. These models can be used to transcribe audio, synthesize speech, or translate text in just a few lines of code. -We have extensive `tutorials `_ that +We have extensive `tutorials `_ that can all be run on `Google Colab `_. -For advanced users that want to train NeMo models from scratch or finetune existing NeMo models +For advanced users that want to train NeMo models from scratch or finetune existing NeMo models we have a full suite of `example scripts `_ that support multi-GPU/multi-node training. For scaling NeMo LLM training on Slurm clusters or public clouds, please see the `NVIDIA NeMo Megatron Launcher `_. -The NM launcher has extensive recipes, scripts, utilities, and documentation for training NeMo LLMs and also has an `Autoconfigurator `_ +The NM launcher has extensive recipes, scripts, utilities, and documentation for training NeMo LLMs and also has an `Autoconfigurator `_ which can be used to find the optimal model parallel configuration for training on a specific cluster. Also see our `introductory video `_ for a high level overview of NeMo. @@ -115,15 +115,13 @@ Key Features * `Prompt Learning `_ * `NGC collection of pre-trained NLP models. `_ * `Synthetic Tabular Data Generation `_ -* Text-to-Speech Synthesis (TTS): - * `Documentation `_ - * Mel-Spectrogram generators: FastPitch, SSL FastPitch, Mixer-TTS/Mixer-TTS-X, RAD-TTS, Tacotron2 - * Vocoders: HiFiGAN, UnivNet, WaveGlow - * End-to-End Models: VITS - * `Pre-trained Model Checkpoints in NVIDIA GPU Cloud (NGC) `_ +* `Speech synthesis (TTS) `_ + * Spectrogram generation: Tacotron2, GlowTTS, TalkNet, FastPitch, FastSpeech2, Mixer-TTS, Mixer-TTS-X + * Vocoders: WaveGlow, SqueezeWave, UniGlow, MelGAN, HiFiGAN, UnivNet + * End-to-end speech generation: FastPitch_HifiGan_E2E, FastSpeech2_HifiGan_E2E, VITS + * `NGC collection of pre-trained TTS models. `_ * `Tools `_ * `Text Processing (text normalization and inverse text normalization) `_ - * `NeMo Forced Aligner `_ * `CTC-Segmentation tool `_ * `Speech Data Explorer `_: a dash-based tool for interactive exploration of ASR/TTS datasets * `Speech Data Processor `_ @@ -134,7 +132,7 @@ Built for speed, NeMo can utilize NVIDIA's Tensor Cores and scale out training t Requirements ------------ -1) Python 3.10 or above +1) Python 3.9 or above 2) Pytorch 1.13.1 or above 3) NVIDIA GPU for training @@ -178,7 +176,7 @@ We recommend installing NeMo in a fresh Conda environment. .. code-block:: bash - conda create --name nemo python==3.10.12 + conda create --name nemo python==3.8.10 conda activate nemo Install PyTorch using their `configurator `_. @@ -247,8 +245,8 @@ To install Apex, run git clone https://github.com/NVIDIA/apex.git cd apex - git checkout 52e18c894223800cb611682dce27d88050edf1de - pip install -v --no-build-isolation --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ + git checkout 57057e2fcf1c084c0fcc818f55c0ff6ea1b24ae2 + pip install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ It is highly recommended to use the NVIDIA PyTorch or NeMo container if having issues installing Apex or any other dependencies. @@ -267,8 +265,6 @@ packaging is also needed: pip install packaging -With the latest versions of Apex, the `pyproject.toml` file in Apex may need to be deleted in order to install locally. - Transformer Engine ~~~~~~~~~~~~~~~~~~ @@ -287,7 +283,7 @@ Transformer Engine requires PyTorch to be built with CUDA 11.8. Flash Attention ~~~~~~~~~~~~~~~~~~~~ -Transformer Engine already supports Flash Attention for GPT models. If you want to use Flash Attention for non-causal models or use with attention bias (introduced from position encoding, e.g. Alibi), please install `flash-attn `_. +Transformer Engine already supports Flash Attention for GPT models. If you want to use Flash Attention for non-causal models or use with attention bias (introduced from position encoding, e.g. Alibi), please install `flash-attn `_. .. code-block:: bash @@ -296,7 +292,7 @@ Transformer Engine already supports Flash Attention for GPT models. If you want NLP inference UI ~~~~~~~~~~~~~~~~~~~~ -To launch the inference web UI server, please install the gradio `gradio `_. +To launch the inference web UI server, please install the gradio `gradio `_. .. code-block:: bash @@ -308,13 +304,13 @@ NeMo Text Processing, specifically (Inverse) Text Normalization, is now a separa Docker containers: ~~~~~~~~~~~~~~~~~~ -We release NeMo containers alongside NeMo releases. For example, NeMo ``r1.20.0`` comes with container ``nemo:23.06``, you may find more details about released containers in `releases page `_. +We release NeMo containers alongside NeMo releases. For example, NeMo ``r1.19.0`` comes with container ``nemo:23.04``, you may find more details about released containers in `releases page `_. To use built container, please run .. code-block:: bash - docker pull nvcr.io/nvidia/nemo:23.06 + docker pull nvcr.io/nvidia/nemo:23.04 To build a nemo container with Dockerfile from a branch, please run diff --git a/nemo/core/optim/optimizer_with_main_params.py b/nemo/core/optim/optimizer_with_main_params.py index 34467275df56..44d54a0e63ff 100644 --- a/nemo/core/optim/optimizer_with_main_params.py +++ b/nemo/core/optim/optimizer_with_main_params.py @@ -162,7 +162,7 @@ class MainParamsOptimizerWrapper(torch.optim.Optimizer): Arguments: optimizer: base optimizer such as Adam or SGD. fp32_grad_accum: to enable the use of fp32 in gradient accumulation and allreduce. - contiguous_grad_bucket: to enable allocating the master gradients in the + contiguous_grad_bucket: to enable allocating the master gradients in the contiguous memory space to reduce memory fragmentation. async_grad_allreduce: enable asynchronous gradient allreduce that is executed along with the training step backprop. From a99ba3a4e29540e2daacb1b076905101bb392180 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Fri, 29 Sep 2023 05:57:17 -0700 Subject: [PATCH 13/23] Update with refactored Float8Tensor class Signed-off-by: Tim Moon --- nemo/core/optim/distributed_adam.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index 76475214e436..874f22ead39a 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -360,7 +360,7 @@ def _param_copy_fragments(self, fragments: Iterable[DistributedFusedAdam.Paramet for fragment in fragments: param = self.parameter(fragment) if _is_fp8_tensor(param): - param._transpose = None + param._reset_caches() for fragment in fragments: param = self.parameter(fragment) if _is_fp8_tensor(param): @@ -390,11 +390,11 @@ def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedA if not _is_fp8_tensor(param): continue i += 1 - fp8_meta = param.fp8_meta_view["scaling_fwd"] - fp8_meta_index = param.gemm_index + fp8_meta = param._fp8_meta["scaling_fwd"] + fp8_meta_index = param._fp8_meta_index amaxes.append(fp8_meta.amax_history[0][fp8_meta_index].view(1)) scales.append(fp8_meta.scale[fp8_meta_index].view(1)) - param._scale_inv_cache = scale_invs[i] + param._scale_inv = scale_invs[i] # Update cached scale-inverses scale_inv_views = [scale_invs[i].view(1) for i in range(num_fp8_params)] @@ -410,9 +410,9 @@ def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedA continue # FP8 metadata - fp8_meta = param.fp8_meta_view["scaling_fwd"] - fp8_meta_index = param.gemm_index - fp8_dtype = get_fp8_te_dtype(param.fp8_meta_view["recipe"], fprop_tensor=True,) + fp8_meta = param._fp8_meta["scaling_fwd"] + fp8_meta_index = param._fp8_meta_index + fp8_dtype = param._fp8_dtype # Iterate through fragments with local data for fragment in self.state[param]["fragments"]: @@ -448,6 +448,7 @@ def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedA params_buckets[bucket_id].params_shard = params_shard # Reduce amaxes + # Note: Assume each param has a separate amax packed_amaxes = torch.empty(num_fp8_params, dtype=torch.float32, device=self.device,) packed_amax_views = [packed_amaxes[i].view(1) for i in range(num_fp8_params)] _multi_tensor_copy( From 7f6adc8d2aa53d0656597e6ce8b52f863b83cf8b Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Sun, 1 Oct 2023 14:57:29 -0700 Subject: [PATCH 14/23] Update FP8 tensor scale factors in-place Better behavior for CUDA graphs Signed-off-by: Tim Moon --- nemo/core/optim/distributed_adam.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index 874f22ead39a..d78c141d7f51 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -116,7 +116,7 @@ def __init__( fp32_params = [] for param_group in param_groups: fp32_params.extend( - filter(lambda param: getattr(param, '_with_fp32_optimizer', False), param_group['params'],) + filter(lambda param: getattr(param, '_with_fp32_optimizer', False), param_group['params']) ) if fp32_params: assert self.dtype == torch.float32, ( @@ -130,7 +130,7 @@ def _broadcast_params(self) -> None: # Assume params have already been synchronized pass - def _make_post_backward_hook(self, param: torch.nn.Parameter, param_group_id: int, param_id: int,) -> Callable: + def _make_post_backward_hook(self, param: torch.nn.Parameter, param_group_id: int, param_id: int) -> Callable: def hook(*unused): if getattr(param, '_pre_forward_hook_is_enabled', False): raise RuntimeError( @@ -285,7 +285,7 @@ def grad_norm( if force or self._grad_norm is None: # Compute norm of local gradients for distributed optimizer - grad_norm_sq = self._local_grad_norm(parameters=parameters, norm_type=norm_type,) + grad_norm_sq = self._local_grad_norm(parameters=parameters, norm_type=norm_type) if self.redundant_size > 1: grad_norm_sq /= self.redundant_size @@ -299,7 +299,7 @@ def grad_norm( return super().grad_norm() @torch.no_grad() - def _param_copy_fragments(self, fragments: Iterable[DistributedFusedAdam.ParameterFragment],) -> None: + def _param_copy_fragments(self, fragments: Iterable[DistributedFusedAdam.ParameterFragment]) -> None: """Update parameter fragments with values from parameter buckets For FP8 params, values are copied directly into the FP8 data @@ -367,7 +367,7 @@ def _param_copy_fragments(self, fragments: Iterable[DistributedFusedAdam.Paramet param.transpose() @torch.no_grad() - def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedAdam.ParameterBucket],) -> None: + def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedAdam.ParameterBucket]) -> None: """Make sure local shards of parameters are in expected datatypes For FP8 params, FP32 values are cast into FP8 using per-param @@ -384,7 +384,7 @@ def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedA # FP8 scaling factors amaxes = [] scales = [] - scale_invs = torch.empty(num_fp8_params, dtype=torch.float32, device=self.device,) + scale_invs = [] i = -1 for param in self.parameters(): if not _is_fp8_tensor(param): @@ -394,14 +394,18 @@ def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedA fp8_meta_index = param._fp8_meta_index amaxes.append(fp8_meta.amax_history[0][fp8_meta_index].view(1)) scales.append(fp8_meta.scale[fp8_meta_index].view(1)) - param._scale_inv = scale_invs[i] + scale_invs.append(param._scale_inv.view(1)) # Update cached scale-inverses - scale_inv_views = [scale_invs[i].view(1) for i in range(num_fp8_params)] + packed_scales = torch.empty(num_fp8_params, dtype=torch.float32, device=self.device) + packed_scale_views = [packed_scales[i].view(1) for i in range(num_fp8_params)] _multi_tensor_copy( - scales, scale_inv_views, dummy_overflow_buf=self._dummy_overflow_buf, + scales, packed_scale_views, dummy_overflow_buf=self._dummy_overflow_buf, + ) + torch.reciprocal(packed_scales, out=packed_scales) + _multi_tensor_copy( + packed_scale_views, scale_invs, dummy_overflow_buf=self._dummy_overflow_buf, ) - torch.reciprocal(scale_invs, out=scale_invs) # Cast local data to FP8 fp8_params_shards = dict() @@ -434,7 +438,7 @@ def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedA # Allocate FP8 buffer if needed if bucket_id not in fp8_params_shards: - fp8_params_shards[bucket_id] = torch.empty_like(param_bucket.params_shard, dtype=torch.uint8,) + fp8_params_shards[bucket_id] = torch.empty_like(param_bucket.params_shard, dtype=torch.uint8) # FP8 cast and amax fp32_fragment = param_bucket.params_shard[shard_range].view(1, -1) @@ -449,7 +453,7 @@ def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedA # Reduce amaxes # Note: Assume each param has a separate amax - packed_amaxes = torch.empty(num_fp8_params, dtype=torch.float32, device=self.device,) + packed_amaxes = torch.empty(num_fp8_params, dtype=torch.float32, device=self.device) packed_amax_views = [packed_amaxes[i].view(1) for i in range(num_fp8_params)] _multi_tensor_copy( amaxes, packed_amax_views, dummy_overflow_buf=self._dummy_overflow_buf, From 9e3c78870838705217df61d21447ebd64dc816d2 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Tue, 3 Oct 2023 15:24:32 -0700 Subject: [PATCH 15/23] Use updated initialization of FP8 models Signed-off-by: Tim Moon --- Dockerfile | 6 +++-- README.rst | 24 +++++++++---------- .../language_modeling/megatron_gpt_model.py | 2 +- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5e7c01f733a8..ef97b070b24e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -49,8 +49,10 @@ WORKDIR /tmp/ # Distributed Adam support for multiple dtypes RUN git clone https://github.com/NVIDIA/apex.git && \ cd apex && \ - git checkout 52e18c894223800cb611682dce27d88050edf1de && \ - pip3 install -v --no-build-isolation --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ + git checkout 2386a912164b0c5cfcd8be7a2b890fbac5607c82 && \ + pip3 install -v --no-build-isolation --config-settings --build-option="--cpp_ext --cuda_ext --fast_layer_norm --distributed_adam --deprecated_fused_adam" . + +RUN pip3 install git+https://github.com/timmoon10/TransformerEngine.git@float8tensor_experiments # uninstall stuff from base container RUN pip3 uninstall -y sacrebleu torchtext diff --git a/README.rst b/README.rst index 6fbe9047d0c4..8dc820ed83a1 100644 --- a/README.rst +++ b/README.rst @@ -41,14 +41,14 @@ Introduction ------------ -NVIDIA NeMo is a conversational AI toolkit built for researchers working on automatic speech recognition (ASR), -text-to-speech synthesis (TTS), large language models (LLMs), and +NVIDIA NeMo is a conversational AI toolkit built for researchers working on automatic speech recognition (ASR), +text-to-speech synthesis (TTS), large language models (LLMs), and natural language processing (NLP). -The primary objective of NeMo is to help researchers from industry and academia to reuse prior work (code and pretrained models) +The primary objective of NeMo is to help researchers from industry and academia to reuse prior work (code and pretrained models) and make it easier to create new `conversational AI models `_. -All NeMo models are trained with `Lightning `_ and -training is automatically scalable to 1000s of GPUs. +All NeMo models are trained with `Lightning `_ and +training is automatically scalable to 1000s of GPUs. Additionally, NeMo Megatron LLM models can be trained up to 1 trillion parameters using tensor and pipeline model parallelism. NeMo models can be optimized for inference and deployed for production use-cases with `NVIDIA Riva `_. @@ -57,14 +57,14 @@ State of the Art pretrained NeMo models are freely available on `HuggingFace Hub `NVIDIA NGC `_. These models can be used to transcribe audio, synthesize speech, or translate text in just a few lines of code. -We have extensive `tutorials `_ that +We have extensive `tutorials `_ that can all be run on `Google Colab `_. -For advanced users that want to train NeMo models from scratch or finetune existing NeMo models +For advanced users that want to train NeMo models from scratch or finetune existing NeMo models we have a full suite of `example scripts `_ that support multi-GPU/multi-node training. For scaling NeMo LLM training on Slurm clusters or public clouds, please see the `NVIDIA NeMo Megatron Launcher `_. -The NM launcher has extensive recipes, scripts, utilities, and documentation for training NeMo LLMs and also has an `Autoconfigurator `_ +The NM launcher has extensive recipes, scripts, utilities, and documentation for training NeMo LLMs and also has an `Autoconfigurator `_ which can be used to find the optimal model parallel configuration for training on a specific cluster. Also see our `introductory video `_ for a high level overview of NeMo. @@ -245,8 +245,8 @@ To install Apex, run git clone https://github.com/NVIDIA/apex.git cd apex - git checkout 57057e2fcf1c084c0fcc818f55c0ff6ea1b24ae2 - pip install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./ + git checkout 2386a912164b0c5cfcd8be7a2b890fbac5607c82 + pip3 install -v --no-build-isolation --config-settings --build-option="--cpp_ext --cuda_ext --fast_layer_norm --distributed_adam --deprecated_fused_adam" . It is highly recommended to use the NVIDIA PyTorch or NeMo container if having issues installing Apex or any other dependencies. @@ -283,7 +283,7 @@ Transformer Engine requires PyTorch to be built with CUDA 11.8. Flash Attention ~~~~~~~~~~~~~~~~~~~~ -Transformer Engine already supports Flash Attention for GPT models. If you want to use Flash Attention for non-causal models or use with attention bias (introduced from position encoding, e.g. Alibi), please install `flash-attn `_. +Transformer Engine already supports Flash Attention for GPT models. If you want to use Flash Attention for non-causal models or use with attention bias (introduced from position encoding, e.g. Alibi), please install `flash-attn `_. .. code-block:: bash @@ -292,7 +292,7 @@ Transformer Engine already supports Flash Attention for GPT models. If you want NLP inference UI ~~~~~~~~~~~~~~~~~~~~ -To launch the inference web UI server, please install the gradio `gradio `_. +To launch the inference web UI server, please install the gradio `gradio `_. .. code-block:: bash diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index d5b2359e39e1..8c806c49e675 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -233,7 +233,7 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): margin=0, interval=1, fp8_format=transformer_engine.common.recipe.Format.E4M3, ) make_fp8_autocast = partial( - transformer_engine.pytorch.fp8_autocast, enabled=fp8_enabled, fp8_recipe=fp8_recipe, + transformer_engine.pytorch.fp8_autocast, enabled=fp8_enabled, fp8_recipe=fp8_recipe, fp8_parameters=True, ) with make_fp8_autocast(): self.model = build_model( From ecd883d9398e00142226665d98afca2c391e467c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 3 Oct 2023 22:27:59 +0000 Subject: [PATCH 16/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../nlp/models/language_modeling/megatron_gpt_model.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 8c806c49e675..44f5e1024b23 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -233,7 +233,10 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): margin=0, interval=1, fp8_format=transformer_engine.common.recipe.Format.E4M3, ) make_fp8_autocast = partial( - transformer_engine.pytorch.fp8_autocast, enabled=fp8_enabled, fp8_recipe=fp8_recipe, fp8_parameters=True, + transformer_engine.pytorch.fp8_autocast, + enabled=fp8_enabled, + fp8_recipe=fp8_recipe, + fp8_parameters=True, ) with make_fp8_autocast(): self.model = build_model( From c9a8179615325286e4754af2c1a224a56889089c Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Wed, 25 Oct 2023 14:47:41 -0700 Subject: [PATCH 17/23] Update TE FP8 tensor support Signed-off-by: Tim Moon --- .../models/language_modeling/megatron_gpt_model.py | 11 +++-------- .../nlp/modules/common/megatron/transformer.py | 2 -- nemo/core/optim/distributed_adam.py | 2 +- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 6b352b604260..dbcdcf864b05 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -228,18 +228,13 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): ) else: fp8_enabled = cfg.get('fp8', False) - make_fp8_autocast = nullcontext + make_model_context = nullcontext if fp8_enabled and HAVE_TE: fp8_recipe = transformer_engine.common.recipe.DelayedScaling( margin=0, interval=1, fp8_format=transformer_engine.common.recipe.Format.E4M3, ) - make_fp8_autocast = partial( - transformer_engine.pytorch.fp8_autocast, - enabled=fp8_enabled, - fp8_recipe=fp8_recipe, - fp8_parameters=True, - ) - with make_fp8_autocast(): + make_model_context = partial(transformer_engine.pytorch.fp8_model_init, enabled=True) + with make_model_context(): self.model = build_model( model_provider_func=self.model_provider_func, wrap_with_ddp=False, diff --git a/nemo/collections/nlp/modules/common/megatron/transformer.py b/nemo/collections/nlp/modules/common/megatron/transformer.py index 98dba5423009..2ad44357dc4e 100644 --- a/nemo/collections/nlp/modules/common/megatron/transformer.py +++ b/nemo/collections/nlp/modules/common/megatron/transformer.py @@ -835,8 +835,6 @@ def __init__( params_dtype=params_dtype, get_rng_state_tracker=get_rng_state_tracker, fuse_wgrad_accumulation=fuse_wgrad_accumulation, - apply_query_key_layer_scaling=apply_query_key_layer_scaling, - attention_softmax_in_fp32=attention_softmax_in_fp32, seq_length=seq_length, micro_batch_size=micro_batch_size, sequence_parallel=sequence_parallel, diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index d78c141d7f51..c29b6a27c389 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -364,7 +364,7 @@ def _param_copy_fragments(self, fragments: Iterable[DistributedFusedAdam.Paramet for fragment in fragments: param = self.parameter(fragment) if _is_fp8_tensor(param): - param.transpose() + param.transpose(cache=True) @torch.no_grad() def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedAdam.ParameterBucket]) -> None: From 7c390c2d547723830006e6875845c6f84c0ce058 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Fri, 27 Oct 2023 16:39:42 -0700 Subject: [PATCH 18/23] Do not precompute FP8 transposes in distopt Signed-off-by: Tim Moon --- nemo/core/optim/distributed_adam.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index c29b6a27c389..0e798281339e 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -356,16 +356,6 @@ def _param_copy_fragments(self, fragments: Iterable[DistributedFusedAdam.Paramet buffers_in, buffers_out, dummy_overflow_buf=self._dummy_overflow_buf, ) - # Precompute transposes - for fragment in fragments: - param = self.parameter(fragment) - if _is_fp8_tensor(param): - param._reset_caches() - for fragment in fragments: - param = self.parameter(fragment) - if _is_fp8_tensor(param): - param.transpose(cache=True) - @torch.no_grad() def _check_params_shard_dtypes(self, params_buckets: Dict[int, DistributedFusedAdam.ParameterBucket]) -> None: """Make sure local shards of parameters are in expected datatypes From e4b6751177a13fc1259b3b9aafd444884f035d2e Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Tue, 31 Oct 2023 10:48:20 -0700 Subject: [PATCH 19/23] Fix import error Signed-off-by: Tim Moon --- nemo/core/optim/distributed_adam.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index 0e798281339e..d4063e6026aa 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -27,9 +27,8 @@ # Check if Transformer Engine has FP8 tensor class HAVE_TE_FP8TENSOR = False try: - from transformer_engine.pytorch import Float8Tensor + from transformer_engine.pytorch.float8_tensor import Float8Tensor from transformer_engine.pytorch.cpp_extensions import cast_to_fp8 - from transformer_engine.pytorch.fp8 import get_fp8_te_dtype HAVE_TE_FP8TENSOR = True except (ImportError, ModuleNotFoundError): From e924d731d315cd1be4efb3ec3b7edc6cb40686a5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 31 Oct 2023 17:51:13 +0000 Subject: [PATCH 20/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- nemo/core/optim/distributed_adam.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index d4063e6026aa..080e54375d6d 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -27,8 +27,8 @@ # Check if Transformer Engine has FP8 tensor class HAVE_TE_FP8TENSOR = False try: - from transformer_engine.pytorch.float8_tensor import Float8Tensor from transformer_engine.pytorch.cpp_extensions import cast_to_fp8 + from transformer_engine.pytorch.float8_tensor import Float8Tensor HAVE_TE_FP8TENSOR = True except (ImportError, ModuleNotFoundError): From 7e4810178cbd382c5e5dc2076e8b5b5b7173fcbc Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Tue, 31 Oct 2023 16:25:16 -0700 Subject: [PATCH 21/23] Support distopt contiguous param buffer with FP8 params Signed-off-by: Tim Moon --- nemo/core/optim/distributed_adam.py | 74 +++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index 080e54375d6d..a1b30c58de71 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -251,6 +251,80 @@ def _init_param_state( self.state[param].update(self.state[fp32_param]) del self.state[fp32_param] + @torch.no_grad() + def init_param_buffer(self) -> None: + """Allocate contiguous buffers for param buckets + + For FP8 params, the FP8 data buffer is made a view into a + contiguous buffer. + + """ + + # Make sure all params are initialized + self.contiguous_param_buffer = True + self.init_params() + + # Construct param buffers + buffer_sizes = collections.defaultdict(lambda: 0) + for bucket in self.state["buckets"]: + dtypes = bucket.dtypes() + buffer_sizes[dtypes] = max( + bucket.contiguous_buffer_offset + bucket.bucket_size, + buffer_sizes[dtypes], + ) + for dtypes, buffer_size in buffer_sizes.items(): + _, _, param_sync_dtype = dtypes + self._param_buffers[dtypes] = torch.zeros( + [buffer_size], + dtype=param_sync_dtype, + device=self.device, + ) + + # Figure out corresponding positions in params and param buffer + params = list(self.parameters()) + param_flat_views = [] + param_buffer_views = [] + for i, param in enumerate(params): + fragment = self.state[param]["fragments"][0] + bucket_id = fragment.bucket_id + bucket = self.state["buckets"][bucket_id] + param_size = param.numel() + bucket_start, _ = fragment.bucket_range + buffer_offset = bucket.contiguous_buffer_offset + buffer_start = buffer_offset + bucket_start + buffer_end = buffer_start + param_size + param_buffer = self._param_buffers[bucket.dtypes()] + param_buffer_view = param_buffer[buffer_start:buffer_end].detach() + if param_buffer_view.device != param.device: + raise RuntimeError( + "Attempted to change a parameter with device={param.device} " + f"into a buffer view with device={param_buffer_view.device}" + ) + if _is_fp8_tensor(param): + param_flat_views.append(param._data.detach().view(-1)) + else: + if param_buffer_view.dtype != param.dtype: + raise RuntimeError( + f"Attempted to change a parameter with dtype={param.dtype} " + f"into a buffer view with dtype={param_buffer_view.dtype}" + ) + param_flat_views.append(param.detach().view(-1)) + param_buffer_views.append(param_buffer_view) + + # Copy values into param buffer + _multi_tensor_copy( + param_flat_views, + param_buffer_views, + dummy_overflow_buf=self._dummy_overflow_buf, + ) + + # Make all params a view into the param buffer + for param, buffer_view in zip(params, param_buffer_views): + if _is_fp8_tensor(param): + param._data = buffer_view.view(param.size()) + else: + param.data = buffer_view.view(param.size()) + def try_grad_sync(self, params: Iterable[torch.nn.Parameter]) -> None: def is_grad_copy_enabled(param: torch.nn.Parameter) -> bool: return not getattr(param, '_disable_greedy_grad_copy', False) and not getattr( From 48dc4a5f7bac31b76e98c06f52dc706c705bfb17 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 31 Oct 2023 23:26:31 +0000 Subject: [PATCH 22/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- nemo/core/optim/distributed_adam.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/nemo/core/optim/distributed_adam.py b/nemo/core/optim/distributed_adam.py index a1b30c58de71..6f4714323e8e 100644 --- a/nemo/core/optim/distributed_adam.py +++ b/nemo/core/optim/distributed_adam.py @@ -268,17 +268,10 @@ def init_param_buffer(self) -> None: buffer_sizes = collections.defaultdict(lambda: 0) for bucket in self.state["buckets"]: dtypes = bucket.dtypes() - buffer_sizes[dtypes] = max( - bucket.contiguous_buffer_offset + bucket.bucket_size, - buffer_sizes[dtypes], - ) + buffer_sizes[dtypes] = max(bucket.contiguous_buffer_offset + bucket.bucket_size, buffer_sizes[dtypes],) for dtypes, buffer_size in buffer_sizes.items(): _, _, param_sync_dtype = dtypes - self._param_buffers[dtypes] = torch.zeros( - [buffer_size], - dtype=param_sync_dtype, - device=self.device, - ) + self._param_buffers[dtypes] = torch.zeros([buffer_size], dtype=param_sync_dtype, device=self.device,) # Figure out corresponding positions in params and param buffer params = list(self.parameters()) @@ -313,9 +306,7 @@ def init_param_buffer(self) -> None: # Copy values into param buffer _multi_tensor_copy( - param_flat_views, - param_buffer_views, - dummy_overflow_buf=self._dummy_overflow_buf, + param_flat_views, param_buffer_views, dummy_overflow_buf=self._dummy_overflow_buf, ) # Make all params a view into the param buffer From 1a8c3558c833a8462819d6af0f033051d09a4ba1 Mon Sep 17 00:00:00 2001 From: Tim Moon Date: Fri, 3 Nov 2023 17:46:22 -0700 Subject: [PATCH 23/23] Add envvar to configure FP8 params Signed-off-by: Tim Moon --- .../nlp/models/language_modeling/megatron_gpt_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index dbcdcf864b05..b6058ee3f002 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -227,7 +227,7 @@ def __init__(self, cfg: DictConfig, trainer: Trainer): virtual_pipeline_model_parallel_size=self.cfg.get('virtual_pipeline_model_parallel_size', None), ) else: - fp8_enabled = cfg.get('fp8', False) + fp8_enabled = cfg.get('fp8', False) and int(os.getenv("NEMO_WITH_FP8_PARAMS", "1")) make_model_context = nullcontext if fp8_enabled and HAVE_TE: fp8_recipe = transformer_engine.common.recipe.DelayedScaling(