From 388678cbf8f82e5b931020c7288a34f5ec3c57cf Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Mon, 13 Nov 2023 12:20:03 +0800 Subject: [PATCH 01/13] support ddp --- colossalai/booster/plugin/gemini_plugin.py | 41 ++++++++++++++++--- colossalai/zero/gemini/chunk/chunk.py | 22 +++++++--- colossalai/zero/gemini/chunk/manager.py | 12 ++++-- colossalai/zero/gemini/gemini_ddp.py | 34 ++++++++++----- .../test_plugin/test_gemini_plugin.py | 12 +++--- .../test_gemini_checkpoint_io.py | 9 ++-- 6 files changed, 96 insertions(+), 34 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index 9c7dc6836c1e..7f4a797b0c70 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -10,6 +10,7 @@ from torch.optim import Optimizer from torch.optim.lr_scheduler import _LRScheduler as LRScheduler from torch.utils.data import DataLoader +from torch.nn.parallel import DistributedDataParallel as DDP from colossalai.checkpoint_io import CheckpointIndexFile, CheckpointIO, GeneralCheckpointIO from colossalai.checkpoint_io.utils import ( @@ -34,8 +35,9 @@ SUPPORTED_PRECISION = ["fp16", "bf16"] PRECISION_STR_TO_DTYPE = {"fp16": torch.half, "bf16": torch.bfloat16} -DP_AXIS = 0 +ZERO_AXIS = 0 TP_AXIS = 1 +DDP_AXIS = 2 def get_param_info(optim: Optimizer): # Get a backup of necessary information of parameters for future use, which includes: @@ -315,6 +317,12 @@ class GeminiPlugin(DPPluginBase): enable_sequence_parallelism (bool): Whether to turn on sequence parallelism in Shardformer. Defaults to False. enable_sequence_overlap (bool): Whether to turn on sequence overlap in Shardformer. Defaults to False. verbose (bool, optional): verbose mode. Debug info including chunk search result will be printed. Defaults to False. + broadcast_buffers (bool, optional): Whether to broadcast buffers in the beginning of training when using DDP. Defaults to True. + ddp_bucket_cap_mb (int, optional): The bucket size in MB when using DDP. Defaults to 25. + find_unused_parameters (bool, optional): Whether to find unused parameters when using DDP. Defaults to False. + check_reduction (bool, optional): Whether to check reduction when using DDP. Defaults to False. + gradient_as_bucket_view (bool, optional): Whether to use gradient as bucket view when using DDP. Defaults to False. + static_graph (bool, optional): Whether to use static graph when using DDP. Defaults to False. """ def __init__( @@ -349,12 +357,19 @@ def __init__( norm_type: float = 2.0, enable_tensor_parallelism: bool = False, tp_size: int = 1, + ddp_size:int = 1, enable_all_optimization: bool = False, enable_fused_normalization: bool = False, enable_flash_attention: bool = False, enable_sequence_parallelism: bool = False, enable_jit_fused: bool = False, enable_sequence_overlap: bool = False, + broadcast_buffers: bool = True, + ddp_bucket_cap_mb: int = 25, + find_unused_parameters: bool = False, + check_reduction: bool = False, + gradient_as_bucket_view: bool = False, + static_graph: bool = False, verbose: bool = False, ) -> None: super().__init__() @@ -403,11 +418,13 @@ def __init__( self.verbose = verbose self.tp_size = tp_size if self.enable_tensor_parallelism else 1 - self.dp_size = dist.get_world_size() // self.tp_size - assert self.dp_size > 1, f"The size of the DP group should be greater than 1. Please reduce the TP group size." - self.pg_mesh = ProcessGroupMesh(self.dp_size, self.tp_size) - self.dp_group = self.pg_mesh.get_group_along_axis(DP_AXIS) + self.ddp_size = ddp_size + self.use_ddp = self.ddp_size > 1 + self.zero_size = dist.get_world_size() // (self.tp_size * self.ddp_size) + self.pg_mesh = ProcessGroupMesh(self.zero_size, self.tp_size, self.ddp_size) + self.zero_group = self.pg_mesh.get_group_along_axis(ZERO_AXIS) self.tp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) + self.ddp_group = self.pg_mesh.get_group_along_axis(DDP_AXIS) self.shard_config = ShardConfig( tensor_parallel_process_group=self.tp_group, enable_tensor_parallelism=self.enable_tensor_parallelism, @@ -418,6 +435,14 @@ def __init__( enable_sequence_parallelism=self.enable_sequence_parallelism, enable_sequence_overlap=self.enable_sequence_overlap, ) + self.ddp_config = dict( + broadcast_buffers=broadcast_buffers, + bucket_cap_mb=ddp_bucket_cap_mb, + find_unused_parameters=find_unused_parameters, + check_reduction=check_reduction, + gradient_as_bucket_view=gradient_as_bucket_view, + static_graph=static_graph, + ) def support_no_sync(self) -> bool: return False @@ -458,7 +483,11 @@ def configure( shardformer = ShardFormer(self.shard_config) model, _ = shardformer.optimize(model) - model = GeminiDDP(model, **self.gemini_config, process_group=self.dp_group, verbose=self.verbose) + if self.use_ddp: + model = model.cuda() + model = DDP(model, process_group=self.ddp_group, **self.ddp_config) + + model = GeminiDDP(model, **self.gemini_config, zero_group=self.zero_group, ddp_group=self.ddp_group, use_ddp=self.use_ddp, verbose=self.verbose) if optimizer is not None and not isinstance(optimizer, OptimizerWrapper): optimizer = GeminiOptimizer( diff --git a/colossalai/zero/gemini/chunk/chunk.py b/colossalai/zero/gemini/chunk/chunk.py index 4ea6cc662025..088bf948e657 100644 --- a/colossalai/zero/gemini/chunk/chunk.py +++ b/colossalai/zero/gemini/chunk/chunk.py @@ -61,12 +61,14 @@ class Chunk: def __init__( self, chunk_size: int, - process_group: ProcessGroup, + zero_group: ProcessGroup, dtype: torch.dtype, init_device: Optional[torch.device] = None, cpu_shard_init: bool = False, keep_gathered: bool = False, pin_memory: bool = False, + ddp_group: ProcessGroup = None, + use_ddp: bool = False, ) -> None: """ Chunk: A container owning a piece of contiguous memory space for tensors @@ -76,7 +78,7 @@ def __init__( Args: chunk_size (int): the number of elements in the chunk - process_group (ProcessGroup): the process group of this chunk + zero_group (ProcessGroup): the process group of this chunk dtype (torch.dtype): the data type of the chunk init_device (torch.device): optional, During the chunk construction process, where the tensor is stored. The default value is None, which is the current GPU @@ -90,9 +92,12 @@ def __init__( self.chunk_size = chunk_size self.utilized_size = 0 - self.torch_pg = process_group + self.torch_pg = zero_group self.pg_size = dist.get_world_size(self.torch_pg) self.pg_rank = dist.get_rank(self.torch_pg) + self.ddp_group = ddp_group + self.use_ddp = use_ddp + self.ddp_size = dist.get_world_size(self.ddp_group) if use_ddp else 1 # the chunk size should be divisible by the dp degree if not keep_gathered: @@ -384,14 +389,20 @@ def reduce(self): # just move cuda_global_chunk to cuda_shard # the communication is not necessary self.__scatter() + if self.use_ddp: + dist.all_reduce(self.cuda_shard, group=self.ddp_group) elif self.keep_gathered: # we use all-reduce here dist.all_reduce(self.cuda_global_chunk, group=self.torch_pg) + if self.use_ddp: + dist.all_reduce(self.cuda_global_chunk, group=self.ddp_group) else: self.cuda_shard = torch.empty(self.shard_size, dtype=self.dtype, device=get_current_device()) input_list = list(torch.chunk(self.cuda_global_chunk, chunks=self.pg_size, dim=0)) dist.reduce_scatter(self.cuda_shard, input_list, group=self.torch_pg) + if self.use_ddp: + dist.all_reduce(self.cuda_shard, group=self.ddp_group) free_storage(self.cuda_global_chunk) self.is_gathered = False @@ -608,10 +619,11 @@ def init_grad_chunk(self) -> "Chunk": # grad chunk is not initialized grad_chunk = Chunk( chunk_size=self.chunk_size, - process_group=self.torch_pg, + zero_group=self.torch_pg, dtype=self.dtype, keep_gathered=self.keep_gathered, pin_memory=self.pin_memory, + ddp_group=self.ddp_group, ) grad_chunk.num_tensors = self.num_tensors grad_chunk.utilized_size = self.utilized_size @@ -640,4 +652,4 @@ def init_grad_chunk(self) -> "Chunk": self.grad_chunk.l2_norm = None alloc_storage(self.grad_chunk.cuda_global_chunk) - return self.grad_chunk + return self.grad_chunk \ No newline at end of file diff --git a/colossalai/zero/gemini/chunk/manager.py b/colossalai/zero/gemini/chunk/manager.py index d3c512fe978d..46511d7c6991 100644 --- a/colossalai/zero/gemini/chunk/manager.py +++ b/colossalai/zero/gemini/chunk/manager.py @@ -38,7 +38,9 @@ def register_tensor( tensor: torch.Tensor, group_type: str, config_key: int, - process_group: ProcessGroup, + zero_group: ProcessGroup, + ddp_group: ProcessGroup = None, + use_ddp: bool = False, cpu_offload: bool = False, pin_memory: bool = False, ) -> None: @@ -76,15 +78,17 @@ def register_tensor( if tensor.numel() > chunk_size: chunk_size = tensor.numel() - dp_size = dist.get_world_size(process_group) + dp_size = dist.get_world_size(zero_group) chunk_size = chunk_size + (-chunk_size % dp_size) chunk = Chunk( chunk_size=chunk_size, - process_group=process_group, + zero_group=zero_group, dtype=tensor.dtype, cpu_shard_init=cpu_offload, pin_memory=pin_memory, + ddp_group=ddp_group, + use_ddp=use_ddp, **chunk_kwargs, ) @@ -288,4 +292,4 @@ def rearrange_accumulated_grad_chunk(self, chunk: Chunk) -> Chunk: # Release accumulated_grad free_storage(accumulated_grad) - return grad_chunk + return grad_chunk \ No newline at end of file diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index ade0a4909902..58beb0488668 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -86,9 +86,11 @@ def __init__( strict_ddp_mode: bool = False, scatter_after_inference: bool = True, mixed_precision: torch.dtype = torch.float16, - process_group: Optional[ProcessGroup] = None, + zero_group: Optional[ProcessGroup] = None, memstats: Optional[MemStats] = None, # genimi memory stats master_weights: bool = True, + ddp_group: Optional[ProcessGroup] = None, + use_ddp: bool = False, verbose: bool = False, ) -> None: assert mixed_precision in (torch.float16, torch.bfloat16) @@ -105,7 +107,7 @@ def __init__( search_range_m=search_range_m, min_chunk_size_m=min_chunk_size_m, strict_ddp_flag=strict_ddp_mode, - process_group=process_group, + process_group=zero_group, verbose=verbose, ) self.gemini_manager = GeminiManager( @@ -128,7 +130,9 @@ def __init__( self.name2param: Dict[str, nn.Parameter] = dict() self.scatter_after_inference = scatter_after_inference self.mixed_precision = mixed_precision - self.dp_process_group = process_group or _get_default_group() + self.zero_group = zero_group or _get_default_group() + self.ddp_group = ddp_group + self.use_ddp = use_ddp self.reuse_fp16_chunk = master_weights self.master_weights = master_weights @@ -267,8 +271,10 @@ def forward(self, *args, **kwargs): outputs = self._inference_forward(*args, **kwargs) else: self.gemini_manager.pre_iter(*args) + sync_ctx = self.module.no_sync() if self.use_ddp else nullcontext() with ColoParamOpHookManager.use_hooks(self.param_op_hook): - outputs = self.module(*args, **kwargs) + with sync_ctx: + outputs = self.module(*args, **kwargs) if self.force_outputs_fp32: return _cast_float(outputs, torch.float) @@ -377,8 +383,12 @@ def grad_handle(self, p, grad): self.chunk_manager.release_chunk(chunk) if grad_chunk.is_gathered: grad_chunk.cuda_global_chunk.div_(chunk.pg_size) + if self.use_ddp: + grad_chunk.cuda_global_chunk.div_(chunk.ddp_size) else: grad_chunk.cuda_shard.div_(chunk.pg_size) + if self.use_ddp: + grad_chunk.cuda_shard.div_(chunk.ddp_size) # check overflow elements self.overflow_counter += grad_chunk.has_inf_or_nan # record l2 norm for gradient clipping. flag is bound to fp16 chunk @@ -733,7 +743,7 @@ def load_parameter(chunk_slice, data): unexpected_keys.append(key) def _init_chunks(self, param_order, strict_ddp_mode: bool, cpu_offload: bool, pin_memory: bool): - dp_world_size = dist.get_world_size(self.dp_process_group) + zero_world_size = dist.get_world_size(self.zero_group) for p in param_order.generate(): self._preprocess_param(p) assert type(p) is ColoParameter @@ -753,8 +763,10 @@ def _init_chunks(self, param_order, strict_ddp_mode: bool, cpu_offload: bool, pi self.chunk_manager.register_tensor( tensor=p, group_type="fp16_param", - config_key=dp_world_size, - process_group=self.dp_process_group, + config_key=zero_world_size, + zero_group=self.zero_group, + ddp_group=self.ddp_group, + use_ddp=self.use_ddp, cpu_offload=cpu_offload, pin_memory=pin_memory, ) @@ -767,8 +779,10 @@ def _init_chunks(self, param_order, strict_ddp_mode: bool, cpu_offload: bool, pi self.chunk_manager.register_tensor( tensor=fp32_p, group_type="fp32_param", - config_key=dp_world_size, - process_group=self.dp_process_group, + config_key=zero_world_size, + zero_group=self.zero_group, + ddp_group=self.ddp_group, + use_ddp=self.use_ddp, cpu_offload=cpu_offload, pin_memory=pin_memory, ) @@ -881,4 +895,4 @@ def state_dict_shard( if block is not None: yield block, block_size - yield sharder.current_block, sharder.current_block_size + yield sharder.current_block, sharder.current_block_size \ No newline at end of file diff --git a/tests/test_booster/test_plugin/test_gemini_plugin.py b/tests/test_booster/test_plugin/test_gemini_plugin.py index 97ec0233f766..4e9608ccf0eb 100644 --- a/tests/test_booster/test_plugin/test_gemini_plugin.py +++ b/tests/test_booster/test_plugin/test_gemini_plugin.py @@ -17,14 +17,14 @@ from tests.kit.model_zoo import model_zoo -def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism) -> Optional[str]: +def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism, tp_size, ddp_size) -> Optional[str]: try: if init_method == "lazy": ctx = LazyInitContext() else: ctx = nullcontext() enable_all_optimization = True if enable_tensor_parallelism else False - plugin = GeminiPlugin(max_norm=1.0, initial_scale=2**5, enable_tensor_parallelism=enable_tensor_parallelism, enable_all_optimization=enable_all_optimization) + plugin = GeminiPlugin(max_norm=1.0, initial_scale=2**5, enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, ddp_size=ddp_size, enable_all_optimization=enable_all_optimization) booster = Booster(plugin=plugin) with ctx: model = model_fn() @@ -63,7 +63,9 @@ def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tenso @parameterize("subset", ["torchvision", "transformers", "diffusers"]) @parameterize("init_method", ["none"]) @parameterize("enable_tensor_parallelism", [True, False]) -def check_gemini_plugin(subset: str, init_method: str = "none", enable_tensor_parallelism: bool = True, early_stop: bool = True): +@parameterize("tp_size", [1]) +@parameterize("ddp_size", [2]) +def check_gemini_plugin(subset: str, init_method: str = "none", enable_tensor_parallelism: bool = True, early_stop: bool = True, tp_size: int = 1, ddp_size: int = 1): """check gemini plugin over model zoo Args: @@ -127,7 +129,7 @@ def check_gemini_plugin(subset: str, init_method: str = "none", enable_tensor_pa if "transformers_blip2" in name: enable_tensor_parallelism = False - err = run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism) + err = run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism, tp_size, ddp_size) torch.cuda.empty_cache() if err is None: passed_models.append(name) @@ -155,4 +157,4 @@ def test_gemini_plugin(early_stop: bool = True): if __name__ == "__main__": - test_gemini_plugin(early_stop=False) + test_gemini_plugin(early_stop=False) \ No newline at end of file diff --git a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py index 821ce9fbbbd9..3cb79f75aa15 100644 --- a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py @@ -71,11 +71,12 @@ def exam_state_dict_with_origin(placement_config, model_name, use_safetensors: b @parameterize("size_per_shard", [32]) @parameterize("enable_tensor_parallelism", [True, False]) @parameterize("tp_size", [2]) -def exam_state_dict(placement_config, shard: bool, model_name: str, size_per_shard: int, enable_tensor_parallelism: bool, tp_size: int): +@parameterize("ddp_size", [2]) +def exam_state_dict(placement_config, shard: bool, model_name: str, size_per_shard: int, enable_tensor_parallelism: bool, tp_size: int, ddp_size: int): (model_fn, data_gen_fn, output_transform_fn, _, _) = next(iter(model_zoo.get_sub_registry(model_name).values())) criterion = lambda x: x.mean() enable_all_optimization = True if enable_tensor_parallelism else False - plugin = GeminiPlugin(**placement_config, precision="fp16", initial_scale=(2**14), enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, enable_all_optimization=enable_all_optimization) + plugin = GeminiPlugin(**placement_config, precision="fp16", initial_scale=(2**14), enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, ddp_size=ddp_size, enable_all_optimization=enable_all_optimization) booster = Booster(plugin=plugin) model = model_fn() @@ -154,7 +155,7 @@ def run_dist(rank, world_size, port): @pytest.mark.dist -@pytest.mark.parametrize("world_size", [4]) +@pytest.mark.parametrize("world_size", [8]) @rerun_if_address_is_in_use() def test_gemini_ckpIO(world_size): - spawn(run_dist, world_size) + spawn(run_dist, world_size) \ No newline at end of file From 61dd28b7d26d659ef9fca3ca9d60aa24dc34a84f Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Mon, 13 Nov 2023 15:32:19 +0800 Subject: [PATCH 02/13] fix --- tests/test_zero/test_gemini/test_chunkv2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_zero/test_gemini/test_chunkv2.py b/tests/test_zero/test_gemini/test_chunkv2.py index a31c888e966d..5977c706fdd1 100644 --- a/tests/test_zero/test_gemini/test_chunkv2.py +++ b/tests/test_zero/test_gemini/test_chunkv2.py @@ -39,7 +39,7 @@ def exam_chunk_basic(init_device, keep_gathered, pin_memory): pg = _get_default_group() my_chunk = Chunk( chunk_size=1024, - process_group=pg, + zero_group=pg, dtype=torch.float32, init_device=init_device, cpu_shard_init=True, From 9eca1bd06b2457bc51480bbb662942886562d316 Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Tue, 14 Nov 2023 19:39:14 +0800 Subject: [PATCH 03/13] fix --- colossalai/booster/plugin/gemini_plugin.py | 25 ---------------------- colossalai/zero/gemini/gemini_ddp.py | 4 +--- 2 files changed, 1 insertion(+), 28 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index 7f4a797b0c70..ae007f65c598 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -10,7 +10,6 @@ from torch.optim import Optimizer from torch.optim.lr_scheduler import _LRScheduler as LRScheduler from torch.utils.data import DataLoader -from torch.nn.parallel import DistributedDataParallel as DDP from colossalai.checkpoint_io import CheckpointIndexFile, CheckpointIO, GeneralCheckpointIO from colossalai.checkpoint_io.utils import ( @@ -317,12 +316,6 @@ class GeminiPlugin(DPPluginBase): enable_sequence_parallelism (bool): Whether to turn on sequence parallelism in Shardformer. Defaults to False. enable_sequence_overlap (bool): Whether to turn on sequence overlap in Shardformer. Defaults to False. verbose (bool, optional): verbose mode. Debug info including chunk search result will be printed. Defaults to False. - broadcast_buffers (bool, optional): Whether to broadcast buffers in the beginning of training when using DDP. Defaults to True. - ddp_bucket_cap_mb (int, optional): The bucket size in MB when using DDP. Defaults to 25. - find_unused_parameters (bool, optional): Whether to find unused parameters when using DDP. Defaults to False. - check_reduction (bool, optional): Whether to check reduction when using DDP. Defaults to False. - gradient_as_bucket_view (bool, optional): Whether to use gradient as bucket view when using DDP. Defaults to False. - static_graph (bool, optional): Whether to use static graph when using DDP. Defaults to False. """ def __init__( @@ -364,12 +357,6 @@ def __init__( enable_sequence_parallelism: bool = False, enable_jit_fused: bool = False, enable_sequence_overlap: bool = False, - broadcast_buffers: bool = True, - ddp_bucket_cap_mb: int = 25, - find_unused_parameters: bool = False, - check_reduction: bool = False, - gradient_as_bucket_view: bool = False, - static_graph: bool = False, verbose: bool = False, ) -> None: super().__init__() @@ -435,14 +422,6 @@ def __init__( enable_sequence_parallelism=self.enable_sequence_parallelism, enable_sequence_overlap=self.enable_sequence_overlap, ) - self.ddp_config = dict( - broadcast_buffers=broadcast_buffers, - bucket_cap_mb=ddp_bucket_cap_mb, - find_unused_parameters=find_unused_parameters, - check_reduction=check_reduction, - gradient_as_bucket_view=gradient_as_bucket_view, - static_graph=static_graph, - ) def support_no_sync(self) -> bool: return False @@ -483,10 +462,6 @@ def configure( shardformer = ShardFormer(self.shard_config) model, _ = shardformer.optimize(model) - if self.use_ddp: - model = model.cuda() - model = DDP(model, process_group=self.ddp_group, **self.ddp_config) - model = GeminiDDP(model, **self.gemini_config, zero_group=self.zero_group, ddp_group=self.ddp_group, use_ddp=self.use_ddp, verbose=self.verbose) if optimizer is not None and not isinstance(optimizer, OptimizerWrapper): diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index 58beb0488668..03afcef76a22 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -271,10 +271,8 @@ def forward(self, *args, **kwargs): outputs = self._inference_forward(*args, **kwargs) else: self.gemini_manager.pre_iter(*args) - sync_ctx = self.module.no_sync() if self.use_ddp else nullcontext() with ColoParamOpHookManager.use_hooks(self.param_op_hook): - with sync_ctx: - outputs = self.module(*args, **kwargs) + outputs = self.module(*args, **kwargs) if self.force_outputs_fp32: return _cast_float(outputs, torch.float) From df0080f332ab5358fa902a3955ee3b0c8f1dbc46 Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Wed, 15 Nov 2023 19:24:51 +0800 Subject: [PATCH 04/13] fix fix --- colossalai/booster/plugin/gemini_plugin.py | 48 ++++++++++++------- colossalai/zero/gemini/chunk/chunk.py | 22 ++++----- colossalai/zero/gemini/chunk/manager.py | 6 +-- colossalai/zero/gemini/gemini_ddp.py | 20 ++++---- .../test_plugin/test_gemini_plugin.py | 24 ++++++---- .../test_gemini_checkpoint_io.py | 31 +++++++----- 6 files changed, 85 insertions(+), 66 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index ae007f65c598..06a491e4756a 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -34,10 +34,6 @@ SUPPORTED_PRECISION = ["fp16", "bf16"] PRECISION_STR_TO_DTYPE = {"fp16": torch.half, "bf16": torch.bfloat16} -ZERO_AXIS = 0 -TP_AXIS = 1 -DDP_AXIS = 2 - def get_param_info(optim: Optimizer): # Get a backup of necessary information of parameters for future use, which includes: # 1. A mapping from integer param_id to param32 shape. @@ -305,8 +301,8 @@ class GeminiPlugin(DPPluginBase): max_norm (float, optional): max_norm used for `clip_grad_norm`. You should notice that you shall not do clip_grad_norm by yourself when using ZeRO DDP. The ZeRO optimizer will take care of clip_grad_norm. norm_type (float, optional): norm_type used for `clip_grad_norm`. - enable_tensor_parallelism (bool, optional): Whether to use tensor parallelism strategy, which is implemented in Shardformer. Default to False. - tp_size (int, optional): If 'enable_tensor_parallelism' is set to true, please configure 'tp_size' which determines the size of the tensor parallel process group. Default to 1. + tp_size (int, optional): If 'tp_size' is set to be greater than 1, it means using tensor parallelism strategy, which is implemented in Shardformer, 'tp_size' determines the size of the tensor parallel process group. Default to 1. + extra_dp_size (int, optional): If 'extra_dp_size' is set to be greater than 1, it means creating another group to run with a ddp-like strategy. Default to 1. enable_all_optimization (bool, optional): Whether to switch on all the optimizations supported by Shardformer. Currently all the optimization methods include fused normalization, flash attention and JIT. Defaults to False. @@ -350,7 +346,7 @@ def __init__( norm_type: float = 2.0, enable_tensor_parallelism: bool = False, tp_size: int = 1, - ddp_size:int = 1, + extra_dp_size:int = 1, enable_all_optimization: bool = False, enable_fused_normalization: bool = False, enable_flash_attention: bool = False, @@ -395,7 +391,7 @@ def __init__( max_norm=max_norm, norm_type=norm_type, ) - self.enable_tensor_parallelism = enable_tensor_parallelism + self.enable_tensor_parallelism = tp_size > 1 self.enable_all_optimization = enable_all_optimization self.enable_fused_normalization = enable_fused_normalization self.enable_flash_attention = enable_flash_attention @@ -404,14 +400,32 @@ def __init__( self.enable_sequence_overlap = enable_sequence_overlap self.verbose = verbose - self.tp_size = tp_size if self.enable_tensor_parallelism else 1 - self.ddp_size = ddp_size - self.use_ddp = self.ddp_size > 1 - self.zero_size = dist.get_world_size() // (self.tp_size * self.ddp_size) - self.pg_mesh = ProcessGroupMesh(self.zero_size, self.tp_size, self.ddp_size) - self.zero_group = self.pg_mesh.get_group_along_axis(ZERO_AXIS) - self.tp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) - self.ddp_group = self.pg_mesh.get_group_along_axis(DDP_AXIS) + self.tp_size = tp_size + self.extra_dp_size = extra_dp_size + world_size = dist.get_world_size() + self.zero_size = world_size // (self.tp_size * self.extra_dp_size) + assert world_size == (self.tp_size * self.extra_dp_size) * self.zero_size, f"The global group size can't be evenly divided by the subgroup size." + assert self.zero_size > 1, f"Gemini's group size should greater than 1, please reduce tensor paralllelism group size or extra-dp group size." + if self.tp_size == 1 and self.extra_dp_size == 1: + self.extra_dp_group = None + self.tp_group = None + self.zero_group = None + elif self.tp_size == 1: + self.tp_group = None + self.pg_mesh = ProcessGroupMesh(self.zero_size, self.extra_dp_size) + self.zero_group = self.pg_mesh.get_group_along_axis(0) + self.extra_dp_group = self.pg_mesh.get_group_along_axis(1) + elif self.extra_dp_size == 1: + self.extra_dp_group = None + self.pg_mesh = ProcessGroupMesh(self.zero_size, self.tp_size) + self.zero_group = self.pg_mesh.get_group_along_axis(0) + self.tp_group = self.pg_mesh.get_group_along_axis(1) + else: + self.pg_mesh = ProcessGroupMesh(self.zero_size, self.extra_dp_size, self.tp_size) + self.zero_group = self.pg_mesh.get_group_along_axis(0) + self.extra_dp_group = self.pg_mesh.get_group_along_axis(1) + self.tp_group = self.pg_mesh.get_group_along_axis(2) + self.shard_config = ShardConfig( tensor_parallel_process_group=self.tp_group, enable_tensor_parallelism=self.enable_tensor_parallelism, @@ -462,7 +476,7 @@ def configure( shardformer = ShardFormer(self.shard_config) model, _ = shardformer.optimize(model) - model = GeminiDDP(model, **self.gemini_config, zero_group=self.zero_group, ddp_group=self.ddp_group, use_ddp=self.use_ddp, verbose=self.verbose) + model = GeminiDDP(model, **self.gemini_config, zero_group=self.zero_group, extra_dp_group=self.extra_dp_group, verbose=self.verbose) if optimizer is not None and not isinstance(optimizer, OptimizerWrapper): optimizer = GeminiOptimizer( diff --git a/colossalai/zero/gemini/chunk/chunk.py b/colossalai/zero/gemini/chunk/chunk.py index 088bf948e657..8785bb429dd8 100644 --- a/colossalai/zero/gemini/chunk/chunk.py +++ b/colossalai/zero/gemini/chunk/chunk.py @@ -67,8 +67,7 @@ def __init__( cpu_shard_init: bool = False, keep_gathered: bool = False, pin_memory: bool = False, - ddp_group: ProcessGroup = None, - use_ddp: bool = False, + extra_dp_group: ProcessGroup = None, ) -> None: """ Chunk: A container owning a piece of contiguous memory space for tensors @@ -95,9 +94,8 @@ def __init__( self.torch_pg = zero_group self.pg_size = dist.get_world_size(self.torch_pg) self.pg_rank = dist.get_rank(self.torch_pg) - self.ddp_group = ddp_group - self.use_ddp = use_ddp - self.ddp_size = dist.get_world_size(self.ddp_group) if use_ddp else 1 + self.extra_dp_group = extra_dp_group + self.extra_dp_size = dist.get_world_size(self.extra_dp_group) # the chunk size should be divisible by the dp degree if not keep_gathered: @@ -389,20 +387,20 @@ def reduce(self): # just move cuda_global_chunk to cuda_shard # the communication is not necessary self.__scatter() - if self.use_ddp: - dist.all_reduce(self.cuda_shard, group=self.ddp_group) + if self.extra_dp_group is not None: + dist.all_reduce(self.cuda_shard, group=self.extra_dp_group) elif self.keep_gathered: # we use all-reduce here dist.all_reduce(self.cuda_global_chunk, group=self.torch_pg) - if self.use_ddp: - dist.all_reduce(self.cuda_global_chunk, group=self.ddp_group) + if self.extra_dp_group is not None: + dist.all_reduce(self.cuda_global_chunk, group=self.extra_dp_group) else: self.cuda_shard = torch.empty(self.shard_size, dtype=self.dtype, device=get_current_device()) input_list = list(torch.chunk(self.cuda_global_chunk, chunks=self.pg_size, dim=0)) dist.reduce_scatter(self.cuda_shard, input_list, group=self.torch_pg) - if self.use_ddp: - dist.all_reduce(self.cuda_shard, group=self.ddp_group) + if self.extra_dp_group is not None: + dist.all_reduce(self.cuda_shard, group=self.extra_dp_group) free_storage(self.cuda_global_chunk) self.is_gathered = False @@ -623,7 +621,7 @@ def init_grad_chunk(self) -> "Chunk": dtype=self.dtype, keep_gathered=self.keep_gathered, pin_memory=self.pin_memory, - ddp_group=self.ddp_group, + extra_dp_group=self.extra_dp_group, ) grad_chunk.num_tensors = self.num_tensors grad_chunk.utilized_size = self.utilized_size diff --git a/colossalai/zero/gemini/chunk/manager.py b/colossalai/zero/gemini/chunk/manager.py index 46511d7c6991..5ad622a13910 100644 --- a/colossalai/zero/gemini/chunk/manager.py +++ b/colossalai/zero/gemini/chunk/manager.py @@ -39,8 +39,7 @@ def register_tensor( group_type: str, config_key: int, zero_group: ProcessGroup, - ddp_group: ProcessGroup = None, - use_ddp: bool = False, + extra_dp_group: ProcessGroup = None, cpu_offload: bool = False, pin_memory: bool = False, ) -> None: @@ -87,8 +86,7 @@ def register_tensor( dtype=tensor.dtype, cpu_shard_init=cpu_offload, pin_memory=pin_memory, - ddp_group=ddp_group, - use_ddp=use_ddp, + extra_dp_group=extra_dp_group, **chunk_kwargs, ) diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index 03afcef76a22..ff943f4b49e0 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -89,8 +89,7 @@ def __init__( zero_group: Optional[ProcessGroup] = None, memstats: Optional[MemStats] = None, # genimi memory stats master_weights: bool = True, - ddp_group: Optional[ProcessGroup] = None, - use_ddp: bool = False, + extra_dp_group: Optional[ProcessGroup] = None, verbose: bool = False, ) -> None: assert mixed_precision in (torch.float16, torch.bfloat16) @@ -131,8 +130,7 @@ def __init__( self.scatter_after_inference = scatter_after_inference self.mixed_precision = mixed_precision self.zero_group = zero_group or _get_default_group() - self.ddp_group = ddp_group - self.use_ddp = use_ddp + self.extra_dp_group = extra_dp_group self.reuse_fp16_chunk = master_weights self.master_weights = master_weights @@ -381,12 +379,12 @@ def grad_handle(self, p, grad): self.chunk_manager.release_chunk(chunk) if grad_chunk.is_gathered: grad_chunk.cuda_global_chunk.div_(chunk.pg_size) - if self.use_ddp: - grad_chunk.cuda_global_chunk.div_(chunk.ddp_size) + if self.extra_dp_group is not None: + grad_chunk.cuda_global_chunk.div_(chunk.extra_dp_size) else: grad_chunk.cuda_shard.div_(chunk.pg_size) - if self.use_ddp: - grad_chunk.cuda_shard.div_(chunk.ddp_size) + if self.extra_dp_group is not None: + grad_chunk.cuda_shard.div_(chunk.extra_dp_size) # check overflow elements self.overflow_counter += grad_chunk.has_inf_or_nan # record l2 norm for gradient clipping. flag is bound to fp16 chunk @@ -763,8 +761,7 @@ def _init_chunks(self, param_order, strict_ddp_mode: bool, cpu_offload: bool, pi group_type="fp16_param", config_key=zero_world_size, zero_group=self.zero_group, - ddp_group=self.ddp_group, - use_ddp=self.use_ddp, + extra_dp_group=self.extra_dp_group, cpu_offload=cpu_offload, pin_memory=pin_memory, ) @@ -779,8 +776,7 @@ def _init_chunks(self, param_order, strict_ddp_mode: bool, cpu_offload: bool, pi group_type="fp32_param", config_key=zero_world_size, zero_group=self.zero_group, - ddp_group=self.ddp_group, - use_ddp=self.use_ddp, + extra_dp_group=self.extra_dp_group, cpu_offload=cpu_offload, pin_memory=pin_memory, ) diff --git a/tests/test_booster/test_plugin/test_gemini_plugin.py b/tests/test_booster/test_plugin/test_gemini_plugin.py index 4e9608ccf0eb..11f4c6e1185d 100644 --- a/tests/test_booster/test_plugin/test_gemini_plugin.py +++ b/tests/test_booster/test_plugin/test_gemini_plugin.py @@ -1,5 +1,6 @@ from contextlib import nullcontext from typing import Optional +import pytest import torch import torch.distributed as dist @@ -17,14 +18,15 @@ from tests.kit.model_zoo import model_zoo -def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism, tp_size, ddp_size) -> Optional[str]: +def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, zero_size, tp_size) -> Optional[str]: try: if init_method == "lazy": ctx = LazyInitContext() else: ctx = nullcontext() - enable_all_optimization = True if enable_tensor_parallelism else False - plugin = GeminiPlugin(max_norm=1.0, initial_scale=2**5, enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, ddp_size=ddp_size, enable_all_optimization=enable_all_optimization) + extra_dp_size = dist.get_world_size() // (zero_size * tp_size) + enable_all_optimization = True if tp_size > 1 else False + plugin = GeminiPlugin(max_norm=1.0, initial_scale=2**5, tp_size=tp_size, extra_dp_size=extra_dp_size, enable_all_optimization=enable_all_optimization) booster = Booster(plugin=plugin) with ctx: model = model_fn() @@ -62,10 +64,9 @@ def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tenso @parameterize("subset", ["torchvision", "transformers", "diffusers"]) @parameterize("init_method", ["none"]) -@parameterize("enable_tensor_parallelism", [True, False]) -@parameterize("tp_size", [1]) -@parameterize("ddp_size", [2]) -def check_gemini_plugin(subset: str, init_method: str = "none", enable_tensor_parallelism: bool = True, early_stop: bool = True, tp_size: int = 1, ddp_size: int = 1): +@parameterize("zero_size", [2]) +@parameterize("tp_size", [1, 2]) +def check_gemini_plugin(subset: str, init_method: str = "none", early_stop: bool = True, zero_size: int = 1, tp_size: int = 1): """check gemini plugin over model zoo Args: @@ -127,9 +128,9 @@ def check_gemini_plugin(subset: str, init_method: str = "none", enable_tensor_pa # TODO debug blip2 when using tp, something wrong with shift_logits's shape if "transformers_blip2" in name: - enable_tensor_parallelism = False + tp_size = 1 - err = run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism, tp_size, ddp_size) + err = run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, zero_size, tp_size) torch.cuda.empty_cache() if err is None: passed_models.append(name) @@ -155,6 +156,11 @@ def run_dist(rank, world_size, port, early_stop: bool = True): def test_gemini_plugin(early_stop: bool = True): spawn(run_dist, 4, early_stop=early_stop) +@pytest.mark.largedist +@rerun_if_address_is_in_use() +def test_gemini_plugin_3d(early_stop: bool = True): + spawn(run_dist, 8, early_stop=early_stop) + if __name__ == "__main__": test_gemini_plugin(early_stop=False) \ No newline at end of file diff --git a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py index 3cb79f75aa15..8343c5f07e30 100644 --- a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py @@ -37,20 +37,21 @@ @parameterize("placement_config", MODEL_PLACEMENT_CONFIGS) @parameterize("model_name", ["transformers_bert_for_sequence_classification"]) @parameterize("use_safetensors", [False, True]) -@parameterize("enable_tensor_parallelism", [True, False]) -@parameterize("tp_size", [2]) -def exam_state_dict_with_origin(placement_config, model_name, use_safetensors: bool, enable_tensor_parallelism: bool, tp_size: int): +@parameterize("tp_size", [1, 2]) +@parameterize("zero_size", [2]) +def exam_state_dict_with_origin(placement_config, model_name, use_safetensors: bool, tp_size: int, zero_size: int): from transformers import BertForSequenceClassification (model_fn, data_gen_fn, output_transform_fn, _, _) = next(iter(model_zoo.get_sub_registry(model_name).values())) bert_model = model_fn() - enable_all_optimization = True if enable_tensor_parallelism else False + enable_all_optimization = True if tp_size > 1 else False with shared_tempdir() as tempdir: pretrained_path = os.path.join(tempdir, "pretrained") bert_model.config.save_pretrained(save_directory=pretrained_path) - plugin = GeminiPlugin(**placement_config, enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, enable_all_optimization=enable_all_optimization) + extra_dp_size = dist.get_world_size() // (zero_size * tp_size) + plugin = GeminiPlugin(**placement_config, tp_size=tp_size, enable_all_optimization=enable_all_optimization, extra_dp_size=extra_dp_size) booster = Booster(plugin=plugin) bert_model, _, _, _, _ = booster.boost(bert_model) model_size = sum(p.numel() * p.element_size() for p in bert_model.parameters()) / 1024**2 @@ -69,14 +70,14 @@ def exam_state_dict_with_origin(placement_config, model_name, use_safetensors: b @parameterize("shard", [True, False]) @parameterize("model_name", ["transformers_gpt"]) @parameterize("size_per_shard", [32]) -@parameterize("enable_tensor_parallelism", [True, False]) -@parameterize("tp_size", [2]) -@parameterize("ddp_size", [2]) -def exam_state_dict(placement_config, shard: bool, model_name: str, size_per_shard: int, enable_tensor_parallelism: bool, tp_size: int, ddp_size: int): +@parameterize("tp_size", [1, 2]) +@parameterize("zero_size", [2]) +def exam_state_dict(placement_config, shard: bool, model_name: str, size_per_shard: int, tp_size: int, zero_size: int): (model_fn, data_gen_fn, output_transform_fn, _, _) = next(iter(model_zoo.get_sub_registry(model_name).values())) criterion = lambda x: x.mean() - enable_all_optimization = True if enable_tensor_parallelism else False - plugin = GeminiPlugin(**placement_config, precision="fp16", initial_scale=(2**14), enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, ddp_size=ddp_size, enable_all_optimization=enable_all_optimization) + enable_all_optimization = True if tp_size > 1 else False + extra_dp_size = dist.get_world_size() // (zero_size * tp_size) + plugin = GeminiPlugin(**placement_config, precision="fp16", initial_scale=(2**14), tp_size=tp_size, extra_dp_size=extra_dp_size, enable_all_optimization=enable_all_optimization) booster = Booster(plugin=plugin) model = model_fn() @@ -155,7 +156,13 @@ def run_dist(rank, world_size, port): @pytest.mark.dist -@pytest.mark.parametrize("world_size", [8]) +@pytest.mark.parametrize("world_size", [4]) @rerun_if_address_is_in_use() def test_gemini_ckpIO(world_size): + spawn(run_dist, world_size) + +@pytest.mark.largedist +@pytest.mark.parametrize("world_size", [8]) +@rerun_if_address_is_in_use() +def test_gemini_ckpIO_3d(world_size): spawn(run_dist, world_size) \ No newline at end of file From 9ac59f852eb75b939dc51f1784fa2d602c477a56 Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Mon, 13 Nov 2023 12:20:03 +0800 Subject: [PATCH 05/13] support ddp --- colossalai/booster/plugin/gemini_plugin.py | 41 ++++++++++++++++--- colossalai/zero/gemini/chunk/chunk.py | 22 +++++++--- colossalai/zero/gemini/chunk/manager.py | 12 ++++-- colossalai/zero/gemini/gemini_ddp.py | 34 ++++++++++----- .../test_plugin/test_gemini_plugin.py | 12 +++--- .../test_gemini_checkpoint_io.py | 9 ++-- 6 files changed, 96 insertions(+), 34 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index 9c7dc6836c1e..7f4a797b0c70 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -10,6 +10,7 @@ from torch.optim import Optimizer from torch.optim.lr_scheduler import _LRScheduler as LRScheduler from torch.utils.data import DataLoader +from torch.nn.parallel import DistributedDataParallel as DDP from colossalai.checkpoint_io import CheckpointIndexFile, CheckpointIO, GeneralCheckpointIO from colossalai.checkpoint_io.utils import ( @@ -34,8 +35,9 @@ SUPPORTED_PRECISION = ["fp16", "bf16"] PRECISION_STR_TO_DTYPE = {"fp16": torch.half, "bf16": torch.bfloat16} -DP_AXIS = 0 +ZERO_AXIS = 0 TP_AXIS = 1 +DDP_AXIS = 2 def get_param_info(optim: Optimizer): # Get a backup of necessary information of parameters for future use, which includes: @@ -315,6 +317,12 @@ class GeminiPlugin(DPPluginBase): enable_sequence_parallelism (bool): Whether to turn on sequence parallelism in Shardformer. Defaults to False. enable_sequence_overlap (bool): Whether to turn on sequence overlap in Shardformer. Defaults to False. verbose (bool, optional): verbose mode. Debug info including chunk search result will be printed. Defaults to False. + broadcast_buffers (bool, optional): Whether to broadcast buffers in the beginning of training when using DDP. Defaults to True. + ddp_bucket_cap_mb (int, optional): The bucket size in MB when using DDP. Defaults to 25. + find_unused_parameters (bool, optional): Whether to find unused parameters when using DDP. Defaults to False. + check_reduction (bool, optional): Whether to check reduction when using DDP. Defaults to False. + gradient_as_bucket_view (bool, optional): Whether to use gradient as bucket view when using DDP. Defaults to False. + static_graph (bool, optional): Whether to use static graph when using DDP. Defaults to False. """ def __init__( @@ -349,12 +357,19 @@ def __init__( norm_type: float = 2.0, enable_tensor_parallelism: bool = False, tp_size: int = 1, + ddp_size:int = 1, enable_all_optimization: bool = False, enable_fused_normalization: bool = False, enable_flash_attention: bool = False, enable_sequence_parallelism: bool = False, enable_jit_fused: bool = False, enable_sequence_overlap: bool = False, + broadcast_buffers: bool = True, + ddp_bucket_cap_mb: int = 25, + find_unused_parameters: bool = False, + check_reduction: bool = False, + gradient_as_bucket_view: bool = False, + static_graph: bool = False, verbose: bool = False, ) -> None: super().__init__() @@ -403,11 +418,13 @@ def __init__( self.verbose = verbose self.tp_size = tp_size if self.enable_tensor_parallelism else 1 - self.dp_size = dist.get_world_size() // self.tp_size - assert self.dp_size > 1, f"The size of the DP group should be greater than 1. Please reduce the TP group size." - self.pg_mesh = ProcessGroupMesh(self.dp_size, self.tp_size) - self.dp_group = self.pg_mesh.get_group_along_axis(DP_AXIS) + self.ddp_size = ddp_size + self.use_ddp = self.ddp_size > 1 + self.zero_size = dist.get_world_size() // (self.tp_size * self.ddp_size) + self.pg_mesh = ProcessGroupMesh(self.zero_size, self.tp_size, self.ddp_size) + self.zero_group = self.pg_mesh.get_group_along_axis(ZERO_AXIS) self.tp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) + self.ddp_group = self.pg_mesh.get_group_along_axis(DDP_AXIS) self.shard_config = ShardConfig( tensor_parallel_process_group=self.tp_group, enable_tensor_parallelism=self.enable_tensor_parallelism, @@ -418,6 +435,14 @@ def __init__( enable_sequence_parallelism=self.enable_sequence_parallelism, enable_sequence_overlap=self.enable_sequence_overlap, ) + self.ddp_config = dict( + broadcast_buffers=broadcast_buffers, + bucket_cap_mb=ddp_bucket_cap_mb, + find_unused_parameters=find_unused_parameters, + check_reduction=check_reduction, + gradient_as_bucket_view=gradient_as_bucket_view, + static_graph=static_graph, + ) def support_no_sync(self) -> bool: return False @@ -458,7 +483,11 @@ def configure( shardformer = ShardFormer(self.shard_config) model, _ = shardformer.optimize(model) - model = GeminiDDP(model, **self.gemini_config, process_group=self.dp_group, verbose=self.verbose) + if self.use_ddp: + model = model.cuda() + model = DDP(model, process_group=self.ddp_group, **self.ddp_config) + + model = GeminiDDP(model, **self.gemini_config, zero_group=self.zero_group, ddp_group=self.ddp_group, use_ddp=self.use_ddp, verbose=self.verbose) if optimizer is not None and not isinstance(optimizer, OptimizerWrapper): optimizer = GeminiOptimizer( diff --git a/colossalai/zero/gemini/chunk/chunk.py b/colossalai/zero/gemini/chunk/chunk.py index 4ea6cc662025..088bf948e657 100644 --- a/colossalai/zero/gemini/chunk/chunk.py +++ b/colossalai/zero/gemini/chunk/chunk.py @@ -61,12 +61,14 @@ class Chunk: def __init__( self, chunk_size: int, - process_group: ProcessGroup, + zero_group: ProcessGroup, dtype: torch.dtype, init_device: Optional[torch.device] = None, cpu_shard_init: bool = False, keep_gathered: bool = False, pin_memory: bool = False, + ddp_group: ProcessGroup = None, + use_ddp: bool = False, ) -> None: """ Chunk: A container owning a piece of contiguous memory space for tensors @@ -76,7 +78,7 @@ def __init__( Args: chunk_size (int): the number of elements in the chunk - process_group (ProcessGroup): the process group of this chunk + zero_group (ProcessGroup): the process group of this chunk dtype (torch.dtype): the data type of the chunk init_device (torch.device): optional, During the chunk construction process, where the tensor is stored. The default value is None, which is the current GPU @@ -90,9 +92,12 @@ def __init__( self.chunk_size = chunk_size self.utilized_size = 0 - self.torch_pg = process_group + self.torch_pg = zero_group self.pg_size = dist.get_world_size(self.torch_pg) self.pg_rank = dist.get_rank(self.torch_pg) + self.ddp_group = ddp_group + self.use_ddp = use_ddp + self.ddp_size = dist.get_world_size(self.ddp_group) if use_ddp else 1 # the chunk size should be divisible by the dp degree if not keep_gathered: @@ -384,14 +389,20 @@ def reduce(self): # just move cuda_global_chunk to cuda_shard # the communication is not necessary self.__scatter() + if self.use_ddp: + dist.all_reduce(self.cuda_shard, group=self.ddp_group) elif self.keep_gathered: # we use all-reduce here dist.all_reduce(self.cuda_global_chunk, group=self.torch_pg) + if self.use_ddp: + dist.all_reduce(self.cuda_global_chunk, group=self.ddp_group) else: self.cuda_shard = torch.empty(self.shard_size, dtype=self.dtype, device=get_current_device()) input_list = list(torch.chunk(self.cuda_global_chunk, chunks=self.pg_size, dim=0)) dist.reduce_scatter(self.cuda_shard, input_list, group=self.torch_pg) + if self.use_ddp: + dist.all_reduce(self.cuda_shard, group=self.ddp_group) free_storage(self.cuda_global_chunk) self.is_gathered = False @@ -608,10 +619,11 @@ def init_grad_chunk(self) -> "Chunk": # grad chunk is not initialized grad_chunk = Chunk( chunk_size=self.chunk_size, - process_group=self.torch_pg, + zero_group=self.torch_pg, dtype=self.dtype, keep_gathered=self.keep_gathered, pin_memory=self.pin_memory, + ddp_group=self.ddp_group, ) grad_chunk.num_tensors = self.num_tensors grad_chunk.utilized_size = self.utilized_size @@ -640,4 +652,4 @@ def init_grad_chunk(self) -> "Chunk": self.grad_chunk.l2_norm = None alloc_storage(self.grad_chunk.cuda_global_chunk) - return self.grad_chunk + return self.grad_chunk \ No newline at end of file diff --git a/colossalai/zero/gemini/chunk/manager.py b/colossalai/zero/gemini/chunk/manager.py index d3c512fe978d..46511d7c6991 100644 --- a/colossalai/zero/gemini/chunk/manager.py +++ b/colossalai/zero/gemini/chunk/manager.py @@ -38,7 +38,9 @@ def register_tensor( tensor: torch.Tensor, group_type: str, config_key: int, - process_group: ProcessGroup, + zero_group: ProcessGroup, + ddp_group: ProcessGroup = None, + use_ddp: bool = False, cpu_offload: bool = False, pin_memory: bool = False, ) -> None: @@ -76,15 +78,17 @@ def register_tensor( if tensor.numel() > chunk_size: chunk_size = tensor.numel() - dp_size = dist.get_world_size(process_group) + dp_size = dist.get_world_size(zero_group) chunk_size = chunk_size + (-chunk_size % dp_size) chunk = Chunk( chunk_size=chunk_size, - process_group=process_group, + zero_group=zero_group, dtype=tensor.dtype, cpu_shard_init=cpu_offload, pin_memory=pin_memory, + ddp_group=ddp_group, + use_ddp=use_ddp, **chunk_kwargs, ) @@ -288,4 +292,4 @@ def rearrange_accumulated_grad_chunk(self, chunk: Chunk) -> Chunk: # Release accumulated_grad free_storage(accumulated_grad) - return grad_chunk + return grad_chunk \ No newline at end of file diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index ade0a4909902..58beb0488668 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -86,9 +86,11 @@ def __init__( strict_ddp_mode: bool = False, scatter_after_inference: bool = True, mixed_precision: torch.dtype = torch.float16, - process_group: Optional[ProcessGroup] = None, + zero_group: Optional[ProcessGroup] = None, memstats: Optional[MemStats] = None, # genimi memory stats master_weights: bool = True, + ddp_group: Optional[ProcessGroup] = None, + use_ddp: bool = False, verbose: bool = False, ) -> None: assert mixed_precision in (torch.float16, torch.bfloat16) @@ -105,7 +107,7 @@ def __init__( search_range_m=search_range_m, min_chunk_size_m=min_chunk_size_m, strict_ddp_flag=strict_ddp_mode, - process_group=process_group, + process_group=zero_group, verbose=verbose, ) self.gemini_manager = GeminiManager( @@ -128,7 +130,9 @@ def __init__( self.name2param: Dict[str, nn.Parameter] = dict() self.scatter_after_inference = scatter_after_inference self.mixed_precision = mixed_precision - self.dp_process_group = process_group or _get_default_group() + self.zero_group = zero_group or _get_default_group() + self.ddp_group = ddp_group + self.use_ddp = use_ddp self.reuse_fp16_chunk = master_weights self.master_weights = master_weights @@ -267,8 +271,10 @@ def forward(self, *args, **kwargs): outputs = self._inference_forward(*args, **kwargs) else: self.gemini_manager.pre_iter(*args) + sync_ctx = self.module.no_sync() if self.use_ddp else nullcontext() with ColoParamOpHookManager.use_hooks(self.param_op_hook): - outputs = self.module(*args, **kwargs) + with sync_ctx: + outputs = self.module(*args, **kwargs) if self.force_outputs_fp32: return _cast_float(outputs, torch.float) @@ -377,8 +383,12 @@ def grad_handle(self, p, grad): self.chunk_manager.release_chunk(chunk) if grad_chunk.is_gathered: grad_chunk.cuda_global_chunk.div_(chunk.pg_size) + if self.use_ddp: + grad_chunk.cuda_global_chunk.div_(chunk.ddp_size) else: grad_chunk.cuda_shard.div_(chunk.pg_size) + if self.use_ddp: + grad_chunk.cuda_shard.div_(chunk.ddp_size) # check overflow elements self.overflow_counter += grad_chunk.has_inf_or_nan # record l2 norm for gradient clipping. flag is bound to fp16 chunk @@ -733,7 +743,7 @@ def load_parameter(chunk_slice, data): unexpected_keys.append(key) def _init_chunks(self, param_order, strict_ddp_mode: bool, cpu_offload: bool, pin_memory: bool): - dp_world_size = dist.get_world_size(self.dp_process_group) + zero_world_size = dist.get_world_size(self.zero_group) for p in param_order.generate(): self._preprocess_param(p) assert type(p) is ColoParameter @@ -753,8 +763,10 @@ def _init_chunks(self, param_order, strict_ddp_mode: bool, cpu_offload: bool, pi self.chunk_manager.register_tensor( tensor=p, group_type="fp16_param", - config_key=dp_world_size, - process_group=self.dp_process_group, + config_key=zero_world_size, + zero_group=self.zero_group, + ddp_group=self.ddp_group, + use_ddp=self.use_ddp, cpu_offload=cpu_offload, pin_memory=pin_memory, ) @@ -767,8 +779,10 @@ def _init_chunks(self, param_order, strict_ddp_mode: bool, cpu_offload: bool, pi self.chunk_manager.register_tensor( tensor=fp32_p, group_type="fp32_param", - config_key=dp_world_size, - process_group=self.dp_process_group, + config_key=zero_world_size, + zero_group=self.zero_group, + ddp_group=self.ddp_group, + use_ddp=self.use_ddp, cpu_offload=cpu_offload, pin_memory=pin_memory, ) @@ -881,4 +895,4 @@ def state_dict_shard( if block is not None: yield block, block_size - yield sharder.current_block, sharder.current_block_size + yield sharder.current_block, sharder.current_block_size \ No newline at end of file diff --git a/tests/test_booster/test_plugin/test_gemini_plugin.py b/tests/test_booster/test_plugin/test_gemini_plugin.py index 97ec0233f766..4e9608ccf0eb 100644 --- a/tests/test_booster/test_plugin/test_gemini_plugin.py +++ b/tests/test_booster/test_plugin/test_gemini_plugin.py @@ -17,14 +17,14 @@ from tests.kit.model_zoo import model_zoo -def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism) -> Optional[str]: +def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism, tp_size, ddp_size) -> Optional[str]: try: if init_method == "lazy": ctx = LazyInitContext() else: ctx = nullcontext() enable_all_optimization = True if enable_tensor_parallelism else False - plugin = GeminiPlugin(max_norm=1.0, initial_scale=2**5, enable_tensor_parallelism=enable_tensor_parallelism, enable_all_optimization=enable_all_optimization) + plugin = GeminiPlugin(max_norm=1.0, initial_scale=2**5, enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, ddp_size=ddp_size, enable_all_optimization=enable_all_optimization) booster = Booster(plugin=plugin) with ctx: model = model_fn() @@ -63,7 +63,9 @@ def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tenso @parameterize("subset", ["torchvision", "transformers", "diffusers"]) @parameterize("init_method", ["none"]) @parameterize("enable_tensor_parallelism", [True, False]) -def check_gemini_plugin(subset: str, init_method: str = "none", enable_tensor_parallelism: bool = True, early_stop: bool = True): +@parameterize("tp_size", [1]) +@parameterize("ddp_size", [2]) +def check_gemini_plugin(subset: str, init_method: str = "none", enable_tensor_parallelism: bool = True, early_stop: bool = True, tp_size: int = 1, ddp_size: int = 1): """check gemini plugin over model zoo Args: @@ -127,7 +129,7 @@ def check_gemini_plugin(subset: str, init_method: str = "none", enable_tensor_pa if "transformers_blip2" in name: enable_tensor_parallelism = False - err = run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism) + err = run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism, tp_size, ddp_size) torch.cuda.empty_cache() if err is None: passed_models.append(name) @@ -155,4 +157,4 @@ def test_gemini_plugin(early_stop: bool = True): if __name__ == "__main__": - test_gemini_plugin(early_stop=False) + test_gemini_plugin(early_stop=False) \ No newline at end of file diff --git a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py index 821ce9fbbbd9..3cb79f75aa15 100644 --- a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py @@ -71,11 +71,12 @@ def exam_state_dict_with_origin(placement_config, model_name, use_safetensors: b @parameterize("size_per_shard", [32]) @parameterize("enable_tensor_parallelism", [True, False]) @parameterize("tp_size", [2]) -def exam_state_dict(placement_config, shard: bool, model_name: str, size_per_shard: int, enable_tensor_parallelism: bool, tp_size: int): +@parameterize("ddp_size", [2]) +def exam_state_dict(placement_config, shard: bool, model_name: str, size_per_shard: int, enable_tensor_parallelism: bool, tp_size: int, ddp_size: int): (model_fn, data_gen_fn, output_transform_fn, _, _) = next(iter(model_zoo.get_sub_registry(model_name).values())) criterion = lambda x: x.mean() enable_all_optimization = True if enable_tensor_parallelism else False - plugin = GeminiPlugin(**placement_config, precision="fp16", initial_scale=(2**14), enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, enable_all_optimization=enable_all_optimization) + plugin = GeminiPlugin(**placement_config, precision="fp16", initial_scale=(2**14), enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, ddp_size=ddp_size, enable_all_optimization=enable_all_optimization) booster = Booster(plugin=plugin) model = model_fn() @@ -154,7 +155,7 @@ def run_dist(rank, world_size, port): @pytest.mark.dist -@pytest.mark.parametrize("world_size", [4]) +@pytest.mark.parametrize("world_size", [8]) @rerun_if_address_is_in_use() def test_gemini_ckpIO(world_size): - spawn(run_dist, world_size) + spawn(run_dist, world_size) \ No newline at end of file From 10fba53a31c32c936b0eb2ada37b4af1970591cb Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Mon, 13 Nov 2023 15:32:19 +0800 Subject: [PATCH 06/13] fix --- tests/test_zero/test_gemini/test_chunkv2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_zero/test_gemini/test_chunkv2.py b/tests/test_zero/test_gemini/test_chunkv2.py index a31c888e966d..5977c706fdd1 100644 --- a/tests/test_zero/test_gemini/test_chunkv2.py +++ b/tests/test_zero/test_gemini/test_chunkv2.py @@ -39,7 +39,7 @@ def exam_chunk_basic(init_device, keep_gathered, pin_memory): pg = _get_default_group() my_chunk = Chunk( chunk_size=1024, - process_group=pg, + zero_group=pg, dtype=torch.float32, init_device=init_device, cpu_shard_init=True, From 39c824e30e6e01db28e8c01955051e365d6b319b Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Tue, 14 Nov 2023 19:39:14 +0800 Subject: [PATCH 07/13] fix --- colossalai/booster/plugin/gemini_plugin.py | 25 ---------------------- colossalai/zero/gemini/gemini_ddp.py | 4 +--- 2 files changed, 1 insertion(+), 28 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index 7f4a797b0c70..ae007f65c598 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -10,7 +10,6 @@ from torch.optim import Optimizer from torch.optim.lr_scheduler import _LRScheduler as LRScheduler from torch.utils.data import DataLoader -from torch.nn.parallel import DistributedDataParallel as DDP from colossalai.checkpoint_io import CheckpointIndexFile, CheckpointIO, GeneralCheckpointIO from colossalai.checkpoint_io.utils import ( @@ -317,12 +316,6 @@ class GeminiPlugin(DPPluginBase): enable_sequence_parallelism (bool): Whether to turn on sequence parallelism in Shardformer. Defaults to False. enable_sequence_overlap (bool): Whether to turn on sequence overlap in Shardformer. Defaults to False. verbose (bool, optional): verbose mode. Debug info including chunk search result will be printed. Defaults to False. - broadcast_buffers (bool, optional): Whether to broadcast buffers in the beginning of training when using DDP. Defaults to True. - ddp_bucket_cap_mb (int, optional): The bucket size in MB when using DDP. Defaults to 25. - find_unused_parameters (bool, optional): Whether to find unused parameters when using DDP. Defaults to False. - check_reduction (bool, optional): Whether to check reduction when using DDP. Defaults to False. - gradient_as_bucket_view (bool, optional): Whether to use gradient as bucket view when using DDP. Defaults to False. - static_graph (bool, optional): Whether to use static graph when using DDP. Defaults to False. """ def __init__( @@ -364,12 +357,6 @@ def __init__( enable_sequence_parallelism: bool = False, enable_jit_fused: bool = False, enable_sequence_overlap: bool = False, - broadcast_buffers: bool = True, - ddp_bucket_cap_mb: int = 25, - find_unused_parameters: bool = False, - check_reduction: bool = False, - gradient_as_bucket_view: bool = False, - static_graph: bool = False, verbose: bool = False, ) -> None: super().__init__() @@ -435,14 +422,6 @@ def __init__( enable_sequence_parallelism=self.enable_sequence_parallelism, enable_sequence_overlap=self.enable_sequence_overlap, ) - self.ddp_config = dict( - broadcast_buffers=broadcast_buffers, - bucket_cap_mb=ddp_bucket_cap_mb, - find_unused_parameters=find_unused_parameters, - check_reduction=check_reduction, - gradient_as_bucket_view=gradient_as_bucket_view, - static_graph=static_graph, - ) def support_no_sync(self) -> bool: return False @@ -483,10 +462,6 @@ def configure( shardformer = ShardFormer(self.shard_config) model, _ = shardformer.optimize(model) - if self.use_ddp: - model = model.cuda() - model = DDP(model, process_group=self.ddp_group, **self.ddp_config) - model = GeminiDDP(model, **self.gemini_config, zero_group=self.zero_group, ddp_group=self.ddp_group, use_ddp=self.use_ddp, verbose=self.verbose) if optimizer is not None and not isinstance(optimizer, OptimizerWrapper): diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index 58beb0488668..03afcef76a22 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -271,10 +271,8 @@ def forward(self, *args, **kwargs): outputs = self._inference_forward(*args, **kwargs) else: self.gemini_manager.pre_iter(*args) - sync_ctx = self.module.no_sync() if self.use_ddp else nullcontext() with ColoParamOpHookManager.use_hooks(self.param_op_hook): - with sync_ctx: - outputs = self.module(*args, **kwargs) + outputs = self.module(*args, **kwargs) if self.force_outputs_fp32: return _cast_float(outputs, torch.float) From 0a46a59bc1de0e541863ee0d6c40fa8c4a42b700 Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Wed, 15 Nov 2023 19:24:51 +0800 Subject: [PATCH 08/13] fix fix --- colossalai/booster/plugin/gemini_plugin.py | 48 ++++++++++++------- colossalai/zero/gemini/chunk/chunk.py | 22 ++++----- colossalai/zero/gemini/chunk/manager.py | 6 +-- colossalai/zero/gemini/gemini_ddp.py | 20 ++++---- .../test_plugin/test_gemini_plugin.py | 24 ++++++---- .../test_gemini_checkpoint_io.py | 31 +++++++----- 6 files changed, 85 insertions(+), 66 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index ae007f65c598..06a491e4756a 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -34,10 +34,6 @@ SUPPORTED_PRECISION = ["fp16", "bf16"] PRECISION_STR_TO_DTYPE = {"fp16": torch.half, "bf16": torch.bfloat16} -ZERO_AXIS = 0 -TP_AXIS = 1 -DDP_AXIS = 2 - def get_param_info(optim: Optimizer): # Get a backup of necessary information of parameters for future use, which includes: # 1. A mapping from integer param_id to param32 shape. @@ -305,8 +301,8 @@ class GeminiPlugin(DPPluginBase): max_norm (float, optional): max_norm used for `clip_grad_norm`. You should notice that you shall not do clip_grad_norm by yourself when using ZeRO DDP. The ZeRO optimizer will take care of clip_grad_norm. norm_type (float, optional): norm_type used for `clip_grad_norm`. - enable_tensor_parallelism (bool, optional): Whether to use tensor parallelism strategy, which is implemented in Shardformer. Default to False. - tp_size (int, optional): If 'enable_tensor_parallelism' is set to true, please configure 'tp_size' which determines the size of the tensor parallel process group. Default to 1. + tp_size (int, optional): If 'tp_size' is set to be greater than 1, it means using tensor parallelism strategy, which is implemented in Shardformer, 'tp_size' determines the size of the tensor parallel process group. Default to 1. + extra_dp_size (int, optional): If 'extra_dp_size' is set to be greater than 1, it means creating another group to run with a ddp-like strategy. Default to 1. enable_all_optimization (bool, optional): Whether to switch on all the optimizations supported by Shardformer. Currently all the optimization methods include fused normalization, flash attention and JIT. Defaults to False. @@ -350,7 +346,7 @@ def __init__( norm_type: float = 2.0, enable_tensor_parallelism: bool = False, tp_size: int = 1, - ddp_size:int = 1, + extra_dp_size:int = 1, enable_all_optimization: bool = False, enable_fused_normalization: bool = False, enable_flash_attention: bool = False, @@ -395,7 +391,7 @@ def __init__( max_norm=max_norm, norm_type=norm_type, ) - self.enable_tensor_parallelism = enable_tensor_parallelism + self.enable_tensor_parallelism = tp_size > 1 self.enable_all_optimization = enable_all_optimization self.enable_fused_normalization = enable_fused_normalization self.enable_flash_attention = enable_flash_attention @@ -404,14 +400,32 @@ def __init__( self.enable_sequence_overlap = enable_sequence_overlap self.verbose = verbose - self.tp_size = tp_size if self.enable_tensor_parallelism else 1 - self.ddp_size = ddp_size - self.use_ddp = self.ddp_size > 1 - self.zero_size = dist.get_world_size() // (self.tp_size * self.ddp_size) - self.pg_mesh = ProcessGroupMesh(self.zero_size, self.tp_size, self.ddp_size) - self.zero_group = self.pg_mesh.get_group_along_axis(ZERO_AXIS) - self.tp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) - self.ddp_group = self.pg_mesh.get_group_along_axis(DDP_AXIS) + self.tp_size = tp_size + self.extra_dp_size = extra_dp_size + world_size = dist.get_world_size() + self.zero_size = world_size // (self.tp_size * self.extra_dp_size) + assert world_size == (self.tp_size * self.extra_dp_size) * self.zero_size, f"The global group size can't be evenly divided by the subgroup size." + assert self.zero_size > 1, f"Gemini's group size should greater than 1, please reduce tensor paralllelism group size or extra-dp group size." + if self.tp_size == 1 and self.extra_dp_size == 1: + self.extra_dp_group = None + self.tp_group = None + self.zero_group = None + elif self.tp_size == 1: + self.tp_group = None + self.pg_mesh = ProcessGroupMesh(self.zero_size, self.extra_dp_size) + self.zero_group = self.pg_mesh.get_group_along_axis(0) + self.extra_dp_group = self.pg_mesh.get_group_along_axis(1) + elif self.extra_dp_size == 1: + self.extra_dp_group = None + self.pg_mesh = ProcessGroupMesh(self.zero_size, self.tp_size) + self.zero_group = self.pg_mesh.get_group_along_axis(0) + self.tp_group = self.pg_mesh.get_group_along_axis(1) + else: + self.pg_mesh = ProcessGroupMesh(self.zero_size, self.extra_dp_size, self.tp_size) + self.zero_group = self.pg_mesh.get_group_along_axis(0) + self.extra_dp_group = self.pg_mesh.get_group_along_axis(1) + self.tp_group = self.pg_mesh.get_group_along_axis(2) + self.shard_config = ShardConfig( tensor_parallel_process_group=self.tp_group, enable_tensor_parallelism=self.enable_tensor_parallelism, @@ -462,7 +476,7 @@ def configure( shardformer = ShardFormer(self.shard_config) model, _ = shardformer.optimize(model) - model = GeminiDDP(model, **self.gemini_config, zero_group=self.zero_group, ddp_group=self.ddp_group, use_ddp=self.use_ddp, verbose=self.verbose) + model = GeminiDDP(model, **self.gemini_config, zero_group=self.zero_group, extra_dp_group=self.extra_dp_group, verbose=self.verbose) if optimizer is not None and not isinstance(optimizer, OptimizerWrapper): optimizer = GeminiOptimizer( diff --git a/colossalai/zero/gemini/chunk/chunk.py b/colossalai/zero/gemini/chunk/chunk.py index 088bf948e657..8785bb429dd8 100644 --- a/colossalai/zero/gemini/chunk/chunk.py +++ b/colossalai/zero/gemini/chunk/chunk.py @@ -67,8 +67,7 @@ def __init__( cpu_shard_init: bool = False, keep_gathered: bool = False, pin_memory: bool = False, - ddp_group: ProcessGroup = None, - use_ddp: bool = False, + extra_dp_group: ProcessGroup = None, ) -> None: """ Chunk: A container owning a piece of contiguous memory space for tensors @@ -95,9 +94,8 @@ def __init__( self.torch_pg = zero_group self.pg_size = dist.get_world_size(self.torch_pg) self.pg_rank = dist.get_rank(self.torch_pg) - self.ddp_group = ddp_group - self.use_ddp = use_ddp - self.ddp_size = dist.get_world_size(self.ddp_group) if use_ddp else 1 + self.extra_dp_group = extra_dp_group + self.extra_dp_size = dist.get_world_size(self.extra_dp_group) # the chunk size should be divisible by the dp degree if not keep_gathered: @@ -389,20 +387,20 @@ def reduce(self): # just move cuda_global_chunk to cuda_shard # the communication is not necessary self.__scatter() - if self.use_ddp: - dist.all_reduce(self.cuda_shard, group=self.ddp_group) + if self.extra_dp_group is not None: + dist.all_reduce(self.cuda_shard, group=self.extra_dp_group) elif self.keep_gathered: # we use all-reduce here dist.all_reduce(self.cuda_global_chunk, group=self.torch_pg) - if self.use_ddp: - dist.all_reduce(self.cuda_global_chunk, group=self.ddp_group) + if self.extra_dp_group is not None: + dist.all_reduce(self.cuda_global_chunk, group=self.extra_dp_group) else: self.cuda_shard = torch.empty(self.shard_size, dtype=self.dtype, device=get_current_device()) input_list = list(torch.chunk(self.cuda_global_chunk, chunks=self.pg_size, dim=0)) dist.reduce_scatter(self.cuda_shard, input_list, group=self.torch_pg) - if self.use_ddp: - dist.all_reduce(self.cuda_shard, group=self.ddp_group) + if self.extra_dp_group is not None: + dist.all_reduce(self.cuda_shard, group=self.extra_dp_group) free_storage(self.cuda_global_chunk) self.is_gathered = False @@ -623,7 +621,7 @@ def init_grad_chunk(self) -> "Chunk": dtype=self.dtype, keep_gathered=self.keep_gathered, pin_memory=self.pin_memory, - ddp_group=self.ddp_group, + extra_dp_group=self.extra_dp_group, ) grad_chunk.num_tensors = self.num_tensors grad_chunk.utilized_size = self.utilized_size diff --git a/colossalai/zero/gemini/chunk/manager.py b/colossalai/zero/gemini/chunk/manager.py index 46511d7c6991..5ad622a13910 100644 --- a/colossalai/zero/gemini/chunk/manager.py +++ b/colossalai/zero/gemini/chunk/manager.py @@ -39,8 +39,7 @@ def register_tensor( group_type: str, config_key: int, zero_group: ProcessGroup, - ddp_group: ProcessGroup = None, - use_ddp: bool = False, + extra_dp_group: ProcessGroup = None, cpu_offload: bool = False, pin_memory: bool = False, ) -> None: @@ -87,8 +86,7 @@ def register_tensor( dtype=tensor.dtype, cpu_shard_init=cpu_offload, pin_memory=pin_memory, - ddp_group=ddp_group, - use_ddp=use_ddp, + extra_dp_group=extra_dp_group, **chunk_kwargs, ) diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index 03afcef76a22..ff943f4b49e0 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -89,8 +89,7 @@ def __init__( zero_group: Optional[ProcessGroup] = None, memstats: Optional[MemStats] = None, # genimi memory stats master_weights: bool = True, - ddp_group: Optional[ProcessGroup] = None, - use_ddp: bool = False, + extra_dp_group: Optional[ProcessGroup] = None, verbose: bool = False, ) -> None: assert mixed_precision in (torch.float16, torch.bfloat16) @@ -131,8 +130,7 @@ def __init__( self.scatter_after_inference = scatter_after_inference self.mixed_precision = mixed_precision self.zero_group = zero_group or _get_default_group() - self.ddp_group = ddp_group - self.use_ddp = use_ddp + self.extra_dp_group = extra_dp_group self.reuse_fp16_chunk = master_weights self.master_weights = master_weights @@ -381,12 +379,12 @@ def grad_handle(self, p, grad): self.chunk_manager.release_chunk(chunk) if grad_chunk.is_gathered: grad_chunk.cuda_global_chunk.div_(chunk.pg_size) - if self.use_ddp: - grad_chunk.cuda_global_chunk.div_(chunk.ddp_size) + if self.extra_dp_group is not None: + grad_chunk.cuda_global_chunk.div_(chunk.extra_dp_size) else: grad_chunk.cuda_shard.div_(chunk.pg_size) - if self.use_ddp: - grad_chunk.cuda_shard.div_(chunk.ddp_size) + if self.extra_dp_group is not None: + grad_chunk.cuda_shard.div_(chunk.extra_dp_size) # check overflow elements self.overflow_counter += grad_chunk.has_inf_or_nan # record l2 norm for gradient clipping. flag is bound to fp16 chunk @@ -763,8 +761,7 @@ def _init_chunks(self, param_order, strict_ddp_mode: bool, cpu_offload: bool, pi group_type="fp16_param", config_key=zero_world_size, zero_group=self.zero_group, - ddp_group=self.ddp_group, - use_ddp=self.use_ddp, + extra_dp_group=self.extra_dp_group, cpu_offload=cpu_offload, pin_memory=pin_memory, ) @@ -779,8 +776,7 @@ def _init_chunks(self, param_order, strict_ddp_mode: bool, cpu_offload: bool, pi group_type="fp32_param", config_key=zero_world_size, zero_group=self.zero_group, - ddp_group=self.ddp_group, - use_ddp=self.use_ddp, + extra_dp_group=self.extra_dp_group, cpu_offload=cpu_offload, pin_memory=pin_memory, ) diff --git a/tests/test_booster/test_plugin/test_gemini_plugin.py b/tests/test_booster/test_plugin/test_gemini_plugin.py index 4e9608ccf0eb..11f4c6e1185d 100644 --- a/tests/test_booster/test_plugin/test_gemini_plugin.py +++ b/tests/test_booster/test_plugin/test_gemini_plugin.py @@ -1,5 +1,6 @@ from contextlib import nullcontext from typing import Optional +import pytest import torch import torch.distributed as dist @@ -17,14 +18,15 @@ from tests.kit.model_zoo import model_zoo -def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism, tp_size, ddp_size) -> Optional[str]: +def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, zero_size, tp_size) -> Optional[str]: try: if init_method == "lazy": ctx = LazyInitContext() else: ctx = nullcontext() - enable_all_optimization = True if enable_tensor_parallelism else False - plugin = GeminiPlugin(max_norm=1.0, initial_scale=2**5, enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, ddp_size=ddp_size, enable_all_optimization=enable_all_optimization) + extra_dp_size = dist.get_world_size() // (zero_size * tp_size) + enable_all_optimization = True if tp_size > 1 else False + plugin = GeminiPlugin(max_norm=1.0, initial_scale=2**5, tp_size=tp_size, extra_dp_size=extra_dp_size, enable_all_optimization=enable_all_optimization) booster = Booster(plugin=plugin) with ctx: model = model_fn() @@ -62,10 +64,9 @@ def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tenso @parameterize("subset", ["torchvision", "transformers", "diffusers"]) @parameterize("init_method", ["none"]) -@parameterize("enable_tensor_parallelism", [True, False]) -@parameterize("tp_size", [1]) -@parameterize("ddp_size", [2]) -def check_gemini_plugin(subset: str, init_method: str = "none", enable_tensor_parallelism: bool = True, early_stop: bool = True, tp_size: int = 1, ddp_size: int = 1): +@parameterize("zero_size", [2]) +@parameterize("tp_size", [1, 2]) +def check_gemini_plugin(subset: str, init_method: str = "none", early_stop: bool = True, zero_size: int = 1, tp_size: int = 1): """check gemini plugin over model zoo Args: @@ -127,9 +128,9 @@ def check_gemini_plugin(subset: str, init_method: str = "none", enable_tensor_pa # TODO debug blip2 when using tp, something wrong with shift_logits's shape if "transformers_blip2" in name: - enable_tensor_parallelism = False + tp_size = 1 - err = run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, enable_tensor_parallelism, tp_size, ddp_size) + err = run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, zero_size, tp_size) torch.cuda.empty_cache() if err is None: passed_models.append(name) @@ -155,6 +156,11 @@ def run_dist(rank, world_size, port, early_stop: bool = True): def test_gemini_plugin(early_stop: bool = True): spawn(run_dist, 4, early_stop=early_stop) +@pytest.mark.largedist +@rerun_if_address_is_in_use() +def test_gemini_plugin_3d(early_stop: bool = True): + spawn(run_dist, 8, early_stop=early_stop) + if __name__ == "__main__": test_gemini_plugin(early_stop=False) \ No newline at end of file diff --git a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py index 3cb79f75aa15..8343c5f07e30 100644 --- a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py @@ -37,20 +37,21 @@ @parameterize("placement_config", MODEL_PLACEMENT_CONFIGS) @parameterize("model_name", ["transformers_bert_for_sequence_classification"]) @parameterize("use_safetensors", [False, True]) -@parameterize("enable_tensor_parallelism", [True, False]) -@parameterize("tp_size", [2]) -def exam_state_dict_with_origin(placement_config, model_name, use_safetensors: bool, enable_tensor_parallelism: bool, tp_size: int): +@parameterize("tp_size", [1, 2]) +@parameterize("zero_size", [2]) +def exam_state_dict_with_origin(placement_config, model_name, use_safetensors: bool, tp_size: int, zero_size: int): from transformers import BertForSequenceClassification (model_fn, data_gen_fn, output_transform_fn, _, _) = next(iter(model_zoo.get_sub_registry(model_name).values())) bert_model = model_fn() - enable_all_optimization = True if enable_tensor_parallelism else False + enable_all_optimization = True if tp_size > 1 else False with shared_tempdir() as tempdir: pretrained_path = os.path.join(tempdir, "pretrained") bert_model.config.save_pretrained(save_directory=pretrained_path) - plugin = GeminiPlugin(**placement_config, enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, enable_all_optimization=enable_all_optimization) + extra_dp_size = dist.get_world_size() // (zero_size * tp_size) + plugin = GeminiPlugin(**placement_config, tp_size=tp_size, enable_all_optimization=enable_all_optimization, extra_dp_size=extra_dp_size) booster = Booster(plugin=plugin) bert_model, _, _, _, _ = booster.boost(bert_model) model_size = sum(p.numel() * p.element_size() for p in bert_model.parameters()) / 1024**2 @@ -69,14 +70,14 @@ def exam_state_dict_with_origin(placement_config, model_name, use_safetensors: b @parameterize("shard", [True, False]) @parameterize("model_name", ["transformers_gpt"]) @parameterize("size_per_shard", [32]) -@parameterize("enable_tensor_parallelism", [True, False]) -@parameterize("tp_size", [2]) -@parameterize("ddp_size", [2]) -def exam_state_dict(placement_config, shard: bool, model_name: str, size_per_shard: int, enable_tensor_parallelism: bool, tp_size: int, ddp_size: int): +@parameterize("tp_size", [1, 2]) +@parameterize("zero_size", [2]) +def exam_state_dict(placement_config, shard: bool, model_name: str, size_per_shard: int, tp_size: int, zero_size: int): (model_fn, data_gen_fn, output_transform_fn, _, _) = next(iter(model_zoo.get_sub_registry(model_name).values())) criterion = lambda x: x.mean() - enable_all_optimization = True if enable_tensor_parallelism else False - plugin = GeminiPlugin(**placement_config, precision="fp16", initial_scale=(2**14), enable_tensor_parallelism=enable_tensor_parallelism, tp_size=tp_size, ddp_size=ddp_size, enable_all_optimization=enable_all_optimization) + enable_all_optimization = True if tp_size > 1 else False + extra_dp_size = dist.get_world_size() // (zero_size * tp_size) + plugin = GeminiPlugin(**placement_config, precision="fp16", initial_scale=(2**14), tp_size=tp_size, extra_dp_size=extra_dp_size, enable_all_optimization=enable_all_optimization) booster = Booster(plugin=plugin) model = model_fn() @@ -155,7 +156,13 @@ def run_dist(rank, world_size, port): @pytest.mark.dist -@pytest.mark.parametrize("world_size", [8]) +@pytest.mark.parametrize("world_size", [4]) @rerun_if_address_is_in_use() def test_gemini_ckpIO(world_size): + spawn(run_dist, world_size) + +@pytest.mark.largedist +@pytest.mark.parametrize("world_size", [8]) +@rerun_if_address_is_in_use() +def test_gemini_ckpIO_3d(world_size): spawn(run_dist, world_size) \ No newline at end of file From a96e1a7289873707dd2b479b7d0aef5e30132542 Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Thu, 16 Nov 2023 14:48:35 +0800 Subject: [PATCH 09/13] simplify tests --- colossalai/booster/plugin/gemini_plugin.py | 1 - .../test_plugin/test_gemini_plugin.py | 2 +- .../test_amp_optimizer.py | 36 ------------------- .../test_naive_optimizer.py | 21 ++--------- .../test_zero_optimizer.py | 36 +++++-------------- 5 files changed, 11 insertions(+), 85 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index 06a491e4756a..faf98723e44f 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -344,7 +344,6 @@ def __init__( max_scale: float = 2**32, max_norm: float = 0.0, norm_type: float = 2.0, - enable_tensor_parallelism: bool = False, tp_size: int = 1, extra_dp_size:int = 1, enable_all_optimization: bool = False, diff --git a/tests/test_booster/test_plugin/test_gemini_plugin.py b/tests/test_booster/test_plugin/test_gemini_plugin.py index 11f4c6e1185d..61debe47b599 100644 --- a/tests/test_booster/test_plugin/test_gemini_plugin.py +++ b/tests/test_booster/test_plugin/test_gemini_plugin.py @@ -65,7 +65,7 @@ def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn, zero_size, t @parameterize("subset", ["torchvision", "transformers", "diffusers"]) @parameterize("init_method", ["none"]) @parameterize("zero_size", [2]) -@parameterize("tp_size", [1, 2]) +@parameterize("tp_size", [2]) def check_gemini_plugin(subset: str, init_method: str = "none", early_stop: bool = True, zero_size: int = 1, tp_size: int = 1): """check gemini plugin over model zoo diff --git a/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_amp_optimizer.py b/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_amp_optimizer.py index 9e7336b93b3a..f652d18e9494 100644 --- a/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_amp_optimizer.py +++ b/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_amp_optimizer.py @@ -124,25 +124,6 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, @parameterize( "test_config", [ - { - "tp_size": 1, - "pp_size": 2, - "num_microbatches": 4, - "enable_all_optimization": True, - "use_lazy_init": True, - "precision": "fp16", - "max_norm": 5, - "initial_scale": 1, - }, - { - "tp_size": 2, - "pp_size": 1, - "enable_all_optimization": True, - "use_lazy_init": False, - "precision": "fp16", - "max_norm": 5, - "initial_scale": 1, - }, { "tp_size": 2, "pp_size": 2, @@ -153,23 +134,6 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, "max_norm": 5, "initial_scale": 1, }, - { - "tp_size": 1, - "pp_size": 2, - "num_microbatches": 4, - "enable_all_optimization": True, - "use_lazy_init": True, - "precision": "bf16", - "max_norm": 5, - }, - { - "tp_size": 2, - "pp_size": 1, - "enable_all_optimization": True, - "use_lazy_init": False, - "precision": "bf16", - "max_norm": 5, - }, { "tp_size": 2, "pp_size": 2, diff --git a/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_naive_optimizer.py b/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_naive_optimizer.py index b8ead795da76..a749a2966fde 100644 --- a/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_naive_optimizer.py +++ b/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_naive_optimizer.py @@ -102,28 +102,11 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, @parameterize( "test_config", [ - { - "tp_size": 1, - "pp_size": 2, - "num_microbatches": 4, - "enable_all_optimization": True, - "use_lazy_init": True, - "precision": "fp32", - "max_norm": 5, - }, - { - "tp_size": 2, - "pp_size": 1, - "enable_all_optimization": True, - "use_lazy_init": False, - "precision": "fp32", - "max_norm": 5, - }, { "tp_size": 2, "pp_size": 2, "num_microbatches": 4, - "enable_all_optimization": True, + "enable_all_optimization": False, "use_lazy_init": False, "precision": "fp32", "max_norm": 5, @@ -148,7 +131,7 @@ def run_test(test_config): "tp_size": 2, "pp_size": 2, "num_microbatches": 4, - "enable_all_optimization": True, + "enable_all_optimization": False, "use_lazy_init": False, "precision": "fp32", "max_norm": 5, diff --git a/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py b/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py index 061c702552cf..c0ce82660de4 100644 --- a/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py +++ b/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py @@ -106,17 +106,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, "pp_size": 2, "num_microbatches": 4, "zero_stage": 1, - "enable_all_optimization": True, - "use_lazy_init": True, - "precision": "fp16", - "max_norm": 5, - "initial_scale": 1, - }, - { - "tp_size": 2, - "pp_size": 1, - "zero_stage": 1, - "enable_all_optimization": True, + "enable_all_optimization": False, "use_lazy_init": False, "precision": "fp16", "max_norm": 5, @@ -126,38 +116,28 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, "tp_size": 2, "pp_size": 1, "zero_stage": 2, - "enable_all_optimization": True, + "enable_all_optimization": False, "use_lazy_init": False, "precision": "fp16", "max_norm": 5, "initial_scale": 1, }, - { - "tp_size": 1, - "pp_size": 2, - "num_microbatches": 4, - "zero_stage": 1, - "enable_all_optimization": True, - "use_lazy_init": True, - "precision": "bf16", - "max_norm": 5, - }, { "tp_size": 2, "pp_size": 1, "zero_stage": 1, - "enable_all_optimization": True, + "enable_all_optimization": False, "use_lazy_init": False, "precision": "bf16", "max_norm": 5, - }, + } { - "tp_size": 2, - "pp_size": 1, + "tp_size": 1, + "pp_size": 2, "zero_stage": 2, - "enable_all_optimization": True, + "enable_all_optimization": False, "use_lazy_init": False, - "precision": "bf16", + "precision": "fp16", "max_norm": 5, }, ], From b76c1760a9bac582602262071386feba966b95db Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Thu, 16 Nov 2023 14:57:53 +0800 Subject: [PATCH 10/13] fix --- colossalai/booster/plugin/gemini_plugin.py | 2 +- .../test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index faf98723e44f..2a59a14119d9 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -404,7 +404,7 @@ def __init__( world_size = dist.get_world_size() self.zero_size = world_size // (self.tp_size * self.extra_dp_size) assert world_size == (self.tp_size * self.extra_dp_size) * self.zero_size, f"The global group size can't be evenly divided by the subgroup size." - assert self.zero_size > 1, f"Gemini's group size should greater than 1, please reduce tensor paralllelism group size or extra-dp group size." + assert self.zero_size > 1, f"Gemini's group size should be greater than 1, please reduce tensor paralllelism group size or extra-dp group size." if self.tp_size == 1 and self.extra_dp_size == 1: self.extra_dp_group = None self.tp_group = None diff --git a/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py b/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py index c0ce82660de4..68ead4cca2b3 100644 --- a/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py +++ b/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py @@ -130,7 +130,7 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, "use_lazy_init": False, "precision": "bf16", "max_norm": 5, - } + }, { "tp_size": 1, "pp_size": 2, From 3cf515ea422a6fa82b528a8697a53660eb568cd8 Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Thu, 16 Nov 2023 15:57:16 +0800 Subject: [PATCH 11/13] fix --- .../test_zero_optimizer.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py b/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py index 68ead4cca2b3..41f06a4c3888 100644 --- a/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py +++ b/tests/test_shardformer/test_hybrid_parallel_grad_clip_norm/test_zero_optimizer.py @@ -131,15 +131,6 @@ def check_forward_backward(model_fn, data_gen_fn, output_transform_fn, loss_fn, "precision": "bf16", "max_norm": 5, }, - { - "tp_size": 1, - "pp_size": 2, - "zero_stage": 2, - "enable_all_optimization": False, - "use_lazy_init": False, - "precision": "fp16", - "max_norm": 5, - }, ], ) def run_test(test_config): From bea4577ec53b99ebacb487568bd819bd02153147 Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Thu, 16 Nov 2023 18:38:25 +0800 Subject: [PATCH 12/13] fix fix fix --- colossalai/booster/plugin/gemini_plugin.py | 27 ++++++---------------- colossalai/zero/gemini/chunk/chunk.py | 2 +- 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index 2a59a14119d9..b0ec4abac813 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -34,6 +34,8 @@ SUPPORTED_PRECISION = ["fp16", "bf16"] PRECISION_STR_TO_DTYPE = {"fp16": torch.half, "bf16": torch.bfloat16} +ZERO_AXIS, DP_AXIS, TP_AXIS = 0, 1, 2 + def get_param_info(optim: Optimizer): # Get a backup of necessary information of parameters for future use, which includes: # 1. A mapping from integer param_id to param32 shape. @@ -404,26 +406,11 @@ def __init__( world_size = dist.get_world_size() self.zero_size = world_size // (self.tp_size * self.extra_dp_size) assert world_size == (self.tp_size * self.extra_dp_size) * self.zero_size, f"The global group size can't be evenly divided by the subgroup size." - assert self.zero_size > 1, f"Gemini's group size should be greater than 1, please reduce tensor paralllelism group size or extra-dp group size." - if self.tp_size == 1 and self.extra_dp_size == 1: - self.extra_dp_group = None - self.tp_group = None - self.zero_group = None - elif self.tp_size == 1: - self.tp_group = None - self.pg_mesh = ProcessGroupMesh(self.zero_size, self.extra_dp_size) - self.zero_group = self.pg_mesh.get_group_along_axis(0) - self.extra_dp_group = self.pg_mesh.get_group_along_axis(1) - elif self.extra_dp_size == 1: - self.extra_dp_group = None - self.pg_mesh = ProcessGroupMesh(self.zero_size, self.tp_size) - self.zero_group = self.pg_mesh.get_group_along_axis(0) - self.tp_group = self.pg_mesh.get_group_along_axis(1) - else: - self.pg_mesh = ProcessGroupMesh(self.zero_size, self.extra_dp_size, self.tp_size) - self.zero_group = self.pg_mesh.get_group_along_axis(0) - self.extra_dp_group = self.pg_mesh.get_group_along_axis(1) - self.tp_group = self.pg_mesh.get_group_along_axis(2) + + self.pg_mesh = ProcessGroupMesh(self.zero_size, self.extra_dp_size, self.tp_size) + self.zero_group = self.pg_mesh.get_group_along_axis(ZERO_AXIS) + self.extra_dp_group = self.pg_mesh.get_group_along_axis(DP_AXIS) + self.tp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) self.shard_config = ShardConfig( tensor_parallel_process_group=self.tp_group, diff --git a/colossalai/zero/gemini/chunk/chunk.py b/colossalai/zero/gemini/chunk/chunk.py index 8785bb429dd8..42a8cdbb34cb 100644 --- a/colossalai/zero/gemini/chunk/chunk.py +++ b/colossalai/zero/gemini/chunk/chunk.py @@ -95,7 +95,7 @@ def __init__( self.pg_size = dist.get_world_size(self.torch_pg) self.pg_rank = dist.get_rank(self.torch_pg) self.extra_dp_group = extra_dp_group - self.extra_dp_size = dist.get_world_size(self.extra_dp_group) + self.extra_dp_size = dist.get_world_size(self.extra_dp_group) if self.extra_dp_group is not None else 1 # the chunk size should be divisible by the dp degree if not keep_gathered: From d78a6b07e8fef02c6d1fcb7133439742973c04a5 Mon Sep 17 00:00:00 2001 From: flybird11111 <1829166702@qq.com> Date: Thu, 16 Nov 2023 20:20:33 +0800 Subject: [PATCH 13/13] fix --- colossalai/booster/plugin/gemini_plugin.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index b0ec4abac813..0908fa40dcb8 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -10,6 +10,7 @@ from torch.optim import Optimizer from torch.optim.lr_scheduler import _LRScheduler as LRScheduler from torch.utils.data import DataLoader +from torch.distributed.distributed_c10d import _get_default_group from colossalai.checkpoint_io import CheckpointIndexFile, CheckpointIO, GeneralCheckpointIO from colossalai.checkpoint_io.utils import ( @@ -408,9 +409,9 @@ def __init__( assert world_size == (self.tp_size * self.extra_dp_size) * self.zero_size, f"The global group size can't be evenly divided by the subgroup size." self.pg_mesh = ProcessGroupMesh(self.zero_size, self.extra_dp_size, self.tp_size) - self.zero_group = self.pg_mesh.get_group_along_axis(ZERO_AXIS) - self.extra_dp_group = self.pg_mesh.get_group_along_axis(DP_AXIS) - self.tp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) + self.zero_group = self.pg_mesh.get_group_along_axis(ZERO_AXIS) if self.zero_size < world_size else _get_default_group() + self.extra_dp_group = self.pg_mesh.get_group_along_axis(DP_AXIS) if self.extra_dp_size > 1 else None + self.tp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) if self.tp_size > 1 else None self.shard_config = ShardConfig( tensor_parallel_process_group=self.tp_group,