From e7b6fc934af76387997b4d35a13ceaec0da9bbb9 Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Wed, 29 May 2024 06:57:12 +0000 Subject: [PATCH 1/2] use async stream to prefetch and h2d data moving --- colossalai/booster/plugin/gemini_plugin.py | 3 +++ colossalai/zero/gemini/chunk/chunk.py | 5 ++--- colossalai/zero/gemini/chunk/manager.py | 2 ++ colossalai/zero/gemini/chunk/utils.py | 7 ++----- colossalai/zero/gemini/gemini_ddp.py | 5 ++--- colossalai/zero/gemini/gemini_hook.py | 10 ++++++---- 6 files changed, 17 insertions(+), 15 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index ab554d21dc95..e4758e18c7f8 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -363,12 +363,15 @@ def __init__( enable_jit_fused: bool = False, enable_sequence_overlap: bool = False, enable_async_reduce: bool = True, + enable_prefetch: bool = True, verbose: bool = False, ) -> None: super().__init__() assert precision in SUPPORTED_PRECISION, f"precision {precision} is not supported" if get_accelerator().name == "npu": assert placement_policy == "static", "NPU only supports static placement policy" + if enable_prefetch: + pin_memory = True self.gemini_config = dict( chunk_config_dict=chunk_config_dict, chunk_init_device=(chunk_init_device or get_accelerator().get_current_device()), diff --git a/colossalai/zero/gemini/chunk/chunk.py b/colossalai/zero/gemini/chunk/chunk.py index ed5b96519441..4cd9933d180a 100644 --- a/colossalai/zero/gemini/chunk/chunk.py +++ b/colossalai/zero/gemini/chunk/chunk.py @@ -338,8 +338,7 @@ def shard_move(self, device: torch.device, force_copy: bool = False): if self.cuda_shard: return - - self.cuda_shard = self.cpu_shard.to(get_accelerator().get_current_device()) + self.cuda_shard = self.cpu_shard.to(get_accelerator().get_current_device(), non_blocking=True) if not self.pin_memory: self.cpu_shard = None @@ -349,7 +348,7 @@ def shard_move(self, device: torch.device, force_copy: bool = False): if self.pin_memory: if force_copy or not self.cpu_vis_flag: - self.cpu_shard.copy_(self.cuda_shard) + self.cpu_shard.copy_(self.cuda_shard, non_blocking=True) # if cpu_shard has been visited # copy operation is not need else: diff --git a/colossalai/zero/gemini/chunk/manager.py b/colossalai/zero/gemini/chunk/manager.py index 36e7ee57bad4..6df4e08c8e49 100644 --- a/colossalai/zero/gemini/chunk/manager.py +++ b/colossalai/zero/gemini/chunk/manager.py @@ -25,6 +25,7 @@ def __init__( chunk_configuration, init_device: Optional[torch.device] = None, reuse_fp16_chunk: bool = True, + max_prefetch: int = 0, ) -> None: self.device = init_device or get_accelerator().get_current_device() self.dp_degree_chunk_size_dict: Dict[int, int] = dict() @@ -42,6 +43,7 @@ def __init__( # Whether model is accumulating gradients, self.accumulating_grads = False self.overflow_counter = torch.tensor([0], dtype=torch.int, device=get_accelerator().get_current_device()) + self._prefetch_stream = get_accelerator().Stream() if max_prefetch else None def register_tensor( self, diff --git a/colossalai/zero/gemini/chunk/utils.py b/colossalai/zero/gemini/chunk/utils.py index 049c5c10255b..884d1306ef77 100644 --- a/colossalai/zero/gemini/chunk/utils.py +++ b/colossalai/zero/gemini/chunk/utils.py @@ -21,6 +21,7 @@ def init_chunk_manager( hidden_dim: Optional[int] = None, reuse_fp16_chunk: bool = True, verbose: bool = False, + max_prefetch: int = 0, **kwargs, ) -> ChunkManager: if hidden_dim: @@ -51,9 +52,5 @@ def init_chunk_manager( ) dist.barrier() - chunk_manager = ChunkManager( - config_dict, - init_device, - reuse_fp16_chunk=reuse_fp16_chunk, - ) + chunk_manager = ChunkManager(config_dict, init_device, reuse_fp16_chunk=reuse_fp16_chunk, max_prefetch=max_prefetch) return chunk_manager diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index 050643dfa610..3ac85ff69ef2 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -104,9 +104,7 @@ def __init__( self.enable_gradient_accumulation = enable_gradient_accumulation if chunk_config_dict is not None: self.chunk_manager = ChunkManager( - chunk_config_dict, - chunk_init_device, - reuse_fp16_chunk=reuse_fp16_chunk, + chunk_config_dict, chunk_init_device, reuse_fp16_chunk=reuse_fp16_chunk, max_prefetch=max_prefetch ) else: # some ugly hotfix for the compatibility with Lightning @@ -122,6 +120,7 @@ def __init__( process_group=zero_group, reuse_fp16_chunk=reuse_fp16_chunk, verbose=verbose, + max_prefetch=max_prefetch, ) self.gemini_manager = GeminiManager( placement_policy, diff --git a/colossalai/zero/gemini/gemini_hook.py b/colossalai/zero/gemini/gemini_hook.py index 736238a0992d..9e297c2a8a19 100644 --- a/colossalai/zero/gemini/gemini_hook.py +++ b/colossalai/zero/gemini/gemini_hook.py @@ -5,6 +5,7 @@ import torch +from colossalai.accelerator import get_accelerator from colossalai.tensor.param_op_hook import ColoParamOpHook from colossalai.utils import is_ddp_ignored from colossalai.zero.gemini import TensorState @@ -54,10 +55,11 @@ def pre_op(self, params): ) # prefetch - for chunk in chunks_fetch_async: - maybe_work = self._chunk_manager.access_chunk(chunk, async_access=True) - if maybe_work is not None: - self._gemini_manager.add_work(chunk, maybe_work) + with get_accelerator().stream(self._gemini_manager.chunk_manager._prefetch_stream): + for chunk in chunks_fetch_async: + maybe_work = self._chunk_manager.access_chunk(chunk, async_access=True) + if maybe_work is not None: + self._gemini_manager.add_work(chunk, maybe_work) # record cuda model data of the current OP, including memory for prefetched chunks self._gemini_manager.record_model_data_volume() From 7a6bb0020c76667c7575e39cefe359fb13d0a9d5 Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Wed, 12 Jun 2024 06:54:04 +0000 Subject: [PATCH 2/2] Remove redundant code --- colossalai/booster/plugin/gemini_plugin.py | 1 - 1 file changed, 1 deletion(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index 314fad983f29..474b78aa26b8 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -363,7 +363,6 @@ def __init__( enable_jit_fused: bool = False, enable_sequence_overlap: bool = False, enable_async_reduce: bool = True, - enable_prefetch: bool = True, verbose: bool = False, ) -> None: super().__init__()