From 95d0a274786c67030096b9f0884ddeacc773c1d0 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Thu, 6 Jun 2024 10:08:50 +0000 Subject: [PATCH 01/10] support tp + sp + pp --- .../booster/plugin/hybrid_parallel_plugin.py | 56 +- colossalai/shardformer/modeling/llama.py | 503 +++++++++--------- colossalai/shardformer/policies/llama.py | 89 +--- examples/language/llama/benchmark.py | 3 + .../test_model/test_shard_llama.py | 18 +- 5 files changed, 340 insertions(+), 329 deletions(-) diff --git a/colossalai/booster/plugin/hybrid_parallel_plugin.py b/colossalai/booster/plugin/hybrid_parallel_plugin.py index 45fe03003b5a..678dc7f706b4 100644 --- a/colossalai/booster/plugin/hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/hybrid_parallel_plugin.py @@ -999,34 +999,37 @@ def __init__( ), f"World size {dist.get_world_size()} is not divisible by tp_size {tp_size} * pp_size {pp_size}" if enable_sequence_parallelism: - self.sequence_parallelism_mode = sequence_parallelism_mode if sequence_parallelism_mode is not None else "1" + self.sequence_parallelism_mode = ( + sequence_parallelism_mode if sequence_parallelism_mode is not None else "all_to_all" + ) assert ( self.sequence_parallelism_mode in SUPPORT_SP_MODE ), f"Sequence parallelism mode {self.sequence_parallelism_mode} is not in the supported list {SUPPORT_SP_MODE}" if self.sequence_parallelism_mode in ["split_gather", "ring"]: - assert ( - tp_size > 1 - ), f"Sequence parallelism mode {self.sequence_parallelism_mode} must be enabled when using tensor parallelism" - if sp_size != 1: - warnings.warn( - f"The sp_size will be the same as tp_size in sequence parallelism mode {self.sequence_parallelism_mode}, will ignore the given sequence parallelism size." - ) - self.sp_size = 1 - self.dp_size = dist.get_world_size() // (tp_size * pp_size) + # assert ( + # tp_size > 1 + # ), f"Sequence parallelism mode {self.sequence_parallelism_mode} must be enabled when using tensor parallelism" + # if sp_size != 1: + # warnings.warn( + # f"The sp_size will be the same as tp_size in sequence parallelism mode {self.sequence_parallelism_mode}, will ignore the given sequence parallelism size." + # ) + # self.sp_size = 1 + self.sp_size = sp_size if sp_size is not None else 1 + self.dp_size = dist.get_world_size() // (tp_size * pp_size * self.sp_size) elif self.sequence_parallelism_mode in ["all_to_all"]: - assert ( - tp_size == 1 - ), f"Sequence parallelism mode {self.sequence_parallelism_mode} cannot be used with tensor parallelism" - assert ( - pp_size == 1 - ), f"Sequence parallelism mode {self.sequence_parallelism_mode} cannot be used with pipeline parallelism" - self.sp_size = dist.get_world_size() if sp_size is None else sp_size - self.dp_size = dist.get_world_size() // (self.sp_size * pp_size) + # assert ( + # tp_size == 1 + # ), f"Sequence parallelism mode {self.sequence_parallelism_mode} cannot be used with tensor parallelism" + # assert ( + # pp_size == 1 + # ), f"Sequence parallelism mode {self.sequence_parallelism_mode} cannot be used with pipeline parallelism" + self.sp_size = dist.get_world_size() // pp_size if sp_size is None else sp_size + self.dp_size = dist.get_world_size() // (self.sp_size * pp_size * tp_size) else: self.dp_size = dist.get_world_size() // (tp_size * pp_size) assert ( sp_size == 1 or sp_size is None - ), f"sp_size can only be set to a >1 number when enable_sequence_parallelism is True" + ), f"You should not set sp_size when sequence parallelism is not enabled." self.sp_size = 1 self.tp_size = tp_size @@ -1040,11 +1043,22 @@ def __init__( self.enable_jit_fused = enable_jit_fused self.enable_sequence_parallelism = enable_sequence_parallelism if dp_outside: - self.dp_axis, self.pp_axis, self.tp_axis, self.sp_axis = 0, 1, 2, 3 + ( + self.dp_axis, + self.pp_axis, + self.sp_axis, + self.tp_axis, + ) = ( + 0, + 1, + 2, + 3, + ) self.pg_mesh = ProcessGroupMesh(self.dp_size, self.pp_size, self.tp_size, self.sp_size) else: - self.pp_axis, self.dp_axis, self.tp_axis, self.sp_axis = 0, 1, 2, 3 + self.pp_axis, self.dp_axis, self.sp_axis, self.tp_axis = 0, 1, 2, 3 self.pg_mesh = ProcessGroupMesh(self.pp_size, self.dp_size, self.tp_size, self.sp_size) + self.stage_manager = None self.schedule = None self.custom_policy = custom_policy diff --git a/colossalai/shardformer/modeling/llama.py b/colossalai/shardformer/modeling/llama.py index d6f10ffafec7..77f86ab0d25b 100644 --- a/colossalai/shardformer/modeling/llama.py +++ b/colossalai/shardformer/modeling/llama.py @@ -1,5 +1,4 @@ import math -import warnings from typing import List, Optional, Tuple, Union import torch @@ -7,7 +6,6 @@ import torch.utils.checkpoint from torch import nn from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss -from transformers.cache_utils import Cache from transformers.modeling_outputs import ( BaseModelOutputWithPast, CausalLMOutputWithPast, @@ -468,220 +466,225 @@ def llama_for_sequence_classification_forward( return {"hidden_states": hidden_states} -def get_llama_flash_attention_forward(shard_config, sp_mode, sp_group, sp_size): - from transformers.models.llama.modeling_llama import LlamaAttention, apply_rotary_pos_emb - - try: - from transformers.models.llama.modeling_llama import repeat_kv - except: - warnings.warn("using llamav1, llamav1 hasn't repeat_kv function") - - def forward( - self: LlamaAttention, - hidden_states: torch.Tensor, - attention_mask: Optional[dict] = None, - position_ids: Optional[torch.LongTensor] = None, - past_key_value: Optional[Cache] = None, - output_attentions: bool = False, - use_cache: bool = False, - **kwargs, - ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: - if "padding_mask" in kwargs: - warnings.warn( - "Passing `padding_mask` is deprecated and will be removed in v4.37. Please make sure use `attention_mask` instead.`" - ) - bsz, q_len, _ = hidden_states.size() - - if sp_mode in ["split_gather", "ring"]: - q_len *= sp_size - assert q_len % 4 == 0, "Flash Attention Error: The sequence length should be a multiple of 4." - - query_states = self.q_proj(hidden_states) - key_states = self.k_proj(hidden_states) - value_states = self.v_proj(hidden_states) - - # sp: all-to-all comminucation when introducing sequence parallel - if sp_mode == "all_to_all": - query_states = all_to_all_comm(query_states, sp_group) - key_states = all_to_all_comm(key_states, sp_group) - value_states = all_to_all_comm(value_states, sp_group) - bsz, q_len, _ = query_states.size() - - query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2) - key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) - value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) - - kv_seq_len = key_states.shape[-2] - if past_key_value is not None: - if self.layer_idx is None: - raise ValueError( - f"The cache structure has changed since version v4.36. If you are using {self.__class__.__name__} " - "for auto-regressive decoding with k/v caching, please make sure to initialize the attention class " - "with a layer index." - ) - kv_seq_len += past_key_value.get_usable_length(kv_seq_len, self.layer_idx) - - cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len) - query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids) - - if past_key_value is not None: - cache_kwargs = {"sin": sin, "cos": cos} # Specific to RoPE models - key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs) - - key_states = repeat_kv(key_states, self.num_key_value_groups) - value_states = repeat_kv(value_states, self.num_key_value_groups) - - assert isinstance(attention_mask, dict), "Flash Attention Error: attention_mask should be a dict." - attn_output = ColoAttention.attention(query_states, key_states, value_states, **attention_mask) - attn_output = attn_output.transpose(1, 2).contiguous() - attn_output = attn_output.reshape(bsz, q_len, self.hidden_size) - - # sp: all-to-all comminucation when introducing sequence parallel - if sp_mode == "all_to_all": - attn_output = all_to_all_comm(attn_output, sp_group, scatter_dim=1, gather_dim=2) - attn_output = self.o_proj(attn_output) - - return attn_output, None, past_key_value - - return forward - - -def get_llama_model_forward_for_flash_attn(shard_config: ShardConfig): - logger = logging.get_logger(__name__) - assert shard_config.enable_flash_attention, "Flash Attention is not enabled." - - def forward( - self: LlamaModel, - input_ids: torch.LongTensor = None, - attention_mask: Optional[torch.Tensor] = None, - position_ids: Optional[torch.LongTensor] = None, - past_key_values: Optional[List[torch.FloatTensor]] = None, - inputs_embeds: Optional[torch.FloatTensor] = None, - use_cache: Optional[bool] = None, - output_attentions: Optional[bool] = None, - output_hidden_states: Optional[bool] = None, - return_dict: Optional[bool] = None, - ) -> Union[Tuple, BaseModelOutputWithPast]: - output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions - output_hidden_states = ( - output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states - ) - use_cache = use_cache if use_cache is not None else self.config.use_cache - - return_dict = return_dict if return_dict is not None else self.config.use_return_dict - - # retrieve input_ids and inputs_embeds - if input_ids is not None and inputs_embeds is not None: - raise ValueError("You cannot specify both decoder_input_ids and decoder_inputs_embeds at the same time") - elif input_ids is not None: - batch_size, seq_length = input_ids.shape - elif inputs_embeds is not None: - batch_size, seq_length, _ = inputs_embeds.shape - else: - raise ValueError("You have to specify either decoder_input_ids or decoder_inputs_embeds") - - seq_length_with_past = seq_length - past_key_values_length = 0 - - if past_key_values is not None: - past_key_values_length = past_key_values[0][0].shape[2] - seq_length_with_past = seq_length_with_past + past_key_values_length - - if position_ids is None: - device = input_ids.device if input_ids is not None else inputs_embeds.device - position_ids = torch.arange( - past_key_values_length, - seq_length + past_key_values_length, - dtype=torch.long, - device=device, - ) - position_ids = position_ids.unsqueeze(0).view(-1, seq_length) - else: - position_ids = position_ids.view(-1, seq_length).long() - - if inputs_embeds is None: - inputs_embeds = self.embed_tokens(input_ids) - # embed positions - hidden_states = inputs_embeds - - # in this case, attention_mask is a dict rather than a tensor - mask_shape = (batch_size, 1, seq_length_with_past, seq_length_with_past) - attention_mask = ColoAttention.prepare_attn_kwargs( - mask_shape, - hidden_states.dtype, - hidden_states.device, - q_padding_mask=attention_mask, - is_causal=True, - ) - - if self.gradient_checkpointing and self.training: - if use_cache: - logger.warning_once( - "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." - ) - use_cache = False - - # decoder layers - all_hidden_states = () if output_hidden_states else None - all_self_attns = () if output_attentions else None - next_decoder_cache = () if use_cache else None - - for idx, decoder_layer in enumerate(self.layers): - if output_hidden_states: - all_hidden_states += (hidden_states,) - - past_key_value = past_key_values[idx] if past_key_values is not None else None - - if self.gradient_checkpointing and self.training: - - def create_custom_forward(module): - def custom_forward(*inputs): - # None for past_key_value - return module(*inputs, past_key_value, output_attentions) - - return custom_forward - - layer_outputs = torch.utils.checkpoint.checkpoint( - create_custom_forward(decoder_layer), - hidden_states, - attention_mask, - position_ids, - ) - else: - layer_outputs = decoder_layer( - hidden_states, - attention_mask=attention_mask, - position_ids=position_ids, - past_key_value=past_key_value, - output_attentions=output_attentions, - use_cache=use_cache, - ) - - hidden_states = layer_outputs[0] - - if use_cache: - next_decoder_cache += (layer_outputs[2 if output_attentions else 1],) - - if output_attentions: - all_self_attns += (layer_outputs[1],) - - hidden_states = self.norm(hidden_states) - - # add hidden states from the last decoder layer - if output_hidden_states: - all_hidden_states += (hidden_states,) - - next_cache = next_decoder_cache if use_cache else None - if not return_dict: - return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None) - return BaseModelOutputWithPast( - last_hidden_state=hidden_states, - past_key_values=next_cache, - hidden_states=all_hidden_states, - attentions=all_self_attns, - ) - - return forward +# def get_llama_flash_attention_forward(sp_mode, sp_group, sp_size): +# from transformers.models.llama.modeling_llama import LlamaAttention, apply_rotary_pos_emb + +# try: +# from transformers.models.llama.modeling_llama import repeat_kv +# except: +# warnings.warn("using llamav1, llamav1 hasn't repeat_kv function") + +# def forward( +# self: LlamaAttention, +# hidden_states: torch.Tensor, +# attention_mask: Optional[dict] = None, +# position_ids: Optional[torch.LongTensor] = None, +# past_key_value: Optional[Cache] = None, +# output_attentions: bool = False, +# use_cache: bool = False, +# **kwargs, +# ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: +# if "padding_mask" in kwargs: +# warnings.warn( +# "Passing `padding_mask` is deprecated and will be removed in v4.37. Please make sure use `attention_mask` instead.`" +# ) +# bsz, q_len, _ = hidden_states.size() + +# if sp_mode in ["split_gather", "ring"]: +# q_len *= sp_size +# # assert q_len % 4 == 0, "Flash Attention Error: The sequence length should be a multiple of 4." + +# query_states = self.q_proj(hidden_states) +# key_states = self.k_proj(hidden_states) +# value_states = self.v_proj(hidden_states) + +# # sp: all-to-all comminucation when introducing sequence parallel +# if sp_mode == "all_to_all": +# query_states = all_to_all_comm(query_states, sp_group) +# key_states = all_to_all_comm(key_states, sp_group) +# value_states = all_to_all_comm(value_states, sp_group) +# bsz, q_len, _ = query_states.size() + +# query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2) +# key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) +# value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) + +# kv_seq_len = key_states.shape[-2] +# if past_key_value is not None: +# if self.layer_idx is None: +# raise ValueError( +# f"The cache structure has changed since version v4.36. If you are using {self.__class__.__name__} " +# "for auto-regressive decoding with k/v caching, please make sure to initialize the attention class " +# "with a layer index." +# ) +# kv_seq_len += past_key_value.get_usable_length(kv_seq_len, self.layer_idx) + +# cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len) +# query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids) + +# if past_key_value is not None: +# cache_kwargs = {"sin": sin, "cos": cos} # Specific to RoPE models +# key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs) + +# key_states = repeat_kv(key_states, self.num_key_value_groups) +# value_states = repeat_kv(value_states, self.num_key_value_groups) + +# assert isinstance(attention_mask, dict), "Flash Attention Error: attention_mask should be a dict." +# attn_output = ColoAttention.attention(query_states, key_states, value_states, **attention_mask) +# attn_output = attn_output.transpose(1, 2).contiguous() +# attn_output = attn_output.reshape(bsz, q_len, self.hidden_size) + +# # sp: all-to-all comminucation when introducing sequence parallel +# if sp_mode == "all_to_all": +# attn_output = all_to_all_comm(attn_output, sp_group, scatter_dim=1, gather_dim=2) +# attn_output = self.o_proj(attn_output) + +# return attn_output, None, past_key_value + +# return forward + + +# def get_llama_model_forward_for_flash_attn(shard_config: ShardConfig, sp_mode=None, sp_size=None, sp_group=None): +# logger = logging.get_logger(__name__) +# assert shard_config.enable_flash_attention, "Flash Attention is not enabled." + +# def forward( +# self: LlamaModel, +# input_ids: torch.LongTensor = None, +# attention_mask: Optional[torch.Tensor] = None, +# position_ids: Optional[torch.LongTensor] = None, +# past_key_values: Optional[List[torch.FloatTensor]] = None, +# inputs_embeds: Optional[torch.FloatTensor] = None, +# use_cache: Optional[bool] = None, +# output_attentions: Optional[bool] = None, +# output_hidden_states: Optional[bool] = None, +# return_dict: Optional[bool] = None, +# ) -> Union[Tuple, BaseModelOutputWithPast]: +# output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions +# output_hidden_states = ( +# output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states +# ) +# use_cache = use_cache if use_cache is not None else self.config.use_cache + +# return_dict = return_dict if return_dict is not None else self.config.use_return_dict + +# # retrieve input_ids and inputs_embeds +# if input_ids is not None and inputs_embeds is not None: +# raise ValueError("You cannot specify both decoder_input_ids and decoder_inputs_embeds at the same time") +# elif input_ids is not None: +# batch_size, seq_length = input_ids.shape +# elif inputs_embeds is not None: +# batch_size, seq_length, _ = inputs_embeds.shape +# else: +# raise ValueError("You have to specify either decoder_input_ids or decoder_inputs_embeds") + +# seq_length_with_past = seq_length +# past_key_values_length = 0 + +# if past_key_values is not None: +# past_key_values_length = past_key_values[0][0].shape[2] +# seq_length_with_past = seq_length_with_past + past_key_values_length + +# if position_ids is None: +# device = input_ids.device if input_ids is not None else inputs_embeds.device +# position_ids = torch.arange( +# past_key_values_length, +# seq_length + past_key_values_length, +# dtype=torch.long, +# device=device, +# ) +# position_ids = position_ids.unsqueeze(0).view(-1, seq_length) +# else: +# position_ids = position_ids.view(-1, seq_length).long() + +# if inputs_embeds is None: +# inputs_embeds = self.embed_tokens(input_ids) + +# if sp_mode in ["ring", "split_gather"]: +# inputs_embeds = split_forward_gather_backward(inputs_embeds, 1, sp_group) +# elif sp_mode == "all_to_all": +# inputs_embeds = split_forward_gather_backward(inputs_embeds, 1, sp_group, 1 / sp_size) +# # embed positions +# hidden_states = inputs_embeds + +# # in this case, attention_mask is a dict rather than a tensor +# mask_shape = (batch_size, 1, seq_length_with_past, seq_length_with_past) +# attention_mask = ColoAttention.prepare_attn_kwargs( +# mask_shape, +# hidden_states.dtype, +# hidden_states.device, +# q_padding_mask=attention_mask, +# is_causal=True, +# ) + +# if self.gradient_checkpointing and self.training: +# if use_cache: +# logger.warning_once( +# "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." +# ) +# use_cache = False + +# # decoder layers +# all_hidden_states = () if output_hidden_states else None +# all_self_attns = () if output_attentions else None +# next_decoder_cache = () if use_cache else None + +# for idx, decoder_layer in enumerate(self.layers): +# if output_hidden_states: +# all_hidden_states += (hidden_states,) + +# past_key_value = past_key_values[idx] if past_key_values is not None else None + +# if self.gradient_checkpointing and self.training: + +# def create_custom_forward(module): +# def custom_forward(*inputs): +# # None for past_key_value +# return module(*inputs, past_key_value, output_attentions) + +# return custom_forward + +# layer_outputs = torch.utils.checkpoint.checkpoint( +# create_custom_forward(decoder_layer), +# hidden_states, +# attention_mask, +# position_ids, +# ) +# else: +# layer_outputs = decoder_layer( +# hidden_states, +# attention_mask=attention_mask, +# position_ids=position_ids, +# past_key_value=past_key_value, +# output_attentions=output_attentions, +# use_cache=use_cache, +# ) + +# hidden_states = layer_outputs[0] + +# if use_cache: +# next_decoder_cache += (layer_outputs[2 if output_attentions else 1],) + +# if output_attentions: +# all_self_attns += (layer_outputs[1],) + +# hidden_states = self.norm(hidden_states) + +# # add hidden states from the last decoder layer +# if output_hidden_states: +# all_hidden_states += (hidden_states,) + +# next_cache = next_decoder_cache if use_cache else None +# if not return_dict: +# return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None) +# return BaseModelOutputWithPast( +# last_hidden_state=hidden_states, +# past_key_values=next_cache, +# hidden_states=all_hidden_states, +# attentions=all_self_attns, +# ) + +# return forward def get_lm_forward_with_dist_cross_entropy(shard_config: ShardConfig): @@ -787,7 +790,7 @@ def forward( return forward -def get_llama_seq_parallel_attention_forward(sp_mode, sp_size, sp_group): +def get_llama_flash_attention_forward(shard_config, sp_mode=None, sp_size=None, sp_group=None): def forward( self, hidden_states: torch.Tensor, @@ -797,6 +800,12 @@ def forward( output_attentions: bool = False, use_cache: bool = False, ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: + if sp_mode is not None: + assert sp_mode in ["all_to_all", "split_gather", "ring"], "Invalid sp_mode" + assert (sp_size is not None) and ( + sp_group is not None + ), "Must specify sp_size and sp_group for sequence parallel" + bsz, q_len, _ = hidden_states.size() # sp: modify sp_len when sequence parallel mode is ring if sp_mode in ["split_gather", "ring"]: @@ -851,30 +860,34 @@ def forward( key_states = repeat_kv(key_states, self.num_key_value_groups) value_states = repeat_kv(value_states, self.num_key_value_groups) - attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim) - - if attn_weights.size() != (bsz, self.num_heads, q_len, kv_seq_len): - raise ValueError( - f"Attention weights should be of size {(bsz, self.num_heads, q_len, kv_seq_len)}, but is" - f" {attn_weights.size()}" - ) + if shard_config.enable_flash_attention: + assert isinstance(attention_mask, dict), "Flash Attention Error: attention_mask should be a dict." + attn_output = ColoAttention.attention(query_states, key_states, value_states, **attention_mask) + else: + attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim) - if attention_mask is not None: - if attention_mask.size() != (bsz, 1, q_len, kv_seq_len): + if attn_weights.size() != (bsz, self.num_heads, q_len, kv_seq_len): raise ValueError( - f"Attention mask should be of size {(bsz, 1, q_len, kv_seq_len)}, but is {attention_mask.size()}" + f"Attention weights should be of size {(bsz, self.num_heads, q_len, kv_seq_len)}, but is" + f" {attn_weights.size()}" ) - attn_weights = attn_weights + attention_mask - # upcast attention to fp32 - attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype) - attn_output = torch.matmul(attn_weights, value_states) + if attention_mask is not None: + if attention_mask.size() != (bsz, 1, q_len, kv_seq_len): + raise ValueError( + f"Attention mask should be of size {(bsz, 1, q_len, kv_seq_len)}, but is {attention_mask.size()}" + ) + attn_weights = attn_weights + attention_mask - if attn_output.size() != (bsz, self.num_heads, q_len, self.head_dim): - raise ValueError( - f"`attn_output` should be of size {(bsz, self.num_heads, q_len, self.head_dim)}, but is" - f" {attn_output.size()}" - ) + # upcast attention to fp32 + attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype) + attn_output = torch.matmul(attn_weights, value_states) + + if attn_output.size() != (bsz, self.num_heads, q_len, self.head_dim): + raise ValueError( + f"`attn_output` should be of size {(bsz, self.num_heads, q_len, self.head_dim)}, but is" + f" {attn_output.size()}" + ) attn_output = attn_output.transpose(1, 2).contiguous() # sp: all-to-all comminucation when introducing sequence parallel @@ -898,7 +911,7 @@ def forward( return forward -def get_llama_seq_parallel_model_forward(sp_mode, sp_size, sp_group): +def get_llama_flash_attention_model_forward(shard_config, sp_mode=None, sp_size=None, sp_group=None): logger = logging.get_logger(__name__) def forward( @@ -967,9 +980,19 @@ def forward( device=inputs_embeds.device, ) - attention_mask = _prepare_4d_causal_attention_mask( - attention_mask, attention_mask.shape, inputs_embeds, past_key_values_length - ) + if shard_config.enable_flash_attention: + mask_shape = (batch_size, 1, seq_length_with_past, seq_length_with_past) + attention_mask = ColoAttention.prepare_attn_kwargs( + mask_shape, + inputs_embeds.dtype, + inputs_embeds.device, + q_padding_mask=attention_mask, + is_causal=True, + ) + else: + attention_mask = _prepare_4d_causal_attention_mask( + attention_mask, attention_mask.shape, inputs_embeds, past_key_values_length + ) hidden_states = inputs_embeds diff --git a/colossalai/shardformer/policies/llama.py b/colossalai/shardformer/policies/llama.py index a9c982231825..dd36605a81bf 100644 --- a/colossalai/shardformer/policies/llama.py +++ b/colossalai/shardformer/policies/llama.py @@ -20,9 +20,7 @@ from ..modeling.llama import ( LlamaPipelineForwards, get_llama_flash_attention_forward, - get_llama_model_forward_for_flash_attn, - get_llama_seq_parallel_attention_forward, - get_llama_seq_parallel_model_forward, + get_llama_flash_attention_model_forward, get_lm_forward_with_dist_cross_entropy, ) from .base_policy import ModulePolicyDescription, Policy, SubModuleReplacementDescription @@ -82,33 +80,9 @@ def module_policy(self) -> Dict[Union[str, nn.Module], ModulePolicyDescription]: ) sp_partial_derived = sp_mode in ["split_gather", "ring"] - use_flash_attention = self.shard_config.enable_flash_attention - # Currently sp cannot to be used with flashattention - if sp_mode in ["split_gather", "ring", "all_to_all"]: - if use_flash_attention: - warnings.warn( - f"Sequence parallelism mode {sp_mode} need to be used with FlashAttention, will disable FlashAttention automatically." - ) - use_flash_attention = False + self.shard_config.enable_flash_attention - if sp_mode in ["split_gather", "ring"]: - self.append_or_create_method_replacement( - description={ - "forward": get_llama_seq_parallel_model_forward( - sp_mode=sp_mode, sp_size=sp_size, sp_group=sp_group - ), - }, - policy=policy, - target_key=LlamaModel, - ) - self.append_or_create_method_replacement( - description={ - "forward": get_llama_seq_parallel_attention_forward(sp_mode, sp_size, sp_group), - }, - policy=policy, - target_key=attn_cls, - ) - elif sp_mode == "all_to_all": + if sp_mode == "all_to_all": decoder_attribute_replacement = { "num_heads": self.model.config.num_attention_heads // sp_size, } @@ -118,24 +92,26 @@ def module_policy(self) -> Dict[Union[str, nn.Module], ModulePolicyDescription]: policy[attn_cls] = ModulePolicyDescription( attribute_replacement=decoder_attribute_replacement, ) - self.append_or_create_method_replacement( - description={ - "forward": get_llama_seq_parallel_attention_forward(sp_mode, sp_size, sp_group), - }, - policy=policy, - target_key=attn_cls, - ) - self.append_or_create_method_replacement( - description={ - "forward": get_llama_seq_parallel_model_forward( - sp_mode=sp_mode, - sp_size=sp_size, - sp_group=sp_group, - ), - }, - policy=policy, - target_key=LlamaModel, - ) + + self.append_or_create_method_replacement( + description={ + "forward": get_llama_flash_attention_forward(self.shard_config, sp_mode, sp_size, sp_group), + }, + policy=policy, + target_key=attn_cls, + ) + self.append_or_create_method_replacement( + description={ + "forward": get_llama_flash_attention_model_forward( + self.shard_config, + sp_mode=sp_mode, + sp_size=sp_size, + sp_group=sp_group, + ), + }, + policy=policy, + target_key=LlamaModel, + ) if self.shard_config.enable_tensor_parallelism: assert ( @@ -235,25 +211,6 @@ def module_policy(self) -> Dict[Union[str, nn.Module], ModulePolicyDescription]: target_key=LlamaModel, ) - # use flash attention - if use_flash_attention: - self.append_or_create_method_replacement( - description={ - "forward": get_llama_flash_attention_forward(self.shard_config, sp_mode, sp_group, sp_size), - }, - policy=policy, - target_key=attn_cls, - ) - if self.pipeline_stage_manager is None: - # replace llama model forward method - self.append_or_create_method_replacement( - description={ - "forward": get_llama_model_forward_for_flash_attn(self.shard_config), - }, - policy=policy, - target_key=LlamaModel, - ) - return policy def postprocess(self): diff --git a/examples/language/llama/benchmark.py b/examples/language/llama/benchmark.py index 8d4dae314d78..f6c975305f75 100644 --- a/examples/language/llama/benchmark.py +++ b/examples/language/llama/benchmark.py @@ -72,6 +72,7 @@ def main(): parser.add_argument("--offload_optim_frac", type=float, default=0.0, help="Offload optim fraction. Only for gemini") parser.add_argument("--offload_param_frac", type=float, default=0.0, help="Offload param fraction. Only for gemini") parser.add_argument("--tp", type=int, default=1, help="Tensor parallel size") + parser.add_argument("--sp", type=int, default=1, help="Sequence parallel size") parser.add_argument("--extra_dp", type=int, default=1, help="Extra data parallel size, used for Gemini") parser.add_argument("--pp", type=int, default=1, help="Pipeline parallel size") parser.add_argument("--mbs", type=int, default=1, help="Micro batch size of pipeline parallel") @@ -174,6 +175,8 @@ def empty_init(): tp_size=args.tp, pp_size=args.pp, zero_stage=args.zero, + sp_size=args.sp, + enable_sequence_parallelism=args.sp > 1, enable_fused_normalization=torch.cuda.is_available(), enable_flash_attention=args.xformers, microbatch_size=args.mbs, diff --git a/tests/test_shardformer/test_model/test_shard_llama.py b/tests/test_shardformer/test_model/test_shard_llama.py index c38570f8599c..e2eb6356fd12 100644 --- a/tests/test_shardformer/test_model/test_shard_llama.py +++ b/tests/test_shardformer/test_model/test_shard_llama.py @@ -133,9 +133,10 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, @parameterize( "test_config", [ - { + { # Test ring + Flash attention "tp_size": 2, "pp_size": 1, + "sp_size": 2, "num_microbatches": 1, "enable_sequence_parallelism": True, "sequence_parallelism_mode": "ring", @@ -145,6 +146,19 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, "precision": "fp16", "initial_scale": 1, }, + { # Ulysess + Flash attention + "tp_size": 1, + "pp_size": 2, + "sp_size": 2, + "num_microbatches": 2, + "enable_sequence_parallelism": True, + "sequence_parallelism_mode": "all_to_all", + "enable_flash_attention": True, + "use_lazy_init": True, + "zero_stage": 1, + "precision": "fp16", + "initial_scale": 1, + }, { "tp_size": 4, "pp_size": 1, @@ -347,4 +361,4 @@ def test_llama_3d(): if __name__ == "__main__": test_llama() - test_llama_3d() + # test_llama_3d() From d2aeb2eb2b058f3b1604f75d3aad81329ce15024 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Thu, 6 Jun 2024 11:48:53 +0000 Subject: [PATCH 02/10] remove comments --- colossalai/shardformer/modeling/llama.py | 221 ------------------ .../test_model/test_shard_llama.py | 2 +- 2 files changed, 1 insertion(+), 222 deletions(-) diff --git a/colossalai/shardformer/modeling/llama.py b/colossalai/shardformer/modeling/llama.py index 77f86ab0d25b..05b4e9037102 100644 --- a/colossalai/shardformer/modeling/llama.py +++ b/colossalai/shardformer/modeling/llama.py @@ -466,227 +466,6 @@ def llama_for_sequence_classification_forward( return {"hidden_states": hidden_states} -# def get_llama_flash_attention_forward(sp_mode, sp_group, sp_size): -# from transformers.models.llama.modeling_llama import LlamaAttention, apply_rotary_pos_emb - -# try: -# from transformers.models.llama.modeling_llama import repeat_kv -# except: -# warnings.warn("using llamav1, llamav1 hasn't repeat_kv function") - -# def forward( -# self: LlamaAttention, -# hidden_states: torch.Tensor, -# attention_mask: Optional[dict] = None, -# position_ids: Optional[torch.LongTensor] = None, -# past_key_value: Optional[Cache] = None, -# output_attentions: bool = False, -# use_cache: bool = False, -# **kwargs, -# ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: -# if "padding_mask" in kwargs: -# warnings.warn( -# "Passing `padding_mask` is deprecated and will be removed in v4.37. Please make sure use `attention_mask` instead.`" -# ) -# bsz, q_len, _ = hidden_states.size() - -# if sp_mode in ["split_gather", "ring"]: -# q_len *= sp_size -# # assert q_len % 4 == 0, "Flash Attention Error: The sequence length should be a multiple of 4." - -# query_states = self.q_proj(hidden_states) -# key_states = self.k_proj(hidden_states) -# value_states = self.v_proj(hidden_states) - -# # sp: all-to-all comminucation when introducing sequence parallel -# if sp_mode == "all_to_all": -# query_states = all_to_all_comm(query_states, sp_group) -# key_states = all_to_all_comm(key_states, sp_group) -# value_states = all_to_all_comm(value_states, sp_group) -# bsz, q_len, _ = query_states.size() - -# query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2) -# key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) -# value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) - -# kv_seq_len = key_states.shape[-2] -# if past_key_value is not None: -# if self.layer_idx is None: -# raise ValueError( -# f"The cache structure has changed since version v4.36. If you are using {self.__class__.__name__} " -# "for auto-regressive decoding with k/v caching, please make sure to initialize the attention class " -# "with a layer index." -# ) -# kv_seq_len += past_key_value.get_usable_length(kv_seq_len, self.layer_idx) - -# cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len) -# query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids) - -# if past_key_value is not None: -# cache_kwargs = {"sin": sin, "cos": cos} # Specific to RoPE models -# key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs) - -# key_states = repeat_kv(key_states, self.num_key_value_groups) -# value_states = repeat_kv(value_states, self.num_key_value_groups) - -# assert isinstance(attention_mask, dict), "Flash Attention Error: attention_mask should be a dict." -# attn_output = ColoAttention.attention(query_states, key_states, value_states, **attention_mask) -# attn_output = attn_output.transpose(1, 2).contiguous() -# attn_output = attn_output.reshape(bsz, q_len, self.hidden_size) - -# # sp: all-to-all comminucation when introducing sequence parallel -# if sp_mode == "all_to_all": -# attn_output = all_to_all_comm(attn_output, sp_group, scatter_dim=1, gather_dim=2) -# attn_output = self.o_proj(attn_output) - -# return attn_output, None, past_key_value - -# return forward - - -# def get_llama_model_forward_for_flash_attn(shard_config: ShardConfig, sp_mode=None, sp_size=None, sp_group=None): -# logger = logging.get_logger(__name__) -# assert shard_config.enable_flash_attention, "Flash Attention is not enabled." - -# def forward( -# self: LlamaModel, -# input_ids: torch.LongTensor = None, -# attention_mask: Optional[torch.Tensor] = None, -# position_ids: Optional[torch.LongTensor] = None, -# past_key_values: Optional[List[torch.FloatTensor]] = None, -# inputs_embeds: Optional[torch.FloatTensor] = None, -# use_cache: Optional[bool] = None, -# output_attentions: Optional[bool] = None, -# output_hidden_states: Optional[bool] = None, -# return_dict: Optional[bool] = None, -# ) -> Union[Tuple, BaseModelOutputWithPast]: -# output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions -# output_hidden_states = ( -# output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states -# ) -# use_cache = use_cache if use_cache is not None else self.config.use_cache - -# return_dict = return_dict if return_dict is not None else self.config.use_return_dict - -# # retrieve input_ids and inputs_embeds -# if input_ids is not None and inputs_embeds is not None: -# raise ValueError("You cannot specify both decoder_input_ids and decoder_inputs_embeds at the same time") -# elif input_ids is not None: -# batch_size, seq_length = input_ids.shape -# elif inputs_embeds is not None: -# batch_size, seq_length, _ = inputs_embeds.shape -# else: -# raise ValueError("You have to specify either decoder_input_ids or decoder_inputs_embeds") - -# seq_length_with_past = seq_length -# past_key_values_length = 0 - -# if past_key_values is not None: -# past_key_values_length = past_key_values[0][0].shape[2] -# seq_length_with_past = seq_length_with_past + past_key_values_length - -# if position_ids is None: -# device = input_ids.device if input_ids is not None else inputs_embeds.device -# position_ids = torch.arange( -# past_key_values_length, -# seq_length + past_key_values_length, -# dtype=torch.long, -# device=device, -# ) -# position_ids = position_ids.unsqueeze(0).view(-1, seq_length) -# else: -# position_ids = position_ids.view(-1, seq_length).long() - -# if inputs_embeds is None: -# inputs_embeds = self.embed_tokens(input_ids) - -# if sp_mode in ["ring", "split_gather"]: -# inputs_embeds = split_forward_gather_backward(inputs_embeds, 1, sp_group) -# elif sp_mode == "all_to_all": -# inputs_embeds = split_forward_gather_backward(inputs_embeds, 1, sp_group, 1 / sp_size) -# # embed positions -# hidden_states = inputs_embeds - -# # in this case, attention_mask is a dict rather than a tensor -# mask_shape = (batch_size, 1, seq_length_with_past, seq_length_with_past) -# attention_mask = ColoAttention.prepare_attn_kwargs( -# mask_shape, -# hidden_states.dtype, -# hidden_states.device, -# q_padding_mask=attention_mask, -# is_causal=True, -# ) - -# if self.gradient_checkpointing and self.training: -# if use_cache: -# logger.warning_once( -# "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..." -# ) -# use_cache = False - -# # decoder layers -# all_hidden_states = () if output_hidden_states else None -# all_self_attns = () if output_attentions else None -# next_decoder_cache = () if use_cache else None - -# for idx, decoder_layer in enumerate(self.layers): -# if output_hidden_states: -# all_hidden_states += (hidden_states,) - -# past_key_value = past_key_values[idx] if past_key_values is not None else None - -# if self.gradient_checkpointing and self.training: - -# def create_custom_forward(module): -# def custom_forward(*inputs): -# # None for past_key_value -# return module(*inputs, past_key_value, output_attentions) - -# return custom_forward - -# layer_outputs = torch.utils.checkpoint.checkpoint( -# create_custom_forward(decoder_layer), -# hidden_states, -# attention_mask, -# position_ids, -# ) -# else: -# layer_outputs = decoder_layer( -# hidden_states, -# attention_mask=attention_mask, -# position_ids=position_ids, -# past_key_value=past_key_value, -# output_attentions=output_attentions, -# use_cache=use_cache, -# ) - -# hidden_states = layer_outputs[0] - -# if use_cache: -# next_decoder_cache += (layer_outputs[2 if output_attentions else 1],) - -# if output_attentions: -# all_self_attns += (layer_outputs[1],) - -# hidden_states = self.norm(hidden_states) - -# # add hidden states from the last decoder layer -# if output_hidden_states: -# all_hidden_states += (hidden_states,) - -# next_cache = next_decoder_cache if use_cache else None -# if not return_dict: -# return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None) -# return BaseModelOutputWithPast( -# last_hidden_state=hidden_states, -# past_key_values=next_cache, -# hidden_states=all_hidden_states, -# attentions=all_self_attns, -# ) - -# return forward - - def get_lm_forward_with_dist_cross_entropy(shard_config: ShardConfig): from transformers import LlamaForCausalLM diff --git a/tests/test_shardformer/test_model/test_shard_llama.py b/tests/test_shardformer/test_model/test_shard_llama.py index e2eb6356fd12..d9081c053c57 100644 --- a/tests/test_shardformer/test_model/test_shard_llama.py +++ b/tests/test_shardformer/test_model/test_shard_llama.py @@ -361,4 +361,4 @@ def test_llama_3d(): if __name__ == "__main__": test_llama() - # test_llama_3d() + test_llama_3d() From 932753fa8065e4ecfd63106660fc7edef04d94a0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 6 Jun 2024 11:55:31 +0000 Subject: [PATCH 03/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- colossalai/shardformer/modeling/llama.py | 1 - 1 file changed, 1 deletion(-) diff --git a/colossalai/shardformer/modeling/llama.py b/colossalai/shardformer/modeling/llama.py index 82facd176aef..da1643f795db 100644 --- a/colossalai/shardformer/modeling/llama.py +++ b/colossalai/shardformer/modeling/llama.py @@ -6,7 +6,6 @@ import torch.utils.checkpoint from torch import nn from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss -from transformers.cache_utils import Cache from transformers.modeling_attn_mask_utils import ( _prepare_4d_causal_attention_mask, _prepare_4d_causal_attention_mask_for_sdpa, From c92951d4eee62032e79e172cc53d720686808856 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Thu, 13 Jun 2024 11:25:12 +0000 Subject: [PATCH 04/10] fix sp group reversed bug --- .../booster/plugin/hybrid_parallel_plugin.py | 4 +-- colossalai/shardformer/modeling/llama.py | 25 +++++++++++++---- colossalai/shardformer/policies/llama.py | 2 -- .../test_model/test_shard_llama.py | 28 ++++++++++++------- 4 files changed, 39 insertions(+), 20 deletions(-) diff --git a/colossalai/booster/plugin/hybrid_parallel_plugin.py b/colossalai/booster/plugin/hybrid_parallel_plugin.py index 678dc7f706b4..6ccd1412ce2c 100644 --- a/colossalai/booster/plugin/hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/hybrid_parallel_plugin.py @@ -1046,8 +1046,8 @@ def __init__( ( self.dp_axis, self.pp_axis, - self.sp_axis, self.tp_axis, + self.sp_axis, ) = ( 0, 1, @@ -1056,7 +1056,7 @@ def __init__( ) self.pg_mesh = ProcessGroupMesh(self.dp_size, self.pp_size, self.tp_size, self.sp_size) else: - self.pp_axis, self.dp_axis, self.sp_axis, self.tp_axis = 0, 1, 2, 3 + self.pp_axis, self.dp_axis, self.tp_axis, self.sp_axis = 0, 1, 2, 3 self.pg_mesh = ProcessGroupMesh(self.pp_size, self.dp_size, self.tp_size, self.sp_size) self.stage_manager = None diff --git a/colossalai/shardformer/modeling/llama.py b/colossalai/shardformer/modeling/llama.py index da1643f795db..0cb62a7bf114 100644 --- a/colossalai/shardformer/modeling/llama.py +++ b/colossalai/shardformer/modeling/llama.py @@ -6,6 +6,7 @@ import torch.utils.checkpoint from torch import nn from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss +from transformers.cache_utils import Cache from transformers.modeling_attn_mask_utils import ( _prepare_4d_causal_attention_mask, _prepare_4d_causal_attention_mask_for_sdpa, @@ -577,10 +578,10 @@ def forward( hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, - past_key_value: Optional[Tuple[torch.Tensor]] = None, + past_key_value: Optional[Cache] = None, output_attentions: bool = False, use_cache: bool = False, - ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: + ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Cache]]: if sp_mode is not None: assert sp_mode in ["all_to_all", "split_gather", "ring"], "Invalid sp_mode" assert (sp_size is not None) and ( @@ -625,15 +626,27 @@ def forward( value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) kv_seq_len = key_states.shape[-2] + # if past_key_value is not None: + # kv_seq_len += past_key_value[0].shape[-2] if past_key_value is not None: - kv_seq_len += past_key_value[0].shape[-2] + if self.layer_idx is None: + raise ValueError( + f"The cache structure has changed since version v4.36. If you are using {self.__class__.__name__} " + "for auto-regressive decoding with k/v caching, please make sure to initialize the attention class " + "with a layer index." + ) + kv_seq_len += past_key_value.get_usable_length(kv_seq_len, self.layer_idx) + cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len) query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids) + # if past_key_value is not None: + # # reuse k, v, self_attention + # key_states = torch.cat([past_key_value[0], key_states], dim=2) + # value_states = torch.cat([past_key_value[1], value_states], dim=2) if past_key_value is not None: - # reuse k, v, self_attention - key_states = torch.cat([past_key_value[0], key_states], dim=2) - value_states = torch.cat([past_key_value[1], value_states], dim=2) + cache_kwargs = {"sin": sin, "cos": cos} # Specific to RoPE models + key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs) past_key_value = (key_states, value_states) if use_cache else None diff --git a/colossalai/shardformer/policies/llama.py b/colossalai/shardformer/policies/llama.py index dd36605a81bf..f4ac79ba2713 100644 --- a/colossalai/shardformer/policies/llama.py +++ b/colossalai/shardformer/policies/llama.py @@ -80,8 +80,6 @@ def module_policy(self) -> Dict[Union[str, nn.Module], ModulePolicyDescription]: ) sp_partial_derived = sp_mode in ["split_gather", "ring"] - self.shard_config.enable_flash_attention - if sp_mode == "all_to_all": decoder_attribute_replacement = { "num_heads": self.model.config.num_attention_heads // sp_size, diff --git a/tests/test_shardformer/test_model/test_shard_llama.py b/tests/test_shardformer/test_model/test_shard_llama.py index a2f7d0d10c02..2c4f91fb8d58 100644 --- a/tests/test_shardformer/test_model/test_shard_llama.py +++ b/tests/test_shardformer/test_model/test_shard_llama.py @@ -160,25 +160,25 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, "initial_scale": 1, }, { - "tp_size": 4, + "tp_size": 1, "pp_size": 1, - "num_microbatches": 1, + "sp_size": 2, + "num_microbatches": 2, "enable_sequence_parallelism": True, - "sequence_parallelism_mode": "split_gather", - "enable_flash_attention": False, + "sequence_parallelism_mode": "all_to_all", "use_lazy_init": True, + "zero_stage": 1, "precision": "fp16", "initial_scale": 1, }, { - "tp_size": 1, + "tp_size": 4, "pp_size": 1, - "sp_size": 2, "num_microbatches": 1, "enable_sequence_parallelism": True, - "sequence_parallelism_mode": "all_to_all", + "sequence_parallelism_mode": "split_gather", + "enable_flash_attention": False, "use_lazy_init": True, - "zero_stage": 2, "precision": "fp16", "initial_scale": 1, }, @@ -227,7 +227,11 @@ def run_llama_test(test_config): sub_model_zoo = model_zoo.get_sub_registry("transformers_llama") for name, (model_fn, data_gen_fn, output_transform_fn, loss_fn, _) in sub_model_zoo.items(): - check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, test_config) + try: + check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, test_config) + except Exception as e: + print(f"Failed config: {test_config}") + raise e clear_layout_converter() Randomizer.reset_index() @@ -277,7 +281,11 @@ def run_llama_3d_test(test_config): sub_model_zoo = model_zoo.get_sub_registry("transformers_llama") for name, (model_fn, data_gen_fn, output_transform_fn, loss_fn, _) in sub_model_zoo.items(): - check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, test_config) + try: + check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, test_config) + except Exception as e: + print(f"Failed config: {test_config}") + raise e clear_layout_converter() Randomizer.reset_index() From 6d6eb37d9b0a1765dd9ec749b57ec33320193995 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Thu, 13 Jun 2024 11:46:00 +0000 Subject: [PATCH 05/10] remove model forward replacement with pp --- colossalai/shardformer/policies/llama.py | 25 ++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/colossalai/shardformer/policies/llama.py b/colossalai/shardformer/policies/llama.py index f4ac79ba2713..7146608f304b 100644 --- a/colossalai/shardformer/policies/llama.py +++ b/colossalai/shardformer/policies/llama.py @@ -98,18 +98,19 @@ def module_policy(self) -> Dict[Union[str, nn.Module], ModulePolicyDescription]: policy=policy, target_key=attn_cls, ) - self.append_or_create_method_replacement( - description={ - "forward": get_llama_flash_attention_model_forward( - self.shard_config, - sp_mode=sp_mode, - sp_size=sp_size, - sp_group=sp_group, - ), - }, - policy=policy, - target_key=LlamaModel, - ) + if self.pipeline_stage_manager is None: + self.append_or_create_method_replacement( + description={ + "forward": get_llama_flash_attention_model_forward( + self.shard_config, + sp_mode=sp_mode, + sp_size=sp_size, + sp_group=sp_group, + ), + }, + policy=policy, + target_key=LlamaModel, + ) if self.shard_config.enable_tensor_parallelism: assert ( From 90485ce718d36e8bb9d78259c096c1eb6633a41c Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Fri, 14 Jun 2024 07:01:22 +0000 Subject: [PATCH 06/10] debug --- .../booster/plugin/hybrid_parallel_plugin.py | 14 -- colossalai/shardformer/modeling/llama.py | 206 +++++++++--------- .../test_model/test_shard_llama.py | 17 +- 3 files changed, 117 insertions(+), 120 deletions(-) diff --git a/colossalai/booster/plugin/hybrid_parallel_plugin.py b/colossalai/booster/plugin/hybrid_parallel_plugin.py index 6ccd1412ce2c..3874c8f1f2c2 100644 --- a/colossalai/booster/plugin/hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/hybrid_parallel_plugin.py @@ -1006,23 +1006,9 @@ def __init__( self.sequence_parallelism_mode in SUPPORT_SP_MODE ), f"Sequence parallelism mode {self.sequence_parallelism_mode} is not in the supported list {SUPPORT_SP_MODE}" if self.sequence_parallelism_mode in ["split_gather", "ring"]: - # assert ( - # tp_size > 1 - # ), f"Sequence parallelism mode {self.sequence_parallelism_mode} must be enabled when using tensor parallelism" - # if sp_size != 1: - # warnings.warn( - # f"The sp_size will be the same as tp_size in sequence parallelism mode {self.sequence_parallelism_mode}, will ignore the given sequence parallelism size." - # ) - # self.sp_size = 1 self.sp_size = sp_size if sp_size is not None else 1 self.dp_size = dist.get_world_size() // (tp_size * pp_size * self.sp_size) elif self.sequence_parallelism_mode in ["all_to_all"]: - # assert ( - # tp_size == 1 - # ), f"Sequence parallelism mode {self.sequence_parallelism_mode} cannot be used with tensor parallelism" - # assert ( - # pp_size == 1 - # ), f"Sequence parallelism mode {self.sequence_parallelism_mode} cannot be used with pipeline parallelism" self.sp_size = dist.get_world_size() // pp_size if sp_size is None else sp_size self.dp_size = dist.get_world_size() // (self.sp_size * pp_size * tp_size) else: diff --git a/colossalai/shardformer/modeling/llama.py b/colossalai/shardformer/modeling/llama.py index 0cb62a7bf114..95a75e294553 100644 --- a/colossalai/shardformer/modeling/llama.py +++ b/colossalai/shardformer/modeling/llama.py @@ -469,109 +469,6 @@ def llama_for_sequence_classification_forward( return {"hidden_states": hidden_states} -def get_lm_forward_with_dist_cross_entropy(shard_config: ShardConfig): - from transformers import LlamaForCausalLM - - def forward( - self: LlamaForCausalLM, - input_ids: torch.LongTensor = None, - attention_mask: Optional[torch.Tensor] = None, - position_ids: Optional[torch.LongTensor] = None, - past_key_values: Optional[List[torch.FloatTensor]] = None, - inputs_embeds: Optional[torch.FloatTensor] = None, - labels: Optional[torch.LongTensor] = None, - use_cache: Optional[bool] = None, - output_attentions: Optional[bool] = None, - output_hidden_states: Optional[bool] = None, - return_dict: Optional[bool] = None, - ) -> Union[Tuple, CausalLMOutputWithPast]: - r""" - Args: - labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): - Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., - config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored - (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. - - Returns: - - Example: - - ```python - >>> from transformers import AutoTokenizer, LlamaForCausalLM - - >>> model = LlamaForCausalLM.from_pretrained(PATH_TO_CONVERTED_WEIGHTS) - >>> tokenizer = AutoTokenizer.from_pretrained(PATH_TO_CONVERTED_TOKENIZER) - - >>> prompt = "Hey, are you conscious? Can you talk to me?" - >>> inputs = tokenizer(prompt, return_tensors="pt") - - >>> # Generate - >>> generate_ids = model.generate(inputs.input_ids, max_length=30) - >>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] - "Hey, are you conscious? Can you talk to me?\nI'm not conscious, but I can talk to you." - ```""" - - output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions - output_hidden_states = ( - output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states - ) - return_dict = return_dict if return_dict is not None else self.config.use_return_dict - - # decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn) - outputs = self.model( - input_ids=input_ids, - attention_mask=attention_mask, - position_ids=position_ids, - past_key_values=past_key_values, - inputs_embeds=inputs_embeds, - use_cache=use_cache, - output_attentions=output_attentions, - output_hidden_states=output_hidden_states, - return_dict=return_dict, - ) - - hidden_states = outputs[0] - if self.config.pretraining_tp > 1: - lm_head_slices = self.lm_head.weight.split(self.vocab_size // self.config.pretraining_tp, dim=0) - logits = [F.linear(hidden_states, lm_head_slices[i]) for i in range(self.config.pretraining_tp)] - logits = torch.cat(logits, dim=-1) - else: - logits = self.lm_head(hidden_states) - logits = logits.float() - - loss = None - if labels is not None: - # Shift so that tokens < n predict n - shift_logits = logits[..., :-1, :].contiguous() - shift_labels = labels[..., 1:].contiguous() - shift_labels = shift_labels.view(-1) - # Enable model parallelism - shift_labels = shift_labels.to(shift_logits.device) - new_vocab_size = logits.shape[-1] - shift_logits = shift_logits.view(-1, new_vocab_size) - loss = cross_entropy_1d( - shift_logits, - shift_labels, - process_group=shard_config.tensor_parallel_process_group, - vocab_size=self.lm_head.out_features, - dtype=self.model.dtype, - ) - - if not return_dict: - output = (logits,) + outputs[1:] - return (loss,) + output if loss is not None else output - - return CausalLMOutputWithPast( - loss=loss, - logits=logits, - past_key_values=outputs.past_key_values, - hidden_states=outputs.hidden_states, - attentions=outputs.attentions, - ) - - return forward - - def get_llama_flash_attention_forward(shard_config, sp_mode=None, sp_size=None, sp_group=None): def forward( self, @@ -865,3 +762,106 @@ def custom_forward(*inputs): ) return forward + + +def get_lm_forward_with_dist_cross_entropy(shard_config: ShardConfig): + from transformers import LlamaForCausalLM + + def forward( + self: LlamaForCausalLM, + input_ids: torch.LongTensor = None, + attention_mask: Optional[torch.Tensor] = None, + position_ids: Optional[torch.LongTensor] = None, + past_key_values: Optional[List[torch.FloatTensor]] = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + labels: Optional[torch.LongTensor] = None, + use_cache: Optional[bool] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + ) -> Union[Tuple, CausalLMOutputWithPast]: + r""" + Args: + labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): + Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., + config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored + (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. + + Returns: + + Example: + + ```python + >>> from transformers import AutoTokenizer, LlamaForCausalLM + + >>> model = LlamaForCausalLM.from_pretrained(PATH_TO_CONVERTED_WEIGHTS) + >>> tokenizer = AutoTokenizer.from_pretrained(PATH_TO_CONVERTED_TOKENIZER) + + >>> prompt = "Hey, are you conscious? Can you talk to me?" + >>> inputs = tokenizer(prompt, return_tensors="pt") + + >>> # Generate + >>> generate_ids = model.generate(inputs.input_ids, max_length=30) + >>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] + "Hey, are you conscious? Can you talk to me?\nI'm not conscious, but I can talk to you." + ```""" + + output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions + output_hidden_states = ( + output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states + ) + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + # decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn) + outputs = self.model( + input_ids=input_ids, + attention_mask=attention_mask, + position_ids=position_ids, + past_key_values=past_key_values, + inputs_embeds=inputs_embeds, + use_cache=use_cache, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + ) + + hidden_states = outputs[0] + if self.config.pretraining_tp > 1: + lm_head_slices = self.lm_head.weight.split(self.vocab_size // self.config.pretraining_tp, dim=0) + logits = [F.linear(hidden_states, lm_head_slices[i]) for i in range(self.config.pretraining_tp)] + logits = torch.cat(logits, dim=-1) + else: + logits = self.lm_head(hidden_states) + logits = logits.float() + + loss = None + if labels is not None: + # Shift so that tokens < n predict n + shift_logits = logits[..., :-1, :].contiguous() + shift_labels = labels[..., 1:].contiguous() + shift_labels = shift_labels.view(-1) + # Enable model parallelism + shift_labels = shift_labels.to(shift_logits.device) + new_vocab_size = logits.shape[-1] + shift_logits = shift_logits.view(-1, new_vocab_size) + loss = cross_entropy_1d( + shift_logits, + shift_labels, + process_group=shard_config.tensor_parallel_process_group, + vocab_size=self.lm_head.out_features, + dtype=self.model.dtype, + ) + + if not return_dict: + output = (logits,) + outputs[1:] + return (loss,) + output if loss is not None else output + + return CausalLMOutputWithPast( + loss=loss, + logits=logits, + past_key_values=outputs.past_key_values, + hidden_states=outputs.hidden_states, + attentions=outputs.attentions, + ) + + return forward diff --git a/tests/test_shardformer/test_model/test_shard_llama.py b/tests/test_shardformer/test_model/test_shard_llama.py index 2c4f91fb8d58..4cae302e6e8f 100644 --- a/tests/test_shardformer/test_model/test_shard_llama.py +++ b/tests/test_shardformer/test_model/test_shard_llama.py @@ -120,9 +120,20 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, atol, rtol = 1e-4, 1e-3 else: atol, rtol = 5e-3, 5e-3 - check_weight( - llama_model, shard_llama_model, col_layer_for_check, tp_group, atol=atol, rtol=rtol, dim=1, verbose=False - ) + try: + check_weight( + llama_model, + shard_llama_model, + col_layer_for_check, + tp_group, + atol=atol, + rtol=rtol, + dim=1, + verbose=False, + ) + except Exception as e: + print(f"Failed config: {test_config}") + raise e # check grads check_all_grad_tensors(grads_to_check) From 996944d73bb87d0f0b2ba4f60762af40cc2c6480 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Sun, 16 Jun 2024 07:11:53 +0000 Subject: [PATCH 07/10] fix attention mask bug --- colossalai/shardformer/modeling/llama.py | 34 +++++++++++------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/colossalai/shardformer/modeling/llama.py b/colossalai/shardformer/modeling/llama.py index 51e4ae3cba4e..c214a16783a6 100644 --- a/colossalai/shardformer/modeling/llama.py +++ b/colossalai/shardformer/modeling/llama.py @@ -24,7 +24,11 @@ from transformers.utils import logging from colossalai.pipeline.stage_manager import PipelineStageManager -from colossalai.shardformer.layer._operation import all_to_all_comm, gather_forward_split_backward +from colossalai.shardformer.layer._operation import ( + all_to_all_comm, + gather_forward_split_backward, + split_forward_gather_backward, +) from colossalai.shardformer.shard import ShardConfig from ..layer import ColoAttention, cross_entropy_1d @@ -528,16 +532,6 @@ def forward( kv_seq_len += past_key_value.get_usable_length(kv_seq_len, self.layer_idx) cos, sin = self.rotary_emb(value_states, position_ids) - print( - "position_ids.shape", - position_ids.shape, - "key_states.shape", - key_states.shape, - "cos.shape", - cos.shape, - "sin.shape", - sin.shape, - ) query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) if past_key_value is not None: @@ -639,6 +633,7 @@ def forward( inputs_embeds = self.embed_tokens(input_ids) past_seen_tokens = 0 + seq_len = inputs_embeds.shape[1] if use_cache: # kept for BC (cache positions) if not isinstance(past_key_values, StaticCache): past_key_values = DynamicCache.from_legacy_cache(past_key_values) @@ -646,19 +641,14 @@ def forward( if cache_position is None: if isinstance(past_key_values, StaticCache): raise ValueError("cache_position is a required argument when using StaticCache.") - cache_position = torch.arange( - past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device - ) + cache_position = torch.arange(past_seen_tokens, past_seen_tokens + seq_len, device=inputs_embeds.device) if position_ids is None: position_ids = cache_position.unsqueeze(0) - # embed positions - hidden_states = inputs_embeds - # in this case, attention_mask is a dict rather than a tensor if shard_config.enable_flash_attention: - mask_shape = (hidden_states.shape[0], 1, past_seen_tokens, past_seen_tokens) + mask_shape = (inputs_embeds.shape[0], 1, past_seen_tokens + seq_len, past_seen_tokens + seq_len) attention_mask = ColoAttention.prepare_attn_kwargs( mask_shape, inputs_embeds.dtype, @@ -667,7 +657,13 @@ def forward( is_causal=True, ) else: - attention_mask = self._update_causal_mask(attention_mask, hidden_states, cache_position) + attention_mask = self._update_causal_mask(attention_mask, inputs_embeds, cache_position) + + if sp_mode in ["ring", "split_gather"]: + inputs_embeds = split_forward_gather_backward(inputs_embeds, 1, sp_group) + elif sp_mode == "all_to_all": + inputs_embeds = split_forward_gather_backward(inputs_embeds, 1, sp_group, 1 / sp_size) + hidden_states = inputs_embeds # decoder layers all_hidden_states = () if output_hidden_states else None From 20059ae7baab72094390ea6739fe39b977ee5be1 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Mon, 17 Jun 2024 03:36:22 +0000 Subject: [PATCH 08/10] fix sp_size bug --- .../booster/plugin/hybrid_parallel_plugin.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/colossalai/booster/plugin/hybrid_parallel_plugin.py b/colossalai/booster/plugin/hybrid_parallel_plugin.py index 3874c8f1f2c2..3c05a8c9e20c 100644 --- a/colossalai/booster/plugin/hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/hybrid_parallel_plugin.py @@ -1006,10 +1006,18 @@ def __init__( self.sequence_parallelism_mode in SUPPORT_SP_MODE ), f"Sequence parallelism mode {self.sequence_parallelism_mode} is not in the supported list {SUPPORT_SP_MODE}" if self.sequence_parallelism_mode in ["split_gather", "ring"]: - self.sp_size = sp_size if sp_size is not None else 1 - self.dp_size = dist.get_world_size() // (tp_size * pp_size * self.sp_size) + assert ( + tp_size > 1 + ), f"Sequence parallelism mode {self.sequence_parallelism_mode} must be enabled when using tensor parallelism" + if sp_size != 1: + warnings.warn( + f"The sp_size will be the same as tp_size in sequence parallelism mode {self.sequence_parallelism_mode}, will ignore the given sequence parallelism size." + ) + self.sp_size = 1 + self.dp_size = dist.get_world_size() // (tp_size * pp_size) + elif self.sequence_parallelism_mode in ["all_to_all"]: - self.sp_size = dist.get_world_size() // pp_size if sp_size is None else sp_size + self.sp_size = 1 if sp_size is None else sp_size self.dp_size = dist.get_world_size() // (self.sp_size * pp_size * tp_size) else: self.dp_size = dist.get_world_size() // (tp_size * pp_size) From a2e07c9ff5be94b6ee4b307a1369f872b3cc8dcf Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Mon, 17 Jun 2024 06:28:29 +0000 Subject: [PATCH 09/10] remove comments --- .../booster/plugin/hybrid_parallel_plugin.py | 1 - colossalai/shardformer/modeling/llama.py | 13 ++++++------- colossalai/shardformer/policies/llama.py | 16 ++++++++-------- .../test_model/test_shard_llama.py | 2 +- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/colossalai/booster/plugin/hybrid_parallel_plugin.py b/colossalai/booster/plugin/hybrid_parallel_plugin.py index 3c05a8c9e20c..fa3c3646a592 100644 --- a/colossalai/booster/plugin/hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/hybrid_parallel_plugin.py @@ -1015,7 +1015,6 @@ def __init__( ) self.sp_size = 1 self.dp_size = dist.get_world_size() // (tp_size * pp_size) - elif self.sequence_parallelism_mode in ["all_to_all"]: self.sp_size = 1 if sp_size is None else sp_size self.dp_size = dist.get_world_size() // (self.sp_size * pp_size * tp_size) diff --git a/colossalai/shardformer/modeling/llama.py b/colossalai/shardformer/modeling/llama.py index c214a16783a6..bf5ce45a8342 100644 --- a/colossalai/shardformer/modeling/llama.py +++ b/colossalai/shardformer/modeling/llama.py @@ -468,9 +468,10 @@ def forward( position_ids: Optional[torch.LongTensor] = None, past_key_value: Optional[Cache] = None, output_attentions: bool = False, + use_cache: bool = False, cache_position: Optional[torch.LongTensor] = None, **kwargs, - ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: + ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Cache]]: if sp_mode is not None: assert sp_mode in ["all_to_all", "split_gather", "ring"], "Invalid sp_mode" assert (sp_size is not None) and ( @@ -519,8 +520,6 @@ def forward( value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) kv_seq_len = key_states.shape[-2] - # if past_key_value is not None: - # kv_seq_len += past_key_value[0].shape[-2] if past_key_value is not None: if self.layer_idx is None: raise ValueError( @@ -538,8 +537,6 @@ def forward( cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs) - # past_key_value = (key_states, value_states) if use_cache else None - # repeat k/v heads if n_kv_heads < n_heads key_states = repeat_kv(key_states, self.num_key_value_groups) value_states = repeat_kv(value_states, self.num_key_value_groups) @@ -619,8 +616,10 @@ def forward( return_dict = return_dict if return_dict is not None else self.config.use_return_dict # retrieve input_ids and inputs_embeds - if input_ids is not None and inputs_embeds is not None: - raise ValueError("You cannot specify both decoder_input_ids and decoder_inputs_embeds at the same time") + if (input_ids is None) ^ (inputs_embeds is not None): + raise ValueError( + "You cannot specify both input_ids and inputs_embeds at the same time, and must specify either one" + ) if (self.gradient_checkpointing or sp_mode in ["ring", "all_to_all"]) and self.training: if use_cache: diff --git a/colossalai/shardformer/policies/llama.py b/colossalai/shardformer/policies/llama.py index 7146608f304b..77ea3f421aa6 100644 --- a/colossalai/shardformer/policies/llama.py +++ b/colossalai/shardformer/policies/llama.py @@ -90,14 +90,14 @@ def module_policy(self) -> Dict[Union[str, nn.Module], ModulePolicyDescription]: policy[attn_cls] = ModulePolicyDescription( attribute_replacement=decoder_attribute_replacement, ) - - self.append_or_create_method_replacement( - description={ - "forward": get_llama_flash_attention_forward(self.shard_config, sp_mode, sp_size, sp_group), - }, - policy=policy, - target_key=attn_cls, - ) + if self.shard_config.enable_flash_attention or self.shard_config.enable_sequence_parallelism: + self.append_or_create_method_replacement( + description={ + "forward": get_llama_flash_attention_forward(self.shard_config, sp_mode, sp_size, sp_group), + }, + policy=policy, + target_key=attn_cls, + ) if self.pipeline_stage_manager is None: self.append_or_create_method_replacement( description={ diff --git a/tests/test_shardformer/test_model/test_shard_llama.py b/tests/test_shardformer/test_model/test_shard_llama.py index 4cae302e6e8f..3a8a1357deb0 100644 --- a/tests/test_shardformer/test_model/test_shard_llama.py +++ b/tests/test_shardformer/test_model/test_shard_llama.py @@ -174,7 +174,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, "tp_size": 1, "pp_size": 1, "sp_size": 2, - "num_microbatches": 2, + "num_microbatches": 1, "enable_sequence_parallelism": True, "sequence_parallelism_mode": "all_to_all", "use_lazy_init": True, From 07849f6f41dbf1d3162cbd6cf1c31dafe6493764 Mon Sep 17 00:00:00 2001 From: Edenzzzz Date: Mon, 17 Jun 2024 08:26:41 +0000 Subject: [PATCH 10/10] add tab --- colossalai/shardformer/policies/llama.py | 26 ++++++++++++------------ 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/colossalai/shardformer/policies/llama.py b/colossalai/shardformer/policies/llama.py index 77ea3f421aa6..5852713c2b49 100644 --- a/colossalai/shardformer/policies/llama.py +++ b/colossalai/shardformer/policies/llama.py @@ -98,19 +98,19 @@ def module_policy(self) -> Dict[Union[str, nn.Module], ModulePolicyDescription]: policy=policy, target_key=attn_cls, ) - if self.pipeline_stage_manager is None: - self.append_or_create_method_replacement( - description={ - "forward": get_llama_flash_attention_model_forward( - self.shard_config, - sp_mode=sp_mode, - sp_size=sp_size, - sp_group=sp_group, - ), - }, - policy=policy, - target_key=LlamaModel, - ) + if self.pipeline_stage_manager is None: + self.append_or_create_method_replacement( + description={ + "forward": get_llama_flash_attention_model_forward( + self.shard_config, + sp_mode=sp_mode, + sp_size=sp_size, + sp_group=sp_group, + ), + }, + policy=policy, + target_key=LlamaModel, + ) if self.shard_config.enable_tensor_parallelism: assert (