diff --git a/colossalai/booster/plugin/hybrid_parallel_plugin.py b/colossalai/booster/plugin/hybrid_parallel_plugin.py index e999c27d672f..9750cc6c0582 100644 --- a/colossalai/booster/plugin/hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/hybrid_parallel_plugin.py @@ -392,7 +392,7 @@ def __init__( self.pg_mesh, pipeline_axis=PP_AXIS, enable_interleave=pp_style == "interleaved", - num_model_chunks=num_model_chunks + num_model_chunks=num_model_chunks, ) if pp_style == "interleaved": @@ -405,9 +405,7 @@ def __init__( ) elif pp_style == "1f1b": self.schedule = OneForwardOneBackwardSchedule( - self.stage_manager, - num_microbatches=num_microbatches, - microbatch_size=microbatch_size + self.stage_manager, num_microbatches=num_microbatches, microbatch_size=microbatch_size ) else: raise NotImplementedError() diff --git a/colossalai/pipeline/schedule/interleaved_pp.py b/colossalai/pipeline/schedule/interleaved_pp.py index 5471419958e1..f4d05e941839 100644 --- a/colossalai/pipeline/schedule/interleaved_pp.py +++ b/colossalai/pipeline/schedule/interleaved_pp.py @@ -50,16 +50,13 @@ def load_batch(self, data_iter: Iterable, device: Optional[torch.device] = None) self.batch_size = get_batch_size(batch) self.microbatch_offset = [0 for _ in range(self.num_model_chunks)] if self.num_microbatch is not None: - assert ( - self.batch_size % self.num_microbatch == 0 - ), "Batch size should divided by the number of microbatch" + assert self.batch_size % self.num_microbatch == 0, "Batch size should divided by the number of microbatch" self.microbatch_size = self.batch_size // self.num_microbatch elif self.microbatch_size is not None: assert self.batch_size % self.microbatch_size == 0, "Batch size should divided by the microbatch size" self.num_microbatch = self.batch_size // self.microbatch_size else: - raise ValueError( - "Either num_microbatch or microbatch_size should be provided") + raise ValueError("Either num_microbatch or microbatch_size should be provided") assert ( self.num_microbatch % self.num_model_chunks == 0 @@ -323,10 +320,9 @@ def forward_backward_step( output_objs[model_chunk_id].append(output_obj) self.send_forward(model_chunk_id, output_obj) - if num_microbatch_remaining == 0 \ - and i + 1 == num_warmup_microbatch: + if num_microbatch_remaining == 0 and i + 1 == num_warmup_microbatch: break - + model_chunk_id = self.get_model_chunk_id(i + 1, is_forward=True) input_obj = self.recv_forward(model_chunk_id) diff --git a/colossalai/pipeline/stage_manager.py b/colossalai/pipeline/stage_manager.py index ed61bfcba0c7..367b11211b88 100644 --- a/colossalai/pipeline/stage_manager.py +++ b/colossalai/pipeline/stage_manager.py @@ -70,7 +70,7 @@ def __init__( def is_first_stage(self, model_chunk_id: Optional[int] = None) -> bool: """Is the current stage the first stage. - NOTE: + NOTE: 1. if using interleaved pipeline parallel, the first stage is the first chunk of the first device. 2. invoke is_first_stage() with model_chunk_id < 0 is equivalent to invoke is_first_device() @@ -79,8 +79,9 @@ def is_first_stage(self, model_chunk_id: Optional[int] = None) -> bool: """ if self.is_interleave and model_chunk_id is None: model_chunk_id = self.model_chunk_id - assert self.is_interleave ^ (model_chunk_id is None), \ - "model_chunk_id must be specified when using interleaved pipeline" + assert self.is_interleave ^ ( + model_chunk_id is None + ), "model_chunk_id must be specified when using interleaved pipeline" if not self.is_interleave or model_chunk_id < 0: return self.stage == 0 else: @@ -89,7 +90,7 @@ def is_first_stage(self, model_chunk_id: Optional[int] = None) -> bool: def is_last_stage(self, model_chunk_id: Optional[int] = None) -> bool: """Is the current stage the last stage. - NOTE: + NOTE: 1. if using interleaved pipeline parallel, the last stage is the last chunk of the last device. 2. invoke is_last_stage() with model_chunk_id < 0 is equivalent to invoke is_last_device() @@ -98,8 +99,9 @@ def is_last_stage(self, model_chunk_id: Optional[int] = None) -> bool: """ if self.is_interleave and model_chunk_id is None: model_chunk_id = self.model_chunk_id - assert self.is_interleave ^ (model_chunk_id is None), \ - "model_chunk_id must be specified when using interleaved pipeline" + assert self.is_interleave ^ ( + model_chunk_id is None + ), "model_chunk_id must be specified when using interleaved pipeline" if not self.is_interleave or model_chunk_id < 0: return self.stage == self.num_stages - 1 else: diff --git a/colossalai/shardformer/modeling/falcon.py b/colossalai/shardformer/modeling/falcon.py index a72391372d54..4e271dfe0fa2 100644 --- a/colossalai/shardformer/modeling/falcon.py +++ b/colossalai/shardformer/modeling/falcon.py @@ -1,16 +1,9 @@ - -import warnings from typing import List, Optional, Tuple, Union import torch import torch.distributed as dist -from colossalai.pipeline.stage_manager import PipelineStageManager from torch.distributed import ProcessGroup from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss -from torch.nn import functional as F - -from transformers.utils import logging - from transformers.modeling_outputs import ( BaseModelOutputWithPastAndCrossAttentions, CausalLMOutputWithCrossAttentions, @@ -18,18 +11,20 @@ SequenceClassifierOutputWithPast, TokenClassifierOutput, ) - from transformers.models.falcon.modeling_falcon import ( FalconForCausalLM, FalconForQuestionAnswering, FalconForSequenceClassification, FalconForTokenClassification, FalconModel, + build_alibi_tensor, ) -from transformers.models.falcon.modeling_falcon import build_alibi_tensor +from transformers.utils import logging +from colossalai.pipeline.stage_manager import PipelineStageManager from colossalai.shardformer.shard import ShardConfig + def build_falcon_alibi_tensor_fn(process_group: ProcessGroup) -> torch.Tensor: def build_falcon_alibi_tensor( self, attention_mask: torch.Tensor, num_heads: int, dtype: torch.dtype @@ -98,7 +93,7 @@ def build_falcon_alibi_tensor( def get_tp_falcon_decoder_layer_forward(): from transformers.models.falcon.modeling_falcon import FalconDecoderLayer, dropout_add - + def forward( self: FalconDecoderLayer, hidden_states: torch.Tensor, @@ -155,16 +150,16 @@ def forward( outputs = (output,) + outputs[1:] return outputs # hidden_states, present, attentions - + return forward + def get_falcon_flash_attention_forward(): try: from xformers.ops import memory_efficient_attention as me_attention except: raise ImportError("Error: xformers module is not installed. Please install it to use flash attention.") from transformers.models.falcon.modeling_falcon import FalconAttention - from colossalai.kernel.cuda_native import AttnMaskType, ColoAttention def forward( self: FalconAttention, @@ -191,11 +186,9 @@ def forward( ) value_layer = value_layer.transpose(1, 2).reshape(batch_size * num_kv_heads, query_length, self.head_dim) - past_kv_length = 0 if layer_past is None else layer_past[0].shape[1] query_layer, key_layer = self.maybe_rotary(query_layer, key_layer, past_kv_length) - if layer_past is not None: past_key, past_value = layer_past # concatenate along seq_length dimension: @@ -220,12 +213,10 @@ def forward( attention_mask_float = ( attention_mask_float + alibi.view(batch_size, self.num_heads, 1, kv_length) * self.beta ) - + batch_size, src_len = query_layer_.size()[0], query_layer_.size()[1] tgt_len = key_layer_.size()[1] - attention_mask_float = attention_mask_float.expand( - batch_size, self.num_heads, src_len, tgt_len - ).contiguous() + attention_mask_float = attention_mask_float.expand(batch_size, self.num_heads, src_len, tgt_len).contiguous() context_layer = me_attention( query_layer_, key_layer_, @@ -236,7 +227,7 @@ def forward( ) batch_size, seq_length, _, _ = context_layer.shape context_layer = context_layer.reshape(batch_size, seq_length, -1) - + output_tensor = self.dense(context_layer) return output_tensor, present @@ -280,7 +271,7 @@ def falcon_model_forward( if past_key_values is not None: logger.warning_once("past_key_values is not supported for pipeline models at the moment.") past_key_values = None - + return_dict = return_dict if return_dict is not None else self.config.use_return_dict if past_key_values is None: @@ -394,10 +385,11 @@ def custom_forward(*inputs): if presents is not None: presents = self._convert_cache_to_standard_format(presents, batch_size) - if stage_manager.is_last_stage(): if not return_dict: - return tuple(v for v in [hidden_states, presents, all_hidden_states, all_self_attentions] if v is not None) + return tuple( + v for v in [hidden_states, presents, all_hidden_states, all_self_attentions] if v is not None + ) return BaseModelOutputWithPastAndCrossAttentions( last_hidden_state=hidden_states, past_key_values=presents, @@ -407,7 +399,6 @@ def custom_forward(*inputs): else: # always return dict for imediate stage return {"hidden_states": hidden_states} - @staticmethod def falcon_for_causal_lm_forward( @@ -434,7 +425,7 @@ def falcon_for_causal_lm_forward( are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]` """ logger = logging.get_logger(__name__) - + return_dict = return_dict if return_dict is not None else self.config.use_return_dict if output_attentions: @@ -489,11 +480,10 @@ def falcon_for_causal_lm_forward( hidden_states=transformer_outputs.hidden_states, attentions=transformer_outputs.attentions, ) - + else: hidden_states = transformer_outputs.get("hidden_states") return {"hidden_states": hidden_states} - @staticmethod def falcon_for_sequence_classification_forward( @@ -552,7 +542,7 @@ def falcon_for_sequence_classification_forward( batch_size = hidden_states.shape[0] hidden_states = transformer_outputs[0] logits = self.score(hidden_states) - + if self.config.pad_token_id is None and batch_size != 1: raise ValueError("Cannot handle batch sizes > 1 if no padding token is defined.") if self.config.pad_token_id is None: @@ -605,7 +595,6 @@ def falcon_for_sequence_classification_forward( else: hidden_states = transformer_outputs.get("hidden_states") return {"hidden_states": hidden_states} - @staticmethod def falcon_for_token_classification_forward( @@ -684,11 +673,11 @@ def falcon_for_token_classification_forward( hidden_states=transformer_outputs.hidden_states, attentions=transformer_outputs.attentions, ) - + else: hidden_states = transformer_outputs.get("hidden_states") return {"hidden_states": hidden_states} - + @staticmethod def falcon_for_question_answering_forward( self: FalconForQuestionAnswering, @@ -780,4 +769,4 @@ def falcon_for_question_answering_forward( ) else: hidden_states = outputs.get("hidden_states") - return {"hidden_states": hidden_states} \ No newline at end of file + return {"hidden_states": hidden_states} diff --git a/colossalai/shardformer/policies/bert.py b/colossalai/shardformer/policies/bert.py index 39fd437521c4..dc7f6e9df5e0 100644 --- a/colossalai/shardformer/policies/bert.py +++ b/colossalai/shardformer/policies/bert.py @@ -257,21 +257,16 @@ def set_pipeline_forward(self, model_cls: nn.Module, new_forward: Callable, poli if stage_manager.is_interleave: layers_per_stage = self.distribute_layers( - len(module.encoder.layer), - stage_manager.num_stages * stage_manager.num_model_chunks + len(module.encoder.layer), stage_manager.num_stages * stage_manager.num_model_chunks ) stage_manager.stage_indices = Policy.get_stage_index( layers_per_stage, stage_manager.stage, num_model_chunks=stage_manager.num_model_chunks, - num_stages=stage_manager.num_stages + num_stages=stage_manager.num_stages, ) method_replacement = { - "forward": partial( - new_forward, - stage_manager=stage_manager, - shard_config=self.shard_config - ) + "forward": partial(new_forward, stage_manager=stage_manager, shard_config=self.shard_config) } else: @@ -279,16 +274,11 @@ def set_pipeline_forward(self, model_cls: nn.Module, new_forward: Callable, poli stage_index = Policy.get_stage_index(layers_per_stage, stage_manager.stage) method_replacement = { "forward": partial( - new_forward, - stage_manager=stage_manager, - stage_index=stage_index, - shard_config=self.shard_config + new_forward, stage_manager=stage_manager, stage_index=stage_index, shard_config=self.shard_config ) } - self.append_or_create_method_replacement( - description=method_replacement, policy=policy, target_key=model_cls - ) + self.append_or_create_method_replacement(description=method_replacement, policy=policy, target_key=model_cls) def get_held_layers(self) -> List[Module]: """Get pipeline layers for current stage.""" @@ -304,14 +294,13 @@ def get_held_layers(self) -> List[Module]: if stage_manager.is_interleave: assert stage_manager.num_model_chunks is not None layers_per_stage = self.distribute_layers( - len(module.encoder.layer), - stage_manager.num_stages * stage_manager.num_model_chunks + len(module.encoder.layer), stage_manager.num_stages * stage_manager.num_model_chunks ) stage_indices = Policy.get_stage_index( layers_per_stage, stage_manager.stage, num_model_chunks=stage_manager.num_model_chunks, - num_stages=stage_manager.num_stages + num_stages=stage_manager.num_stages, ) if stage_manager.is_first_stage(-1): held_layers.append(module.embeddings) @@ -518,8 +507,7 @@ def get_held_layers(self) -> List[Module]: """ held_layers = super().get_held_layers() stage_manager = self.pipeline_stage_manager - if stage_manager.is_last_stage( - None if not stage_manager.is_interleave else -1): + if stage_manager.is_last_stage(None if not stage_manager.is_interleave else -1): held_layers.append(self.model.dropout) held_layers.append(self.model.classifier) return held_layers diff --git a/colossalai/shardformer/policies/falcon.py b/colossalai/shardformer/policies/falcon.py index 387c01b670ea..0c0c6ed6d68f 100644 --- a/colossalai/shardformer/policies/falcon.py +++ b/colossalai/shardformer/policies/falcon.py @@ -7,16 +7,16 @@ import colossalai.shardformer.layer as col_nn -from .base_policy import ModulePolicyDescription, Policy, SubModuleReplacementDescription from ..modeling.falcon import ( FalconPipelineForwards, - build_falcon_alibi_tensor_fn, + build_falcon_alibi_tensor_fn, + get_falcon_flash_attention_forward, get_tp_falcon_decoder_layer_forward, - get_falcon_flash_attention_forward ) -__all__ = [ - "FalconPolicy" -] +from .base_policy import ModulePolicyDescription, Policy, SubModuleReplacementDescription + +__all__ = ["FalconPolicy"] + class FalconPolicy(Policy): def config_sanity_check(self): @@ -34,35 +34,37 @@ def preprocess(self): new_vocab_size = vocab_size + world_size - vocab_size % world_size self.model.resize_token_embeddings(new_vocab_size) return self.model - + def module_policy(self): - from transformers.models.falcon.modeling_falcon import FalconModel, FalconDecoderLayer, FalconAttention + from transformers.models.falcon.modeling_falcon import FalconAttention, FalconDecoderLayer, FalconModel if not self.model.config.new_decoder_architecture and self.model.config.multi_query: - warnings.warn("Falcon dosen't support tensor parallelism when (not new_decoder_architecture and multi_query) is True, will ignore the tensor parallelism flag.") + warnings.warn( + "Falcon dosen't support tensor parallelism when (not new_decoder_architecture and multi_query) is True, will ignore the tensor parallelism flag." + ) self.shard_config.enable_tensor_parallelism = False + if self.shard_config.enable_sequence_parallelism: + self.shard_config.enable_sequence_parallelism = False + warnings.warn("Falcon doesn't support sequence parallelism now, will ignore the sequence parallelism flag.") + policy = {} if self.shard_config.enable_tensor_parallelism: - attn_attribute_replacement={ - "self_attention.hidden_size": self.model.config.hidden_size - // self.shard_config.tensor_parallel_size, - "self_attention.split_size": self.model.config.hidden_size + attn_attribute_replacement = { + "self_attention.hidden_size": self.model.config.hidden_size // self.shard_config.tensor_parallel_size, + "self_attention.split_size": self.model.config.hidden_size // self.shard_config.tensor_parallel_size, + "self_attention.num_heads": self.model.config.num_attention_heads // self.shard_config.tensor_parallel_size, - "self_attention.num_heads": self.model.config.num_attention_heads // self.shard_config.tensor_parallel_size, "self_attention.num_kv_heads": self.model.config.num_kv_heads // self.shard_config.tensor_parallel_size, } - + policy[FalconDecoderLayer] = ModulePolicyDescription( attribute_replacement=attn_attribute_replacement, - method_replacement={ - "forward": get_tp_falcon_decoder_layer_forward() - }, + method_replacement={"forward": get_tp_falcon_decoder_layer_forward()}, sub_module_replacement=[ SubModuleReplacementDescription( suffix="self_attention.query_key_value", target_module=col_nn.Linear1D_Col, - ), SubModuleReplacementDescription( suffix="self_attention.dense", @@ -76,13 +78,9 @@ def module_policy(self): suffix="mlp.dense_h_to_4h", target_module=col_nn.Linear1D_Col, ), - SubModuleReplacementDescription( - suffix="mlp.dense_4h_to_h", - target_module=col_nn.Linear1D_Row - ), - ] + SubModuleReplacementDescription(suffix="mlp.dense_4h_to_h", target_module=col_nn.Linear1D_Row), + ], ) - policy[FalconModel] = ModulePolicyDescription( attribute_replacement={ @@ -117,24 +115,16 @@ def module_policy(self): self.append_or_create_submodule_replacement( description=[ SubModuleReplacementDescription( - suffix="ln_attn", - target_module=col_nn.FusedLayerNorm, - ignore_if_not_exist=True + suffix="ln_attn", target_module=col_nn.FusedLayerNorm, ignore_if_not_exist=True ), SubModuleReplacementDescription( - suffix="ln_mlp", - target_module=col_nn.FusedLayerNorm, - ignore_if_not_exist=True + suffix="ln_mlp", target_module=col_nn.FusedLayerNorm, ignore_if_not_exist=True ), SubModuleReplacementDescription( - suffix="input_layernorm", - target_module=col_nn.FusedLayerNorm, - ignore_if_not_exist=True + suffix="input_layernorm", target_module=col_nn.FusedLayerNorm, ignore_if_not_exist=True ), SubModuleReplacementDescription( - suffix="post_attention_layernorm", - target_module=col_nn.FusedLayerNorm, - ignore_if_not_exist=True + suffix="post_attention_layernorm", target_module=col_nn.FusedLayerNorm, ignore_if_not_exist=True ), ], policy=policy, @@ -143,9 +133,7 @@ def module_policy(self): if self.shard_config.enable_flash_attention: self.append_or_create_method_replacement( - description={ - "forward": get_falcon_flash_attention_forward() - }, + description={"forward": get_falcon_flash_attention_forward()}, policy=policy, target_key=FalconAttention, ) @@ -154,7 +142,6 @@ def module_policy(self): def postprocess(self): return self.model - def set_pipeline_forward(self, model_cls: nn.Module, new_forward: Callable, policy: Dict) -> None: """If under pipeline parallel setting, replacing the original forward method of huggingface to customized forward method, and add this changing to policy.""" @@ -204,6 +191,7 @@ def module_policy(self): policy = super().module_policy() from transformers.models.falcon.modeling_falcon import FalconModel + if self.pipeline_stage_manager: self.set_pipeline_forward( model_cls=FalconModel, new_forward=FalconPipelineForwards.falcon_model_forward, policy=policy @@ -220,7 +208,8 @@ def get_held_layers(self) -> List[Module]: def get_shared_params(self) -> List[Dict[int, Tensor]]: """no shared params in falcon model""" return [] - + + class FalconForCausalLMPolicy(FalconPolicy): def __init__(self) -> None: super().__init__() @@ -241,10 +230,12 @@ def module_policy(self): ) if self.pipeline_stage_manager: self.set_pipeline_forward( - model_cls=FalconForCausalLM, new_forward=FalconPipelineForwards.falcon_for_causal_lm_forward, policy=policy + model_cls=FalconForCausalLM, + new_forward=FalconPipelineForwards.falcon_for_causal_lm_forward, + policy=policy, ) return policy - + def get_held_layers(self) -> List[Module]: """Get pipeline layers for current stage.""" stage_manager = self.pipeline_stage_manager @@ -252,7 +243,7 @@ def get_held_layers(self) -> List[Module]: if stage_manager.is_last_stage(): held_layers.append(self.model.lm_head) return held_layers - + def get_shared_params(self) -> List[Dict[int, Tensor]]: falcon_model = self.model if self.pipeline_stage_manager and self.pipeline_stage_manager.num_stages > 1: @@ -265,7 +256,8 @@ def get_shared_params(self) -> List[Dict[int, Tensor]]: } ] return [] - + + class FalconForSequenceClassificationPolicy(FalconPolicy): def __init__(self) -> None: super().__init__() @@ -292,7 +284,7 @@ def module_policy(self): policy=policy, ) return policy - + def get_held_layers(self) -> List[Module]: """Get pipeline layers for current stage.""" stage_manager = self.pipeline_stage_manager @@ -300,11 +292,12 @@ def get_held_layers(self) -> List[Module]: if stage_manager.is_last_stage(): held_layers.append(self.model.score) return held_layers - + def get_shared_params(self) -> List[Dict[int, Tensor]]: """No shared params in falcon for sequence classification model""" return [] - + + class FalconForTokenClassificationPolicy(FalconPolicy): def __init__(self) -> None: super().__init__() @@ -319,12 +312,12 @@ def module_policy(self): self.append_or_create_submodule_replacement( description=[ SubModuleReplacementDescription( - suffix="classifier", target_module=col_nn.Linear1D_Col, kwargs=dict(gather_output=True) + suffix="classifier", target_module=col_nn.Linear1D_Col, kwargs=dict(gather_output=True) ), SubModuleReplacementDescription( suffix="dropout", target_module=col_nn.DropoutForReplicatedInput, - ) + ), ], policy=policy, target_key=FalconForTokenClassification, @@ -336,7 +329,7 @@ def module_policy(self): policy=policy, ) return policy - + def get_held_layers(self) -> List[Module]: """Get pipeline layers for current stage.""" stage_manager = self.pipeline_stage_manager @@ -345,11 +338,12 @@ def get_held_layers(self) -> List[Module]: held_layers.append(self.model.dropout) held_layers.append(self.model.classifier) return held_layers - + def get_shared_params(self) -> List[Dict[int, Tensor]]: """No shared params in falcon for token classification model""" return [] - + + class FalconForQuestionAnsweringPolicy(FalconPolicy): def __init__(self) -> None: super().__init__() @@ -375,7 +369,7 @@ def module_policy(self): policy=policy, ) return policy - + def get_held_layers(self) -> List[Module]: """Get pipeline layers for current stage.""" held_layers = super().get_held_layers() @@ -383,7 +377,7 @@ def get_held_layers(self) -> List[Module]: if stage_manager.is_last_stage(): held_layers.append(self.model.qa_outputs) return held_layers - + def get_shared_params(self) -> List[Dict[int, Tensor]]: """No shared params in falcon for question answering model""" - return [] \ No newline at end of file + return [] diff --git a/colossalai/shardformer/policies/gptj.py b/colossalai/shardformer/policies/gptj.py index 343df4e09777..fe52b00fbebe 100644 --- a/colossalai/shardformer/policies/gptj.py +++ b/colossalai/shardformer/policies/gptj.py @@ -1,3 +1,4 @@ +import warnings from functools import partial from typing import Callable, Dict, List @@ -5,7 +6,7 @@ import colossalai.shardformer.layer as col_nn -from ..modeling.gptj import GPTJPipelineForwards, get_gptj_flash_attention_forward, gptj_sequence_parallel_forward_fn +from ..modeling.gptj import GPTJPipelineForwards, get_gptj_flash_attention_forward from .base_policy import ModulePolicyDescription, Policy, SubModuleReplacementDescription __all__ = [ @@ -40,7 +41,11 @@ def module_policy(self): from transformers.models.gptj.modeling_gptj import GPTJAttention, GPTJBlock, GPTJModel policy = {} + if self.shard_config.enable_sequence_parallelism: + self.shard_config.enable_sequence_parallelism = False + warnings.warn("GPTJ doesn't support sequence parallelism now, will ignore the sequence parallelism flag.") use_sequence_parallel = self.shard_config.enable_sequence_parallelism + overlap = self.shard_config.enable_sequence_overlap if self.shard_config.enable_tensor_parallelism: policy[GPTJModel] = ModulePolicyDescription( @@ -139,9 +144,6 @@ def module_policy(self): target_key=GPTJAttention, ) - if self.shard_config.enable_sequence_parallelism: - policy[GPTJModel].method_replacement = {"forward": gptj_sequence_parallel_forward_fn(self.shard_config)} - return policy def postprocess(self): diff --git a/examples/language/bert/data.py b/examples/language/bert/data.py index af87029d0986..31c6937ee16c 100644 --- a/examples/language/bert/data.py +++ b/examples/language/bert/data.py @@ -91,7 +91,9 @@ def val_dataloader(self): # TODO: drop_last is set to True for now to avoid error when using PP # as the last batch may not be divisible by the number of microbatches if len(self.eval_splits) == 1: - return self.plugin.prepare_dataloader(self.dataset["validation"], batch_size=self.eval_batch_size, drop_last=True) + return self.plugin.prepare_dataloader( + self.dataset["validation"], batch_size=self.eval_batch_size, drop_last=True + ) elif len(self.eval_splits) > 1: return [ self.plugin.prepare_dataloader(self.dataset[x], batch_size=self.eval_batch_size, drop_last=True) diff --git a/examples/language/bert/finetune.py b/examples/language/bert/finetune.py index 48b294dc3f7b..b349d7edfdd8 100644 --- a/examples/language/bert/finetune.py +++ b/examples/language/bert/finetune.py @@ -58,7 +58,8 @@ def evaluate_model( def evaluate_subset(dataloader: DataLoader): use_pipeline = isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1 is_pp_last_device = use_pipeline and booster.plugin.stage_manager.is_last_stage( - None if not booster.plugin.stage_manager.is_interleave else -1) + None if not booster.plugin.stage_manager.is_interleave else -1 + ) accum_loss = torch.zeros(1, device=get_current_device()) for batch in dataloader: @@ -136,7 +137,8 @@ def train_epoch( ): use_pipeline = isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1 is_pp_last_device = use_pipeline and booster.plugin.stage_manager.is_last_stage( - None if not booster.plugin.stage_manager.is_interleave else -1) + None if not booster.plugin.stage_manager.is_interleave else -1 + ) print_flag = (not use_pipeline and coordinator.is_master()) or (use_pipeline and is_pp_last_device) total_step = len(train_dataloader) diff --git a/tests/kit/model_zoo/transformers/__init__.py b/tests/kit/model_zoo/transformers/__init__.py index 87c6294501b2..d8d2c14b65d2 100644 --- a/tests/kit/model_zoo/transformers/__init__.py +++ b/tests/kit/model_zoo/transformers/__init__.py @@ -3,6 +3,7 @@ from .blip2 import * from .bloom import * from .chatglm2 import * +from .falcon import * from .gpt import * from .gptj import * from .llama import * @@ -11,4 +12,3 @@ from .t5 import * from .vit import * from .whisper import * -from .falcon import * diff --git a/tests/kit/model_zoo/transformers/falcon.py b/tests/kit/model_zoo/transformers/falcon.py index 64e4ffe07004..d28d44634cd2 100644 --- a/tests/kit/model_zoo/transformers/falcon.py +++ b/tests/kit/model_zoo/transformers/falcon.py @@ -16,7 +16,7 @@ def data_gen(): # tokenized_input = tokenizer(input, return_tensors='pt') # input_ids = tokenized_input['input_ids'] # attention_mask = tokenized_input['attention_mask'] - input_ids = torch.tensor([[15496, 11, 616, 3290, 318, 13779, 318, 13779]], dtype=torch.int64) + input_ids = torch.tensor([[15496, 11, 616, 3290, 318, 13779, 318, 13779]], dtype=torch.int64) attention_mask = torch.tensor([[1, 1, 1, 1, 1, 1, 1, 1]], dtype=torch.int64) return dict(input_ids=input_ids, attention_mask=attention_mask) @@ -36,6 +36,7 @@ def data_gen_for_token_classification(): data["labels"] = torch.tensor([[0, 0, 0, 0, 0, 0, 0, 0]], dtype=torch.int64) return data + def data_gen_for_sequence_classification(): # sequence classification data gen data = data_gen() @@ -44,7 +45,6 @@ def data_gen_for_sequence_classification(): def data_gen_for_question_answering(): - input_ids = torch.tensor( [[57647, 1620, 23967, 620, 107373, 34, 91514, 620, 107373, 1620, 267, 35378, 48946, 18161, 48946, 18161]], dtype=torch.int64, @@ -56,6 +56,7 @@ def data_gen_for_question_answering(): input_ids=input_ids, attention_mask=attention_mask, start_positions=start_positions, end_positions=end_positions ) + # define output transform function output_transform_fn = lambda x: x @@ -68,15 +69,15 @@ def data_gen_for_question_answering(): loss_fn_for_question_answering = lambda x: x.loss config = transformers.FalconConfig( - num_hidden_layers=2, - num_attention_heads=4, - vocab_size=250880, - hidden_dropout=0, - attention_dropout=0, - hidden_size=64, + num_hidden_layers=2, + num_attention_heads=4, + vocab_size=250880, + hidden_dropout=0, + attention_dropout=0, + hidden_size=64, multi_query=False, - new_decoder_architecture=True, - pad_token_id = -1 + new_decoder_architecture=True, + pad_token_id=-1, ) model_zoo.register( diff --git a/tests/test_booster/test_plugin/test_gemini_plugin.py b/tests/test_booster/test_plugin/test_gemini_plugin.py index 618d3d7c9a54..d26c09cf70b7 100644 --- a/tests/test_booster/test_plugin/test_gemini_plugin.py +++ b/tests/test_booster/test_plugin/test_gemini_plugin.py @@ -96,11 +96,11 @@ def check_gemini_plugin(subset: str, init_method: str = "none", early_stop: bool "transformers_sam", "transformers_vit", "transformers_gpt_double_heads", # TODO check why does the model fail to run using Gemini - "transformers_falcon", #TODO check why falcon fails to run Gemini + "transformers_falcon", # TODO check why falcon fails to run Gemini "transformers_falcon_for_causal_lm", "transformers_falcon_for_sequence_classification", "transformers_falcon_for_token_classification", - "transformers_falcon_for_question_answering" + "transformers_falcon_for_question_answering", ]: continue diff --git a/tests/test_pipeline/test_schedule/test_interleaved.py b/tests/test_pipeline/test_schedule/test_interleaved.py index 5034335ec9e6..4de50245feeb 100644 --- a/tests/test_pipeline/test_schedule/test_interleaved.py +++ b/tests/test_pipeline/test_schedule/test_interleaved.py @@ -22,9 +22,7 @@ class MlpModel(nn.Module): def __init__(self): super().__init__() - self.layers = nn.ModuleList( - [nn.Linear(DIM, DIM) for _ in range(NUM_LAYER)] - ) + self.layers = nn.ModuleList([nn.Linear(DIM, DIM) for _ in range(NUM_LAYER)]) def forward(self, x): for layer in self.layers: @@ -59,13 +57,7 @@ def run_pp( This test is to examine the correctness of interleaved 1F1B, compared with torch. Be aware it contains some hardcodes. """ - colossalai.launch( - config=dict(), - rank=rank, - world_size=world_size, - port=port, - host="localhost" - ) + colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host="localhost") # create model seed_all(1453) @@ -74,10 +66,7 @@ def run_pp( pg_mesh = ProcessGroupMesh(world_size) stage_manager = PipelineStageManager( - pg_mesh, - pipeline_axis=0, - enable_interleave=True, - num_model_chunks=num_model_chunk + pg_mesh, pipeline_axis=0, enable_interleave=True, num_model_chunks=num_model_chunk ) schedule = InterleavedSchedule( stage_manager=stage_manager, @@ -90,11 +79,7 @@ def run_pp( if idx % world_size == rank: sub_model._forward = sub_model.forward sub_model.forward = MethodType( - partial( - pp_linear_fwd, - stage_mgr=stage_manager, - model_chunk_id=len(sharded_model) - ), + partial(pp_linear_fwd, stage_mgr=stage_manager, model_chunk_id=len(sharded_model)), sub_model._forward, ) sharded_model.append(sub_model.cuda()) @@ -109,7 +94,8 @@ def run_pp( input_list = [torch.rand(batch_size, DIM).cuda()] dist.all_reduce(input_list[0]) - def criterion(x, *args, **kwargs): return (x * x).mean() + def criterion(x, *args, **kwargs): + return (x * x).mean() # forward and backward torch_output = torch_model(input_list[0]) @@ -117,12 +103,7 @@ def criterion(x, *args, **kwargs): return (x * x).mean() torch_loss.backward() pp_ret = schedule.forward_backward_step( - sharded_model, - iter(input_list), - criterion, - pp_optimizer, - return_loss=True, - return_outputs=True + sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True ) # check loss @@ -132,14 +113,8 @@ def criterion(x, *args, **kwargs): return (x * x).mean() # check gradients for i in range(num_model_chunk): idx = world_size * i + rank - assert torch.allclose( - torch_model.layers[idx].weight.grad, - sharded_model[i].weight.grad - ) - assert torch.allclose( - torch_model.layers[idx].bias.grad, - sharded_model[i].bias.grad - ) + assert torch.allclose(torch_model.layers[idx].weight.grad, sharded_model[i].weight.grad) + assert torch.allclose(torch_model.layers[idx].bias.grad, sharded_model[i].bias.grad) # step torch_optimizer.step() @@ -148,14 +123,8 @@ def criterion(x, *args, **kwargs): return (x * x).mean() # check updated param for i in range(num_model_chunk): idx = world_size * i + rank - assert torch.allclose( - torch_model.layers[idx].weight, - sharded_model[i].weight - ) - assert torch.allclose( - torch_model.layers[idx].bias, - sharded_model[i].bias - ) + assert torch.allclose(torch_model.layers[idx].weight, sharded_model[i].weight) + assert torch.allclose(torch_model.layers[idx].bias, sharded_model[i].bias) @pytest.mark.dist @@ -170,7 +139,7 @@ def test_pp(num_microbatch: int, batch_size: int, num_model_chunk: int): nprocs=NUM_LAYER // num_model_chunk, num_microbatch=num_microbatch, batch_size=batch_size, - num_model_chunk=num_model_chunk + num_model_chunk=num_model_chunk, ) diff --git a/tests/test_shardformer/test_model/test_shard_falcon.py b/tests/test_shardformer/test_model/test_shard_falcon.py index 39a0aa72a61e..9630451799c0 100644 --- a/tests/test_shardformer/test_model/test_shard_falcon.py +++ b/tests/test_shardformer/test_model/test_shard_falcon.py @@ -18,6 +18,7 @@ unwrap_model, ) + def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, test_config): org_model, org_optimizer, sharded_model, sharded_optimizer, criterion, booster = build_model_from_hybrid_plugin( model_fn, loss_fn, test_config @@ -80,6 +81,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, torch.cuda.empty_cache() + @parameterize( "test_config", [ @@ -100,20 +102,8 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, "use_lazy_init": False, "precision": "fp32", }, - { - "tp_size": 4, - "pp_size": 1, - "enable_all_optimization": True, - "use_lazy_init": False, - "precision": "fp32" - }, - { - "tp_size": 2, - "pp_size": 1, - "enable_all_optimization": True, - "use_lazy_init": False, - "precision": "fp32" - }, + {"tp_size": 4, "pp_size": 1, "enable_all_optimization": True, "use_lazy_init": False, "precision": "fp32"}, + {"tp_size": 2, "pp_size": 1, "enable_all_optimization": True, "use_lazy_init": False, "precision": "fp32"}, { "tp_size": 2, "pp_size": 1, @@ -145,6 +135,7 @@ def run_falcon_test(test_config): Randomizer.reset_index() torch.cuda.empty_cache() + @parameterize( "test_config", [ @@ -169,7 +160,6 @@ def run_falcon_test(test_config): }, ], ) - def run_falcon_3d_test(test_config): sub_model_zoo = model_zoo.get_sub_registry("transformers_falcon") @@ -186,6 +176,7 @@ def check_falcon(rank, world_size, port): colossalai.launch(config={}, rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl") run_falcon_test() + def check_falcon_3d(rank, world_size, port): disable_existing_loggers() colossalai.launch(config={}, rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl") @@ -205,6 +196,7 @@ def test_falcon(): def test_falcon_3d(): spawn(check_falcon_3d, 8) + if __name__ == "__main__": test_falcon() test_falcon_3d()