From 9675109d4310176ed14acf082029dd71656646af Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Sun, 8 Oct 2023 15:36:57 +0800 Subject: [PATCH 01/13] add test --- .../test_zero/test_gemini/test_grad_accum.py | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 tests/test_zero/test_gemini/test_grad_accum.py diff --git a/tests/test_zero/test_gemini/test_grad_accum.py b/tests/test_zero/test_gemini/test_grad_accum.py new file mode 100644 index 000000000000..755172d1e1a2 --- /dev/null +++ b/tests/test_zero/test_gemini/test_grad_accum.py @@ -0,0 +1,121 @@ +import pytest +import torch +import torch.distributed as dist +from torch.testing import assert_close + +import colossalai +from colossalai.nn.optimizer import HybridAdam +from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn +from colossalai.utils import set_seed +from colossalai.utils.cuda import get_current_device +from colossalai.zero import GeminiDDP, GeminiOptimizer +from colossalai.zero.gemini.chunk import search_chunk_configuration +from tests.components_to_test import run_fwd +from tests.components_to_test.registry import non_distributed_component_funcs + +PLACEMENT_CONFIGS = [ + {"placement_policy": "static", "shard_param_frac": 0.0}, # zero2 + {"placement_policy": "static", "shard_param_frac": 1.0}, # zero3 + {"placement_policy": "static", "shard_param_frac": 0.5}, # zero3-half + {"placement_policy": "auto"}, +] + + +def check_grad(model: GeminiDDP, torch_model: torch.nn.Module, no_sync: bool = False): + if no_sync: + for (n, p0), (_, p1) in zip(model.named_parameters(), torch_model.named_parameters()): + grad = p0.grad.to(p1.grad.dtype) + assert_close(grad, p1.grad, rtol=1e-3, atol=1e-4, msg=f"{n}, gemini_grad: {grad}, torch_grad: {p1.grad}") + else: + chunk_manager = model.chunk_manager + param_list = [p for p in model.parameters()] + chunk_list = chunk_manager.get_chunks(param_list) + for chunk in chunk_list: + chunk_manager.access_chunk(chunk) + + for (n, p0), (_, p1) in zip(model.named_parameters(), torch_model.named_parameters()): + grad = p0.to(p1.grad.dtype) + assert_close(grad, p1.grad, rtol=1e-3, atol=1e-3, msg=f"{n}, gemini_grad: {grad}, torch_grad: {p1.grad}") + + +@parameterize("placement_config", PLACEMENT_CONFIGS) +@parameterize("keep_gathered", [False, True]) +@parameterize("model_name", ["gpt2"]) +@parameterize("use_grad_checkpoint", [False, True]) +def exam_gemini_grad_acc(placement_config, keep_gathered: bool, use_grad_checkpoint: bool, model_name: str): + init_device = get_current_device() + get_components_func = non_distributed_component_funcs.get_callable(model_name) + model_builder, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func() + + set_seed(42) + gemini_model = model_builder(use_grad_checkpoint) + + set_seed(42) + torch_model = model_builder(use_grad_checkpoint).cuda() + for torch_p, p in zip(torch_model.parameters(), gemini_model.parameters()): + torch_p.data.copy_(p.data) + + world_size = torch.distributed.get_world_size() + config_dict, *_ = search_chunk_configuration(gemini_model, search_range_m=1, search_interval=100) + config_dict[world_size]["chunk_size"] = 5000 + config_dict[world_size]["keep_gathered"] = keep_gathered + gemini_model = GeminiDDP(gemini_model, config_dict, init_device, pin_memory=True, **placement_config) + optimizer = HybridAdam(gemini_model.parameters(), lr=1e-3) + gemini_optim = GeminiOptimizer(optimizer, gemini_model, initial_scale=1) + + rank = dist.get_rank() + torch_optim = torch.optim.Adam(torch_model.parameters(), lr=1e-3) + + set_seed(rank) + accum_iter = 3 + for i, (input_ids, label) in enumerate(train_dataloader): + input_ids, label = input_ids.cuda(), label.cuda() + + torch_model.train() + gemini_model.train() + + set_seed(42 + rank) + torch_loss = run_fwd(torch_model, input_ids, label, criterion) + torch_loss = torch_loss / accum_iter + torch_loss.backward() + + set_seed(42 + rank) + gemini_loss = run_fwd(gemini_model, input_ids, label, criterion) + gemini_loss = gemini_loss / accum_iter + gemini_optim.backward(gemini_loss) + + assert torch.allclose( + torch_loss, gemini_loss, rtol=1e-3, atol=1e-5 + ), f"torch_loss: {torch_loss}, gemini_loss: {gemini_loss}" + + if (i + 1) % accum_iter == 0: + torch_optim.step() + gemini_optim.step() + break + + check_grad(gemini_model, torch_model) + + # check updated param + torch_dict = torch_model.state_dict() + gemini_dict = gemini_model.state_dict(only_rank_0=False) + + for key, value in gemini_dict.items(): + torch_key = "module." + key + torch_value = torch_dict[torch_key].to(device=value.device, dtype=value.dtype) + assert_close(value, torch_value, rtol=1e-3, atol=2e-3) + + +def run_dist(rank, world_size, port): + config = {} + colossalai.launch(config=config, rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl") + exam_gemini_grad_acc() + + +@pytest.mark.dist +@rerun_if_address_is_in_use() +def test_grad_accumulation(): + spawn(run_dist, 2) + + +if __name__ == "__main__": + test_grad_accumulation() From 85d96c2caa197e856ed2f4885dddb4c2fdb74d5b Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Sun, 8 Oct 2023 15:39:05 +0800 Subject: [PATCH 02/13] fix no_sync bug in low level zero plugin --- colossalai/booster/plugin/low_level_zero_plugin.py | 2 +- tests/test_zero/test_gemini/test_grad_accum.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/colossalai/booster/plugin/low_level_zero_plugin.py b/colossalai/booster/plugin/low_level_zero_plugin.py index 088b67c8c533..dc78fe8c094c 100644 --- a/colossalai/booster/plugin/low_level_zero_plugin.py +++ b/colossalai/booster/plugin/low_level_zero_plugin.py @@ -335,4 +335,4 @@ def get_checkpoint_io(self) -> CheckpointIO: def no_sync(self, model: nn.Module, optimizer: OptimizerWrapper) -> Iterator[None]: assert isinstance(optimizer, LowLevelZeroOptimizer) - return optimizer.optim.no_sync() + return optimizer.no_sync() diff --git a/tests/test_zero/test_gemini/test_grad_accum.py b/tests/test_zero/test_gemini/test_grad_accum.py index 755172d1e1a2..c1e75ec007ba 100644 --- a/tests/test_zero/test_gemini/test_grad_accum.py +++ b/tests/test_zero/test_gemini/test_grad_accum.py @@ -67,7 +67,7 @@ def exam_gemini_grad_acc(placement_config, keep_gathered: bool, use_grad_checkpo torch_optim = torch.optim.Adam(torch_model.parameters(), lr=1e-3) set_seed(rank) - accum_iter = 3 + accum_iter = 4 for i, (input_ids, label) in enumerate(train_dataloader): input_ids, label = input_ids.cuda(), label.cuda() From 79c148f3f1e75d9ccfb5c37256afc79386222507 Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Thu, 12 Oct 2023 10:51:03 +0800 Subject: [PATCH 03/13] fix test --- .../test_zero/test_gemini/test_grad_accum.py | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/tests/test_zero/test_gemini/test_grad_accum.py b/tests/test_zero/test_gemini/test_grad_accum.py index c1e75ec007ba..2439b780be0d 100644 --- a/tests/test_zero/test_gemini/test_grad_accum.py +++ b/tests/test_zero/test_gemini/test_grad_accum.py @@ -1,6 +1,7 @@ import pytest import torch import torch.distributed as dist +from torch.nn.parallel import DistributedDataParallel as DDP from torch.testing import assert_close import colossalai @@ -21,21 +22,18 @@ ] -def check_grad(model: GeminiDDP, torch_model: torch.nn.Module, no_sync: bool = False): - if no_sync: - for (n, p0), (_, p1) in zip(model.named_parameters(), torch_model.named_parameters()): - grad = p0.grad.to(p1.grad.dtype) - assert_close(grad, p1.grad, rtol=1e-3, atol=1e-4, msg=f"{n}, gemini_grad: {grad}, torch_grad: {p1.grad}") - else: - chunk_manager = model.chunk_manager - param_list = [p for p in model.parameters()] - chunk_list = chunk_manager.get_chunks(param_list) - for chunk in chunk_list: - chunk_manager.access_chunk(chunk) +def check_grad(model: GeminiDDP, torch_model: torch.nn.Module): + chunk_manager = model.chunk_manager + param_list = [p for p in model.parameters()] + chunk_list = chunk_manager.get_chunks(param_list) + for chunk in chunk_list: + chunk_manager.access_chunk(chunk) - for (n, p0), (_, p1) in zip(model.named_parameters(), torch_model.named_parameters()): - grad = p0.to(p1.grad.dtype) - assert_close(grad, p1.grad, rtol=1e-3, atol=1e-3, msg=f"{n}, gemini_grad: {grad}, torch_grad: {p1.grad}") + for (n, p0), (_, p1) in zip(model.named_parameters(), torch_model.named_parameters()): + # after backward, gradient is placed at parameters chunks + assert_close( + p0, p1.grad.to(p0.dtype), rtol=1e-3, atol=1e-3, msg=f"{n}, gemini_grad: {p0}, torch_grad: {p1.grad}" + ) @parameterize("placement_config", PLACEMENT_CONFIGS) @@ -59,12 +57,15 @@ def exam_gemini_grad_acc(placement_config, keep_gathered: bool, use_grad_checkpo config_dict, *_ = search_chunk_configuration(gemini_model, search_range_m=1, search_interval=100) config_dict[world_size]["chunk_size"] = 5000 config_dict[world_size]["keep_gathered"] = keep_gathered - gemini_model = GeminiDDP(gemini_model, config_dict, init_device, pin_memory=True, **placement_config) + gemini_model = GeminiDDP( + gemini_model, config_dict, init_device, pin_memory=True, enable_gradient_accumulation=True, **placement_config + ) optimizer = HybridAdam(gemini_model.parameters(), lr=1e-3) gemini_optim = GeminiOptimizer(optimizer, gemini_model, initial_scale=1) rank = dist.get_rank() torch_optim = torch.optim.Adam(torch_model.parameters(), lr=1e-3) + torch_model = DDP(torch_model) set_seed(rank) accum_iter = 4 @@ -84,17 +85,19 @@ def exam_gemini_grad_acc(placement_config, keep_gathered: bool, use_grad_checkpo gemini_loss = gemini_loss / accum_iter gemini_optim.backward(gemini_loss) - assert torch.allclose( - torch_loss, gemini_loss, rtol=1e-3, atol=1e-5 - ), f"torch_loss: {torch_loss}, gemini_loss: {gemini_loss}" + print(i, torch_loss, gemini_loss) + assert torch.allclose(torch_loss, gemini_loss, rtol=1e-3, atol=1e-5) if (i + 1) % accum_iter == 0: torch_optim.step() gemini_optim.step() - break + continue check_grad(gemini_model, torch_model) + if i == accum_iter: + break + # check updated param torch_dict = torch_model.state_dict() gemini_dict = gemini_model.state_dict(only_rank_0=False) From b55370cef5660515869c95c773232ed5ba530e36 Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Thu, 12 Oct 2023 10:52:26 +0800 Subject: [PATCH 04/13] add argument for grad accum --- colossalai/booster/plugin/gemini_plugin.py | 3 +++ colossalai/zero/gemini/gemini_ddp.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index 6c165857506c..89f4859881ec 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -245,6 +245,7 @@ class GeminiPlugin(DPPluginBase): chunk_config_dict (dict, optional): chunk configuration dictionary. chunk_init_device (torch.device, optional): device to initialize the chunk. placement_policy (str, optional): "static" and "auto". Defaults to "static". + enable_gradient_accumulation (bool, optional): Whether to enable gradient accumulation. When set to True, gradient will be stored after doing backward pass. Defaults to False. shard_param_frac (float, optional): fraction of parameters to be sharded. Only for "static" placement. If `shard_param_frac` is 1.0, it's equal to zero-3. If `shard_param_frac` is 0.0, it's equal to zero-2. Defaults to 1.0. offload_optim_frac (float, optional): fraction of optimizer states to be offloaded. Only for "static" placement. @@ -291,6 +292,7 @@ def __init__( chunk_config_dict: Optional[dict] = None, chunk_init_device: Optional[torch.device] = None, placement_policy: str = "static", + enable_gradient_accumulation: bool = False, shard_param_frac: float = 1.0, # only for static placement offload_optim_frac: float = 0.0, # only for static placement offload_param_frac: float = 0.0, # only for static placement @@ -323,6 +325,7 @@ def __init__( chunk_config_dict=chunk_config_dict, chunk_init_device=(chunk_init_device or get_current_device()), placement_policy=placement_policy, + enable_gradient_accumulation=enable_gradient_accumulation, shard_param_frac=shard_param_frac, offload_optim_frac=offload_optim_frac, offload_param_frac=offload_param_frac, diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index a4871f7e4b40..2c6fa3835a77 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -59,6 +59,7 @@ def __init__( chunk_config_dict: Optional[dict] = None, chunk_init_device: torch.device = torch.device("cpu"), placement_policy: str = "static", + enable_gradient_accumulation: bool = False, shard_param_frac: float = 1.0, # only for static placement offload_optim_frac: float = 0.0, # only for static placement offload_param_frac: float = 0.0, # only for static placement @@ -115,6 +116,7 @@ def __init__( self.scatter_after_inference = scatter_after_inference self.mixed_precision = mixed_precision self.dp_process_group = process_group or _get_default_group() + self.enable_gradient_accumulation = enable_gradient_accumulation self.reuse_fp16_chunk = master_weights self.master_weights = master_weights From 5834aed7ef03a5a62e03843c2a1c6c6a62c4c5c0 Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Thu, 12 Oct 2023 12:30:15 +0800 Subject: [PATCH 05/13] add grad accum in backward hook for gemini --- colossalai/booster/plugin/gemini_plugin.py | 2 +- colossalai/zero/gemini/chunk/chunk.py | 15 +++++++++++ colossalai/zero/gemini/gemini_ddp.py | 18 ++++++++++--- colossalai/zero/gemini/gemini_optimizer.py | 1 + .../test_zero/test_gemini/test_grad_accum.py | 25 +++++++++++-------- 5 files changed, 47 insertions(+), 14 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index 89f4859881ec..20a931b816ea 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -258,7 +258,7 @@ class GeminiPlugin(DPPluginBase): warmup_non_model_data_ratio (float, optional): ratio of expected non-model data memory during warmup. Only for "auto" placement. Defaults to 0.8. steady_cuda_cap_ratio (float, optional): ratio of allowed cuda capacity for model data during steady state. Only for "auto" placement. Defaults to 0.9. precision (str, optional): precision. Support 'fp16' and 'bf16'. Defaults to 'fp16'. - master_weights (bool, optional): master weights. Defaults to True. + master_weights (bool, optional): Whether to keep fp32 master parameter weights in optimizer. Defaults to True. pin_memory (bool, optional): use pin memory on CPU. Defaults to False. force_outputs_fp32 (bool, optional): force outputs are fp32. Defaults to False. strict_ddp_mode (bool, optional): use strict ddp mode (only use dp without other parallelism). Defaults to False. diff --git a/colossalai/zero/gemini/chunk/chunk.py b/colossalai/zero/gemini/chunk/chunk.py index c8be773b2c4f..d3309fc5364f 100644 --- a/colossalai/zero/gemini/chunk/chunk.py +++ b/colossalai/zero/gemini/chunk/chunk.py @@ -434,6 +434,21 @@ def copy_tensor_to_chunk_slice( if update_ptr: tensor.data = self.cuda_global_chunk[tensor_info.offset : tensor_info.end].view(tensor.shape) + def add_tensor_to_chunk_slice(self, tensor: torch.Tensor, data_slice: torch.Tensor) -> None: + """ + Add data slice to the memory space indexed by the input tensor in the chunk. + Only used when accumulating gradient chunks. + + Args: + tensor (torch.Tensor): the tensor used to retrieve meta information + data_slice (torch.Tensor): the tensor to be added to the chunk + """ + # sanity check + assert self.is_gathered + + tensor_info = self.tensors_info[tensor] + self.cuda_global_chunk[tensor_info.offset : tensor_info.end].add_(data_slice.data.flatten()) + def get_valid_length(self) -> int: """Get the valid length of the chunk's payload.""" if self.keep_gathered: diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index 2c6fa3835a77..5a0de36aeb00 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -116,11 +116,15 @@ def __init__( self.scatter_after_inference = scatter_after_inference self.mixed_precision = mixed_precision self.dp_process_group = process_group or _get_default_group() - self.enable_gradient_accumulation = enable_gradient_accumulation self.reuse_fp16_chunk = master_weights self.master_weights = master_weights + self.enable_gradient_accumulation = enable_gradient_accumulation + if self.enable_gradient_accumulation: + self.reuse_fp16_chunk = False + self.accumulating_grads = False # Whether model is accumulating gradients + self._logger = get_dist_logger() if self.gemini_manager._premade_memstats_: @@ -329,7 +333,10 @@ def grad_handle(self, p, grad): ) grad_chunk = chunk if not self.reuse_fp16_chunk: - grad_chunk = self.chunk_manager.init_grad_chunk(chunk) + if not self.accumulating_grads: + grad_chunk = self.chunk_manager.init_grad_chunk(chunk) + else: + grad_chunk = chunk.grad_chunk # hold -> compute -> hold after bwd grad_chunk.tensor_trans_state(p, TensorState.COMPUTE) grad_chunk.tensor_trans_state(p, TensorState.HOLD_AFTER_BWD) @@ -338,7 +345,12 @@ def grad_handle(self, p, grad): chunk.tensor_trans_state(p, TensorState.HOLD) grad_chunk.tensor_trans_state(p, TensorState.READY_FOR_REDUCE) - grad_chunk.copy_tensor_to_chunk_slice(p, grad, update_ptr=self.reuse_fp16_chunk) + if not self.accumulating_grads: + grad_chunk.copy_tensor_to_chunk_slice(p, grad, update_ptr=self.reuse_fp16_chunk) + else: + grad_chunk.add_tensor_to_chunk_slice(p, grad) + if self.enable_gradient_accumulation: + self.accumulating_grads = True reduced = self.chunk_manager.reduce_chunk(grad_chunk) if reduced: if not self.reuse_fp16_chunk: diff --git a/colossalai/zero/gemini/gemini_optimizer.py b/colossalai/zero/gemini/gemini_optimizer.py index 3c42e96cb803..0d0298e067f3 100644 --- a/colossalai/zero/gemini/gemini_optimizer.py +++ b/colossalai/zero/gemini/gemini_optimizer.py @@ -263,6 +263,7 @@ def step(self, *args, **kwargs): self.zero_grad() if self.module.master_weights: self._update_fp16_params() + self.module.accumulating_grads = False return ret def clip_grad_norm(self, model: torch.nn.Module, max_norm: float, norm_type: float = 2.0): diff --git a/tests/test_zero/test_gemini/test_grad_accum.py b/tests/test_zero/test_gemini/test_grad_accum.py index 2439b780be0d..56ee3a483d79 100644 --- a/tests/test_zero/test_gemini/test_grad_accum.py +++ b/tests/test_zero/test_gemini/test_grad_accum.py @@ -26,24 +26,23 @@ def check_grad(model: GeminiDDP, torch_model: torch.nn.Module): chunk_manager = model.chunk_manager param_list = [p for p in model.parameters()] chunk_list = chunk_manager.get_chunks(param_list) - for chunk in chunk_list: - chunk_manager.access_chunk(chunk) + chunk_list = [chunk.grad_chunk for chunk in chunk_list] - for (n, p0), (_, p1) in zip(model.named_parameters(), torch_model.named_parameters()): - # after backward, gradient is placed at parameters chunks - assert_close( - p0, p1.grad.to(p0.dtype), rtol=1e-3, atol=1e-3, msg=f"{n}, gemini_grad: {p0}, torch_grad: {p1.grad}" - ) + for p0, p1 in zip(model.parameters(), torch_model.parameters()): + assert_close(p0, p1.grad, rtol=1e-3, atol=5e-5) @parameterize("placement_config", PLACEMENT_CONFIGS) @parameterize("keep_gathered", [False, True]) @parameterize("model_name", ["gpt2"]) @parameterize("use_grad_checkpoint", [False, True]) -def exam_gemini_grad_acc(placement_config, keep_gathered: bool, use_grad_checkpoint: bool, model_name: str): +@parameterize("master_weights", [True, False]) +def exam_gemini_grad_acc( + placement_config, keep_gathered: bool, model_name: str, use_grad_checkpoint: bool, master_weights: bool +): init_device = get_current_device() get_components_func = non_distributed_component_funcs.get_callable(model_name) - model_builder, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func() + model_builder, train_dataloader, _, _, criterion = get_components_func() set_seed(42) gemini_model = model_builder(use_grad_checkpoint) @@ -58,7 +57,13 @@ def exam_gemini_grad_acc(placement_config, keep_gathered: bool, use_grad_checkpo config_dict[world_size]["chunk_size"] = 5000 config_dict[world_size]["keep_gathered"] = keep_gathered gemini_model = GeminiDDP( - gemini_model, config_dict, init_device, pin_memory=True, enable_gradient_accumulation=True, **placement_config + gemini_model, + config_dict, + init_device, + pin_memory=True, + enable_gradient_accumulation=True, + master_weights=master_weights, + **placement_config, ) optimizer = HybridAdam(gemini_model.parameters(), lr=1e-3) gemini_optim = GeminiOptimizer(optimizer, gemini_model, initial_scale=1) From de5b9441778ee9ac615c0f5829a3b16b65a30242 Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Thu, 12 Oct 2023 19:20:52 +0800 Subject: [PATCH 06/13] finish implementation, rewrite tests --- colossalai/zero/gemini/gemini_ddp.py | 5 +- .../test_zero/test_gemini/test_grad_accum.py | 46 +++++++++++-------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index 5a0de36aeb00..be20650fdb3d 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -304,6 +304,8 @@ def _post_backward(self): f"{error_str}", ) self._setup_grads_ptr() + if self.enable_gradient_accumulation and not self.accumulating_grads: + self.accumulating_grads = True # Turn on the state of gradient accumulation. self._logger.debug( f"comp cuda demand time: {self.gemini_manager._comp_cuda_demand_time}, layout time: {self.gemini_manager._layout_time}, evict time: {self.gemini_manager._evict_time}, CPU->CUDA vol: {self.gemini_manager._h2d_volume}B, CUDA->CPU vol: {self.gemini_manager._d2h_volume}" ) @@ -336,6 +338,7 @@ def grad_handle(self, p, grad): if not self.accumulating_grads: grad_chunk = self.chunk_manager.init_grad_chunk(chunk) else: + self.chunk_manager.access_chunk(chunk.grad_chunk) grad_chunk = chunk.grad_chunk # hold -> compute -> hold after bwd grad_chunk.tensor_trans_state(p, TensorState.COMPUTE) @@ -349,8 +352,6 @@ def grad_handle(self, p, grad): grad_chunk.copy_tensor_to_chunk_slice(p, grad, update_ptr=self.reuse_fp16_chunk) else: grad_chunk.add_tensor_to_chunk_slice(p, grad) - if self.enable_gradient_accumulation: - self.accumulating_grads = True reduced = self.chunk_manager.reduce_chunk(grad_chunk) if reduced: if not self.reuse_fp16_chunk: diff --git a/tests/test_zero/test_gemini/test_grad_accum.py b/tests/test_zero/test_gemini/test_grad_accum.py index 56ee3a483d79..f6b734c03911 100644 --- a/tests/test_zero/test_gemini/test_grad_accum.py +++ b/tests/test_zero/test_gemini/test_grad_accum.py @@ -1,6 +1,7 @@ import pytest import torch import torch.distributed as dist +from apex import amp from torch.nn.parallel import DistributedDataParallel as DDP from torch.testing import assert_close @@ -27,19 +28,25 @@ def check_grad(model: GeminiDDP, torch_model: torch.nn.Module): param_list = [p for p in model.parameters()] chunk_list = chunk_manager.get_chunks(param_list) chunk_list = [chunk.grad_chunk for chunk in chunk_list] + for chunk in chunk_list: + chunk_manager.access_chunk(chunk) for p0, p1 in zip(model.parameters(), torch_model.parameters()): assert_close(p0, p1.grad, rtol=1e-3, atol=5e-5) + for chunk in chunk_list: + chunk_manager.release_chunk(chunk) + @parameterize("placement_config", PLACEMENT_CONFIGS) @parameterize("keep_gathered", [False, True]) @parameterize("model_name", ["gpt2"]) @parameterize("use_grad_checkpoint", [False, True]) -@parameterize("master_weights", [True, False]) +@parameterize("master_weights", [False, True]) def exam_gemini_grad_acc( placement_config, keep_gathered: bool, model_name: str, use_grad_checkpoint: bool, master_weights: bool ): + print(placement_config, keep_gathered, use_grad_checkpoint, master_weights, flush=True) init_device = get_current_device() get_components_func = non_distributed_component_funcs.get_callable(model_name) model_builder, train_dataloader, _, _, criterion = get_components_func() @@ -69,49 +76,52 @@ def exam_gemini_grad_acc( gemini_optim = GeminiOptimizer(optimizer, gemini_model, initial_scale=1) rank = dist.get_rank() + amp_config = dict( + opt_level="O2", keep_batchnorm_fp32=False, loss_scale=1, min_loss_scale=1, max_loss_scale=1, master_weights=True + ) torch_optim = torch.optim.Adam(torch_model.parameters(), lr=1e-3) - torch_model = DDP(torch_model) + torch_model, torch_optim = amp.initialize(torch_model, torch_optim, **amp_config) + torch_model = DDP(torch_model, device_ids=[rank]) set_seed(rank) accum_iter = 4 for i, (input_ids, label) in enumerate(train_dataloader): - input_ids, label = input_ids.cuda(), label.cuda() + delay_unscale = False if (i + 1) % accum_iter == 0 else True - torch_model.train() - gemini_model.train() + input_ids, label = input_ids.cuda(), label.cuda() set_seed(42 + rank) torch_loss = run_fwd(torch_model, input_ids, label, criterion) torch_loss = torch_loss / accum_iter - torch_loss.backward() + with amp.scale_loss(torch_loss, torch_optim, delay_unscale=delay_unscale) as scaled_loss: + scaled_loss.backward() set_seed(42 + rank) gemini_loss = run_fwd(gemini_model, input_ids, label, criterion) gemini_loss = gemini_loss / accum_iter gemini_optim.backward(gemini_loss) - print(i, torch_loss, gemini_loss) assert torch.allclose(torch_loss, gemini_loss, rtol=1e-3, atol=1e-5) + check_grad(gemini_model, torch_model) + if (i + 1) % accum_iter == 0: torch_optim.step() gemini_optim.step() - continue + torch_optim.zero_grad() - check_grad(gemini_model, torch_model) + # check updated param + torch_dict = torch_model.state_dict() + gemini_dict = gemini_model.state_dict(only_rank_0=False) + + for key, value in gemini_dict.items(): + torch_key = "module." + key + torch_value = torch_dict[torch_key].to(value.device).to(value.dtype) + assert_close(value, torch_value, rtol=1e-3, atol=2e-3) if i == accum_iter: break - # check updated param - torch_dict = torch_model.state_dict() - gemini_dict = gemini_model.state_dict(only_rank_0=False) - - for key, value in gemini_dict.items(): - torch_key = "module." + key - torch_value = torch_dict[torch_key].to(device=value.device, dtype=value.dtype) - assert_close(value, torch_value, rtol=1e-3, atol=2e-3) - def run_dist(rank, world_size, port): config = {} From 8804c485f17bb1a2619495ca63d9191907e7c24b Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Thu, 12 Oct 2023 21:53:07 +0800 Subject: [PATCH 07/13] fix test --- .../test_zero/test_gemini/test_grad_accum.py | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/tests/test_zero/test_gemini/test_grad_accum.py b/tests/test_zero/test_gemini/test_grad_accum.py index f6b734c03911..72292df224a1 100644 --- a/tests/test_zero/test_gemini/test_grad_accum.py +++ b/tests/test_zero/test_gemini/test_grad_accum.py @@ -25,28 +25,37 @@ def check_grad(model: GeminiDDP, torch_model: torch.nn.Module): chunk_manager = model.chunk_manager - param_list = [p for p in model.parameters()] - chunk_list = chunk_manager.get_chunks(param_list) - chunk_list = [chunk.grad_chunk for chunk in chunk_list] - for chunk in chunk_list: - chunk_manager.access_chunk(chunk) - + grad_chunk_list = [] + device_list = [] + + # Access gradient chunks. + for p in model.parameters(): + grad_chunk = chunk_manager.get_chunk(p).grad_chunk + if grad_chunk not in grad_chunk_list: + chunk_manager.access_chunk(grad_chunk) + grad_chunk_list.append(grad_chunk) + device_list.append(model.grads_device[p]) + + # Compare gradients. for p0, p1 in zip(model.parameters(), torch_model.parameters()): + print(p0, p1.grad) assert_close(p0, p1.grad, rtol=1e-3, atol=5e-5) - for chunk in chunk_list: - chunk_manager.release_chunk(chunk) + # Release gradient chunks and move them to gradient device. + for grad_chunk, device in zip(grad_chunk_list, device_list): + chunk_manager.release_chunk(grad_chunk) + chunk_manager.move_chunk(grad_chunk, device, force_copy=True) @parameterize("placement_config", PLACEMENT_CONFIGS) @parameterize("keep_gathered", [False, True]) @parameterize("model_name", ["gpt2"]) -@parameterize("use_grad_checkpoint", [False, True]) +@parameterize("use_grad_checkpoint", [False]) # TODO(Baizhou): debug for gradient checkpointing case @parameterize("master_weights", [False, True]) def exam_gemini_grad_acc( placement_config, keep_gathered: bool, model_name: str, use_grad_checkpoint: bool, master_weights: bool ): - print(placement_config, keep_gathered, use_grad_checkpoint, master_weights, flush=True) + # print(placement_config, keep_gathered, use_grad_checkpoint, master_weights, flush=True) init_device = get_current_device() get_components_func = non_distributed_component_funcs.get_callable(model_name) model_builder, train_dataloader, _, _, criterion = get_components_func() @@ -76,6 +85,8 @@ def exam_gemini_grad_acc( gemini_optim = GeminiOptimizer(optimizer, gemini_model, initial_scale=1) rank = dist.get_rank() + + # setting master_weights to False will cause overflow after optimizer.step() amp_config = dict( opt_level="O2", keep_batchnorm_fp32=False, loss_scale=1, min_loss_scale=1, max_loss_scale=1, master_weights=True ) @@ -106,6 +117,10 @@ def exam_gemini_grad_acc( check_grad(gemini_model, torch_model) if (i + 1) % accum_iter == 0: + # TODO(Baizhou): Delete following two lines after cpu_adam for fp16 has been merged into main branch (auto policy put gradients in cpu). + if placement_config["placement_policy"] == "auto": + break + torch_optim.step() gemini_optim.step() torch_optim.zero_grad() From 9f89aa1463e09cba1af156bcbd8c7073b3e98efd Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Fri, 13 Oct 2023 10:58:06 +0800 Subject: [PATCH 08/13] skip stuck model in low level zero test --- tests/test_booster/test_plugin/test_low_level_zero_plugin.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_booster/test_plugin/test_low_level_zero_plugin.py b/tests/test_booster/test_plugin/test_low_level_zero_plugin.py index 9cc12f96bd4d..104ca254c572 100644 --- a/tests/test_booster/test_plugin/test_low_level_zero_plugin.py +++ b/tests/test_booster/test_plugin/test_low_level_zero_plugin.py @@ -14,6 +14,8 @@ _AMP_ERR_MODELS = ["timm_convit", "deepfm_interactionarch"] # These models have no parameters _LOW_LEVEL_ZERO_ERR_MODELS = ["dlrm_interactionarch"] +# These models will cause stuck, to be fixed +_STUCK_MODELS = ["transformers_albert_for_multiple_choice"] def run_fn(stage, model_fn, data_gen_fn, output_transform_fn) -> Optional[str]: @@ -53,7 +55,7 @@ def check_low_level_zero_plugin(stage: int, early_stop: bool = True): """ passed_models = [] failed_info = {} # (model_name, error) pair - ignore_models = _AMP_ERR_MODELS + _LOW_LEVEL_ZERO_ERR_MODELS + ignore_models = _AMP_ERR_MODELS + _LOW_LEVEL_ZERO_ERR_MODELS + _STUCK_MODELS skipped_models = [] for name, (model_fn, data_gen_fn, output_transform_fn, _, _) in model_zoo.items(): From d84e6d6ac03f402b71396fd9a8200f447f70f7a3 Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Fri, 13 Oct 2023 12:40:58 +0800 Subject: [PATCH 09/13] update doc --- .../features/gradient_accumulation_with_booster.md | 13 +++++++++++++ .../features/gradient_accumulation_with_booster.md | 13 +++++++++++++ tests/test_zero/test_gemini/test_grad_accum.py | 1 - 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/docs/source/en/features/gradient_accumulation_with_booster.md b/docs/source/en/features/gradient_accumulation_with_booster.md index 347cd6e519bb..35248f1af0b3 100644 --- a/docs/source/en/features/gradient_accumulation_with_booster.md +++ b/docs/source/en/features/gradient_accumulation_with_booster.md @@ -126,6 +126,19 @@ for idx, (img, label) in enumerate(train_dataloader): ``` +Currently the plugins supporting `no_sync()` method include `TorchDDPPlugin` and `LowLevelZeroPlugin` set to stage 1. `GeminiPlugin` doesn't support `no_sync()` method, but it can also enable synchronized gradient accumulation in a torch-like way. Following is the code snippet of enabling gradient accumulation for `GeminiPlugin`: + +```python + output = gemini_model(input) + train_loss = criterion(output, label) + train_loss = train_loss / GRADIENT_ACCUMULATION + booster.backward(train_loss, gemini_optimizer) + + if idx % (GRADIENT_ACCUMULATION - 1) == 0: + gemini_optimizer.step() # zero_grad is automatically done +``` + + ### Step 6. Invoke Training Scripts To verify gradient accumulation, we can just check the change of parameter values. When gradient accumulation is set, parameters are only updated in the last step. You can run the script using this command: ```shell diff --git a/docs/source/zh-Hans/features/gradient_accumulation_with_booster.md b/docs/source/zh-Hans/features/gradient_accumulation_with_booster.md index 3ad9b2e07a95..17a60a8c7909 100644 --- a/docs/source/zh-Hans/features/gradient_accumulation_with_booster.md +++ b/docs/source/zh-Hans/features/gradient_accumulation_with_booster.md @@ -93,6 +93,19 @@ model, optimizer, criterion, train_dataloader, _ = booster.boost(model=model, dataloader=train_dataloader) ``` +目前支持`no_sync()`方法的插件包括 `TorchDDPPlugin` 和 `LowLevelZeroPlugin`(需要设置参数`stage`为1). `GeminiPlugin` 不支持 `no_sync()` 方法, 但是它可以通过和`pytorch`类似的方式来使用同步的梯度累积。以下是 `GeminiPlugin` 进行梯度累积的代码片段: + +```python + output = gemini_model(input) + train_loss = criterion(output, label) + train_loss = train_loss / GRADIENT_ACCUMULATION + booster.backward(train_loss, gemini_optimizer) + + if idx % (GRADIENT_ACCUMULATION - 1) == 0: + gemini_optimizer.step() # zero_grad is automatically done +``` + + ### 步骤 5. 使用booster训练 使用booster构建一个普通的训练循环,验证梯度累积。 `param_by_iter` 记录分布训练的信息。 ```python diff --git a/tests/test_zero/test_gemini/test_grad_accum.py b/tests/test_zero/test_gemini/test_grad_accum.py index 72292df224a1..1abfc15c4861 100644 --- a/tests/test_zero/test_gemini/test_grad_accum.py +++ b/tests/test_zero/test_gemini/test_grad_accum.py @@ -38,7 +38,6 @@ def check_grad(model: GeminiDDP, torch_model: torch.nn.Module): # Compare gradients. for p0, p1 in zip(model.parameters(), torch_model.parameters()): - print(p0, p1.grad) assert_close(p0, p1.grad, rtol=1e-3, atol=5e-5) # Release gradient chunks and move them to gradient device. From 86e5502aea729bd7fcd6b6b8ffba1edadae7d332 Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Fri, 13 Oct 2023 15:48:19 +0800 Subject: [PATCH 10/13] optimize communication & fix gradient checkpoint --- colossalai/zero/gemini/gemini_ddp.py | 38 ++++++++++++++++++- .../test_zero/test_gemini/test_grad_accum.py | 4 +- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index be20650fdb3d..a2acc6320976 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -338,8 +338,42 @@ def grad_handle(self, p, grad): if not self.accumulating_grads: grad_chunk = self.chunk_manager.init_grad_chunk(chunk) else: - self.chunk_manager.access_chunk(chunk.grad_chunk) - grad_chunk = chunk.grad_chunk + if chunk.grad_chunk not in self.chunk_manager.accessed_chunks: + # Make a backup for gradient accumulated before. + # Here backup gradients should be multiplied, since it will be divided after gradient reduction. + if chunk.grad_chunk.is_gathered: + accumulated_grad = chunk.grad_chunk.cuda_global_chunk.clone().detach().mul_(chunk.pg_size) + accumulated_grad_gathered = True + else: + if chunk.grad_chunk.cuda_shard is not None: + accumulated_grad = chunk.grad_chunk.cuda_shard.clone().detach().mul_(chunk.pg_size) + else: + accumulated_grad = ( + chunk.grad_chunk.cpu_shard.to(get_current_device()) + .clone() + .detach() + .mul_(chunk.pg_size) + ) + accumulated_grad_gathered = False + + # Reset grad_chunk, and chunk.grad_chunk will be accessed. + grad_chunk = self.chunk_manager.init_grad_chunk(chunk) + grad_chunk.cuda_global_chunk.zero_() + + # Add backup gradients to grad_chunk. + if accumulated_grad_gathered: + grad_chunk.cuda_global_chunk.add_(accumulated_grad) + else: + grad_chunk.cuda_global_chunk[grad_chunk.shard_begin : grad_chunk.shard_end].add_( + accumulated_grad + ) + + # Release accumulated_grad + free_storage(accumulated_grad) + accumulated_grad = None + else: + grad_chunk = chunk.grad_chunk + # hold -> compute -> hold after bwd grad_chunk.tensor_trans_state(p, TensorState.COMPUTE) grad_chunk.tensor_trans_state(p, TensorState.HOLD_AFTER_BWD) diff --git a/tests/test_zero/test_gemini/test_grad_accum.py b/tests/test_zero/test_gemini/test_grad_accum.py index 1abfc15c4861..c4c58a0bd88f 100644 --- a/tests/test_zero/test_gemini/test_grad_accum.py +++ b/tests/test_zero/test_gemini/test_grad_accum.py @@ -49,12 +49,11 @@ def check_grad(model: GeminiDDP, torch_model: torch.nn.Module): @parameterize("placement_config", PLACEMENT_CONFIGS) @parameterize("keep_gathered", [False, True]) @parameterize("model_name", ["gpt2"]) -@parameterize("use_grad_checkpoint", [False]) # TODO(Baizhou): debug for gradient checkpointing case +@parameterize("use_grad_checkpoint", [False, True]) @parameterize("master_weights", [False, True]) def exam_gemini_grad_acc( placement_config, keep_gathered: bool, model_name: str, use_grad_checkpoint: bool, master_weights: bool ): - # print(placement_config, keep_gathered, use_grad_checkpoint, master_weights, flush=True) init_device = get_current_device() get_components_func = non_distributed_component_funcs.get_callable(model_name) model_builder, train_dataloader, _, _, criterion = get_components_func() @@ -97,7 +96,6 @@ def exam_gemini_grad_acc( accum_iter = 4 for i, (input_ids, label) in enumerate(train_dataloader): delay_unscale = False if (i + 1) % accum_iter == 0 else True - input_ids, label = input_ids.cuda(), label.cuda() set_seed(42 + rank) From 60f1a68617c4d8be278f8d72ad37b07eb7cad5fa Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Fri, 13 Oct 2023 16:10:54 +0800 Subject: [PATCH 11/13] modify doc --- .../gradient_accumulation_with_booster.md | 40 +++++++++++++------ .../gradient_accumulation_with_booster.md | 39 ++++++++++++------ .../test_zero/test_gemini/test_grad_accum.py | 2 +- 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/docs/source/en/features/gradient_accumulation_with_booster.md b/docs/source/en/features/gradient_accumulation_with_booster.md index 35248f1af0b3..ea97dd92e885 100644 --- a/docs/source/en/features/gradient_accumulation_with_booster.md +++ b/docs/source/en/features/gradient_accumulation_with_booster.md @@ -1,6 +1,6 @@ # Gradient Accumulation -Author: [Mingyan Jiang](https://github.com/jiangmingyan) +Author: [Mingyan Jiang](https://github.com/jiangmingyan), [Baizhou Zhang](https://github.com/Fridge003) **Prerequisite** - [Training Booster](../basics/booster_api.md) @@ -126,18 +126,6 @@ for idx, (img, label) in enumerate(train_dataloader): ``` -Currently the plugins supporting `no_sync()` method include `TorchDDPPlugin` and `LowLevelZeroPlugin` set to stage 1. `GeminiPlugin` doesn't support `no_sync()` method, but it can also enable synchronized gradient accumulation in a torch-like way. Following is the code snippet of enabling gradient accumulation for `GeminiPlugin`: - -```python - output = gemini_model(input) - train_loss = criterion(output, label) - train_loss = train_loss / GRADIENT_ACCUMULATION - booster.backward(train_loss, gemini_optimizer) - - if idx % (GRADIENT_ACCUMULATION - 1) == 0: - gemini_optimizer.step() # zero_grad is automatically done -``` - ### Step 6. Invoke Training Scripts To verify gradient accumulation, we can just check the change of parameter values. When gradient accumulation is set, parameters are only updated in the last step. You can run the script using this command: @@ -155,4 +143,30 @@ iteration 2, first 10 elements of param: tensor([-0.0208, 0.0189, 0.0234, 0.0 iteration 3, first 10 elements of param: tensor([-0.0141, 0.0464, 0.0507, 0.0321, 0.0356, -0.0150, 0.0172, -0.0118, 0.0222, 0.0473], device='cuda:0', grad_fn=) ``` + +## Gradient Accumulation on GeminiPlugin + +Currently the plugins supporting `no_sync()` method include `TorchDDPPlugin` and `LowLevelZeroPlugin` set to stage 1. `GeminiPlugin` doesn't support `no_sync()` method, but it can enable synchronized gradient accumulation in a torch-like way. + +To enable gradient accumulation feature, the argument `enable_gradient_accumulation` should be set to `True` when initializing `GeminiPlugin`. Following is the pseudocode snippet of enabling gradient accumulation for `GeminiPlugin`: + +```python +... +plugin = GeminiPlugin(..., enable_gradient_accumulation=True) +booster = Booster(plugin=plugin) +... + +... +for idx, (input, label) in enumerate(train_dataloader): + output = gemini_model(input.cuda()) + train_loss = criterion(output, label.cuda()) + train_loss = train_loss / GRADIENT_ACCUMULATION + booster.backward(train_loss, gemini_optimizer) + + if idx % (GRADIENT_ACCUMULATION - 1) == 0: + gemini_optimizer.step() # zero_grad is automatically done +... +``` + + diff --git a/docs/source/zh-Hans/features/gradient_accumulation_with_booster.md b/docs/source/zh-Hans/features/gradient_accumulation_with_booster.md index 17a60a8c7909..824308f94654 100644 --- a/docs/source/zh-Hans/features/gradient_accumulation_with_booster.md +++ b/docs/source/zh-Hans/features/gradient_accumulation_with_booster.md @@ -1,6 +1,6 @@ # 梯度累积 -作者: [Mingyan Jiang](https://github.com/jiangmingyan) +作者: [Mingyan Jiang](https://github.com/jiangmingyan), [Baizhou Zhang](https://github.com/Fridge003) **前置教程** - [训练中使用Booster](../basics/booster_api.md) @@ -93,18 +93,6 @@ model, optimizer, criterion, train_dataloader, _ = booster.boost(model=model, dataloader=train_dataloader) ``` -目前支持`no_sync()`方法的插件包括 `TorchDDPPlugin` 和 `LowLevelZeroPlugin`(需要设置参数`stage`为1). `GeminiPlugin` 不支持 `no_sync()` 方法, 但是它可以通过和`pytorch`类似的方式来使用同步的梯度累积。以下是 `GeminiPlugin` 进行梯度累积的代码片段: - -```python - output = gemini_model(input) - train_loss = criterion(output, label) - train_loss = train_loss / GRADIENT_ACCUMULATION - booster.backward(train_loss, gemini_optimizer) - - if idx % (GRADIENT_ACCUMULATION - 1) == 0: - gemini_optimizer.step() # zero_grad is automatically done -``` - ### 步骤 5. 使用booster训练 使用booster构建一个普通的训练循环,验证梯度累积。 `param_by_iter` 记录分布训练的信息。 @@ -157,4 +145,29 @@ iteration 2, first 10 elements of param: tensor([-0.0208, 0.0189, 0.0234, 0.0 iteration 3, first 10 elements of param: tensor([-0.0141, 0.0464, 0.0507, 0.0321, 0.0356, -0.0150, 0.0172, -0.0118, 0.0222, 0.0473], device='cuda:0', grad_fn=) ``` +## 在Gemini插件中使用梯度累积 + +目前支持`no_sync()`方法的插件包括 `TorchDDPPlugin` 和 `LowLevelZeroPlugin`(需要设置参数`stage`为1). `GeminiPlugin` 不支持 `no_sync()` 方法, 但是它可以通过和`pytorch`类似的方式来使用同步的梯度累积。 + +为了开启梯度累积功能,在初始化`GeminiPlugin`的时候需要将参数`enable_gradient_accumulation`设置为`True`。以下是 `GeminiPlugin` 进行梯度累积的伪代码片段: + +```python +... +plugin = GeminiPlugin(..., enable_gradient_accumulation=True) +booster = Booster(plugin=plugin) +... + +... +for idx, (input, label) in enumerate(train_dataloader): + output = gemini_model(input.cuda()) + train_loss = criterion(output, label.cuda()) + train_loss = train_loss / GRADIENT_ACCUMULATION + booster.backward(train_loss, gemini_optimizer) + + if idx % (GRADIENT_ACCUMULATION - 1) == 0: + gemini_optimizer.step() # zero_grad is automatically done +... +``` + + diff --git a/tests/test_zero/test_gemini/test_grad_accum.py b/tests/test_zero/test_gemini/test_grad_accum.py index c4c58a0bd88f..5e0c393229f9 100644 --- a/tests/test_zero/test_gemini/test_grad_accum.py +++ b/tests/test_zero/test_gemini/test_grad_accum.py @@ -48,7 +48,7 @@ def check_grad(model: GeminiDDP, torch_model: torch.nn.Module): @parameterize("placement_config", PLACEMENT_CONFIGS) @parameterize("keep_gathered", [False, True]) -@parameterize("model_name", ["gpt2"]) +@parameterize("model_name", ["gpt2", "bert"]) @parameterize("use_grad_checkpoint", [False, True]) @parameterize("master_weights", [False, True]) def exam_gemini_grad_acc( From c7a600457b4b4bc345541371e8f6bb09418bded9 Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Fri, 13 Oct 2023 16:52:56 +0800 Subject: [PATCH 12/13] cleaning codes --- colossalai/zero/gemini/chunk/manager.py | 36 ++++++++++++++++++++++++- colossalai/zero/gemini/gemini_ddp.py | 34 ++--------------------- tests/components_to_test/bert.py | 1 - 3 files changed, 37 insertions(+), 34 deletions(-) diff --git a/colossalai/zero/gemini/chunk/manager.py b/colossalai/zero/gemini/chunk/manager.py index 713c11742e15..d3c512fe978d 100644 --- a/colossalai/zero/gemini/chunk/manager.py +++ b/colossalai/zero/gemini/chunk/manager.py @@ -5,7 +5,7 @@ import torch.distributed as dist from torch.distributed import ProcessGroup -from colossalai.utils import get_current_device +from colossalai.utils import free_storage, get_current_device from .chunk import Chunk, ChunkFullError, TensorState @@ -255,3 +255,37 @@ def init_grad_chunk(self, chunk: Chunk) -> Chunk: self.accessed_chunks.add(grad_chunk) self.accessed_mem += grad_chunk.chunk_mem return grad_chunk + + def rearrange_accumulated_grad_chunk(self, chunk: Chunk) -> Chunk: + """Rearrange gradients accumulated in chunk.grad_chunk, and getP prepared for gradient reduction.""" + + assert chunk.grad_chunk is not None + + # Make a backup for gradient accumulated before. + # Here backup gradients should be multiplied, since it will be divided after gradient reduction. + if chunk.grad_chunk.is_gathered: + accumulated_grad = chunk.grad_chunk.cuda_global_chunk.clone().detach().mul_(chunk.pg_size) + accumulated_grad_gathered = True + else: + if chunk.grad_chunk.cuda_shard is not None: + accumulated_grad = chunk.grad_chunk.cuda_shard.clone().detach().mul_(chunk.pg_size) + else: + accumulated_grad = ( + chunk.grad_chunk.cpu_shard.to(get_current_device()).clone().detach().mul_(chunk.pg_size) + ) + accumulated_grad_gathered = False + + # Reset grad_chunk, and chunk.grad_chunk will be accessed. + grad_chunk = self.init_grad_chunk(chunk) + grad_chunk.cuda_global_chunk.zero_() + + # Add backup gradients to grad_chunk. + if accumulated_grad_gathered: + grad_chunk.cuda_global_chunk.add_(accumulated_grad) + else: + grad_chunk.cuda_global_chunk[grad_chunk.shard_begin : grad_chunk.shard_end].add_(accumulated_grad) + + # Release accumulated_grad + free_storage(accumulated_grad) + + return grad_chunk diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index a2acc6320976..4c0c8d984539 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -338,39 +338,9 @@ def grad_handle(self, p, grad): if not self.accumulating_grads: grad_chunk = self.chunk_manager.init_grad_chunk(chunk) else: + assert chunk.grad_chunk is not None if chunk.grad_chunk not in self.chunk_manager.accessed_chunks: - # Make a backup for gradient accumulated before. - # Here backup gradients should be multiplied, since it will be divided after gradient reduction. - if chunk.grad_chunk.is_gathered: - accumulated_grad = chunk.grad_chunk.cuda_global_chunk.clone().detach().mul_(chunk.pg_size) - accumulated_grad_gathered = True - else: - if chunk.grad_chunk.cuda_shard is not None: - accumulated_grad = chunk.grad_chunk.cuda_shard.clone().detach().mul_(chunk.pg_size) - else: - accumulated_grad = ( - chunk.grad_chunk.cpu_shard.to(get_current_device()) - .clone() - .detach() - .mul_(chunk.pg_size) - ) - accumulated_grad_gathered = False - - # Reset grad_chunk, and chunk.grad_chunk will be accessed. - grad_chunk = self.chunk_manager.init_grad_chunk(chunk) - grad_chunk.cuda_global_chunk.zero_() - - # Add backup gradients to grad_chunk. - if accumulated_grad_gathered: - grad_chunk.cuda_global_chunk.add_(accumulated_grad) - else: - grad_chunk.cuda_global_chunk[grad_chunk.shard_begin : grad_chunk.shard_end].add_( - accumulated_grad - ) - - # Release accumulated_grad - free_storage(accumulated_grad) - accumulated_grad = None + grad_chunk = self.chunk_manager.rearrange_accumulated_grad_chunk(chunk) else: grad_chunk = chunk.grad_chunk diff --git a/tests/components_to_test/bert.py b/tests/components_to_test/bert.py index f0061ad18c84..9f0eef75ae93 100644 --- a/tests/components_to_test/bert.py +++ b/tests/components_to_test/bert.py @@ -52,7 +52,6 @@ def bert_model_builder(checkpoint: bool = False): hidden_dropout_prob=0.0, attention_probs_dropout_prob=0.0, ) - print("building BertForSequenceClassification model") # adapting huggingface BertForSequenceClassification for single unittest calling interface class ModelAdaptor(BertForSequenceClassification): From ebc9e52c8b14accfcbf0518ad23043422828fcfd Mon Sep 17 00:00:00 2001 From: Baizhou Zhang Date: Tue, 17 Oct 2023 11:27:48 +0800 Subject: [PATCH 13/13] update cpu adam fp16 case --- colossalai/zero/gemini/gemini_ddp.py | 2 +- tests/test_zero/test_gemini/test_grad_accum.py | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index 4c0c8d984539..df7e1163c3d9 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -373,7 +373,7 @@ def grad_handle(self, p, grad): if chunk.l2_norm_flag: grad_chunk.set_l2_norm() self.chunk_manager.move_chunk(grad_chunk, self.grads_device[p], force_copy=True) - if not self.master_weights: + if not (self.master_weights) or (self.enable_gradient_accumulation): self.chunk_manager.move_chunk(chunk, self.grads_device[p], force_copy=True) return empty_grad diff --git a/tests/test_zero/test_gemini/test_grad_accum.py b/tests/test_zero/test_gemini/test_grad_accum.py index 5e0c393229f9..334a57410817 100644 --- a/tests/test_zero/test_gemini/test_grad_accum.py +++ b/tests/test_zero/test_gemini/test_grad_accum.py @@ -114,10 +114,6 @@ def exam_gemini_grad_acc( check_grad(gemini_model, torch_model) if (i + 1) % accum_iter == 0: - # TODO(Baizhou): Delete following two lines after cpu_adam for fp16 has been merged into main branch (auto policy put gradients in cpu). - if placement_config["placement_policy"] == "auto": - break - torch_optim.step() gemini_optim.step() torch_optim.zero_grad()