From 59c13a362b546fabdfe904de83f076c35d9eb745 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Fri, 11 Aug 2023 14:16:20 +0800 Subject: [PATCH 01/17] add pp stage manager as circle stage --- colossalai/pipeline/stage_manager.py | 42 ++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/colossalai/pipeline/stage_manager.py b/colossalai/pipeline/stage_manager.py index fe228e2270dd..82bb559c9f8e 100644 --- a/colossalai/pipeline/stage_manager.py +++ b/colossalai/pipeline/stage_manager.py @@ -13,6 +13,7 @@ class PipelineStageManager: Args: pg_mesh (ProcessGroupMesh): Process group mesh. pipeline_axis (int): The axis along which the pipeline is constructed. + circle_p2p (bool): Whether to use circle p2p communication, it will make the first and last stage communicate with each other. Attributes: num_stages (int): Number of stages in the pipeline. @@ -21,14 +22,36 @@ class PipelineStageManager: virtual_stage (int): The current virtual stage. """ - def __init__(self, pg_mesh: ProcessGroupMesh, pipeline_axis: int) -> None: + def __init__(self, pg_mesh: ProcessGroupMesh, pipeline_axis: int, circle_stage: bool = False) -> None: self.pg_mesh = pg_mesh self.pipeline_axis = pipeline_axis + self.circle_stage = circle_stage self.num_virtual_stages: Optional[int] = None self.virtual_stage: Optional[int] = None self.prev_rank: Optional[Tuple[int, ...]] = None self.next_rank: Optional[Tuple[int, ...]] = None self.p2p_groups: Dict[Tuple[int, int], ProcessGroup] = {} + + # init prev and next coord + if self.circle_stage: + self._circle_coord_init() + else: + self._none_circle_coord_init() + + # init p2p process groups + stages = list(range(self.num_stages)) + for prev, cur in zip(stages[:-1], stages[1:]): + group = self.pg_mesh.get_group_along_axis(self.pipeline_axis, [prev, cur]) + if self.stage in [prev, cur]: + ranks_in_group = self.pg_mesh.get_ranks_in_group(group) + self.p2p_groups[tuple(ranks_in_group)] = group + + if circle_stage and self.stage in [stages[0], stages[-1]]: + group = self.pg_mesh.get_group_along_axis(self.pipeline_axis, [stages[0], stages[-1]]) + ranks_in_group = self.pg_mesh.get_ranks_in_group(group) + self.p2p_groups[tuple(ranks_in_group)] = group + + def _none_circle_coord_init(self): # init prev and next coord coord = self.pg_mesh.coordinate() if self.stage > 0: @@ -40,13 +63,16 @@ def __init__(self, pg_mesh: ProcessGroupMesh, pipeline_axis: int) -> None: (coord[self.pipeline_axis] + 1,) + coord[self.pipeline_axis + 1:] self.next_rank = self.pg_mesh.ravel(next_coord, self.pg_mesh.shape) - # init p2p process groups - stages = list(range(self.num_stages)) - for prev, cur in zip(stages[:-1], stages[1:]): - group = self.pg_mesh.get_group_along_axis(self.pipeline_axis, [prev, cur]) - if self.stage in [prev, cur]: - ranks_in_group = self.pg_mesh.get_ranks_in_group(group) - self.p2p_groups[tuple(ranks_in_group)] = group + def _circle_coord_init(self): + # init prev and next coord cyclically + coord = self.pg_mesh.coordinate() + prev_coord = coord[: self.pipeline_axis] + \ + ((coord[self.pipeline_axis] + self.num_stages - 1 )% self.num_stages,) + coord[self.pipeline_axis + 1:] + self.prev_rank = self.pg_mesh.ravel(prev_coord, self.pg_mesh.shape) + + next_coord = coord[: self.pipeline_axis] + \ + ((coord[self.pipeline_axis] + self.num_stages + 1 )% self.num_stages,) + coord[self.pipeline_axis + 1:] + self.next_rank = self.pg_mesh.ravel(next_coord, self.pg_mesh.shape) def is_first_stage(self, virtual: bool = False) -> bool: """Is the current stage the first stage. From 56cabee68e8b9dbc56de22efc9478e8869613004 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Fri, 11 Aug 2023 16:24:26 +0800 Subject: [PATCH 02/17] fix a bug when create process group --- colossalai/pipeline/stage_manager.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/colossalai/pipeline/stage_manager.py b/colossalai/pipeline/stage_manager.py index 82bb559c9f8e..6a4e97006b43 100644 --- a/colossalai/pipeline/stage_manager.py +++ b/colossalai/pipeline/stage_manager.py @@ -46,10 +46,11 @@ def __init__(self, pg_mesh: ProcessGroupMesh, pipeline_axis: int, circle_stage: ranks_in_group = self.pg_mesh.get_ranks_in_group(group) self.p2p_groups[tuple(ranks_in_group)] = group - if circle_stage and self.stage in [stages[0], stages[-1]]: + if self.circle_stage: group = self.pg_mesh.get_group_along_axis(self.pipeline_axis, [stages[0], stages[-1]]) - ranks_in_group = self.pg_mesh.get_ranks_in_group(group) - self.p2p_groups[tuple(ranks_in_group)] = group + if self.stage in [stages[0], stages[-1]]: + ranks_in_group = self.pg_mesh.get_ranks_in_group(group) + self.p2p_groups[tuple(ranks_in_group)] = group def _none_circle_coord_init(self): # init prev and next coord From 110cf723f3d575b33e4671de6c6b0db136c7347b Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Fri, 11 Aug 2023 18:07:21 +0800 Subject: [PATCH 03/17] add ppinfer basic framework --- colossalai/ppinference/__init__.py | 4 ++ colossalai/ppinference/engine.py | 64 ++++++++++++++++++++++ colossalai/ppinference/inference_config.py | 31 +++++++++++ colossalai/ppinference/utils.py | 35 ++++++++++++ 4 files changed, 134 insertions(+) create mode 100644 colossalai/ppinference/__init__.py create mode 100644 colossalai/ppinference/engine.py create mode 100644 colossalai/ppinference/inference_config.py create mode 100644 colossalai/ppinference/utils.py diff --git a/colossalai/ppinference/__init__.py b/colossalai/ppinference/__init__.py new file mode 100644 index 000000000000..95adf4865b81 --- /dev/null +++ b/colossalai/ppinference/__init__.py @@ -0,0 +1,4 @@ +from .engine import PPInferEngine +from .inference_config import InferenceConfig + +__all__ = ['PPInferEngine', 'InferenceConfig'] diff --git a/colossalai/ppinference/engine.py b/colossalai/ppinference/engine.py new file mode 100644 index 000000000000..6af82871f3e3 --- /dev/null +++ b/colossalai/ppinference/engine.py @@ -0,0 +1,64 @@ +from typing import List, Optional, Set + +import torch +import torch.distributed as dist +import torch.nn as nn + +from colossalai.cluster import ProcessGroupMesh +from colossalai.pipeline.stage_manager import PipelineStageManager +from colossalai.shardformer._utils import getattr_ + +from .inference_config import InferenceConfig +from .utils import get_suffix_name, set_tensors_to_none + + +class PPInferEngine: + + def __init__( + self, + gerneration_config: InferenceConfig, + model: nn.Module, + ) -> None: + self.gerneration_config = gerneration_config + self.model = model + self.pg_mesh = ProcessGroupMesh(gerneration_config.pp_size) + self.stage_manager = PipelineStageManager(self.pg_mesh, 0, True) + self._stage_to_module = {} + + self._partion_model() + + def _partion_model(self): + # get module list + self.module_list = self._recursive_partion(self.model, [], '') + + # allocate module to each stage + module_num = len(self.module_list) + stage_size = self.gerneration_config.pp_size + for stage in range(stage_size): + start = stage * (module_num // stage_size) + min(stage, module_num % stage_size) + end = start + (module_num // stage_size) + (1 if stage < module_num % stage_size else 0) + self._stage_to_module[stage] = self.module_list[start:end] + + # release layers dose not belong to current stage + self._release_unheld_layers() + + # load model to cuda + print(dist.get_rank(), torch.cuda.memory_allocated()) + self.model = self.model.cuda() + print(dist.get_rank(), torch.cuda.memory_allocated()) + + def _recursive_partion(self, module: nn.Module, module_list: List[str], suffix: str): + for name, child in module.named_children(): + suffix_name = get_suffix_name(suffix, name) + if child.__class__.__name__ in self.gerneration_config.stage_unit: + module_list.append(suffix_name) + else: + self._recursive_partion(child, module_list, suffix_name) + return module_list + + def _release_unheld_layers(self): + r""" + Release the unheld layers in the model + """ + held_layers = self._stage_to_module[self.stage_manager.stage] + set_tensors_to_none(self.model, include=set(self.module_list) - set(held_layers)) diff --git a/colossalai/ppinference/inference_config.py b/colossalai/ppinference/inference_config.py new file mode 100644 index 000000000000..c8ebe7dbc5c8 --- /dev/null +++ b/colossalai/ppinference/inference_config.py @@ -0,0 +1,31 @@ +from typing import List + +import torch + +__all__ = 'InferenceConfig' + + +class InferenceConfig: + ''' + InferenceConfig is a class that stores the configuration for inference. + + Args: + pp_size (int): the number of pipeline stages. + stage_unit (List[str]): the unit module name can be sliced as a stage, should be `nn.Module`. + target_length (int): the target length of the input sequence. + padding_token_id (int): the token id for padding. + + ''' + + def __init__( + self, + pp_size: int, + stage_unit: List[str], + target_length: int = 32, + padding_token_id: int = 0, + ): + assert isinstance(pp_size, int), f'pp_size must be an integer, got {type(pp_size)}' + self.pp_size = pp_size + self.stage_unit = stage_unit + self.target_length = target_length + self.padding_token_id = padding_token_id diff --git a/colossalai/ppinference/utils.py b/colossalai/ppinference/utils.py new file mode 100644 index 000000000000..1a6e8a519397 --- /dev/null +++ b/colossalai/ppinference/utils.py @@ -0,0 +1,35 @@ +from typing import List, Optional, Set + +import torch.nn as nn + +from colossalai.shardformer._utils import getattr_, setattr_ + + +def set_tensors_to_none(model: nn.Module, include: Set[str] = set()) -> None: + """ + Set all parameters and buffers of model to None + + Args: + model (nn.Module): The model to set + """ + for module_suffix in include: + set_module = getattr_(model, module_suffix) + for n, p in set_module.named_parameters(): + setattr_(set_module, n, None) + for n, buf in set_module.named_buffers(): + setattr_(set_module, n, None) + setattr_(model, module_suffix, None) + + +def get_suffix_name(suffix: str, name: str): + """ + Get the suffix name of the module, as `suffix.name` when name is string or `suffix[name]` when name is a digit, + and 'name' when `suffix` is empty. + + Args: + suffix (str): The suffix of the suffix module + name (str): The name of the current module + """ + point = '' if suffix is '' else '.' + suffix_name = suffix + f'[{name}]' if name.isdigit() else suffix + f'{point}{name}' + return suffix_name From 99b740c28ae6aa2cde9279c94d220e7793f067f2 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Mon, 14 Aug 2023 18:07:00 +0800 Subject: [PATCH 04/17] add micro batch manager and support kvcache-pp gpt2 fwd --- colossalai/ppinference/engine.py | 45 ++- colossalai/ppinference/microbatch_manager.py | 46 +++ colossalai/ppinference/modeling/gpt2.py | 307 +++++++++++++++++++ 3 files changed, 386 insertions(+), 12 deletions(-) create mode 100644 colossalai/ppinference/microbatch_manager.py create mode 100644 colossalai/ppinference/modeling/gpt2.py diff --git a/colossalai/ppinference/engine.py b/colossalai/ppinference/engine.py index 6af82871f3e3..89629c293e78 100644 --- a/colossalai/ppinference/engine.py +++ b/colossalai/ppinference/engine.py @@ -1,3 +1,5 @@ +from functools import partial +from types import MethodType from typing import List, Optional, Set import torch @@ -9,6 +11,8 @@ from colossalai.shardformer._utils import getattr_ from .inference_config import InferenceConfig +from .microbatch_manager import MicroBatchManager +from .modeling.gpt2 import GPT2PipelineForwards from .utils import get_suffix_name, set_tensors_to_none @@ -23,31 +27,45 @@ def __init__( self.model = model self.pg_mesh = ProcessGroupMesh(gerneration_config.pp_size) self.stage_manager = PipelineStageManager(self.pg_mesh, 0, True) - self._stage_to_module = {} + self.held_layer = None + self.mb_manager = MicroBatchManager(self.gerneration_config) self._partion_model() + self._inject_fwd() + + def inference(self, **kwargs): + output = self.model(**kwargs) + return output + + def _inject_fwd(self): + new_fwd = partial(GPT2PipelineForwards.gpt2_lmhead_model_forward, + stage_manager=self.stage_manager, + stage_index=[0, 1]) + bound_method = MethodType(new_fwd, self.model) + setattr(self.model, 'forward', bound_method) def _partion_model(self): # get module list - self.module_list = self._recursive_partion(self.model, [], '') + module_list = self._recursive_partion(self.model, [], '') # allocate module to each stage - module_num = len(self.module_list) + module_num = len(module_list) stage_size = self.gerneration_config.pp_size - for stage in range(stage_size): - start = stage * (module_num // stage_size) + min(stage, module_num % stage_size) - end = start + (module_num // stage_size) + (1 if stage < module_num % stage_size else 0) - self._stage_to_module[stage] = self.module_list[start:end] + stage = self.stage_manager.stage + start = stage * (module_num // stage_size) + min(stage, module_num % stage_size) + end = start + (module_num // stage_size) + (1 if stage < module_num % stage_size else 0) + self.held_layer = module_list[start:end] # release layers dose not belong to current stage - self._release_unheld_layers() + self._release_unheld_layers(module_list) # load model to cuda - print(dist.get_rank(), torch.cuda.memory_allocated()) self.model = self.model.cuda() print(dist.get_rank(), torch.cuda.memory_allocated()) def _recursive_partion(self, module: nn.Module, module_list: List[str], suffix: str): + if len(list(module.children())) == 0: + module_list.append(suffix) for name, child in module.named_children(): suffix_name = get_suffix_name(suffix, name) if child.__class__.__name__ in self.gerneration_config.stage_unit: @@ -56,9 +74,12 @@ def _recursive_partion(self, module: nn.Module, module_list: List[str], suffix: self._recursive_partion(child, module_list, suffix_name) return module_list - def _release_unheld_layers(self): + def _partion_batch(self): + pass + + def _release_unheld_layers(self, module_list: List[str]): r""" Release the unheld layers in the model """ - held_layers = self._stage_to_module[self.stage_manager.stage] - set_tensors_to_none(self.model, include=set(self.module_list) - set(held_layers)) + held_layers = self.held_layer + set_tensors_to_none(self.model, include=set(module_list) - set(held_layers)) diff --git a/colossalai/ppinference/microbatch_manager.py b/colossalai/ppinference/microbatch_manager.py new file mode 100644 index 000000000000..e4f7c42607c4 --- /dev/null +++ b/colossalai/ppinference/microbatch_manager.py @@ -0,0 +1,46 @@ +from .inference_config import InferenceConfig + +__all__ = 'MicroBatchManager' + + +class MicroBatchManager(): + + def __init__( + self, + pp_inference_config: InferenceConfig, + ): + self.pp_inference_config = pp_inference_config + self.mb_to_kvcache = self._init_kvcache() + self.cur_mb = 0 + + def step(self, present_kv=None): + self._update_kvcahe(present_kv) + self.cur_mb = self.next_mb + + def _init_kvcache(self): + mb_to_kvcache = {i: () for i in range(self.pp_inference_config.pp_size)} + return mb_to_kvcache + + def _update_kvcahe(self, present_kv): + self.mb_to_kvcache[self.cur_mb] += (present_kv,) + + if self.mb_to_kvcache[self.cur_mb][0][0][-2] == self.pp_inference_config.target_length: + self.mb_to_kvcache.pop(self.cur_mb) + + @property + def is_done(self): + return len(self.mb_to_kvcache) == 0 + + @property + def next_mb(self): + if self.is_done: + return None + + nxt_mb = (self.cur_mb + 1) % self.pp_inference_config.pp_size + while nxt_mb % self.pp_inference_config.pp_size not in self.mb_to_kvcache: + nxt_mb = (nxt_mb + 1) % self.pp_inference_config.pp_size + return nxt_mb + + @property + def cur_kvcache(self): + return self.mb_to_kvcache[self.cur_mb] diff --git a/colossalai/ppinference/modeling/gpt2.py b/colossalai/ppinference/modeling/gpt2.py new file mode 100644 index 000000000000..f8fe2bc42d19 --- /dev/null +++ b/colossalai/ppinference/modeling/gpt2.py @@ -0,0 +1,307 @@ +from typing import Dict, List, Optional, Tuple, Union + +import torch +from torch.nn import CrossEntropyLoss +from transformers.modeling_outputs import BaseModelOutputWithPastAndCrossAttentions, CausalLMOutputWithCrossAttentions +from transformers.models.gpt2.modeling_gpt2 import GPT2LMHeadModel, GPT2Model +from transformers.utils import logging + +from colossalai.pipeline.stage_manager import PipelineStageManager + +from ..microbatch_manager import MicroBatchManager + + +class GPT2PipelineForwards: + ''' + This class serves as a micro library for forward function substitution of GPT2 models + under pipeline setting. + ''' + + @staticmethod + def gpt2_model_forward( + self: GPT2Model, + input_ids: Optional[torch.LongTensor] = None, + past_key_values: Optional[Tuple[Tuple[torch.Tensor]]] = None, + attention_mask: Optional[torch.FloatTensor] = None, + token_type_ids: Optional[torch.LongTensor] = None, + position_ids: Optional[torch.LongTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + encoder_hidden_states: Optional[torch.Tensor] = None, + encoder_attention_mask: Optional[torch.FloatTensor] = None, + use_cache: Optional[bool] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + stage_manager: Optional[PipelineStageManager] = None, + hidden_states: Optional[torch.FloatTensor] = None, + stage_index: Optional[List[int]] = None) -> Union[Dict, Tuple, BaseModelOutputWithPastAndCrossAttentions]: + + # This function is modified on the basis of transformers.models.gpt2.modeling_gpt2.GPT2Model.forward. + # Please refer to original code of transformers for more details. + logger = logging.get_logger(__name__) + + # Preprocess passed in arguments + if output_attentions: + logger.warning_once('output_attentions=True is not supported for pipeline models at the moment.') + output_attentions = False + if output_hidden_states: + logger.warning_once('output_hidden_states=True is not supported for pipeline models at the moment.') + output_hidden_states = False + + use_cache = use_cache if use_cache is not None else self.config.use_cache + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + if past_key_values is None: + past_length = 0 + past_key_values = tuple([None] * len(self.h)) + else: + past_length = past_key_values[0][0].size(-2) + + if stage_manager.is_first_stage(): + if input_ids is not None and inputs_embeds is not None: + raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") + elif input_ids is not None: + input_shape = input_ids.size() + input_ids = input_ids.view(-1, input_shape[-1]) + batch_size = input_ids.shape[0] + elif inputs_embeds is not None: + input_shape = inputs_embeds.size()[:-1] + batch_size = inputs_embeds.shape[0] + else: + raise ValueError("You have to specify either input_ids or inputs_embeds") + + device = input_ids.device if input_ids is not None else inputs_embeds.device + if token_type_ids is not None: + token_type_ids = token_type_ids.view(-1, input_shape[-1]) + else: + if hidden_states is None: + raise ValueError("hidden_states shouldn't be None for stages other than the first stage.") + input_shape = hidden_states.size()[:-1] + batch_size, seq_length = input_shape[0], input_shape[1] + device = hidden_states.device + + # GPT2Attention mask. + if attention_mask is not None: + if batch_size <= 0: + raise ValueError("batch_size has to be defined and > 0") + attention_mask = attention_mask.view(batch_size, -1) + # We create a 3D attention mask from a 2D tensor mask. + # Sizes are [batch_size, 1, 1, to_seq_length] + # So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length] + # this attention mask is more simple than the triangular masking of causal attention + # used in OpenAI GPT, we just need to prepare the broadcast dimension here. + attention_mask = attention_mask[:, None, None, :] + + # Since attention_mask is 1.0 for positions we want to attend and 0.0 for + # masked positions, this operation will create a tensor which is 0.0 for + # positions we want to attend and the dtype's smallest value for masked positions. + # Since we are adding it to the raw scores before the softmax, this is + # effectively the same as removing these entirely. + attention_mask = attention_mask.to(dtype=self.dtype) # fp16 compatibility + attention_mask = (1.0 - attention_mask) * torch.finfo(self.dtype).min + + # If a 2D or 3D attention mask is provided for the cross-attention + # we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length] + if self.config.add_cross_attention and encoder_hidden_states is not None: + encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size() + encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length) + if encoder_attention_mask is None: + encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device) + encoder_attention_mask = self.invert_attention_mask(encoder_attention_mask) + else: + encoder_attention_mask = None + + # Prepare head mask if needed + # 1.0 in head_mask indicate we keep the head + # attention_probs has shape bsz x n_heads x N x N + # head_mask has shape n_layer x batch x n_heads x N x N + head_mask = self.get_head_mask(head_mask, self.config.n_layer) + + if stage_manager.is_first_stage(): + if position_ids is not None: + position_ids = position_ids.view(-1, input_shape[-1]) + else: + position_ids = torch.arange(past_length, input_shape[-1] + past_length, dtype=torch.long, device=device) + position_ids = position_ids.unsqueeze(0).view(-1, input_shape[-1]) + + if inputs_embeds is None: + inputs_embeds = self.wte(input_ids) + position_embeds = self.wpe(position_ids) + hidden_states = inputs_embeds + position_embeds + if token_type_ids is not None: + token_type_embeds = self.wte(token_type_ids) + hidden_states = hidden_states + token_type_embeds + hidden_states = self.drop(hidden_states) + + output_shape = input_shape + (hidden_states.size(-1),) + + if self.gradient_checkpointing and self.training: + if use_cache: + logger.warning_once( + "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`...") + use_cache = False + + presents = () if use_cache else None + all_self_attentions = () if output_attentions else None + all_cross_attentions = () if output_attentions and self.config.add_cross_attention else None + all_hidden_states = () if output_hidden_states else None + + # Going through held blocks. + start_idx, end_idx = stage_index[0], stage_index[1] + for i, (block, layer_past) in enumerate(zip(range(start_idx, end_idx), past_key_values)): + block = self.h[i] + # Model parallel + if self.model_parallel: + torch.cuda.set_device(hidden_states.device) + # Ensure layer_past is on same device as hidden_states (might not be correct) + if layer_past is not None: + layer_past = tuple(past_state.to(hidden_states.device) for past_state in layer_past) + # Ensure that attention_mask is always on the same device as hidden_states + if attention_mask is not None: + attention_mask = attention_mask.to(hidden_states.device) + if isinstance(head_mask, torch.Tensor): + head_mask = head_mask.to(hidden_states.device) + if output_hidden_states: + all_hidden_states = all_hidden_states + (hidden_states,) + + if self.gradient_checkpointing and self.training: + + def create_custom_forward(module): + + def custom_forward(*inputs): + # None for past_key_value + return module(*inputs, use_cache, output_attentions) + + return custom_forward + + outputs = torch.utils.checkpoint.checkpoint( + create_custom_forward(block), + hidden_states, + None, + attention_mask, + head_mask[i], + encoder_hidden_states, + encoder_attention_mask, + ) + else: + outputs = block( + hidden_states, + layer_past=layer_past, + attention_mask=attention_mask, + head_mask=head_mask[i], + encoder_hidden_states=encoder_hidden_states, + encoder_attention_mask=encoder_attention_mask, + use_cache=use_cache, + output_attentions=output_attentions, + ) + + hidden_states = outputs[0] + if use_cache is True: + presents = presents + (outputs[1],) + + if output_attentions: + all_self_attentions = all_self_attentions + (outputs[2 if use_cache else 1],) + if self.config.add_cross_attention: + all_cross_attentions = all_cross_attentions + (outputs[3 if use_cache else 2],) + + # Model Parallel: If it's the last layer for that device, put things on the next device + if self.model_parallel: + for k, v in self.device_map.items(): + if i == v[-1] and "cuda:" + str(k) != self.last_device: + hidden_states = hidden_states.to("cuda:" + str(k + 1)) + + if stage_manager.is_last_stage(): + hidden_states = self.ln_f(hidden_states) + + hidden_states = hidden_states.view(output_shape) + + # Add last hidden state + if output_hidden_states: + all_hidden_states = all_hidden_states + (hidden_states,) + + return BaseModelOutputWithPastAndCrossAttentions( + last_hidden_state=hidden_states, + past_key_values=presents, + hidden_states=all_hidden_states, + attentions=all_self_attentions, + cross_attentions=all_cross_attentions, + ) + + @staticmethod + def gpt2_lmhead_model_forward( + self: GPT2LMHeadModel, + input_ids: Optional[torch.LongTensor] = None, + past_key_values: Optional[Tuple[Tuple[torch.Tensor]]] = None, + attention_mask: Optional[torch.FloatTensor] = None, + token_type_ids: Optional[torch.LongTensor] = None, + position_ids: Optional[torch.LongTensor] = None, + head_mask: Optional[torch.FloatTensor] = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + encoder_hidden_states: Optional[torch.Tensor] = None, + encoder_attention_mask: Optional[torch.FloatTensor] = None, + labels: Optional[torch.LongTensor] = None, + use_cache: Optional[bool] = None, + output_attentions: Optional[bool] = None, + output_hidden_states: Optional[bool] = None, + return_dict: Optional[bool] = None, + stage_manager: Optional[PipelineStageManager] = None, + hidden_states: Optional[torch.FloatTensor] = None, + stage_index: Optional[List[int]] = None) -> Union[Dict, Tuple, CausalLMOutputWithCrossAttentions]: + r""" + labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): + Labels for language modeling. Note that the labels **are shifted** inside the model, i.e. you can set + `labels = input_ids` Indices are selected in `[-100, 0, ..., config.vocab_size]` All labels set to `-100` + are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]` + + This function is modified on the basis of transformers.models.gpt2.modeling_gpt2.GPT2LMHeadModel.forward. + Please refer to original code of transformers for more details. + """ + return_dict = return_dict if return_dict is not None else self.config.use_return_dict + + outputs = GPT2PipelineForwards.gpt2_model_forward(self.transformer, + input_ids, + past_key_values=past_key_values, + attention_mask=attention_mask, + token_type_ids=token_type_ids, + position_ids=position_ids, + head_mask=head_mask, + inputs_embeds=inputs_embeds, + encoder_hidden_states=encoder_hidden_states, + encoder_attention_mask=encoder_attention_mask, + use_cache=use_cache, + output_attentions=output_attentions, + output_hidden_states=output_hidden_states, + return_dict=return_dict, + stage_manager=stage_manager, + hidden_states=hidden_states, + stage_index=stage_index) + + # If not at the last stage, return hidden_states as in GPT2Model + if not stage_manager.is_last_stage(): + return outputs + + hidden_states = outputs[0] + lm_logits = self.lm_head(hidden_states) + loss = None + if labels is not None: + # move labels to correct device to enable model parallelism + labels = labels.to(lm_logits.device) + # Shift so that tokens < n predict n + shift_logits = lm_logits[..., :-1, :].contiguous() + shift_labels = labels[..., 1:].contiguous() + # Flatten the tokens + loss_fct = CrossEntropyLoss() + loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)) + if not return_dict: + output = (lm_logits,) + outputs[1:] + return ((loss,) + output) if loss is not None else output + + return CausalLMOutputWithCrossAttentions( + loss=loss, + logits=lm_logits, + past_key_values=outputs.past_key_values, + hidden_states=outputs.hidden_states, + attentions=outputs.attentions, + cross_attentions=outputs.cross_attentions, + ) From b023f59d4729d7be9cfa8e7b59cd00071e61b896 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Fri, 18 Aug 2023 11:20:03 +0800 Subject: [PATCH 05/17] add generate schedule --- colossalai/pipeline/schedule/generate.py | 96 ++++++++++++++++++++ colossalai/ppinference/engine.py | 36 ++++++-- colossalai/ppinference/microbatch_manager.py | 10 ++ colossalai/ppinference/modeling/__init__.py | 0 colossalai/ppinference/modeling/gpt2.py | 24 +---- 5 files changed, 137 insertions(+), 29 deletions(-) create mode 100644 colossalai/pipeline/schedule/generate.py create mode 100644 colossalai/ppinference/modeling/__init__.py diff --git a/colossalai/pipeline/schedule/generate.py b/colossalai/pipeline/schedule/generate.py new file mode 100644 index 000000000000..5c7c85d0018e --- /dev/null +++ b/colossalai/pipeline/schedule/generate.py @@ -0,0 +1,96 @@ +from functools import partial +from typing import Any, Callable, Iterable, List, Optional, Union + +import torch +import torch.cuda +from torch.nn import Module +from torch.utils._pytree import tree_map + +from colossalai.interface import OptimizerWrapper +from colossalai.pipeline.p2p import PipelineP2PCommunication +from colossalai.pipeline.stage_manager import PipelineStageManager +from colossalai.ppinference.microbatch_manager import MicroBatchManager +from colossalai.utils.cuda import get_current_device + +from ._utils import detach, get_batch_size, get_micro_batch, merge_batch, model_forward, retain_grad, to_device +from .base import PipelineSchedule + + +class GenerateSchedule(PipelineSchedule): + + def __init__(self, stage_manager: PipelineStageManager, mb_manager: MicroBatchManager) -> None: + super().__init__(stage_manager) + self.comm = PipelineP2PCommunication(stage_manager) + self.num_microbatches = stage_manager.num_stages + self.mb_manager = mb_manager + self.batch: Optional[Any] = None + self.batch_size: Optional[int] = None + self.microbatch_offset: Optional[int] = None + self.microbatch_size: Optional[int] = None + + def load_batch(self, data_iter: Iterable, device: Optional[torch.device] = None) -> None: + """Load a batch from data iterator. + + Args: + data_iter (Iterable): Data iterator. + device (Optional[torch.device], optional): Target device. Defaults to None. + """ + batch = next(data_iter) + if device is not None: + batch = tree_map(partial(to_device, device=device), batch) + self.batch = batch + self.batch_size = get_batch_size(batch) + self.microbatch_offset = 0 + assert self.batch_size % self.num_microbatches == 0, \ + f"Batch size should divided by the number of microbatches, {self.batch_size}, {self.num_microbatches}" + self.microbatch_size = self.batch_size // self.num_microbatches + + def load_micro_batch(self) -> Any: + """Load a micro batch from the current batch. + + Returns: + Any: Micro batch. + """ + micro_batch = get_micro_batch(self.batch, self.microbatch_offset, self.microbatch_size) + self.microbatch_offset += self.microbatch_size + return tree_map(partial(to_device, device=get_current_device()), micro_batch) + + def generate_step(self, + model: Module, + data_iter: Iterable, + outputs: Optional[List[Any]] = None) -> Union[torch.Tensor, dict]: + """Forward one step of the pipeline + + Args: + model (Module): Model to be run + input_obj (Optional[dict]): The output from the previous stage. If it is the first stage, the `input_obj` is None. + criterion (Callable): Criterion to calculate loss. + accum_loss (Optional[torch.Tensor], optional): Accumulated loss. Defaults to None. + outputs (Optional[List[Any]], optional): List to store the output of the last stage (final output). Defaults to None. + + Returns: + Union[torch.Tensor, dict]: The intermediate output (dict) of the current stage. If it is the last stage, the output is the loss (Tensor). + """ + output_sequence = [] + self.load_batch(data_iter) + + # prepare for warmup + num_warmup_microbatch = self.stage_manager.num_stages - self.stage_manager.stage + num_warmup_microbatch = min(num_warmup_microbatch, self.num_microbatches) + num_microbatch_remaining = self.num_microbatches - num_warmup_microbatch + + # run warmup round + for _ in range(num_warmup_microbatch): + micro_batch = None + if self.stage_manager.is_first_stage: + micro_batch = self.load_micro_batch() + input_obj = self.comm.recv_forward() + # if self.stage_manager.is_first_stage: + # print("warmup",self.microbatch_offset,micro_batch, input_obj) + hidden_states = None if input_obj is None else {'hidden_states': input_obj.get('hidden_states', None)} + output_obj = model_forward(model, micro_batch, hidden_states) + self.comm.send_forward(output_obj) + if self.stage_manager.is_last_stage: + output_sequence.append(output_obj) + + return output_sequence diff --git a/colossalai/ppinference/engine.py b/colossalai/ppinference/engine.py index 89629c293e78..7c0138e0f9ac 100644 --- a/colossalai/ppinference/engine.py +++ b/colossalai/ppinference/engine.py @@ -1,18 +1,19 @@ +import re from functools import partial from types import MethodType -from typing import List, Optional, Set +from typing import Callable, List, Optional, Set import torch import torch.distributed as dist import torch.nn as nn from colossalai.cluster import ProcessGroupMesh +from colossalai.pipeline.schedule.generate import GenerateSchedule from colossalai.pipeline.stage_manager import PipelineStageManager from colossalai.shardformer._utils import getattr_ from .inference_config import InferenceConfig from .microbatch_manager import MicroBatchManager -from .modeling.gpt2 import GPT2PipelineForwards from .utils import get_suffix_name, set_tensors_to_none @@ -22,6 +23,7 @@ def __init__( self, gerneration_config: InferenceConfig, model: nn.Module, + pp_fwd: Callable, ) -> None: self.gerneration_config = gerneration_config self.model = model @@ -29,18 +31,20 @@ def __init__( self.stage_manager = PipelineStageManager(self.pg_mesh, 0, True) self.held_layer = None self.mb_manager = MicroBatchManager(self.gerneration_config) + self.schedule = GenerateSchedule(self.stage_manager, self.mb_manager) self._partion_model() - self._inject_fwd() + self._inject_fwd(pp_fwd) - def inference(self, **kwargs): - output = self.model(**kwargs) - return output + def inference(self, input_list): + out = self.schedule.generate_step(self.model, iter(input_list)) - def _inject_fwd(self): - new_fwd = partial(GPT2PipelineForwards.gpt2_lmhead_model_forward, - stage_manager=self.stage_manager, - stage_index=[0, 1]) + # print(out) + + def _inject_fwd(self, pp_fwd: Callable): + stage_index = self._get_stage_index() + print(stage_index) + new_fwd = partial(pp_fwd, stage_manager=self.stage_manager, stage_index=stage_index) bound_method = MethodType(new_fwd, self.model) setattr(self.model, 'forward', bound_method) @@ -83,3 +87,15 @@ def _release_unheld_layers(self, module_list: List[str]): """ held_layers = self.held_layer set_tensors_to_none(self.model, include=set(module_list) - set(held_layers)) + + def _get_stage_index(self): + re_pattern = r'\[\d+\]' + prog = re.compile(re_pattern) + stage_idx = [] + for item in self.held_layer: + result = prog.search(item) + if result: + idx = result.group().replace('[', '').replace(']', '') + stage_idx.append(int(idx)) + + return [min(stage_idx), max(stage_idx) + 1] diff --git a/colossalai/ppinference/microbatch_manager.py b/colossalai/ppinference/microbatch_manager.py index e4f7c42607c4..97952f66ab24 100644 --- a/colossalai/ppinference/microbatch_manager.py +++ b/colossalai/ppinference/microbatch_manager.py @@ -2,6 +2,10 @@ __all__ = 'MicroBatchManager' +BEGIN = 1 +GENERATE = 2 +DONE = 3 + class MicroBatchManager(): @@ -44,3 +48,9 @@ def next_mb(self): @property def cur_kvcache(self): return self.mb_to_kvcache[self.cur_mb] + + # @property + # def mb_state(self): + # if len(self.cur_kvcache) == 0: + # return BEGIN + # elif len(self.cur_kvcache) == diff --git a/colossalai/ppinference/modeling/__init__.py b/colossalai/ppinference/modeling/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/colossalai/ppinference/modeling/gpt2.py b/colossalai/ppinference/modeling/gpt2.py index f8fe2bc42d19..63d9435fc777 100644 --- a/colossalai/ppinference/modeling/gpt2.py +++ b/colossalai/ppinference/modeling/gpt2.py @@ -8,8 +8,6 @@ from colossalai.pipeline.stage_manager import PipelineStageManager -from ..microbatch_manager import MicroBatchManager - class GPT2PipelineForwards: ''' @@ -149,7 +147,7 @@ def gpt2_model_forward( # Going through held blocks. start_idx, end_idx = stage_index[0], stage_index[1] - for i, (block, layer_past) in enumerate(zip(range(start_idx, end_idx), past_key_values)): + for i, layer_past in zip(range(start_idx, end_idx), past_key_values): block = self.h[i] # Model parallel if self.model_parallel: @@ -185,6 +183,7 @@ def custom_forward(*inputs): encoder_attention_mask, ) else: + print(torch.distributed.get_rank(), "forward", i) outputs = block( hidden_states, layer_past=layer_past, @@ -220,13 +219,7 @@ def custom_forward(*inputs): if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) - return BaseModelOutputWithPastAndCrossAttentions( - last_hidden_state=hidden_states, - past_key_values=presents, - hidden_states=all_hidden_states, - attentions=all_self_attentions, - cross_attentions=all_cross_attentions, - ) + return {'hidden_states': hidden_states, 'past_kv_cache': presents} @staticmethod def gpt2_lmhead_model_forward( @@ -281,7 +274,7 @@ def gpt2_lmhead_model_forward( if not stage_manager.is_last_stage(): return outputs - hidden_states = outputs[0] + hidden_states = outputs['hidden_states'] lm_logits = self.lm_head(hidden_states) loss = None if labels is not None: @@ -297,11 +290,4 @@ def gpt2_lmhead_model_forward( output = (lm_logits,) + outputs[1:] return ((loss,) + output) if loss is not None else output - return CausalLMOutputWithCrossAttentions( - loss=loss, - logits=lm_logits, - past_key_values=outputs.past_key_values, - hidden_states=outputs.hidden_states, - attentions=outputs.attentions, - cross_attentions=outputs.cross_attentions, - ) + return {'hidden_states': lm_logits, 'past_kv_cache': outputs['past_kv_cache']} From 63b71d1d65a6ce63306ed88e44e54d0b46d86445 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Fri, 18 Aug 2023 14:37:54 +0800 Subject: [PATCH 06/17] use mb size to control mb number --- colossalai/pipeline/schedule/generate.py | 18 ++++++++++-------- colossalai/ppinference/engine.py | 2 +- colossalai/ppinference/inference_config.py | 3 +++ colossalai/ppinference/modeling/gpt2.py | 1 - 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/colossalai/pipeline/schedule/generate.py b/colossalai/pipeline/schedule/generate.py index 5c7c85d0018e..f24036bededf 100644 --- a/colossalai/pipeline/schedule/generate.py +++ b/colossalai/pipeline/schedule/generate.py @@ -21,12 +21,12 @@ class GenerateSchedule(PipelineSchedule): def __init__(self, stage_manager: PipelineStageManager, mb_manager: MicroBatchManager) -> None: super().__init__(stage_manager) self.comm = PipelineP2PCommunication(stage_manager) - self.num_microbatches = stage_manager.num_stages self.mb_manager = mb_manager + self.microbatch_size = mb_manager.pp_inference_config.micro_batch_size self.batch: Optional[Any] = None self.batch_size: Optional[int] = None self.microbatch_offset: Optional[int] = None - self.microbatch_size: Optional[int] = None + self.num_microbatches: Optional[int] = None def load_batch(self, data_iter: Iterable, device: Optional[torch.device] = None) -> None: """Load a batch from data iterator. @@ -41,9 +41,9 @@ def load_batch(self, data_iter: Iterable, device: Optional[torch.device] = None) self.batch = batch self.batch_size = get_batch_size(batch) self.microbatch_offset = 0 - assert self.batch_size % self.num_microbatches == 0, \ + assert self.batch_size % self.microbatch_size == 0, \ f"Batch size should divided by the number of microbatches, {self.batch_size}, {self.num_microbatches}" - self.microbatch_size = self.batch_size // self.num_microbatches + self.num_microbatches = self.batch_size // self.microbatch_size def load_micro_batch(self) -> Any: """Load a micro batch from the current batch. @@ -55,6 +55,7 @@ def load_micro_batch(self) -> Any: self.microbatch_offset += self.microbatch_size return tree_map(partial(to_device, device=get_current_device()), micro_batch) + @torch.no_grad() def generate_step(self, model: Module, data_iter: Iterable, @@ -73,6 +74,7 @@ def generate_step(self, """ output_sequence = [] self.load_batch(data_iter) + model.eval() # prepare for warmup num_warmup_microbatch = self.stage_manager.num_stages - self.stage_manager.stage @@ -80,9 +82,9 @@ def generate_step(self, num_microbatch_remaining = self.num_microbatches - num_warmup_microbatch # run warmup round - for _ in range(num_warmup_microbatch): + for _ in range(self.num_microbatches): micro_batch = None - if self.stage_manager.is_first_stage: + if self.stage_manager.is_first_stage(): micro_batch = self.load_micro_batch() input_obj = self.comm.recv_forward() # if self.stage_manager.is_first_stage: @@ -90,7 +92,7 @@ def generate_step(self, hidden_states = None if input_obj is None else {'hidden_states': input_obj.get('hidden_states', None)} output_obj = model_forward(model, micro_batch, hidden_states) self.comm.send_forward(output_obj) - if self.stage_manager.is_last_stage: - output_sequence.append(output_obj) + if self.stage_manager.is_last_stage(): + output_sequence.append(output_obj['hidden_states']) return output_sequence diff --git a/colossalai/ppinference/engine.py b/colossalai/ppinference/engine.py index 7c0138e0f9ac..2ed76383e088 100644 --- a/colossalai/ppinference/engine.py +++ b/colossalai/ppinference/engine.py @@ -38,7 +38,7 @@ def __init__( def inference(self, input_list): out = self.schedule.generate_step(self.model, iter(input_list)) - + return out # print(out) def _inject_fwd(self, pp_fwd: Callable): diff --git a/colossalai/ppinference/inference_config.py b/colossalai/ppinference/inference_config.py index c8ebe7dbc5c8..03117532630c 100644 --- a/colossalai/ppinference/inference_config.py +++ b/colossalai/ppinference/inference_config.py @@ -12,6 +12,7 @@ class InferenceConfig: Args: pp_size (int): the number of pipeline stages. stage_unit (List[str]): the unit module name can be sliced as a stage, should be `nn.Module`. + micro_batch_size (int): the micro batch size. target_length (int): the target length of the input sequence. padding_token_id (int): the token id for padding. @@ -21,11 +22,13 @@ def __init__( self, pp_size: int, stage_unit: List[str], + micro_batch_size: int = 1, target_length: int = 32, padding_token_id: int = 0, ): assert isinstance(pp_size, int), f'pp_size must be an integer, got {type(pp_size)}' self.pp_size = pp_size self.stage_unit = stage_unit + self.micro_batch_size = micro_batch_size self.target_length = target_length self.padding_token_id = padding_token_id diff --git a/colossalai/ppinference/modeling/gpt2.py b/colossalai/ppinference/modeling/gpt2.py index 63d9435fc777..2289cdf157ff 100644 --- a/colossalai/ppinference/modeling/gpt2.py +++ b/colossalai/ppinference/modeling/gpt2.py @@ -183,7 +183,6 @@ def custom_forward(*inputs): encoder_attention_mask, ) else: - print(torch.distributed.get_rank(), "forward", i) outputs = block( hidden_states, layer_past=layer_past, From 7e0c63f6ab69949ac19834ddd2bd3b286a5e025e Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Mon, 21 Aug 2023 15:26:49 +0800 Subject: [PATCH 07/17] support generate with kv cache --- colossalai/pipeline/p2p.py | 4 +- colossalai/pipeline/schedule/generate.py | 60 +++++++-- colossalai/pipeline/stage_manager.py | 8 +- colossalai/ppinference/inference_config.py | 8 +- colossalai/ppinference/microbatch_manager.py | 128 ++++++++++++++----- 5 files changed, 155 insertions(+), 53 deletions(-) diff --git a/colossalai/pipeline/p2p.py b/colossalai/pipeline/p2p.py index af7a00b5c720..f6be584d8e26 100644 --- a/colossalai/pipeline/p2p.py +++ b/colossalai/pipeline/p2p.py @@ -173,7 +173,7 @@ def recv_forward(self, prev_rank: int = None) -> Any: Returns: Any: The input tensor or input tensor list. """ - if self.stage_manager.is_first_stage(): + if self.stage_manager.is_first_stage() and not self.stage_manager.circle_stage: input_tensor = None else: if prev_rank is None: @@ -211,7 +211,7 @@ def send_forward(self, output_object: Any, next_rank: int = None) -> None: output_object (Any): Object to be sent. next_rank (int, optional): The rank of the recipient of the tensor. """ - if not self.stage_manager.is_last_stage(): + if not self.stage_manager.is_last_stage() or self.stage_manager.circle_stage: if next_rank is None: next_rank = self.stage_manager.get_next_rank() cur_rank = self.stage_manager.get_rank() diff --git a/colossalai/pipeline/schedule/generate.py b/colossalai/pipeline/schedule/generate.py index f24036bededf..d4727fe0e3f2 100644 --- a/colossalai/pipeline/schedule/generate.py +++ b/colossalai/pipeline/schedule/generate.py @@ -9,7 +9,7 @@ from colossalai.interface import OptimizerWrapper from colossalai.pipeline.p2p import PipelineP2PCommunication from colossalai.pipeline.stage_manager import PipelineStageManager -from colossalai.ppinference.microbatch_manager import MicroBatchManager +from colossalai.ppinference.microbatch_manager import DONE, GENERATE, PREFILL, MicroBatchManager from colossalai.utils.cuda import get_current_device from ._utils import detach, get_batch_size, get_micro_batch, merge_batch, model_forward, retain_grad, to_device @@ -55,6 +55,19 @@ def load_micro_batch(self) -> Any: self.microbatch_offset += self.microbatch_size return tree_map(partial(to_device, device=get_current_device()), micro_batch) + def postprocess_new_inputs(self, input_ids): + new_mask = self.mb_manager.cur_descrption.attn_mask + new_mask = torch.cat((new_mask, torch.ones((new_mask.shape[0], 1), dtype=torch.int64).cuda()), dim=-1) + self.mb_manager.cur_descrption.attn_mask = new_mask + past_key_values = self.mb_manager.cur_descrption.kv_cache + + return dict(input_ids=input_ids['new_token'], attention_mask=new_mask, past_key_values=past_key_values) + + def get_token_id(self, hidden_state: torch.Tensor) -> torch.Tensor: + last_hidden_state = hidden_state[:, -1] + input_ids = torch.argmax(last_hidden_state, dim=-1).unsqueeze(1) + return input_ids + @torch.no_grad() def generate_step(self, model: Module, @@ -82,17 +95,36 @@ def generate_step(self, num_microbatch_remaining = self.num_microbatches - num_warmup_microbatch # run warmup round - for _ in range(self.num_microbatches): - micro_batch = None - if self.stage_manager.is_first_stage(): - micro_batch = self.load_micro_batch() - input_obj = self.comm.recv_forward() - # if self.stage_manager.is_first_stage: - # print("warmup",self.microbatch_offset,micro_batch, input_obj) - hidden_states = None if input_obj is None else {'hidden_states': input_obj.get('hidden_states', None)} - output_obj = model_forward(model, micro_batch, hidden_states) - self.comm.send_forward(output_obj) - if self.stage_manager.is_last_stage(): - output_sequence.append(output_obj['hidden_states']) - + for gen_word in range(self.mb_manager.pp_inference_config.new_length): + for mb in range(self.num_microbatches): + # first stage and in prefill phase + if self.stage_manager.is_first_stage() and self.mb_manager.cur_state is PREFILL: + input_obj = None + micro_batch = self.load_micro_batch() + hidden_states = None + # first stage and in generate phase + elif self.stage_manager.is_first_stage(): + input_obj = self.comm.recv_forward() + micro_batch = self.postprocess_new_inputs(input_obj) + hidden_states = None + # not first stage and in gererate phase + else: + input_obj = self.comm.recv_forward() + micro_batch = None + hidden_states = input_obj + + print( + f"stage:{self.stage_manager.stage}, micro batch id:{self.mb_manager.idx}, new token id:{gen_word}") + output_obj = model_forward(model, micro_batch, hidden_states) + + past_kv_cache = output_obj.get('past_kv_cache', None) + state = self.mb_manager.step(micro_batch, input_obj, past_kv_cache) + if self.stage_manager.is_last_stage(): + new_token = self.get_token_id(output_obj['hidden_states']) + output_sequence.append(new_token) + if state is not DONE: + self.comm.send_forward({'new_token': new_token}) + else: + self.comm.send_forward({'hidden_states': output_obj['hidden_states']}) + self.mb_manager.next() return output_sequence diff --git a/colossalai/pipeline/stage_manager.py b/colossalai/pipeline/stage_manager.py index 6a4e97006b43..97657aee4487 100644 --- a/colossalai/pipeline/stage_manager.py +++ b/colossalai/pipeline/stage_manager.py @@ -13,7 +13,7 @@ class PipelineStageManager: Args: pg_mesh (ProcessGroupMesh): Process group mesh. pipeline_axis (int): The axis along which the pipeline is constructed. - circle_p2p (bool): Whether to use circle p2p communication, it will make the first and last stage communicate with each other. + circle_stage (bool): Whether to use circle p2p communication, it will make the first and last stage communicate with each other. Attributes: num_stages (int): Number of stages in the pipeline. @@ -135,7 +135,8 @@ def get_prev_rank(self) -> int: Returns: int: Rank of the previous stage. """ - assert not self.is_first_stage(), "Cannot get previous rank in the first stage." + if not self.circle_stage: + assert not self.is_first_stage(), "Cannot get previous rank in the first stage." return self.prev_rank def get_next_rank(self) -> int: @@ -144,7 +145,8 @@ def get_next_rank(self) -> int: Returns: int: Rank of the next stage. """ - assert not self.is_last_stage(), "Cannot get next rank in the last stage." + if not self.circle_stage: + assert not self.is_last_stage(), "Cannot get next rank in the last stage." return self.next_rank def set_num_virtual_stages(self, num_virtual_stages: int) -> None: diff --git a/colossalai/ppinference/inference_config.py b/colossalai/ppinference/inference_config.py index 03117532630c..8fdc9df3dd0a 100644 --- a/colossalai/ppinference/inference_config.py +++ b/colossalai/ppinference/inference_config.py @@ -13,7 +13,7 @@ class InferenceConfig: pp_size (int): the number of pipeline stages. stage_unit (List[str]): the unit module name can be sliced as a stage, should be `nn.Module`. micro_batch_size (int): the micro batch size. - target_length (int): the target length of the input sequence. + new_length (int): the new length of the input sequence. padding_token_id (int): the token id for padding. ''' @@ -23,12 +23,14 @@ def __init__( pp_size: int, stage_unit: List[str], micro_batch_size: int = 1, - target_length: int = 32, + micro_batch_buffer_size: int = None, + new_length: int = 32, padding_token_id: int = 0, ): assert isinstance(pp_size, int), f'pp_size must be an integer, got {type(pp_size)}' self.pp_size = pp_size self.stage_unit = stage_unit self.micro_batch_size = micro_batch_size - self.target_length = target_length + self.micro_batch_buffer_size = pp_size if micro_batch_buffer_size is None else micro_batch_buffer_size + self.new_length = new_length self.padding_token_id = padding_token_id diff --git a/colossalai/ppinference/microbatch_manager.py b/colossalai/ppinference/microbatch_manager.py index 97952f66ab24..763918e629be 100644 --- a/colossalai/ppinference/microbatch_manager.py +++ b/colossalai/ppinference/microbatch_manager.py @@ -1,12 +1,67 @@ +from typing import Dict + +import torch + from .inference_config import InferenceConfig __all__ = 'MicroBatchManager' -BEGIN = 1 +PREFILL = 1 GENERATE = 2 DONE = 3 +class MicroBatchDescription(): + + def __init__( + self, + mb_inputs: torch.Tensor, + inter_inputs, + new_length: int, + ) -> None: + if mb_inputs is not None: + assert mb_inputs.get('input_ids') is not None and mb_inputs.get('attention_mask') is not None + self.mb_length = mb_inputs['input_ids'].shape[-1] + self.attn_mask = mb_inputs['attention_mask'] + self.input_ids = mb_inputs['input_ids'] + + elif inter_inputs is not None: + assert inter_inputs.get('hidden_states') is not None + # print(inter_inputs['hidden_states'].shape) + self.mb_length = inter_inputs['hidden_states'].shape[-2] + else: + raise ValueError('mb_inputs and inter_inputs can not be None at the same time') + + self.target_length = self.mb_length + new_length + self.kv_cache = () + + def update(self, kv_cache): + self.kv_cache = kv_cache + + @property + def cur_length(self): + """ + Return the current sequnence length of micro batch, when there is no kv_cache, the length is mb_length, + otherwise the sequence length is `kv_cache[0][0].shape[-2]` plus 1 + + """ + if len(self.kv_cache) == 0: + return self.mb_length + return self.kv_cache[0][0].shape[-2] + 1 + + @property + def state(self): + """ + Return the state of current micro batch, when current length is equal to target length, + the state is DONE, otherwise GENERATE + + """ + if self.cur_length == self.target_length: + return DONE + else: + return GENERATE + + class MicroBatchManager(): def __init__( @@ -14,43 +69,54 @@ def __init__( pp_inference_config: InferenceConfig, ): self.pp_inference_config = pp_inference_config - self.mb_to_kvcache = self._init_kvcache() - self.cur_mb = 0 + self.mb_descrption_buffer = {} + self.buffer_size = pp_inference_config.micro_batch_buffer_size + self.idx = 0 - def step(self, present_kv=None): - self._update_kvcahe(present_kv) - self.cur_mb = self.next_mb + def _add_descrption(self, mb_inputs: Dict[str, torch.Tensor], inter_inputs: Dict[str, torch.Tensor]): + self.mb_descrption_buffer[self.idx] = MicroBatchDescription(mb_inputs, inter_inputs, + self.pp_inference_config.new_length) - def _init_kvcache(self): - mb_to_kvcache = {i: () for i in range(self.pp_inference_config.pp_size)} - return mb_to_kvcache + def _update_descrption(self, present_kv): + self.mb_descrption_buffer[self.idx].update(present_kv) - def _update_kvcahe(self, present_kv): - self.mb_to_kvcache[self.cur_mb] += (present_kv,) + def _remove_descrption(self): + self.mb_descrption_buffer.pop(self.idx) - if self.mb_to_kvcache[self.cur_mb][0][0][-2] == self.pp_inference_config.target_length: - self.mb_to_kvcache.pop(self.cur_mb) + def step(self, mb_inputs=None, inter_inputs=None, present_kv=None): + """ + Update the state if microbatch manager - @property - def is_done(self): - return len(self.mb_to_kvcache) == 0 + Args: + mb_inputs (int, optional): The input of first stage when in prefill, should be a dict like {'input_ids': torch.Tensor, 'attention_mask': torch.Tensor}. + inter_inputs ([type], optional): The input of intermediate stage (the output of previous stage), should be a dict like {'hidden_state': torch.Tensor}. + present_kv ([type], optional): The kvcache of current microbatch in current stage. + """ + if self.mb_descrption_buffer.get(self.idx) is None: + self._add_descrption(mb_inputs, inter_inputs) + self._update_descrption(present_kv) + state = self.cur_state + if state == DONE: + self._remove_descrption() + return state + + def next(self): + self.idx = (self.idx + 1) % self.buffer_size @property - def next_mb(self): - if self.is_done: - return None + def cur_descrption(self) -> MicroBatchDescription: + return self.mb_descrption_buffer.get(self.idx) - nxt_mb = (self.cur_mb + 1) % self.pp_inference_config.pp_size - while nxt_mb % self.pp_inference_config.pp_size not in self.mb_to_kvcache: - nxt_mb = (nxt_mb + 1) % self.pp_inference_config.pp_size - return nxt_mb + @property + def cur_kv_cache(self): + return self.cur_descrption.kv_cache @property - def cur_kvcache(self): - return self.mb_to_kvcache[self.cur_mb] - - # @property - # def mb_state(self): - # if len(self.cur_kvcache) == 0: - # return BEGIN - # elif len(self.cur_kvcache) == + def cur_state(self): + """ + Return the state of current micro batch, when current descrption is None, the state is PREFILL + + """ + if self.cur_descrption is None: + return PREFILL + return self.cur_descrption.state From 367bbb628531fa684b47d4e2115c77dd00e3cb90 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Mon, 21 Aug 2023 17:44:14 +0800 Subject: [PATCH 08/17] add output, remove unused code --- colossalai/pipeline/schedule/generate.py | 31 +++++++--------- colossalai/ppinference/engine.py | 3 -- colossalai/ppinference/microbatch_manager.py | 38 +++++++++++++++----- 3 files changed, 43 insertions(+), 29 deletions(-) diff --git a/colossalai/pipeline/schedule/generate.py b/colossalai/pipeline/schedule/generate.py index d4727fe0e3f2..567dffb26c5f 100644 --- a/colossalai/pipeline/schedule/generate.py +++ b/colossalai/pipeline/schedule/generate.py @@ -1,5 +1,5 @@ from functools import partial -from typing import Any, Callable, Iterable, List, Optional, Union +from typing import Any, Iterable, Optional, Union import torch import torch.cuda @@ -44,6 +44,7 @@ def load_batch(self, data_iter: Iterable, device: Optional[torch.device] = None) assert self.batch_size % self.microbatch_size == 0, \ f"Batch size should divided by the number of microbatches, {self.batch_size}, {self.num_microbatches}" self.num_microbatches = self.batch_size // self.microbatch_size + self.round = self.num_microbatches // self.stage_manager.num_stages def load_micro_batch(self) -> Any: """Load a micro batch from the current batch. @@ -69,10 +70,7 @@ def get_token_id(self, hidden_state: torch.Tensor) -> torch.Tensor: return input_ids @torch.no_grad() - def generate_step(self, - model: Module, - data_iter: Iterable, - outputs: Optional[List[Any]] = None) -> Union[torch.Tensor, dict]: + def generate_step(self, model: Module, data_iter: Iterable) -> Union[torch.Tensor, dict]: """Forward one step of the pipeline Args: @@ -89,14 +87,11 @@ def generate_step(self, self.load_batch(data_iter) model.eval() - # prepare for warmup - num_warmup_microbatch = self.stage_manager.num_stages - self.stage_manager.stage - num_warmup_microbatch = min(num_warmup_microbatch, self.num_microbatches) - num_microbatch_remaining = self.num_microbatches - num_warmup_microbatch - - # run warmup round - for gen_word in range(self.mb_manager.pp_inference_config.new_length): - for mb in range(self.num_microbatches): + # run by round + for _ in range(self.round): + state = PREFILL + step = 1 + while self.mb_manager.is_micro_batch_done() is False: # first stage and in prefill phase if self.stage_manager.is_first_stage() and self.mb_manager.cur_state is PREFILL: input_obj = None @@ -110,21 +105,21 @@ def generate_step(self, # not first stage and in gererate phase else: input_obj = self.comm.recv_forward() - micro_batch = None + micro_batch = { + 'past_key_values': self.mb_manager.cur_kv_cache + } if self.mb_manager.cur_kv_cache is not None else None hidden_states = input_obj - print( - f"stage:{self.stage_manager.stage}, micro batch id:{self.mb_manager.idx}, new token id:{gen_word}") output_obj = model_forward(model, micro_batch, hidden_states) past_kv_cache = output_obj.get('past_kv_cache', None) state = self.mb_manager.step(micro_batch, input_obj, past_kv_cache) if self.stage_manager.is_last_stage(): new_token = self.get_token_id(output_obj['hidden_states']) - output_sequence.append(new_token) + self.mb_manager.add_new_tokens(new_token) if state is not DONE: self.comm.send_forward({'new_token': new_token}) else: self.comm.send_forward({'hidden_states': output_obj['hidden_states']}) - self.mb_manager.next() + output_sequence.extend(self.mb_manager.export_new_tokens()) return output_sequence diff --git a/colossalai/ppinference/engine.py b/colossalai/ppinference/engine.py index 2ed76383e088..0f8aeb4ebf39 100644 --- a/colossalai/ppinference/engine.py +++ b/colossalai/ppinference/engine.py @@ -39,11 +39,9 @@ def __init__( def inference(self, input_list): out = self.schedule.generate_step(self.model, iter(input_list)) return out - # print(out) def _inject_fwd(self, pp_fwd: Callable): stage_index = self._get_stage_index() - print(stage_index) new_fwd = partial(pp_fwd, stage_manager=self.stage_manager, stage_index=stage_index) bound_method = MethodType(new_fwd, self.model) setattr(self.model, 'forward', bound_method) @@ -65,7 +63,6 @@ def _partion_model(self): # load model to cuda self.model = self.model.cuda() - print(dist.get_rank(), torch.cuda.memory_allocated()) def _recursive_partion(self, module: nn.Module, module_list: List[str], suffix: str): if len(list(module.children())) == 0: diff --git a/colossalai/ppinference/microbatch_manager.py b/colossalai/ppinference/microbatch_manager.py index 763918e629be..83a3b99e27e8 100644 --- a/colossalai/ppinference/microbatch_manager.py +++ b/colossalai/ppinference/microbatch_manager.py @@ -16,7 +16,7 @@ class MicroBatchDescription(): def __init__( self, mb_inputs: torch.Tensor, - inter_inputs, + interval_inputs, new_length: int, ) -> None: if mb_inputs is not None: @@ -25,12 +25,11 @@ def __init__( self.attn_mask = mb_inputs['attention_mask'] self.input_ids = mb_inputs['input_ids'] - elif inter_inputs is not None: - assert inter_inputs.get('hidden_states') is not None - # print(inter_inputs['hidden_states'].shape) - self.mb_length = inter_inputs['hidden_states'].shape[-2] + elif interval_inputs is not None: + assert interval_inputs.get('hidden_states') is not None + self.mb_length = interval_inputs['hidden_states'].shape[-2] else: - raise ValueError('mb_inputs and inter_inputs can not be None at the same time') + raise ValueError('mb_inputs and interval_inputs can not be None at the same time') self.target_length = self.mb_length + new_length self.kv_cache = () @@ -70,6 +69,7 @@ def __init__( ): self.pp_inference_config = pp_inference_config self.mb_descrption_buffer = {} + self.new_tokens_buffer = {} self.buffer_size = pp_inference_config.micro_batch_buffer_size self.idx = 0 @@ -96,19 +96,41 @@ def step(self, mb_inputs=None, inter_inputs=None, present_kv=None): self._add_descrption(mb_inputs, inter_inputs) self._update_descrption(present_kv) state = self.cur_state - if state == DONE: - self._remove_descrption() + self.next() return state def next(self): self.idx = (self.idx + 1) % self.buffer_size + def is_micro_batch_done(self): + if len(self.mb_descrption_buffer) == 0: + return False + for mb in self.mb_descrption_buffer.values(): + if mb.state != DONE: + return False + self.mb_descrption_buffer.clear() + return True + + def add_new_tokens(self, new_token): + if self.idx not in self.new_tokens_buffer: + self.new_tokens_buffer[self.idx] = new_token + else: + self.new_tokens_buffer[self.idx] = torch.cat([self.new_tokens_buffer[self.idx], new_token], dim=-1) + + def export_new_tokens(self): + list = [item.tolist() for item in self.new_tokens_buffer.values()] + flat_list = [item for sublist in list for item in sublist] + self.new_tokens_buffer.clear() + return flat_list + @property def cur_descrption(self) -> MicroBatchDescription: return self.mb_descrption_buffer.get(self.idx) @property def cur_kv_cache(self): + if self.cur_descrption is None: + return None return self.cur_descrption.kv_cache @property From e2911e4a933bea6c0ad7f0493917da8893865613 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Mon, 21 Aug 2023 18:02:18 +0800 Subject: [PATCH 09/17] add test --- colossalai/ppinference/test.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 colossalai/ppinference/test.py diff --git a/colossalai/ppinference/test.py b/colossalai/ppinference/test.py new file mode 100644 index 000000000000..6a1fa7b2855f --- /dev/null +++ b/colossalai/ppinference/test.py @@ -0,0 +1,33 @@ +import torch +import torch.distributed as dist +import transformers +from modeling.gpt2 import GPT2PipelineForwards + +import colossalai +from colossalai.ppinference import InferenceConfig, PPInferEngine + +colossalai.launch_from_torch(config={}) + + +def data_gen(): + input_ids = torch.tensor([[15496, 11, 616, 3290, 318, 13779, 318, 13779]], dtype=torch.int64) + attention_mask = torch.tensor([[1, 1, 1, 1, 1, 1, 1, 1]], dtype=torch.int64) + return dict(input_ids=input_ids, attention_mask=attention_mask) + + +inputs = data_gen() +for k, v in inputs.items(): + if torch.is_tensor(v) or 'Tensor' in v.__class__.__name__: + new_shape = [1] * v.dim() + new_shape[0] = 8 + inputs[k] = v.to('cuda').repeat(*new_shape) + +model = transformers.GPT2LMHeadModel(transformers.GPT2Config(n_layer=8)) +infer_config = InferenceConfig(pp_size=4, new_length=8, micro_batch_size=2, stage_unit=['GPT2Block']) +engine = PPInferEngine(infer_config, model, GPT2PipelineForwards.gpt2_lmhead_model_forward) + +print(engine.held_layer) + +output = engine.inference([inputs]) +if dist.get_rank() == 3: + print(len(output)) From f3b61220d188f65c61a76bf3ff99f77ea70952b5 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Tue, 22 Aug 2023 17:38:20 +0800 Subject: [PATCH 10/17] reuse shardformer to build model --- colossalai/pipeline/stage_manager.py | 40 ++-------- colossalai/ppinference/engine.py | 79 +++++-------------- colossalai/ppinference/inference_config.py | 4 +- colossalai/ppinference/policy/gpt2_ppinfer.py | 69 ++++++++++++++++ colossalai/ppinference/test.py | 4 +- 5 files changed, 97 insertions(+), 99 deletions(-) create mode 100644 colossalai/ppinference/policy/gpt2_ppinfer.py diff --git a/colossalai/pipeline/stage_manager.py b/colossalai/pipeline/stage_manager.py index 36a04a01a13e..e1edd77a2d20 100644 --- a/colossalai/pipeline/stage_manager.py +++ b/colossalai/pipeline/stage_manager.py @@ -13,7 +13,7 @@ class PipelineStageManager: Args: pg_mesh (ProcessGroupMesh): Process group mesh. pipeline_axis (int): The axis along which the pipeline is constructed. - circle_stage (bool): Whether to use circle p2p communication, it will make the first and last stage communicate with each other. + is_virtual (bool): Whether to use circle p2p communication, it will make the first and last stage communicate with each other. Attributes: num_stages (int): Number of stages in the pipeline. @@ -27,27 +27,6 @@ def __init__(self, pg_mesh: ProcessGroupMesh, pipeline_axis: int, is_virtual: bo self.next_rank: Optional[Tuple[int, ...]] = None self.p2p_groups: Dict[Tuple[int, int], ProcessGroup] = {} - # init prev and next coord - if self.circle_stage: - self._circle_coord_init() - else: - self._none_circle_coord_init() - - # init p2p process groups - stages = list(range(self.num_stages)) - for prev, cur in zip(stages[:-1], stages[1:]): - group = self.pg_mesh.get_group_along_axis(self.pipeline_axis, [prev, cur]) - if self.stage in [prev, cur]: - ranks_in_group = self.pg_mesh.get_ranks_in_group(group) - self.p2p_groups[tuple(ranks_in_group)] = group - - if self.circle_stage: - group = self.pg_mesh.get_group_along_axis(self.pipeline_axis, [stages[0], stages[-1]]) - if self.stage in [stages[0], stages[-1]]: - ranks_in_group = self.pg_mesh.get_ranks_in_group(group) - self.p2p_groups[tuple(ranks_in_group)] = group - - def _none_circle_coord_init(self): # init prev and next coord coord = self.pg_mesh.coordinate() # the prev rank of rank0 is the last rank @@ -59,16 +38,13 @@ def _none_circle_coord_init(self): (coord[self.pipeline_axis] + 1,) + coord[self.pipeline_axis + 1:] self.next_rank = self.pg_mesh.ravel(next_coord, self.pg_mesh.shape, mode='wrap') - def _circle_coord_init(self): - # init prev and next coord cyclically - coord = self.pg_mesh.coordinate() - prev_coord = coord[: self.pipeline_axis] + \ - ((coord[self.pipeline_axis] + self.num_stages - 1 )% self.num_stages,) + coord[self.pipeline_axis + 1:] - self.prev_rank = self.pg_mesh.ravel(prev_coord, self.pg_mesh.shape) - - next_coord = coord[: self.pipeline_axis] + \ - ((coord[self.pipeline_axis] + self.num_stages + 1 )% self.num_stages,) + coord[self.pipeline_axis + 1:] - self.next_rank = self.pg_mesh.ravel(next_coord, self.pg_mesh.shape) + # init p2p process groups + stages = list(range(self.num_stages)) + for prev, cur in zip(stages[:-1], stages[1:]): + group = self.pg_mesh.get_group_along_axis(self.pipeline_axis, [prev, cur]) + if self.stage in [prev, cur]: + ranks_in_group = self.pg_mesh.get_ranks_in_group(group) + self.p2p_groups[tuple(ranks_in_group)] = group if is_virtual: # add the process group of the first rank and the last rank diff --git a/colossalai/ppinference/engine.py b/colossalai/ppinference/engine.py index 0f8aeb4ebf39..716d5cbe1442 100644 --- a/colossalai/ppinference/engine.py +++ b/colossalai/ppinference/engine.py @@ -3,6 +3,7 @@ from types import MethodType from typing import Callable, List, Optional, Set +import numpy as np import torch import torch.distributed as dist import torch.nn as nn @@ -10,10 +11,12 @@ from colossalai.cluster import ProcessGroupMesh from colossalai.pipeline.schedule.generate import GenerateSchedule from colossalai.pipeline.stage_manager import PipelineStageManager +from colossalai.shardformer import ShardConfig, ShardFormer from colossalai.shardformer._utils import getattr_ from .inference_config import InferenceConfig from .microbatch_manager import MicroBatchManager +from .policy.gpt2_ppinfer import GPT2LMHeadModelPipelinePolicy from .utils import get_suffix_name, set_tensors_to_none @@ -26,73 +29,27 @@ def __init__( pp_fwd: Callable, ) -> None: self.gerneration_config = gerneration_config - self.model = model self.pg_mesh = ProcessGroupMesh(gerneration_config.pp_size) self.stage_manager = PipelineStageManager(self.pg_mesh, 0, True) - self.held_layer = None self.mb_manager = MicroBatchManager(self.gerneration_config) self.schedule = GenerateSchedule(self.stage_manager, self.mb_manager) - - self._partion_model() - self._inject_fwd(pp_fwd) + self.model = self._shardformer(model) def inference(self, input_list): out = self.schedule.generate_step(self.model, iter(input_list)) return out - def _inject_fwd(self, pp_fwd: Callable): - stage_index = self._get_stage_index() - new_fwd = partial(pp_fwd, stage_manager=self.stage_manager, stage_index=stage_index) - bound_method = MethodType(new_fwd, self.model) - setattr(self.model, 'forward', bound_method) - - def _partion_model(self): - # get module list - module_list = self._recursive_partion(self.model, [], '') - - # allocate module to each stage - module_num = len(module_list) - stage_size = self.gerneration_config.pp_size - stage = self.stage_manager.stage - start = stage * (module_num // stage_size) + min(stage, module_num % stage_size) - end = start + (module_num // stage_size) + (1 if stage < module_num % stage_size else 0) - self.held_layer = module_list[start:end] - - # release layers dose not belong to current stage - self._release_unheld_layers(module_list) - - # load model to cuda - self.model = self.model.cuda() - - def _recursive_partion(self, module: nn.Module, module_list: List[str], suffix: str): - if len(list(module.children())) == 0: - module_list.append(suffix) - for name, child in module.named_children(): - suffix_name = get_suffix_name(suffix, name) - if child.__class__.__name__ in self.gerneration_config.stage_unit: - module_list.append(suffix_name) - else: - self._recursive_partion(child, module_list, suffix_name) - return module_list - - def _partion_batch(self): - pass - - def _release_unheld_layers(self, module_list: List[str]): - r""" - Release the unheld layers in the model - """ - held_layers = self.held_layer - set_tensors_to_none(self.model, include=set(module_list) - set(held_layers)) - - def _get_stage_index(self): - re_pattern = r'\[\d+\]' - prog = re.compile(re_pattern) - stage_idx = [] - for item in self.held_layer: - result = prog.search(item) - if result: - idx = result.group().replace('[', '').replace(']', '') - stage_idx.append(int(idx)) - - return [min(stage_idx), max(stage_idx) + 1] + def _shardformer(self, model): + shardconfig = ShardConfig( + tensor_parallel_process_group=None, + pipeline_stage_manager=self.stage_manager, + enable_tensor_parallelism=False, + enable_fused_normalization=False, + enable_all_optimization=False, + enable_flash_attention=False, + enable_jit_fused=False, + enable_sequence_parallelism=False, + ) + shardformer = ShardFormer(shard_config=shardconfig) + shard_model, _ = shardformer.optimize(model, GPT2LMHeadModelPipelinePolicy()) + return shard_model.cuda() diff --git a/colossalai/ppinference/inference_config.py b/colossalai/ppinference/inference_config.py index 8fdc9df3dd0a..019562a7a145 100644 --- a/colossalai/ppinference/inference_config.py +++ b/colossalai/ppinference/inference_config.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass from typing import List import torch @@ -11,7 +12,6 @@ class InferenceConfig: Args: pp_size (int): the number of pipeline stages. - stage_unit (List[str]): the unit module name can be sliced as a stage, should be `nn.Module`. micro_batch_size (int): the micro batch size. new_length (int): the new length of the input sequence. padding_token_id (int): the token id for padding. @@ -21,7 +21,6 @@ class InferenceConfig: def __init__( self, pp_size: int, - stage_unit: List[str], micro_batch_size: int = 1, micro_batch_buffer_size: int = None, new_length: int = 32, @@ -29,7 +28,6 @@ def __init__( ): assert isinstance(pp_size, int), f'pp_size must be an integer, got {type(pp_size)}' self.pp_size = pp_size - self.stage_unit = stage_unit self.micro_batch_size = micro_batch_size self.micro_batch_buffer_size = pp_size if micro_batch_buffer_size is None else micro_batch_buffer_size self.new_length = new_length diff --git a/colossalai/ppinference/policy/gpt2_ppinfer.py b/colossalai/ppinference/policy/gpt2_ppinfer.py new file mode 100644 index 000000000000..3e4ad30f96ed --- /dev/null +++ b/colossalai/ppinference/policy/gpt2_ppinfer.py @@ -0,0 +1,69 @@ +from functools import partial +from typing import Callable, Dict, List + +from torch import Tensor, nn + +import colossalai.shardformer.layer as col_nn +from colossalai.shardformer.policies.base_policy import ModulePolicyDescription, Policy, SubModuleReplacementDescription +from colossalai.shardformer.policies.gpt2 import GPT2Policy + +from ..modeling.gpt2 import GPT2PipelineForwards + + +class GPT2LMHeadModelPipelinePolicy(GPT2Policy): + + def __init__(self) -> None: + super().__init__() + + def module_policy(self): + from transformers.models.gpt2.modeling_gpt2 import GPT2LMHeadModel + + module_policy = super().module_policy() + + if self.shard_config.enable_tensor_parallelism: + addon_module = { + GPT2LMHeadModel: + ModulePolicyDescription(sub_module_replacement=[ + SubModuleReplacementDescription( + suffix="lm_head", target_module=col_nn.Linear1D_Col, kwargs={"gather_output": True}) + ]) + } + module_policy.update(addon_module) + + if self.pipeline_stage_manager is not None: + self.set_pipeline_forward(model_cls=GPT2LMHeadModel, + new_forward=GPT2PipelineForwards.gpt2_lmhead_model_forward, + policy=module_policy) + return module_policy + + def get_held_layers(self) -> List[nn.Module]: + held_layers = super().get_held_layers() + if self.pipeline_stage_manager.is_last_stage(): + held_layers.append(self.model.lm_head) + return held_layers + + def get_shared_params(self) -> List[Dict[int, Tensor]]: + '''The weights of wte and lm_head are shared.''' + module = self.model + stage_manager = self.pipeline_stage_manager + if stage_manager is not None: + if stage_manager.num_stages > 1 and id(module.transformer.wte.weight) == id(module.lm_head.weight): + first_stage, last_stage = 0, stage_manager.num_stages - 1 + return [{first_stage: module.transformer.wte.weight, last_stage: module.lm_head.weight}] + return [] + + def set_pipeline_forward(self, model_cls: nn.Module, new_forward: Callable, policy: Dict) -> None: + """If under pipeline parallel setting, replacing the original forward method of huggingface + to customized forward method, and add this changing to policy.""" + if not self.pipeline_stage_manager: + raise ValueError("set_pipeline_forward method can only be called when pipeline parallel is enabled.") + stage_manager = self.pipeline_stage_manager + if self.model.__class__.__name__ == 'GPT2Model': + module = self.model + else: + module = self.model.transformer + + layers_per_stage = Policy.distribute_layers(len(module.h), stage_manager.num_stages) + stage_index = Policy.get_stage_index(layers_per_stage, stage_manager.stage) + method_replacement = {'forward': partial(new_forward, stage_manager=stage_manager, stage_index=stage_index)} + self.append_or_create_method_replacement(description=method_replacement, policy=policy, target_key=model_cls) diff --git a/colossalai/ppinference/test.py b/colossalai/ppinference/test.py index 6a1fa7b2855f..894ff44b035d 100644 --- a/colossalai/ppinference/test.py +++ b/colossalai/ppinference/test.py @@ -23,11 +23,9 @@ def data_gen(): inputs[k] = v.to('cuda').repeat(*new_shape) model = transformers.GPT2LMHeadModel(transformers.GPT2Config(n_layer=8)) -infer_config = InferenceConfig(pp_size=4, new_length=8, micro_batch_size=2, stage_unit=['GPT2Block']) +infer_config = InferenceConfig(pp_size=4, new_length=8, micro_batch_size=2) engine = PPInferEngine(infer_config, model, GPT2PipelineForwards.gpt2_lmhead_model_forward) -print(engine.held_layer) - output = engine.inference([inputs]) if dist.get_rank() == 3: print(len(output)) From 33a9cf86d1dd5b73a36ab15328c826e69c607447 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Thu, 24 Aug 2023 18:09:21 +0800 Subject: [PATCH 11/17] refactor some code and use the same attribute name of hf --- colossalai/pipeline/schedule/generate.py | 47 +++++++++++++----------- colossalai/ppinference/modeling/gpt2.py | 4 +- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/colossalai/pipeline/schedule/generate.py b/colossalai/pipeline/schedule/generate.py index 567dffb26c5f..0bd620b00706 100644 --- a/colossalai/pipeline/schedule/generate.py +++ b/colossalai/pipeline/schedule/generate.py @@ -56,7 +56,27 @@ def load_micro_batch(self) -> Any: self.microbatch_offset += self.microbatch_size return tree_map(partial(to_device, device=get_current_device()), micro_batch) - def postprocess_new_inputs(self, input_ids): + def _prepare_stage_inputs(self): + # first stage and in prefill phase + if self.stage_manager.is_first_stage() and self.mb_manager.cur_state is PREFILL: + pre_stage_out = None + model_inputs = self.load_micro_batch() + hidden_states = None + # first stage and in generate phase + elif self.stage_manager.is_first_stage(): + pre_stage_out = self.comm.recv_forward() + model_inputs = self._prepare_next_token(pre_stage_out) + hidden_states = None + # not first stage and in gererate phase + else: + pre_stage_out = self.comm.recv_forward() + model_inputs = { + 'past_key_values': self.mb_manager.cur_kv_cache + } if self.mb_manager.cur_kv_cache is not None else None + hidden_states = pre_stage_out + return pre_stage_out, model_inputs, hidden_states + + def _prepare_next_token(self, input_ids): new_mask = self.mb_manager.cur_descrption.attn_mask new_mask = torch.cat((new_mask, torch.ones((new_mask.shape[0], 1), dtype=torch.int64).cuda()), dim=-1) self.mb_manager.cur_descrption.attn_mask = new_mask @@ -90,30 +110,13 @@ def generate_step(self, model: Module, data_iter: Iterable) -> Union[torch.Tenso # run by round for _ in range(self.round): state = PREFILL - step = 1 while self.mb_manager.is_micro_batch_done() is False: - # first stage and in prefill phase - if self.stage_manager.is_first_stage() and self.mb_manager.cur_state is PREFILL: - input_obj = None - micro_batch = self.load_micro_batch() - hidden_states = None - # first stage and in generate phase - elif self.stage_manager.is_first_stage(): - input_obj = self.comm.recv_forward() - micro_batch = self.postprocess_new_inputs(input_obj) - hidden_states = None - # not first stage and in gererate phase - else: - input_obj = self.comm.recv_forward() - micro_batch = { - 'past_key_values': self.mb_manager.cur_kv_cache - } if self.mb_manager.cur_kv_cache is not None else None - hidden_states = input_obj + pre_stage_out, model_inputs, hidden_states = self._prepare_stage_inputs() - output_obj = model_forward(model, micro_batch, hidden_states) + output_obj = model_forward(model, model_inputs, hidden_states) - past_kv_cache = output_obj.get('past_kv_cache', None) - state = self.mb_manager.step(micro_batch, input_obj, past_kv_cache) + past_key_values = output_obj.get('past_key_values', None) + state = self.mb_manager.step(model_inputs, pre_stage_out, past_key_values) if self.stage_manager.is_last_stage(): new_token = self.get_token_id(output_obj['hidden_states']) self.mb_manager.add_new_tokens(new_token) diff --git a/colossalai/ppinference/modeling/gpt2.py b/colossalai/ppinference/modeling/gpt2.py index 2289cdf157ff..773fb2a07899 100644 --- a/colossalai/ppinference/modeling/gpt2.py +++ b/colossalai/ppinference/modeling/gpt2.py @@ -218,7 +218,7 @@ def custom_forward(*inputs): if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) - return {'hidden_states': hidden_states, 'past_kv_cache': presents} + return {'hidden_states': hidden_states, 'past_key_values': presents} @staticmethod def gpt2_lmhead_model_forward( @@ -289,4 +289,4 @@ def gpt2_lmhead_model_forward( output = (lm_logits,) + outputs[1:] return ((loss,) + output) if loss is not None else output - return {'hidden_states': lm_logits, 'past_kv_cache': outputs['past_kv_cache']} + return {'hidden_states': lm_logits, 'past_key_values': outputs['past_key_values']} From ba01e8a37ade8f1e54ed26cb68b6d37eb07b0554 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Mon, 28 Aug 2023 16:41:14 +0800 Subject: [PATCH 12/17] fix review and add test for generation --- colossalai/inference/__init__.py | 3 + colossalai/inference/pipeline/__init__.py | 3 + colossalai/inference/pipeline/engine.py | 93 +++++++++++++++++++ .../pipeline}/microbatch_manager.py | 46 +++++---- .../pipeline}/modeling/__init__.py | 0 .../pipeline}/modeling/gpt2.py | 0 .../pipeline}/policy/gpt2_ppinfer.py | 0 .../pipeline}/test.py | 7 +- .../pipeline}/utils.py | 0 colossalai/pipeline/schedule/generate.py | 21 ++--- colossalai/ppinference/__init__.py | 4 - colossalai/ppinference/engine.py | 55 ----------- colossalai/ppinference/inference_config.py | 34 ------- tests/test_inference/test_pipeline_infer.py | 58 ++++++++++++ 14 files changed, 196 insertions(+), 128 deletions(-) create mode 100644 colossalai/inference/__init__.py create mode 100644 colossalai/inference/pipeline/__init__.py create mode 100644 colossalai/inference/pipeline/engine.py rename colossalai/{ppinference => inference/pipeline}/microbatch_manager.py (81%) rename colossalai/{ppinference => inference/pipeline}/modeling/__init__.py (100%) rename colossalai/{ppinference => inference/pipeline}/modeling/gpt2.py (100%) rename colossalai/{ppinference => inference/pipeline}/policy/gpt2_ppinfer.py (100%) rename colossalai/{ppinference => inference/pipeline}/test.py (73%) rename colossalai/{ppinference => inference/pipeline}/utils.py (100%) delete mode 100644 colossalai/ppinference/__init__.py delete mode 100644 colossalai/ppinference/engine.py delete mode 100644 colossalai/ppinference/inference_config.py create mode 100644 tests/test_inference/test_pipeline_infer.py diff --git a/colossalai/inference/__init__.py b/colossalai/inference/__init__.py new file mode 100644 index 000000000000..999f90e8414e --- /dev/null +++ b/colossalai/inference/__init__.py @@ -0,0 +1,3 @@ +from .pipeline import PPInferEngine + +__all__ = "pipeline" diff --git a/colossalai/inference/pipeline/__init__.py b/colossalai/inference/pipeline/__init__.py new file mode 100644 index 000000000000..aff4568f7d08 --- /dev/null +++ b/colossalai/inference/pipeline/__init__.py @@ -0,0 +1,3 @@ +from .engine import PPInferEngine + +__all__ = ['PPInferEngine'] diff --git a/colossalai/inference/pipeline/engine.py b/colossalai/inference/pipeline/engine.py new file mode 100644 index 000000000000..0c748d725d5d --- /dev/null +++ b/colossalai/inference/pipeline/engine.py @@ -0,0 +1,93 @@ +import re +from functools import partial +from types import MethodType +from typing import Callable, List, Optional, Set + +import numpy as np +import torch +import torch.distributed as dist +import torch.nn as nn + +from colossalai.cluster import ProcessGroupMesh +from colossalai.pipeline.schedule.generate import GenerateSchedule +from colossalai.pipeline.stage_manager import PipelineStageManager +from colossalai.shardformer import ShardConfig, ShardFormer +from colossalai.shardformer._utils import getattr_ +from colossalai.shardformer.policies.base_policy import Policy + +from .microbatch_manager import MicroBatchManager +from .policy.gpt2_ppinfer import GPT2LMHeadModelPipelinePolicy +from .utils import get_suffix_name, set_tensors_to_none + + +class PPInferEngine: + ''' + PPInferEngine is a class that handles the pipeline parallel inference. + + Args: + pp_size (int): the number of pipeline stages. + pp_model (`nn.Module`): the model already in pipeline parallelism style. + model (`nn.Module`): the model not in pipeline style, and will be modified with `ShardFormer`. + model_policy (`colossalai.shardformer.policies.base_policy.Policy`): the policy to shardformer model. + micro_batch_size (int): the micro batch size. + micro_batch_buffer_size (int): the buffer size for micro batch. Normally, it should be the same as the number of pipeline stages. + new_length (int): the new length of the input sequence. + early_stopping (bool): whether to stop early. + + Example: + + ```python + from colossalai.ppinference import PPInferEngine + from transformers import GPT2LMHeadModel, GPT2Tokenizer + + model = transformers.GPT2LMHeadModel.from_pretrained('gpt2') + # assume the model is infered with 4 pipeline stages + inferengine = PPInferEngine(pp_size=4, model=model, model_policy={Your own policy for pipeline sharding}) + + input = ["Hello, my dog is cute, and I like"] + tokenized_input = tokenizer(input, return_tensors='pt') + output = engine.inference([tokenized_input]) + ``` + + ''' + + def __init__( + self, + pp_size: int, + pp_model: nn.Module = None, + model: nn.Module = None, + model_policy: Policy = None, + new_length: int = 32, + micro_batch_size: int = 1, + micro_batch_buffer_size: int = None, + # TODO: implement early_stopping, and various gerneration options + early_stopping: bool = False, + do_sample: bool = False, + num_beams: int = 1, + ) -> None: + assert pp_model or (model and model_policy), "Either pp_model or model with model_policy should be provided." + self.pp_size = pp_size + self.pg_mesh = ProcessGroupMesh(pp_size) + self.stage_manager = PipelineStageManager(self.pg_mesh, 0, True) + self.mb_manager = MicroBatchManager(new_length, micro_batch_size, micro_batch_buffer_size or pp_size) + self.schedule = GenerateSchedule(self.stage_manager, self.mb_manager) + self.model = pp_model or self._shardformer(model, model_policy) + + def inference(self, input_list): + out = self.schedule.generate_step(self.model, iter(input_list)) + return out + + def _shardformer(self, model, model_policy): + shardconfig = ShardConfig( + tensor_parallel_process_group=None, + pipeline_stage_manager=self.stage_manager, + enable_tensor_parallelism=False, + enable_fused_normalization=False, + enable_all_optimization=False, + enable_flash_attention=False, + enable_jit_fused=False, + enable_sequence_parallelism=False, + ) + shardformer = ShardFormer(shard_config=shardconfig) + shard_model, _ = shardformer.optimize(model, model_policy) + return shard_model.cuda() diff --git a/colossalai/ppinference/microbatch_manager.py b/colossalai/inference/pipeline/microbatch_manager.py similarity index 81% rename from colossalai/ppinference/microbatch_manager.py rename to colossalai/inference/pipeline/microbatch_manager.py index 83a3b99e27e8..f54396bb3747 100644 --- a/colossalai/ppinference/microbatch_manager.py +++ b/colossalai/inference/pipeline/microbatch_manager.py @@ -1,22 +1,23 @@ +from enum import Enum from typing import Dict import torch -from .inference_config import InferenceConfig - __all__ = 'MicroBatchManager' -PREFILL = 1 -GENERATE = 2 -DONE = 3 + +class Status(Enum): + PREFILL = 1 + GENERATE = 2 + DONE = 3 class MicroBatchDescription(): def __init__( self, - mb_inputs: torch.Tensor, - interval_inputs, + mb_inputs: Dict[str, torch.Tensor], + interval_inputs: Dict[str, torch.Tensor], new_length: int, ) -> None: if mb_inputs is not None: @@ -56,26 +57,31 @@ def state(self): """ if self.cur_length == self.target_length: - return DONE + return Status.DONE else: - return GENERATE + return Status.GENERATE class MicroBatchManager(): - - def __init__( - self, - pp_inference_config: InferenceConfig, - ): - self.pp_inference_config = pp_inference_config + ''' + MicroBatchManager is a class that manages the micro batch. + + Args: + new_length (int): the new length of the input sequence. + micro_batch_size (int): the micro batch size. + micro_batch_buffer_size (int): the buffer size for micro batch. Normally, it should be the same as the number of pipeline stages. + ''' + + def __init__(self, new_length: int, micro_batch_size: int, micro_batch_buffer_size: int): + self.new_length = new_length + self.micro_batch_size = micro_batch_size + self.buffer_size = micro_batch_buffer_size self.mb_descrption_buffer = {} self.new_tokens_buffer = {} - self.buffer_size = pp_inference_config.micro_batch_buffer_size self.idx = 0 def _add_descrption(self, mb_inputs: Dict[str, torch.Tensor], inter_inputs: Dict[str, torch.Tensor]): - self.mb_descrption_buffer[self.idx] = MicroBatchDescription(mb_inputs, inter_inputs, - self.pp_inference_config.new_length) + self.mb_descrption_buffer[self.idx] = MicroBatchDescription(mb_inputs, inter_inputs, self.new_length) def _update_descrption(self, present_kv): self.mb_descrption_buffer[self.idx].update(present_kv) @@ -106,7 +112,7 @@ def is_micro_batch_done(self): if len(self.mb_descrption_buffer) == 0: return False for mb in self.mb_descrption_buffer.values(): - if mb.state != DONE: + if mb.state != Status.DONE: return False self.mb_descrption_buffer.clear() return True @@ -140,5 +146,5 @@ def cur_state(self): """ if self.cur_descrption is None: - return PREFILL + return Status.PREFILL return self.cur_descrption.state diff --git a/colossalai/ppinference/modeling/__init__.py b/colossalai/inference/pipeline/modeling/__init__.py similarity index 100% rename from colossalai/ppinference/modeling/__init__.py rename to colossalai/inference/pipeline/modeling/__init__.py diff --git a/colossalai/ppinference/modeling/gpt2.py b/colossalai/inference/pipeline/modeling/gpt2.py similarity index 100% rename from colossalai/ppinference/modeling/gpt2.py rename to colossalai/inference/pipeline/modeling/gpt2.py diff --git a/colossalai/ppinference/policy/gpt2_ppinfer.py b/colossalai/inference/pipeline/policy/gpt2_ppinfer.py similarity index 100% rename from colossalai/ppinference/policy/gpt2_ppinfer.py rename to colossalai/inference/pipeline/policy/gpt2_ppinfer.py diff --git a/colossalai/ppinference/test.py b/colossalai/inference/pipeline/test.py similarity index 73% rename from colossalai/ppinference/test.py rename to colossalai/inference/pipeline/test.py index 894ff44b035d..952044f0ff5a 100644 --- a/colossalai/ppinference/test.py +++ b/colossalai/inference/pipeline/test.py @@ -1,10 +1,10 @@ import torch import torch.distributed as dist import transformers -from modeling.gpt2 import GPT2PipelineForwards import colossalai -from colossalai.ppinference import InferenceConfig, PPInferEngine +from colossalai.inference import PPInferEngine +from colossalai.inference.pipeline.policy.gpt2_ppinfer import GPT2LMHeadModelPipelinePolicy colossalai.launch_from_torch(config={}) @@ -23,8 +23,7 @@ def data_gen(): inputs[k] = v.to('cuda').repeat(*new_shape) model = transformers.GPT2LMHeadModel(transformers.GPT2Config(n_layer=8)) -infer_config = InferenceConfig(pp_size=4, new_length=8, micro_batch_size=2) -engine = PPInferEngine(infer_config, model, GPT2PipelineForwards.gpt2_lmhead_model_forward) +engine = PPInferEngine(pp_size=4, model=model, model_policy=GPT2LMHeadModelPipelinePolicy(), new_length=8) output = engine.inference([inputs]) if dist.get_rank() == 3: diff --git a/colossalai/ppinference/utils.py b/colossalai/inference/pipeline/utils.py similarity index 100% rename from colossalai/ppinference/utils.py rename to colossalai/inference/pipeline/utils.py diff --git a/colossalai/pipeline/schedule/generate.py b/colossalai/pipeline/schedule/generate.py index 0bd620b00706..e12616655d32 100644 --- a/colossalai/pipeline/schedule/generate.py +++ b/colossalai/pipeline/schedule/generate.py @@ -1,18 +1,17 @@ from functools import partial -from typing import Any, Iterable, Optional, Union +from typing import Any, Dict, Iterable, Optional, Union import torch import torch.cuda from torch.nn import Module from torch.utils._pytree import tree_map -from colossalai.interface import OptimizerWrapper +from colossalai.inference.pipeline.microbatch_manager import MicroBatchManager, Status from colossalai.pipeline.p2p import PipelineP2PCommunication from colossalai.pipeline.stage_manager import PipelineStageManager -from colossalai.ppinference.microbatch_manager import DONE, GENERATE, PREFILL, MicroBatchManager from colossalai.utils.cuda import get_current_device -from ._utils import detach, get_batch_size, get_micro_batch, merge_batch, model_forward, retain_grad, to_device +from ._utils import get_batch_size, get_micro_batch, model_forward, to_device from .base import PipelineSchedule @@ -22,7 +21,7 @@ def __init__(self, stage_manager: PipelineStageManager, mb_manager: MicroBatchMa super().__init__(stage_manager) self.comm = PipelineP2PCommunication(stage_manager) self.mb_manager = mb_manager - self.microbatch_size = mb_manager.pp_inference_config.micro_batch_size + self.microbatch_size = mb_manager.micro_batch_size self.batch: Optional[Any] = None self.batch_size: Optional[int] = None self.microbatch_offset: Optional[int] = None @@ -58,7 +57,7 @@ def load_micro_batch(self) -> Any: def _prepare_stage_inputs(self): # first stage and in prefill phase - if self.stage_manager.is_first_stage() and self.mb_manager.cur_state is PREFILL: + if self.stage_manager.is_first_stage() and self.mb_manager.cur_state is Status.PREFILL: pre_stage_out = None model_inputs = self.load_micro_batch() hidden_states = None @@ -76,13 +75,13 @@ def _prepare_stage_inputs(self): hidden_states = pre_stage_out return pre_stage_out, model_inputs, hidden_states - def _prepare_next_token(self, input_ids): + def _prepare_next_token(self, inputs: Dict[str, torch.Tensor]): new_mask = self.mb_manager.cur_descrption.attn_mask - new_mask = torch.cat((new_mask, torch.ones((new_mask.shape[0], 1), dtype=torch.int64).cuda()), dim=-1) + new_mask = torch.cat((new_mask, torch.ones((new_mask.shape[0], 1), dtype=torch.int64, device='cuda')), dim=-1) self.mb_manager.cur_descrption.attn_mask = new_mask past_key_values = self.mb_manager.cur_descrption.kv_cache - return dict(input_ids=input_ids['new_token'], attention_mask=new_mask, past_key_values=past_key_values) + return dict(input_ids=inputs['new_token'], attention_mask=new_mask, past_key_values=past_key_values) def get_token_id(self, hidden_state: torch.Tensor) -> torch.Tensor: last_hidden_state = hidden_state[:, -1] @@ -109,7 +108,7 @@ def generate_step(self, model: Module, data_iter: Iterable) -> Union[torch.Tenso # run by round for _ in range(self.round): - state = PREFILL + state = Status.PREFILL while self.mb_manager.is_micro_batch_done() is False: pre_stage_out, model_inputs, hidden_states = self._prepare_stage_inputs() @@ -120,7 +119,7 @@ def generate_step(self, model: Module, data_iter: Iterable) -> Union[torch.Tenso if self.stage_manager.is_last_stage(): new_token = self.get_token_id(output_obj['hidden_states']) self.mb_manager.add_new_tokens(new_token) - if state is not DONE: + if state is not Status.DONE: self.comm.send_forward({'new_token': new_token}) else: self.comm.send_forward({'hidden_states': output_obj['hidden_states']}) diff --git a/colossalai/ppinference/__init__.py b/colossalai/ppinference/__init__.py deleted file mode 100644 index 95adf4865b81..000000000000 --- a/colossalai/ppinference/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .engine import PPInferEngine -from .inference_config import InferenceConfig - -__all__ = ['PPInferEngine', 'InferenceConfig'] diff --git a/colossalai/ppinference/engine.py b/colossalai/ppinference/engine.py deleted file mode 100644 index 716d5cbe1442..000000000000 --- a/colossalai/ppinference/engine.py +++ /dev/null @@ -1,55 +0,0 @@ -import re -from functools import partial -from types import MethodType -from typing import Callable, List, Optional, Set - -import numpy as np -import torch -import torch.distributed as dist -import torch.nn as nn - -from colossalai.cluster import ProcessGroupMesh -from colossalai.pipeline.schedule.generate import GenerateSchedule -from colossalai.pipeline.stage_manager import PipelineStageManager -from colossalai.shardformer import ShardConfig, ShardFormer -from colossalai.shardformer._utils import getattr_ - -from .inference_config import InferenceConfig -from .microbatch_manager import MicroBatchManager -from .policy.gpt2_ppinfer import GPT2LMHeadModelPipelinePolicy -from .utils import get_suffix_name, set_tensors_to_none - - -class PPInferEngine: - - def __init__( - self, - gerneration_config: InferenceConfig, - model: nn.Module, - pp_fwd: Callable, - ) -> None: - self.gerneration_config = gerneration_config - self.pg_mesh = ProcessGroupMesh(gerneration_config.pp_size) - self.stage_manager = PipelineStageManager(self.pg_mesh, 0, True) - self.mb_manager = MicroBatchManager(self.gerneration_config) - self.schedule = GenerateSchedule(self.stage_manager, self.mb_manager) - self.model = self._shardformer(model) - - def inference(self, input_list): - out = self.schedule.generate_step(self.model, iter(input_list)) - return out - - def _shardformer(self, model): - shardconfig = ShardConfig( - tensor_parallel_process_group=None, - pipeline_stage_manager=self.stage_manager, - enable_tensor_parallelism=False, - enable_fused_normalization=False, - enable_all_optimization=False, - enable_flash_attention=False, - enable_jit_fused=False, - enable_sequence_parallelism=False, - ) - shardformer = ShardFormer(shard_config=shardconfig) - shard_model, _ = shardformer.optimize(model, GPT2LMHeadModelPipelinePolicy()) - return shard_model.cuda() diff --git a/colossalai/ppinference/inference_config.py b/colossalai/ppinference/inference_config.py deleted file mode 100644 index 019562a7a145..000000000000 --- a/colossalai/ppinference/inference_config.py +++ /dev/null @@ -1,34 +0,0 @@ -from dataclasses import dataclass -from typing import List - -import torch - -__all__ = 'InferenceConfig' - - -class InferenceConfig: - ''' - InferenceConfig is a class that stores the configuration for inference. - - Args: - pp_size (int): the number of pipeline stages. - micro_batch_size (int): the micro batch size. - new_length (int): the new length of the input sequence. - padding_token_id (int): the token id for padding. - - ''' - - def __init__( - self, - pp_size: int, - micro_batch_size: int = 1, - micro_batch_buffer_size: int = None, - new_length: int = 32, - padding_token_id: int = 0, - ): - assert isinstance(pp_size, int), f'pp_size must be an integer, got {type(pp_size)}' - self.pp_size = pp_size - self.micro_batch_size = micro_batch_size - self.micro_batch_buffer_size = pp_size if micro_batch_buffer_size is None else micro_batch_buffer_size - self.new_length = new_length - self.padding_token_id = padding_token_id diff --git a/tests/test_inference/test_pipeline_infer.py b/tests/test_inference/test_pipeline_infer.py new file mode 100644 index 000000000000..71e46e252b7d --- /dev/null +++ b/tests/test_inference/test_pipeline_infer.py @@ -0,0 +1,58 @@ +from copy import deepcopy + +import torch +import torch.distributed as dist +import torch.nn as nn +import transformers + +import colossalai +from colossalai.inference.pipeline.engine import PPInferEngine +from colossalai.inference.pipeline.policy.gpt2_ppinfer import GPT2LMHeadModelPipelinePolicy +from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn + + +def data_gen(): + input_ids = torch.tensor([[15496, 11, 616, 3290, 318, 13779, 318, 13779]], dtype=torch.int64) + attention_mask = torch.tensor([[1, 1, 1, 1, 1, 1, 1, 1]], dtype=torch.int64) + return dict(input_ids=input_ids, attention_mask=attention_mask) + + +inputs = data_gen() +for k, v in inputs.items(): + if torch.is_tensor(v) or 'Tensor' in v.__class__.__name__: + new_shape = [1] * v.dim() + new_shape[0] = 16 + inputs[k] = v.to('cuda').repeat(*new_shape) + + +def pipeline_inference_test(pp_size, new_length, micro_batch_size): + model = transformers.GPT2LMHeadModel(transformers.GPT2Config(n_layer=8)) + engine = PPInferEngine(pp_size=pp_size, + model=model, + model_policy=GPT2LMHeadModelPipelinePolicy(), + new_length=new_length, + micro_batch_size=micro_batch_size) + output = engine.inference([inputs]) + if dist.get_rank() == dist.get_world_size() - 1: + assert len(output[0]) == new_length, f"{len(output)}, {new_length}" + + +@parameterize('pp_size', [4]) +@parameterize('new_length', [4, 8, 16]) +@parameterize('micro_batch_size', [1, 4]) +def run_pipeline_inference_test(pp_size, new_length, micro_batch_size): + pipeline_inference_test(pp_size, new_length, micro_batch_size) + + +def check_pipeline_inference(rank, world_size, port): + colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + run_pipeline_inference_test() + + +@rerun_if_address_is_in_use() +def test_pipeline_inference(): + spawn(check_pipeline_inference, nprocs=4) + + +if __name__ == '__main__': + test_pipeline_inference() From 0e3a5c1853f7e5717f35f1fe9b33b9c0cddc0abc Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Mon, 28 Aug 2023 16:52:26 +0800 Subject: [PATCH 13/17] remove unused file --- colossalai/inference/pipeline/test.py | 30 --------------------------- 1 file changed, 30 deletions(-) delete mode 100644 colossalai/inference/pipeline/test.py diff --git a/colossalai/inference/pipeline/test.py b/colossalai/inference/pipeline/test.py deleted file mode 100644 index 952044f0ff5a..000000000000 --- a/colossalai/inference/pipeline/test.py +++ /dev/null @@ -1,30 +0,0 @@ -import torch -import torch.distributed as dist -import transformers - -import colossalai -from colossalai.inference import PPInferEngine -from colossalai.inference.pipeline.policy.gpt2_ppinfer import GPT2LMHeadModelPipelinePolicy - -colossalai.launch_from_torch(config={}) - - -def data_gen(): - input_ids = torch.tensor([[15496, 11, 616, 3290, 318, 13779, 318, 13779]], dtype=torch.int64) - attention_mask = torch.tensor([[1, 1, 1, 1, 1, 1, 1, 1]], dtype=torch.int64) - return dict(input_ids=input_ids, attention_mask=attention_mask) - - -inputs = data_gen() -for k, v in inputs.items(): - if torch.is_tensor(v) or 'Tensor' in v.__class__.__name__: - new_shape = [1] * v.dim() - new_shape[0] = 8 - inputs[k] = v.to('cuda').repeat(*new_shape) - -model = transformers.GPT2LMHeadModel(transformers.GPT2Config(n_layer=8)) -engine = PPInferEngine(pp_size=4, model=model, model_policy=GPT2LMHeadModelPipelinePolicy(), new_length=8) - -output = engine.inference([inputs]) -if dist.get_rank() == 3: - print(len(output)) From b07321d3ae0d90b851f686c7ce43c69c3420df1a Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Wed, 30 Aug 2023 09:59:35 +0800 Subject: [PATCH 14/17] fix CI --- .../test_pipeline_infer.py | 6 +++++- tests/test_lazy/test_models.py | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) rename tests/{test_inference => test_generate}/test_pipeline_infer.py (91%) diff --git a/tests/test_inference/test_pipeline_infer.py b/tests/test_generate/test_pipeline_infer.py similarity index 91% rename from tests/test_inference/test_pipeline_infer.py rename to tests/test_generate/test_pipeline_infer.py index 71e46e252b7d..f328d08e3078 100644 --- a/tests/test_inference/test_pipeline_infer.py +++ b/tests/test_generate/test_pipeline_infer.py @@ -1,5 +1,6 @@ from copy import deepcopy +import pytest import torch import torch.distributed as dist import torch.nn as nn @@ -8,7 +9,7 @@ import colossalai from colossalai.inference.pipeline.engine import PPInferEngine from colossalai.inference.pipeline.policy.gpt2_ppinfer import GPT2LMHeadModelPipelinePolicy -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn +from colossalai.testing import clear_cache_before_run, parameterize, rerun_if_address_is_in_use, spawn def data_gen(): @@ -40,6 +41,7 @@ def pipeline_inference_test(pp_size, new_length, micro_batch_size): @parameterize('pp_size', [4]) @parameterize('new_length', [4, 8, 16]) @parameterize('micro_batch_size', [1, 4]) +@clear_cache_before_run() def run_pipeline_inference_test(pp_size, new_length, micro_batch_size): pipeline_inference_test(pp_size, new_length, micro_batch_size) @@ -49,7 +51,9 @@ def check_pipeline_inference(rank, world_size, port): run_pipeline_inference_test() +@pytest.mark.dist @rerun_if_address_is_in_use() +@clear_cache_before_run() def test_pipeline_inference(): spawn(check_pipeline_inference, nprocs=4) diff --git a/tests/test_lazy/test_models.py b/tests/test_lazy/test_models.py index 18a737fcec85..204c8bfb5e28 100644 --- a/tests/test_lazy/test_models.py +++ b/tests/test_lazy/test_models.py @@ -10,6 +10,7 @@ def test_torchvision_models_lazy_init(subset, default_device): sub_model_zoo = model_zoo.get_sub_registry(subset) for name, entry in sub_model_zoo.items(): + print(subset, default_device, name) # TODO(ver217): lazy init does not support weight norm, skip these models if name in ('torchaudio_wav2vec2_base', 'torchaudio_hubert_base') or name.startswith('transformers_llama') or name.startswith( From 9eabcb52e45191bf5f6098022136edc3ba560a7a Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Wed, 30 Aug 2023 13:58:01 +0800 Subject: [PATCH 15/17] add cache clear --- tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py | 2 ++ tests/test_generate/test_pipeline_infer.py | 1 + 2 files changed, 3 insertions(+) diff --git a/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py index a94e8d42c78e..4261f5ae26c6 100644 --- a/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py @@ -51,6 +51,7 @@ def check_low_level_zero_checkpointIO(stage: int, shard: bool): booster.load_optimizer(new_optimizer, optimizer_ckpt_path) check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict(), False) + torch.cuda.empty_cache() def run_dist(rank, world_size, port): @@ -59,6 +60,7 @@ def run_dist(rank, world_size, port): @rerun_if_address_is_in_use() +@clear_cache_before_run() def test_low_level_zero_checkpointIO(): spawn(run_dist, 2) diff --git a/tests/test_generate/test_pipeline_infer.py b/tests/test_generate/test_pipeline_infer.py index f328d08e3078..5bc2f1857536 100644 --- a/tests/test_generate/test_pipeline_infer.py +++ b/tests/test_generate/test_pipeline_infer.py @@ -44,6 +44,7 @@ def pipeline_inference_test(pp_size, new_length, micro_batch_size): @clear_cache_before_run() def run_pipeline_inference_test(pp_size, new_length, micro_batch_size): pipeline_inference_test(pp_size, new_length, micro_batch_size) + torch.cuda.empty_cache() def check_pipeline_inference(rank, world_size, port): From 7578ab9eb29fceba0de7f69fc99275d77cdafdc6 Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Wed, 30 Aug 2023 16:55:40 +0800 Subject: [PATCH 16/17] fix code error --- colossalai/inference/__init__.py | 2 +- tests/test_lazy/test_models.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/colossalai/inference/__init__.py b/colossalai/inference/__init__.py index 999f90e8414e..d62ba0d5de66 100644 --- a/colossalai/inference/__init__.py +++ b/colossalai/inference/__init__.py @@ -1,3 +1,3 @@ from .pipeline import PPInferEngine -__all__ = "pipeline" +__all__ = "PPInferEngine" diff --git a/tests/test_lazy/test_models.py b/tests/test_lazy/test_models.py index 204c8bfb5e28..18a737fcec85 100644 --- a/tests/test_lazy/test_models.py +++ b/tests/test_lazy/test_models.py @@ -10,7 +10,6 @@ def test_torchvision_models_lazy_init(subset, default_device): sub_model_zoo = model_zoo.get_sub_registry(subset) for name, entry in sub_model_zoo.items(): - print(subset, default_device, name) # TODO(ver217): lazy init does not support weight norm, skip these models if name in ('torchaudio_wav2vec2_base', 'torchaudio_hubert_base') or name.startswith('transformers_llama') or name.startswith( From 07f56b3ad6b195338988fda08cf7200cb5516bac Mon Sep 17 00:00:00 2001 From: FoolPlayer <498107402@qq.com> Date: Wed, 30 Aug 2023 17:06:36 +0800 Subject: [PATCH 17/17] fix typo --- colossalai/inference/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/colossalai/inference/__init__.py b/colossalai/inference/__init__.py index d62ba0d5de66..db33ae6fe998 100644 --- a/colossalai/inference/__init__.py +++ b/colossalai/inference/__init__.py @@ -1,3 +1,3 @@ from .pipeline import PPInferEngine -__all__ = "PPInferEngine" +__all__ = ['PPInferEngine']