From 179b7aeb14c64d79238bdb4be6842322094fabf5 Mon Sep 17 00:00:00 2001 From: lclgy Date: Thu, 10 Aug 2023 14:50:32 +0800 Subject: [PATCH 1/3] improve stablility of zero --- .../low_level/bookkeeping/bucket_store.py | 56 ++++++++++++------- colossalai/zero/low_level/low_level_optim.py | 7 +++ .../test_zero/test_low_level/test_zero1_2.py | 2 +- 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 98f1b78d0049..7312febe60e9 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -13,15 +13,20 @@ class BucketStore(BaseStore): def __init__(self, torch_pg: ProcessGroup): super().__init__(torch_pg) - # init and reset + # init self.current_group_id = 0 + self._num_elements_in_bucket = 0 # mapping gardient slices and parameter self.grad_to_param_mapping = dict() + self._grad_in_bucket = dict() self._param_list = [] self._padding_size = [] + for rank in range(self._world_size): + self._grad_in_bucket[rank] = [] - self.reset() + # offset_list records number of tensors in the bucket before each reduction + self.offset_list = [0] def num_elements_in_bucket(self) -> int: """Return the total number of elements in bucket @@ -32,6 +37,12 @@ def num_elements_in_bucket(self) -> int: return self._num_elements_in_bucket + def reset_num_elements_in_bucket(self): + """Set the number of elements in bucket to zero. + """ + + self._num_elements_in_bucket = 0 + def add_param_grad(self, group_id: int, param: Tensor, padding_size: int): """Add a param to bucket and record the padding size of a param for gradient padding @@ -46,28 +57,33 @@ def add_param_grad(self, group_id: int, param: Tensor, padding_size: int): self._num_elements_in_bucket += (param.numel() + padding_size) self.current_group_id = group_id + # number of tensors in current bucket + self.offset_list[-1] += 1 + def build_grad_in_bucket(self): """Orgnize parameters' gradient(padding and split), follows the paramters' splitting method Data structure of self._grad_in_bucket: { rank0: [grad0_rank0, grad1_rank0, ...] - rank1: [grad1_rank1, grad1_rank1, ...] + rank1: [grad0_rank1, grad1_rank1, ...] } """ - for param, padding_size in zip(self._param_list, self._padding_size): - with torch.no_grad(): - grad = param.grad.detach().flatten() - if padding_size > 0: - grad = torch.nn.functional.pad(grad, [0, padding_size]) - grad_list = grad.split(grad.numel() // self._world_size) - for rank in range(self._world_size): - grad_current_rank = grad_list[rank].detach() - self.grad_to_param_mapping[id(grad_current_rank)] = id(param) - self._grad_in_bucket[rank].append(grad_current_rank) + + grad = param.grad.clone().detach().flatten() + if padding_size > 0: + with torch.no_grad(): + grad = torch.nn.functional.pad(grad.view(-1), [0, padding_size]) + grad_list = grad.split(grad.numel() // self._world_size) + for rank in range(self._world_size): + grad_current_rank = grad_list[rank].clone().detach() + self.grad_to_param_mapping[id(grad_current_rank)] = id(param) + self._grad_in_bucket[rank].append(grad_current_rank) param.grad = None + self.offset_list.append(0) + def get_grad(self) -> Dict: """Return the dictionary of gradients slices, of which the keys are ranks @@ -104,10 +120,12 @@ def get_param_id_of_grad(self, grad: Tensor) -> int: return self.grad_to_param_mapping[id(grad)] def reset(self): - self.grad_to_param_mapping = dict() - self._num_elements_in_bucket = 0 - self._param_list = [] - self._padding_size = [] - self._grad_in_bucket = dict() + """Reset the bucket storage after reduction, only release the tensors have been reduced + """ + cur_offset = self.offset_list.pop(0) + self._param_list = self._param_list[:cur_offset] + self._padding_size = self._padding_size[:cur_offset] + for _ in range(cur_offset): + del self.grad_to_param_mapping[next(iter(self.grad_to_param_mapping))] for rank in range(self._world_size): - self._grad_in_bucket[rank] = [] + self._grad_in_bucket[rank] = self._grad_in_bucket[rank][:cur_offset] diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 2b3f50ed4fd4..3f395bb79a2e 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -242,10 +242,17 @@ def _attach_reduction_hook(self): def _run_reduction(self): if self._bucket_store.num_elements_in_bucket() > 0: self._bucket_store.build_grad_in_bucket() + flat_grads = self._bucket_store.get_flatten_grad() flat_grads /= self._world_size + + # ready to add other tensors to bucket + self._bucket_store.reset_num_elements_in_bucket() + if self._overlap_communication: stream = self._comm_stream + # waiting for ops in the default stream finishing + stream.wait_stream(torch.cuda.current_stream()) else: stream = torch.cuda.current_stream() diff --git a/tests/test_zero/test_low_level/test_zero1_2.py b/tests/test_zero/test_low_level/test_zero1_2.py index 5a0609bff192..9c4474aff5c3 100644 --- a/tests/test_zero/test_low_level/test_zero1_2.py +++ b/tests/test_zero/test_low_level/test_zero1_2.py @@ -137,7 +137,7 @@ def exam_zero_1_torch_ddp(world_size, dtype: torch.dtype): zero_optimizer = LowLevelZeroOptimizer(zero_optimizer, overlap_communication=True, initial_scale=1, - reduce_bucket_size=262144) + reduce_bucket_size=1024 * 1024) torch_optimizer = torch.optim.SGD(torch_model.parameters(), lr=1) From a182f735f5c83e6b07c32bd98907cf0729ce3d9c Mon Sep 17 00:00:00 2001 From: lclgy Date: Thu, 10 Aug 2023 15:29:20 +0800 Subject: [PATCH 2/3] fix wrong index --- colossalai/zero/low_level/bookkeeping/bucket_store.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 7312febe60e9..0ab10e25d407 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -70,7 +70,6 @@ def build_grad_in_bucket(self): } """ for param, padding_size in zip(self._param_list, self._padding_size): - grad = param.grad.clone().detach().flatten() if padding_size > 0: with torch.no_grad(): @@ -123,9 +122,9 @@ def reset(self): """Reset the bucket storage after reduction, only release the tensors have been reduced """ cur_offset = self.offset_list.pop(0) - self._param_list = self._param_list[:cur_offset] - self._padding_size = self._padding_size[:cur_offset] + self._param_list = self._param_list[cur_offset:] + self._padding_size = self._padding_size[cur_offset:] for _ in range(cur_offset): del self.grad_to_param_mapping[next(iter(self.grad_to_param_mapping))] for rank in range(self._world_size): - self._grad_in_bucket[rank] = self._grad_in_bucket[rank][:cur_offset] + self._grad_in_bucket[rank] = self._grad_in_bucket[rank][cur_offset:] From fa974a23160c700a468c9c36765151f83cc109bc Mon Sep 17 00:00:00 2001 From: lclgy Date: Thu, 10 Aug 2023 15:40:43 +0800 Subject: [PATCH 3/3] add record stream --- colossalai/zero/low_level/low_level_optim.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 3f395bb79a2e..64d6a5395120 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -251,6 +251,8 @@ def _run_reduction(self): if self._overlap_communication: stream = self._comm_stream + # in case of the memory being reused in the default stream + flat_grads.record_stream(stream) # waiting for ops in the default stream finishing stream.wait_stream(torch.cuda.current_stream()) else: