From e9beff8e7ba8f89b96df7fc4b66f98575ef66d13 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Fri, 28 Jun 2024 13:36:43 +0000 Subject: [PATCH 1/7] fix cross-PP-stage position id length diff bug --- .../booster/plugin/hybrid_parallel_plugin.py | 1 + colossalai/shardformer/modeling/llama.py | 24 ++++++++++--- colossalai/shardformer/policies/llama.py | 8 ----- .../test_model/test_shard_llama.py | 35 +++++++++++-------- 4 files changed, 42 insertions(+), 26 deletions(-) diff --git a/colossalai/booster/plugin/hybrid_parallel_plugin.py b/colossalai/booster/plugin/hybrid_parallel_plugin.py index 3bd43f172cf8..acaf8a539dd6 100644 --- a/colossalai/booster/plugin/hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/hybrid_parallel_plugin.py @@ -1206,6 +1206,7 @@ def configure( and self.enable_sequence_parallelism and self.sequence_parallelism_mode == "all_to_all" ) + # sync gradients across DP * SP ranks if self.enable_sequence_parallelism and self.sequence_parallelism_mode == "all_to_all": dp_group = self.pg_mesh.create_group_along_axis([self.dp_axis, self.sp_axis]) else: diff --git a/colossalai/shardformer/modeling/llama.py b/colossalai/shardformer/modeling/llama.py index bf5ce45a8342..cf7fd13c898b 100644 --- a/colossalai/shardformer/modeling/llama.py +++ b/colossalai/shardformer/modeling/llama.py @@ -86,13 +86,20 @@ def llama_model_forward( device = input_ids.device if input_ids is not None else inputs_embeds.device if inputs_embeds is None: inputs_embeds = self.embed_tokens(input_ids) - hidden_states = inputs_embeds else: input_shape = hidden_states.shape[:-1] batch_size, seq_length = input_shape device = hidden_states.device + # Support SP + PP + sp_mode = shard_config.sequence_parallelism_mode + sp_group = shard_config.sequence_parallel_process_group + sp_size = shard_config.sequence_parallel_size + if sp_mode == "all_to_all" and not stage_manager.is_first_stage(): + # For correct positions ids. The states will be gather along the seq dim in the attention layer later. + seq_length *= sp_size + past_seen_tokens = 0 if use_cache: # kept for BC (cache positions) if not isinstance(past_key_values, StaticCache): @@ -101,7 +108,7 @@ def llama_model_forward( if cache_position is None: if isinstance(past_key_values, StaticCache): raise ValueError("cache_position is a required argument when using StaticCache.") - cache_position = torch.arange(past_seen_tokens, past_seen_tokens + hidden_states.shape[1], device=device) + cache_position = torch.arange(past_seen_tokens, past_seen_tokens + seq_length, device=device) seq_length_with_past = seq_length + past_seen_tokens @@ -118,7 +125,6 @@ def llama_model_forward( if position_ids is None: position_ids = cache_position.unsqueeze(0) - # embed positions, for the first stage, hidden_states is the input embeddings, # for the other stages, hidden_states is the output of the previous stage if shard_config.enable_flash_attention: @@ -134,6 +140,13 @@ def llama_model_forward( else: attention_mask = self._update_causal_mask(attention_mask, hidden_states, cache_position) + # Support SP + PP + if stage_manager.is_first_stage(): + if sp_mode in ["ring", "split_gather"]: + hidden_states = split_forward_gather_backward(hidden_states, 1, sp_group) + elif sp_mode == "all_to_all": + hidden_states = split_forward_gather_backward(hidden_states, 1, sp_group, 1 / sp_size) + if self.gradient_checkpointing and self.training and use_cache: if use_cache: logger.warning_once( @@ -196,6 +209,10 @@ def llama_model_forward( if stage_manager.is_last_stage(): hidden_states = self.norm(hidden_states) + if sp_mode == "ring" or sp_mode == "split_gather": + hidden_states = gather_forward_split_backward(hidden_states, 1, sp_group) + elif sp_mode == "all_to_all": + hidden_states = gather_forward_split_backward(hidden_states, 1, sp_group, grad_scale=sp_size) # add hidden states from the last decoder layer if output_hidden_states: @@ -529,7 +546,6 @@ def forward( ) kv_seq_len += past_key_value.get_usable_length(kv_seq_len, self.layer_idx) - cos, sin = self.rotary_emb(value_states, position_ids) query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) diff --git a/colossalai/shardformer/policies/llama.py b/colossalai/shardformer/policies/llama.py index 85ec6717d0d1..36491b4b5522 100644 --- a/colossalai/shardformer/policies/llama.py +++ b/colossalai/shardformer/policies/llama.py @@ -1,4 +1,3 @@ -import warnings from functools import partial from typing import Callable, Dict, List, Union @@ -66,13 +65,6 @@ def module_policy(self) -> Dict[Union[str, nn.Module], ModulePolicyDescription]: else: norm_cls = RMSNorm - if self.pipeline_stage_manager is not None: - self.shard_config.enable_sequence_parallelism = False - self.shard_config.enable_sequence_overlap = False - self.shard_config.sequence_parallelism_mode = None - warnings.warn( - f"For llama, sequence parallelism is currently not compatible with pipeline parallelism, set to be False" - ) sp_mode = self.shard_config.sequence_parallelism_mode or None sp_size = self.shard_config.sequence_parallel_size or None sp_group = self.shard_config.sequence_parallel_process_group or None diff --git a/tests/test_shardformer/test_model/test_shard_llama.py b/tests/test_shardformer/test_model/test_shard_llama.py index 3a8a1357deb0..38bde68a4663 100644 --- a/tests/test_shardformer/test_model/test_shard_llama.py +++ b/tests/test_shardformer/test_model/test_shard_llama.py @@ -59,9 +59,12 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, if ( booster.plugin.zero_stage in [1, 2] and booster.plugin.shard_config.enable_sequence_parallelism + and booster.plugin.shard_config.pp_size == 1 and booster.plugin.shard_config.sequence_parallelism_mode == "all_to_all" ): - for p1, p2 in zip(llama_model.parameters(), sharded_optimizer._master_param_groups_of_current_rank[0]): + for (name, p1), p2 in zip( + llama_model.named_parameters(), sharded_optimizer._master_param_groups_of_current_rank[0] + ): working_p = sharded_optimizer._param_store.master_to_working_param[id(p2)] grads = sharded_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(working_p)) grad_index = ( @@ -69,7 +72,11 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, ) grad = grads[grad_index] sharded_grad = p1.grad.view(-1).chunk(dist.get_world_size())[dist.get_rank()] - assert_close(sharded_grad, grad[: sharded_grad.shape[0]], atol=5e-3, rtol=5e-3, check_dtype=False) + try: + assert_close(sharded_grad, grad[: sharded_grad.shape[0]], atol=5e-3, rtol=5e-3, check_dtype=False) + except Exception as e: + print(f"Failed param name: {name}") + raise e # Save gradient tensors for comparison between the original model and the sharded model before optimizer step. grads_to_check = {} @@ -144,29 +151,29 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, @parameterize( "test_config", [ - { # Test ring + Flash attention - "tp_size": 2, - "pp_size": 1, + { # Ulysess + Flash attention + "tp_size": 1, + "pp_size": 2, "sp_size": 2, - "num_microbatches": 1, + "num_microbatches": 2, "enable_sequence_parallelism": True, - "sequence_parallelism_mode": "ring", + "sequence_parallelism_mode": "all_to_all", "enable_flash_attention": True, "use_lazy_init": True, - "zero_stage": 2, + "zero_stage": 0, "precision": "fp16", "initial_scale": 1, }, - { # Ulysess + Flash attention - "tp_size": 1, - "pp_size": 2, + { # Test ring + Flash attention + "tp_size": 2, + "pp_size": 1, "sp_size": 2, - "num_microbatches": 2, + "num_microbatches": 1, "enable_sequence_parallelism": True, - "sequence_parallelism_mode": "all_to_all", + "sequence_parallelism_mode": "ring", "enable_flash_attention": True, "use_lazy_init": True, - "zero_stage": 1, + "zero_stage": 2, "precision": "fp16", "initial_scale": 1, }, From 2ac4574e19599251c10968f8d928b152f12303bd Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Sat, 29 Jun 2024 02:34:57 +0000 Subject: [PATCH 2/7] fix typo --- tests/test_shardformer/test_model/test_shard_llama.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_shardformer/test_model/test_shard_llama.py b/tests/test_shardformer/test_model/test_shard_llama.py index 5e489457147c..717b278ac6a6 100644 --- a/tests/test_shardformer/test_model/test_shard_llama.py +++ b/tests/test_shardformer/test_model/test_shard_llama.py @@ -59,7 +59,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, if ( booster.plugin.zero_stage in [1, 2] and booster.plugin.shard_config.enable_sequence_parallelism - and booster.plugin.shard_config.pp_size == 1 + and booster.plugin.shard_config.pipeline_stage_manager is None and booster.plugin.shard_config.sequence_parallelism_mode == "all_to_all" ): for (name, p1), p2 in zip( From d300b8311cc03e6288ef44e39e264cb03c6cac38 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Sat, 29 Jun 2024 07:39:53 +0000 Subject: [PATCH 3/7] fix typo --- tests/test_shardformer/test_model/test_shard_llama.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_shardformer/test_model/test_shard_llama.py b/tests/test_shardformer/test_model/test_shard_llama.py index 717b278ac6a6..6a31fb400bb3 100644 --- a/tests/test_shardformer/test_model/test_shard_llama.py +++ b/tests/test_shardformer/test_model/test_shard_llama.py @@ -62,11 +62,12 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, and booster.plugin.shard_config.pipeline_stage_manager is None and booster.plugin.shard_config.sequence_parallelism_mode == "all_to_all" ): + master2working = sharded_optimizer.get_master_to_working_map() for (name, p1), p2 in zip( llama_model.named_parameters(), sharded_optimizer._master_param_groups_of_current_rank[0] ): - working_p = sharded_optimizer._param_store.master_to_working_param[id(p2)] - grads = sharded_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(working_p)) + working_p = master2working[id(p2)] + grads = sharded_optimizer.get_partitioned_gradients_by_param_id(0, id(working_p)) grad_index = ( 0 if sharded_optimizer._partition_grads From 333afb61044da3afd4dfad089faacfe517daf99e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 29 Jun 2024 07:40:57 +0000 Subject: [PATCH 4/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_shardformer/test_model/test_shard_llama.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_shardformer/test_model/test_shard_llama.py b/tests/test_shardformer/test_model/test_shard_llama.py index 6a31fb400bb3..b32eb0d31ea1 100644 --- a/tests/test_shardformer/test_model/test_shard_llama.py +++ b/tests/test_shardformer/test_model/test_shard_llama.py @@ -59,7 +59,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, if ( booster.plugin.zero_stage in [1, 2] and booster.plugin.shard_config.enable_sequence_parallelism - and booster.plugin.shard_config.pipeline_stage_manager is None + and booster.plugin.shard_config.pipeline_stage_manager is None and booster.plugin.shard_config.sequence_parallelism_mode == "all_to_all" ): master2working = sharded_optimizer.get_master_to_working_map() From ec5784e9c127d5391e7abc89c2836117508018a4 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Tue, 2 Jul 2024 09:35:42 +0000 Subject: [PATCH 5/7] unified cross entropy func for all shardformer models --- colossalai/shardformer/layer/__init__.py | 3 +- colossalai/shardformer/layer/loss.py | 43 +++++++++++++- colossalai/shardformer/modeling/bloom.py | 56 +++++------------- colossalai/shardformer/modeling/command.py | 55 ++++-------------- colossalai/shardformer/modeling/gpt2.py | 47 +++------------ colossalai/shardformer/modeling/llama.py | 49 +++------------- colossalai/shardformer/modeling/mistral.py | 48 +++------------- colossalai/shardformer/modeling/opt.py | 57 ++++--------------- colossalai/shardformer/modeling/qwen2.py | 47 +++------------ examples/language/opt/opt_benchmark.py | 1 + examples/language/opt/opt_train_demo.py | 1 + .../test_model/test_shard_llama.py | 11 +--- 12 files changed, 113 insertions(+), 305 deletions(-) diff --git a/colossalai/shardformer/layer/__init__.py b/colossalai/shardformer/layer/__init__.py index f17fad1b6606..331e4972966c 100644 --- a/colossalai/shardformer/layer/__init__.py +++ b/colossalai/shardformer/layer/__init__.py @@ -3,7 +3,7 @@ from .dropout import DropoutForParallelInput, DropoutForReplicatedInput from .embedding import Embedding1D, PaddingEmbedding, VocabParallelEmbedding1D from .linear import Linear1D_Col, Linear1D_Row, PaddingLMHead, VocabParallelLMHead1D -from .loss import cross_entropy_1d +from .loss import cross_entropy_1d, dist_cross_entropy from .normalization import FusedLayerNorm, FusedRMSNorm, LayerNorm, RMSNorm from .parallel_module import ParallelModule from .qkv_fused_linear import FusedLinear1D_Col, GPT2FusedLinearConv1D_Col, GPT2FusedLinearConv1D_Row @@ -18,6 +18,7 @@ "DropoutForParallelInput", "DropoutForReplicatedInput", "cross_entropy_1d", + "dist_cross_entropy", "BaseLayerNorm", "LayerNorm", "RMSNorm", diff --git a/colossalai/shardformer/layer/loss.py b/colossalai/shardformer/layer/loss.py index a6d19edf5b53..946f6008eba4 100644 --- a/colossalai/shardformer/layer/loss.py +++ b/colossalai/shardformer/layer/loss.py @@ -2,8 +2,11 @@ import torch.distributed as dist from torch.autograd import Function from torch.distributed import ProcessGroup +from torch.nn import CrossEntropyLoss -__all__ = ["DistCrossEntropy", "cross_entropy_1d"] +from colossalai.shardformer.shard import ShardConfig + +__all__ = ["DistCrossEntropy", "cross_entropy_1d", "dist_cross_entropy"] class DistCrossEntropy(Function): @@ -132,3 +135,41 @@ def cross_entropy_1d( dtype: torch.dtype = None, ) -> torch.Tensor: return DistCrossEntropy.apply(vocab_logits, labels, ignore_index, process_group, vocab_size, dtype) + + +def dist_cross_entropy( + labels: torch.Tensor, + logits: torch.Tensor, + shard_config: ShardConfig, + out_features: int, + vocab_size: int, + dtype: torch.dtype, +) -> torch.Tensor: + """ + Helper to compute cross entropy loss for most shardformer models, + compatible with PP, TP and SP. + """ + if labels is not None: + # Shift so that tokens < n predict n + shift_logits = logits[..., :-1, :].contiguous() + shift_labels = labels[..., 1:].contiguous() + # Flatten the tokens + loss_fct = CrossEntropyLoss() + shift_labels = shift_labels.view(-1) + shift_labels = shift_labels.to(shift_logits.device) + if shard_config.enable_tensor_parallelism and shard_config.parallel_output: + # Cross entropy with all-reduce for TP + new_vocab_size = logits.shape[-1] + shift_logits = shift_logits.view(-1, new_vocab_size) + loss = cross_entropy_1d( + shift_logits, + shift_labels, + process_group=shard_config.tensor_parallel_process_group, + vocab_size=out_features, + dtype=dtype, + ) + else: + shift_logits = shift_logits.view(-1, vocab_size) + loss = loss_fct(shift_logits, shift_labels) + + return loss diff --git a/colossalai/shardformer/modeling/bloom.py b/colossalai/shardformer/modeling/bloom.py index 1541436264e9..26ffef6c5ee0 100644 --- a/colossalai/shardformer/modeling/bloom.py +++ b/colossalai/shardformer/modeling/bloom.py @@ -28,7 +28,7 @@ from colossalai.shardformer.layer._operation import gather_forward_split_backward, split_forward_gather_backward from colossalai.shardformer.shard import ShardConfig -from ..layer import cross_entropy_1d +from ..layer import dist_cross_entropy logger = logging.get_logger(__name__) @@ -359,30 +359,14 @@ def bloom_for_causal_lm_forward( hidden_states = transformer_outputs[0] lm_logits = self.lm_head(hidden_states).contiguous() - loss = None - if labels is not None: - # move labels to correct device to enable model parallelism - labels = labels.to(lm_logits.device) - # Shift so that tokens < n predict n - shift_logits = lm_logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - batch_size, seq_length, vocab_size = shift_logits.shape - # Flatten the tokens - if shard_config.enable_tensor_parallelism and shard_config.parallel_output: - new_vocab_size = lm_logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - shift_labels = shift_labels.view(-1) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.transformer.dtype, - ) - else: - loss_fct = CrossEntropyLoss() - shift_logits = shift_logits.view(-1, self.config.vocab_size) - loss = loss_fct(shift_logits, shift_labels.view(-1)) + loss = dist_cross_entropy( + labels, + lm_logits, + shard_config, + self.lm_head.out_features, + self.config.vocab_size, + self.transformer.dtype, + ) if not return_dict: output = (lm_logits,) + transformer_outputs[1:] @@ -1040,24 +1024,10 @@ def forward( hidden_states = transformer_outputs[0] lm_logits = self.lm_head(hidden_states) - loss = None - if labels is not None: - # move labels to correct device to enable model parallelism - labels = labels.to(lm_logits.device) - # Shift so that tokens < n predict n - shift_logits = lm_logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - # Flatten the tokens - new_vocab_size = lm_logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - shift_labels = shift_labels.view(-1) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.transformer.dtype, - ) + loss = dist_cross_entropy( + labels, lm_logits, shard_config, self.lm_head.out_features, self.config.vocab_size, self.transformer.dtype + ) + if not return_dict: output = (lm_logits,) + transformer_outputs[1:] return ((loss,) + output) if loss is not None else output diff --git a/colossalai/shardformer/modeling/command.py b/colossalai/shardformer/modeling/command.py index 07a7f6cbf8d3..72f705bc0a75 100644 --- a/colossalai/shardformer/modeling/command.py +++ b/colossalai/shardformer/modeling/command.py @@ -5,7 +5,6 @@ import torch import torch.utils.checkpoint from torch import nn -from torch.nn import CrossEntropyLoss from transformers.cache_utils import Cache, DynamicCache from transformers.modeling_outputs import BaseModelOutputWithPast, CausalLMOutputWithPast from transformers.models.cohere.modeling_cohere import ( @@ -25,7 +24,7 @@ ) from colossalai.shardformer.shard import ShardConfig -from ..layer import ColoAttention, cross_entropy_1d +from ..layer import ColoAttention, dist_cross_entropy class CommandPipelineForwards: @@ -300,29 +299,9 @@ def command_for_causal_lm_forward( logits = self.lm_head(hidden_states) logits = logits * self.logit_scale logits = logits.float() - loss = None - if labels is not None: - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - # Flatten the tokens - loss_fct = CrossEntropyLoss() - shift_labels = shift_labels.view(-1) - # Enable model parallelism - shift_labels = shift_labels.to(shift_logits.device) - if shard_config.enable_tensor_parallelism and shard_config.parallel_output: - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.model.dtype, - ) - else: - shift_logits = shift_logits.view(-1, self.config.vocab_size) - loss = loss_fct(shift_logits, shift_labels) + loss = dist_cross_entropy( + labels, logits, shard_config, self.lm_head.out_features, self.config.vocab_size, self.model.dtype + ) if not return_dict: output = (logits,) + outputs[1:] @@ -658,24 +637,14 @@ def forward( logits = self.lm_head(hidden_states) logits = logits * self.logit_scale logits = logits.float() - - loss = None - if labels is not None: - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - shift_labels = shift_labels.view(-1) - # Enable model parallelism - shift_labels = shift_labels.to(shift_logits.device) - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.model.dtype, - ) + loss = dist_cross_entropy( + labels, + logits, + shard_config, + self.lm_head.out_features, + self.config.vocab_size, + self.model.dtype, + ) if not return_dict: output = (logits,) + outputs[1:] diff --git a/colossalai/shardformer/modeling/gpt2.py b/colossalai/shardformer/modeling/gpt2.py index aa75bab115a7..6ecda91c4d35 100644 --- a/colossalai/shardformer/modeling/gpt2.py +++ b/colossalai/shardformer/modeling/gpt2.py @@ -25,7 +25,7 @@ from colossalai.shardformer.layer._operation import gather_forward_split_backward, split_forward_gather_backward from colossalai.shardformer.shard import ShardConfig -from ..layer import cross_entropy_1d +from ..layer import dist_cross_entropy logger = logging.get_logger(__name__) @@ -372,27 +372,9 @@ def gpt2_lmhead_model_forward( hidden_states = outputs[0] lm_logits = self.lm_head(hidden_states) - loss = None - if labels is not None: - # move labels to correct device to enable model parallelism - labels = labels.to(lm_logits.device) - # Shift so that tokens < n predict n - shift_logits = lm_logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - # Flatten the tokens - loss_fct = CrossEntropyLoss() - shift_logits = shift_logits.view(-1, shift_logits.size(-1)) - shift_labels = shift_labels.view(-1) - if shard_config.enable_tensor_parallelism and shard_config.parallel_output: - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.transformer.dtype, - ) - else: - loss = loss_fct(shift_logits, shift_labels) + loss = dist_cross_entropy( + labels, lm_logits, shard_config, self.lm_head.out_features, self.config.vocab_size, self.transformer.dtype + ) if not return_dict: output = (lm_logits,) + outputs[1:] @@ -1282,24 +1264,9 @@ def forward( hidden_states = transformer_outputs[0] lm_logits = self.lm_head(hidden_states) - - loss = None - if labels is not None: - # move labels to correct device to enable model parallelism - labels = labels.to(lm_logits.device) - # Shift so that tokens < n predict n - shift_logits = lm_logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - # Flatten the tokens - shift_logits = shift_logits.view(-1, shift_logits.size(-1)) - shift_labels = shift_labels.view(-1) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.transformer.dtype, - ) + loss = dist_cross_entropy( + labels, lm_logits, shard_config, self.lm_head.out_features, self.config.vocab_size, self.transformer.dtype + ) if not return_dict: output = (lm_logits,) + transformer_outputs[1:] diff --git a/colossalai/shardformer/modeling/llama.py b/colossalai/shardformer/modeling/llama.py index cf7fd13c898b..54ff8e321e06 100644 --- a/colossalai/shardformer/modeling/llama.py +++ b/colossalai/shardformer/modeling/llama.py @@ -31,7 +31,7 @@ ) from colossalai.shardformer.shard import ShardConfig -from ..layer import ColoAttention, cross_entropy_1d +from ..layer import ColoAttention, dist_cross_entropy class LlamaPipelineForwards: @@ -321,29 +321,9 @@ def llama_for_causal_lm_forward( if stage_manager.is_last_stage(): hidden_states = outputs[0] logits = self.lm_head(hidden_states) - loss = None - if labels is not None: - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - # Flatten the tokens - loss_fct = CrossEntropyLoss() - shift_labels = shift_labels.view(-1) - # Enable model parallelism - shift_labels = shift_labels.to(shift_logits.device) - if shard_config.enable_tensor_parallelism and shard_config.parallel_output: - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.model.dtype, - ) - else: - shift_logits = shift_logits.view(-1, self.config.vocab_size) - loss = loss_fct(shift_logits, shift_labels) + loss = dist_cross_entropy( + labels, logits, shard_config, self.lm_head.out_features, self.config.vocab_size, self.model.dtype + ) if not return_dict: output = (logits,) + outputs[1:] @@ -820,24 +800,9 @@ def forward( logits = self.lm_head(hidden_states) logits = logits.float() - loss = None - if labels is not None: - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - shift_labels = shift_labels.view(-1) - # Enable model parallelism - shift_labels = shift_labels.to(shift_logits.device) - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.model.dtype, - ) - + loss = dist_cross_entropy( + labels, logits, shard_config, self.lm_head.out_features, self.config.vocab_size, self.model.dtype + ) if not return_dict: output = (logits,) + outputs[1:] return (loss,) + output if loss is not None else output diff --git a/colossalai/shardformer/modeling/mistral.py b/colossalai/shardformer/modeling/mistral.py index 310c2d8e233a..82e8ef5f9af7 100644 --- a/colossalai/shardformer/modeling/mistral.py +++ b/colossalai/shardformer/modeling/mistral.py @@ -19,7 +19,7 @@ from colossalai.pipeline.stage_manager import PipelineStageManager from colossalai.shardformer.shard import ShardConfig -from ..layer import ColoAttention, cross_entropy_1d +from ..layer import ColoAttention, dist_cross_entropy logger = logging.get_logger(__name__) @@ -275,29 +275,9 @@ def mistral_for_causal_lm_forward( logits = self.lm_head(hidden_states) logits = logits.float() - loss = None - if labels is not None: - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - # Flatten the tokens - loss_fct = CrossEntropyLoss() - shift_labels = shift_labels.view(-1) - # Enable model parallelism - shift_labels = shift_labels.to(shift_logits.device) - if shard_config.enable_tensor_parallelism and shard_config.parallel_output: - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.model.dtype, - ) - else: - shift_logits = shift_logits.view(-1, self.config.vocab_size) - loss = loss_fct(shift_logits, shift_labels) + loss = dist_cross_entropy( + labels, logits, shard_config, self.lm_head.out_features, self.config.vocab_size, self.model.dtype + ) if not return_dict: output = (logits,) + outputs[1:] @@ -708,23 +688,9 @@ def forward( logits = self.lm_head(hidden_states) logits = logits.float() - loss = None - if labels is not None: - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - shift_labels = shift_labels.view(-1) - # Enable model parallelism - shift_labels = shift_labels.to(shift_logits.device) - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.model.dtype, - ) + loss = dist_cross_entropy( + labels, logits, shard_config, self.lm_head.out_features, self.config.vocab_size, self.model.dtype + ) if not return_dict: output = (logits,) + outputs[1:] diff --git a/colossalai/shardformer/modeling/opt.py b/colossalai/shardformer/modeling/opt.py index f10860fef558..31bf481de8ec 100644 --- a/colossalai/shardformer/modeling/opt.py +++ b/colossalai/shardformer/modeling/opt.py @@ -22,7 +22,7 @@ from colossalai.shardformer.layer import ColoAttention from colossalai.shardformer.shard import ShardConfig -from ..layer import cross_entropy_1d +from ..layer import dist_cross_entropy logger = logging.get_logger(__name__) @@ -330,30 +330,14 @@ def opt_for_causal_lm_forward( ) if stage_manager.is_last_stage(): logits = self.lm_head(outputs[0]).contiguous() - loss = None - if labels is not None: - # move labels to correct device to enable model parallelism - labels = labels.to(logits.device) - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - # Flatten the tokens - - if shard_config.enable_tensor_parallelism and shard_config.parallel_output: - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - shift_labels = shift_labels.view(-1) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.model.decoder.dtype, - ) - else: - loss_fct = CrossEntropyLoss() - shift_logits = shift_logits.view(-1, self.config.vocab_size) - loss = loss_fct(shift_logits.view(-1, self.config.vocab_size), shift_labels.view(-1)) + loss = dist_cross_entropy( + labels, + logits, + shard_config, + self.lm_head.out_features, + self.config.vocab_size, + self.model.decoder.dtype, + ) if not return_dict: output = (logits,) + outputs[1:] @@ -971,26 +955,9 @@ def forward( ) logits = self.lm_head(outputs[0]).contiguous() - - loss = None - if labels is not None: - # move labels to correct device to enable model parallelism - labels = labels.to(logits.device) - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - shift_labels = shift_labels.view(-1) - # Enable model parallelism - shift_labels = shift_labels.to(shift_logits.device) - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.model.decoder.dtype, - ) + loss = dist_cross_entropy( + labels, logits, shard_config, self.lm_head.out_features, self.config.vocab_size, self.model.decoder.dtype + ) if not return_dict: output = (logits,) + outputs[1:] diff --git a/colossalai/shardformer/modeling/qwen2.py b/colossalai/shardformer/modeling/qwen2.py index e0aa5fba4a01..b7dba243a4c9 100644 --- a/colossalai/shardformer/modeling/qwen2.py +++ b/colossalai/shardformer/modeling/qwen2.py @@ -32,7 +32,7 @@ from colossalai.pipeline.stage_manager import PipelineStageManager from colossalai.shardformer.shard import ShardConfig -from ..layer import ColoAttention, cross_entropy_1d +from ..layer import ColoAttention, dist_cross_entropy class Qwen2PipelineForwards: @@ -304,25 +304,9 @@ def qwen2_for_causal_lm_forward( if stage_manager.is_last_stage(): hidden_states = outputs[0] logits = self.lm_head(hidden_states) - loss = None - if labels is not None: - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - # Flatten the tokens - loss_fct = CrossEntropyLoss() - shift_labels = shift_labels.view(-1) - # Enable model parallelism - shift_labels = shift_labels.to(shift_logits.device) - if shard_config.enable_tensor_parallelism: - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - loss = cross_entropy_1d( - shift_logits, shift_labels, process_group=shard_config.tensor_parallel_process_group - ) - else: - shift_logits = shift_logits.view(-1, self.config.vocab_size) - loss = loss_fct(shift_logits, shift_labels) + loss = dist_cross_entropy( + labels, logits, shard_config, self.lm_head.out_features, self.config.vocab_size, logits.dtype + ) if not return_dict: output = (logits,) + outputs[1:] @@ -724,26 +708,9 @@ def forward( hidden_states = outputs[0] logits = self.lm_head(hidden_states) logits = logits.float() - - loss = None - if labels is not None: - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - # Flatten the tokens - loss_fct = CrossEntropyLoss() - shift_labels = shift_labels.view(-1) - # Enable model parallelism - shift_labels = shift_labels.to(shift_logits.device) - if shard_config.enable_tensor_parallelism: - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - loss = cross_entropy_1d( - shift_logits, shift_labels, process_group=shard_config.tensor_parallel_process_group - ) - else: - shift_logits = shift_logits.view(-1, self.config.vocab_size) - loss = loss_fct(shift_logits, shift_labels) + loss = dist_cross_entropy( + labels, logits, shard_config, self.lm_head.out_features, self.config.vocab_size, logits.dtype + ) if not return_dict: output = (logits,) + outputs[1:] diff --git a/examples/language/opt/opt_benchmark.py b/examples/language/opt/opt_benchmark.py index c2883d96c16e..53108cd74bf5 100755 --- a/examples/language/opt/opt_benchmark.py +++ b/examples/language/opt/opt_benchmark.py @@ -127,4 +127,5 @@ def main(): if __name__ == "__main__": + print("--------------------------------------") main() diff --git a/examples/language/opt/opt_train_demo.py b/examples/language/opt/opt_train_demo.py index b5b50305cc34..c1ea8dedbb81 100644 --- a/examples/language/opt/opt_train_demo.py +++ b/examples/language/opt/opt_train_demo.py @@ -85,6 +85,7 @@ def main(): # Enable gradient checkpointing model.gradient_checkpointing_enable() + print("model._gradient_checkpointing_func:", model._gradient_checkpointing_func) # Set plugin booster_kwargs = {} diff --git a/tests/test_shardformer/test_model/test_shard_llama.py b/tests/test_shardformer/test_model/test_shard_llama.py index b32eb0d31ea1..88e54176b9fd 100644 --- a/tests/test_shardformer/test_model/test_shard_llama.py +++ b/tests/test_shardformer/test_model/test_shard_llama.py @@ -63,9 +63,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, and booster.plugin.shard_config.sequence_parallelism_mode == "all_to_all" ): master2working = sharded_optimizer.get_master_to_working_map() - for (name, p1), p2 in zip( - llama_model.named_parameters(), sharded_optimizer._master_param_groups_of_current_rank[0] - ): + for p1, p2 in zip(llama_model.parameters(), sharded_optimizer._master_param_groups_of_current_rank[0]): working_p = master2working[id(p2)] grads = sharded_optimizer.get_partitioned_gradients_by_param_id(0, id(working_p)) grad_index = ( @@ -75,11 +73,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, ) grad = grads[grad_index] sharded_grad = p1.grad.view(-1).chunk(dist.get_world_size())[dist.get_rank()] - try: - assert_close(sharded_grad, grad[: sharded_grad.shape[0]], atol=5e-3, rtol=5e-3, check_dtype=False) - except Exception as e: - print(f"Failed param name: {name}") - raise e + assert_close(sharded_grad, grad[: sharded_grad.shape[0]], atol=5e-3, rtol=5e-3, check_dtype=False) # Save gradient tensors for comparison between the original model and the sharded model before optimizer step. grads_to_check = {} @@ -253,7 +247,6 @@ def run_llama_test(test_config): except Exception as e: print(f"Failed config: {test_config}") raise e - clear_layout_converter() Randomizer.reset_index() torch.cuda.empty_cache() From 2866a4e4d32e77d5b0b2de98f5b065bffe5b1a70 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Tue, 2 Jul 2024 09:46:41 +0000 Subject: [PATCH 6/7] add comments --- colossalai/shardformer/layer/loss.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/colossalai/shardformer/layer/loss.py b/colossalai/shardformer/layer/loss.py index 946f6008eba4..cea2da03fb58 100644 --- a/colossalai/shardformer/layer/loss.py +++ b/colossalai/shardformer/layer/loss.py @@ -169,6 +169,8 @@ def dist_cross_entropy( dtype=dtype, ) else: + # NOTE if use TP and not parallel_output, the output is gathered. + # see VocabParallelLMHead1D shift_logits = shift_logits.view(-1, vocab_size) loss = loss_fct(shift_logits, shift_labels) From 3f83b1c5b869270b3d26d564a90833e2d51b37ef Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Tue, 2 Jul 2024 11:10:15 +0000 Subject: [PATCH 7/7] remove redundant lines --- examples/language/opt/opt_benchmark.py | 1 - examples/language/opt/opt_train_demo.py | 1 - 2 files changed, 2 deletions(-) diff --git a/examples/language/opt/opt_benchmark.py b/examples/language/opt/opt_benchmark.py index 53108cd74bf5..c2883d96c16e 100755 --- a/examples/language/opt/opt_benchmark.py +++ b/examples/language/opt/opt_benchmark.py @@ -127,5 +127,4 @@ def main(): if __name__ == "__main__": - print("--------------------------------------") main() diff --git a/examples/language/opt/opt_train_demo.py b/examples/language/opt/opt_train_demo.py index c1ea8dedbb81..b5b50305cc34 100644 --- a/examples/language/opt/opt_train_demo.py +++ b/examples/language/opt/opt_train_demo.py @@ -85,7 +85,6 @@ def main(): # Enable gradient checkpointing model.gradient_checkpointing_enable() - print("model._gradient_checkpointing_func:", model._gradient_checkpointing_func) # Set plugin booster_kwargs = {}