From 98989a157253cd5cad14298f41026cc0e944e229 Mon Sep 17 00:00:00 2001 From: lclgy Date: Sun, 18 Jun 2023 12:26:25 +0800 Subject: [PATCH 01/18] refactor low level zero --- colossalai/zero/low_level/_utils.py | 3 +- .../low_level/bookkeeping/bucket_store.py | 53 +-- .../low_level/bookkeeping/gradient_store.py | 23 +- .../low_level/bookkeeping/parameter_store.py | 97 ++---- colossalai/zero/low_level/low_level_optim.py | 305 ++++++------------ 5 files changed, 162 insertions(+), 319 deletions(-) diff --git a/colossalai/zero/low_level/_utils.py b/colossalai/zero/low_level/_utils.py index 218f7603bc54..d6f25b9f348b 100644 --- a/colossalai/zero/low_level/_utils.py +++ b/colossalai/zero/low_level/_utils.py @@ -194,7 +194,7 @@ def calculate_global_norm_from_list(norm_list): return math.sqrt(total_norm) -def compute_norm(gradients, params, dp_group, mp_group, norm_type=2): +def compute_norm(gradients, params, dp_group, mp_group, master_working_map, norm_type=2): """Clips gradient norm of an iterable of parameters. This is adapted from torch.nn.utils.clip_grad.clip_grad_norm_ and added functionality to handle model parallel parameters. Note that @@ -230,6 +230,7 @@ def compute_norm(gradients, params, dp_group, mp_group, norm_type=2): # logger.info(f"Total Norm beginning {total_norm}") for g, p in zip(gradients, params): + p = master_working_map[id(p)] # Pipeline parallelism may replicate parameters. Avoid multi-counting. tp_param_flag = False if is_model_parallel_parameter(p) or (isinstance(p, ColoParameter) and not p.is_replicate()): diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index ec322a78bf81..0841fdc8140c 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -1,3 +1,5 @@ +import torch +from torch._utils import _flatten_dense_tensors from torch.distributed import ProcessGroup from .base_store import BaseStore @@ -7,35 +9,36 @@ class BucketStore(BaseStore): def __init__(self, torch_pg: ProcessGroup): super().__init__(torch_pg) - self._params = dict() - self._num_elements_in_bucket = dict() + + # init and reset + self.current_group_id = 0 self.reset() - def num_elements_in_bucket(self, reduce_rank: int = None): - return self._num_elements_in_bucket[reduce_rank] + def num_elements_in_bucket(self): + return self._num_elements_in_bucket + + def add_param_grad(self, group_id, grad, padding_size): + self.current_group_id = group_id + with torch.no_grad(): + if padding_size > 0: + grad = torch.nn.functional.pad(grad.view(-1), [0, padding_size]) + else: + grad = grad.view(-1) + self._num_elements_in_bucket += grad.numel() + grad_list = grad.split(grad.numel() // self._world_size) + for rank in range(self._world_size): + self._grad_in_bucket[rank].append(grad_list[rank]) - def add_num_elements_in_bucket(self, num_elements, reduce_rank: int = None): - self._num_elements_in_bucket[reduce_rank] += num_elements + def get_grad(self): + return self._grad_in_bucket - def add_param(self, tensor, reduce_rank: int = None): - self._params[reduce_rank].append(tensor) + def flatten_grad(self): + for rank, grad_list in self._grad_in_bucket.items(): + self._grad_in_bucket[rank] = _flatten_dense_tensors(grad_list) def reset(self): - keys = [None] + list(range(self._world_size)) - self._params = {rank: [] for rank in keys} - self._num_elements_in_bucket = {rank: 0 for rank in keys} - - def reset_by_rank(self, reduce_rank=None): - self._params[reduce_rank] = [] - self._num_elements_in_bucket[reduce_rank] = 0 - - def get_grad(self, reduce_rank: int = None): - param_list = self.get_param(reduce_rank) - for param in param_list: - # the param must have grad for reduction - assert param.grad is not None, f'Parameter of size ({param.size()}) has None grad, cannot be reduced' - return [param.grad for param in param_list] - - def get_param(self, reduce_rank: int = None): - return self._params[reduce_rank] + self._num_elements_in_bucket = 0 + self._grad_in_bucket = dict() + for rank in range(self._world_size): + self._grad_in_bucket[rank] = [] diff --git a/colossalai/zero/low_level/bookkeeping/gradient_store.py b/colossalai/zero/low_level/bookkeeping/gradient_store.py index 942d7186e55f..3233078cef89 100644 --- a/colossalai/zero/low_level/bookkeeping/gradient_store.py +++ b/colossalai/zero/low_level/bookkeeping/gradient_store.py @@ -26,9 +26,9 @@ def append_accumulate_grad_object(self, obj): self._grad_acc_objs.append(obj) - def get_averaged_gradients_by_group(self, group_id: int) -> List[Tensor]: + def get_averaged_gradients_by_group(self, group_id: int, rank) -> List[Tensor]: """ - Return average gradients of a parameter group + Return a list of flatten average gradients of a parameter group :param group_id: The index of parameter group :type group_id: int @@ -37,11 +37,13 @@ def get_averaged_gradients_by_group(self, group_id: int) -> List[Tensor]: :rtype: List[torch.Tensor] """ if group_id not in self._averaged_gradients: - self._averaged_gradients[group_id] = [] + self._averaged_gradients[group_id] = dict() + if rank not in self._averaged_gradients[group_id]: + self._averaged_gradients[group_id][rank] = [] - return self._averaged_gradients[group_id] + return self._averaged_gradients[group_id][rank] - def append_average_gradient_by_group(self, group_id: int, tensor: Tensor) -> None: + def append_average_gradient_by_group(self, group_id: int, rank, tensor: Tensor) -> None: """ Append an average gradient to the list of averaged gradients of a parameter group @@ -52,10 +54,11 @@ def append_average_gradient_by_group(self, group_id: int, tensor: Tensor) -> Non """ - if group_id in self._averaged_gradients: - self._averaged_gradients[group_id].append(tensor) - else: - self._averaged_gradients[group_id] = [tensor] + if group_id not in self._averaged_gradients: + self._averaged_gradients[group_id] = dict() + if rank not in self._averaged_gradients[group_id]: + self._averaged_gradients[group_id][rank] = [] + self._averaged_gradients[group_id][rank].append(tensor) def add_average_gradient_by_group(self, group_id: int, tensor_idx: int, tensor: Tensor) -> None: """ @@ -79,7 +82,7 @@ def reset_average_gradients_by_group(self, group_id: int) -> None: :type group_id: int """ - self._averaged_gradients[group_id] = [] + self._averaged_gradients[group_id] = dict() def reset_all_average_gradients(self) -> None: """ diff --git a/colossalai/zero/low_level/bookkeeping/parameter_store.py b/colossalai/zero/low_level/bookkeeping/parameter_store.py index 1f3ba7cbc3bc..f982d7df2b32 100644 --- a/colossalai/zero/low_level/bookkeeping/parameter_store.py +++ b/colossalai/zero/low_level/bookkeeping/parameter_store.py @@ -10,88 +10,33 @@ class ParameterStore(BaseStore): def __init__(self, torch_pg: ProcessGroup): super().__init__(torch_pg) - # param partitioning data structures - self._param_to_rank = dict() - self._rank_group_id_to_param_list = dict() - self._rank_group_id_to_flat_param = dict() - # param reduction data structures - self._is_param_reduced = dict() - self._reduced_param = [] + self._padding_map = dict() + self._position_map = dict() + self._marked_params = dict() + self._numel_per_split_param = dict() - def set_param_to_rank(self, tensor: Tensor, rank: int) -> None: - """ - Set the mapping between parameter to rank, each parameter should be owned by a rank. + self.master_to_working_param = dict() + self.working_to_master_param = dict() - :param tensor: A :class:`torch.Tensor` object - :type tensor: torch.Tensor - :param rank: The rank of which the process is responsible for updating the parameter - :type rank: int - """ + def record_param_padding_size(self, param, padding_size): + self._padding_map[id(param)] = padding_size - self._param_to_rank[tensor] = rank + def get_param_padding_size(self, param): + return self._padding_map[id(param)] - def get_param_rank(self, tensor: Tensor) -> int: - """ - Gives the rank which the parameter belongs to + def record_offset_in_flatten(self, param, position): + self._position_map[id(param)] = position - :param tensor: A :class:`torch.Tensor` object - :type tensor: torch.Tensor - """ - return self._param_to_rank[tensor] + def get_offset_in_flatten(self, param): + return self._position_map[id(param)] - def belongs_to_current_rank(self, tensor) -> bool: - """ - Check whether a parameter is supposed to be updated by the process of the current rank + def record_numel_per_split_param(self, param, numel): + self._numel_per_split_param[id(param)] = numel - :param tensor: A :class:`torch.Tensor` object - :type tensor: torch.Tensor + def get_numel_per_split_param(self, param): + return self._numel_per_split_param[id(param)] - :return: True if the parameter should be updated by the current rank. Otherwise false. - :rtype: bool - """ - - tensor_rank = self._param_to_rank[tensor] - return tensor_rank == self._local_rank - - def add_param_list_by_rank_group(self, rank, group_id, tensor_list) -> None: - if rank not in self._rank_group_id_to_param_list: - self._rank_group_id_to_param_list[rank] = dict() - - if group_id not in self._rank_group_id_to_param_list[rank]: - self._rank_group_id_to_param_list[rank][group_id] = [] - - self._rank_group_id_to_param_list[rank][group_id].extend(tensor_list) - - def get_params_by_rank_group(self, rank, group_id) -> List[Tensor]: - return self._rank_group_id_to_param_list[rank][group_id] - - def add_flat_param_by_rank_group(self, rank, group_id, tensor) -> None: - if rank not in self._rank_group_id_to_flat_param: - self._rank_group_id_to_flat_param[rank] = dict() - - self._rank_group_id_to_flat_param[rank][group_id] = tensor - - def get_flat_param_by_rank_group(self, rank, group_id) -> Tensor: - return self._rank_group_id_to_flat_param[rank][group_id] - - def is_param_reduced(self, tensor): - return self._is_param_reduced[tensor] - - def set_param_reduction_state(self, tensor, state): - self._is_param_reduced[tensor] = state - - def get_param_reduction_states(self): - return self._is_param_reduced - - def reset_previous_reduced_params(self): - self._reduced_param = [] - - def add_previous_reduced_param(self, tensor): - self._reduced_param.append(tensor) - - def clear_grads_of_previous_reduced_params(self): - if len(self._reduced_param) > 0: - for param in self._reduced_param: - param.grad = None - self.reset_previous_reduced_params() + def link_master_and_working_param(self, master_param, working_param): + self.master_to_working_param[id(master_param)] = working_param + self.working_to_master_param[id(working_param)] = master_param diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index ee03c0f0ae15..6e01b0b5c1e2 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -80,11 +80,8 @@ def __init__( forced_dtype: Optional[torch.dtype] = None): # TODO: add support for - # 1. fp16 master weights - # 2. contiguous gradients - # 3. cpu offload - # 4. support when some parameters requires_grad = False - # 5. support layer drop + # 1. optimize the sharding + # 2. support layer drop super(LowLevelZeroOptimizer, self).__init__(optim=optimizer) self._dtype = self.optim.param_groups[0]['params'][0].dtype self._logger = get_dist_logger() @@ -122,7 +119,7 @@ def __init__( # working and master params for mixed precision training self._working_param_groups = dict() - self._master_flat_param_groups_of_current_rank = dict() + self._master_param_groups_of_current_rank = dict() # communication params self._overlap_communication = overlap_communication @@ -160,55 +157,17 @@ def __init__( # add the working params to working_param_groups for bookkeeping self._working_param_groups[group_id] = group_params - # assign parameters to ranks - # the params in the list are sorted - params_per_rank = self._partition_param_list(group_params) + master_param_current_rank = self._create_master_param_current_rank(group_params) - # store the mapping between param to rank - # each param should belong to only one rank - for rank, params in enumerate(params_per_rank): - self._param_store.add_param_list_by_rank_group(rank, group_id, params) - for param in params: - self._param_store.set_param_to_rank(param, rank) - - # move to cpu to make room to create the flat tensor - # move_tensor(params, device='cpu') - for param in group_params: - param.data = param.data.cpu() - - # flatten the reordered tensors - for rank in range(self._world_size): - tensor_list = self._param_store.get_params_by_rank_group(rank, group_id) - with torch.no_grad(): - flat_tensor = flatten(tensor_list) - flat_tensor = flat_tensor.data.cuda() - self._param_store.add_flat_param_by_rank_group(rank, group_id, flat_tensor) - - # sync parameters - for rank in range(self._world_size): - flat_tensor = self._param_store.get_flat_param_by_rank_group(rank, group_id) - tensor_list = self._param_store.get_params_by_rank_group(rank, group_id) - sync_param(flat_tensor=flat_tensor, tensor_list=tensor_list) - - # create a copy of fp32 master weights of the parameters for which this rank is responsible - working_flat_current_rank = self._param_store.get_flat_param_by_rank_group(self._local_rank, group_id) - master_flat_current_rank = working_flat_current_rank.float() - device = 'cpu' if self._cpu_offload else get_current_device() - master_flat_current_rank = master_flat_current_rank.to(device) - master_flat_current_rank.requires_grad = True - self._master_flat_param_groups_of_current_rank[group_id] = master_flat_current_rank + self._master_param_groups_of_current_rank[group_id] = master_param_current_rank # need to replace the params in the `params` field in the optimizer # so that when the optimizer calls step(), it only updates the tensors # managed by this data parallel rank - param_group['params'] = [master_flat_current_rank] + param_group['params'] = master_param_current_rank - # set reduction state - for param in self._working_param_groups[group_id]: - self._param_store.set_param_reduction_state(param, False) - - # initialize communication stream for - # communication-computation overlapping + # intialize communication stream for + # communication-compuation overlapping if self._overlap_communication: self._comm_stream = torch.cuda.Stream() @@ -265,29 +224,37 @@ def _search_colo_process_group(self): raise RuntimeError("All parameters should be ColoParameter if you use ColoParameter.") return colo_pg - def _partition_param_list(self, param_list): - params_per_rank = [[] for _ in range(self._world_size)] - numel_per_rank = [0 for _ in range(self._world_size)] + def _create_master_param_current_rank(self, param_list): + # split each param evenly + params_current_rank = [] + device = 'cpu' if self._cpu_offload else get_current_device() + + offset = 0 + for param in reversed(param_list): + padding_size = (self._world_size - param.numel() % self._world_size) % self._world_size + self._param_store.record_offset_in_flatten(param, offset) + self._param_store.record_param_padding_size(param, padding_size) - # partition the parameters in a greedy fashion - sorted_params = sorted(param_list, key=lambda x: x.numel(), reverse=True) - for param in sorted_params: - # allocate this parameter to the rank with - # the smallest numel for load balancing purpose - rank_to_go = numel_per_rank.index(min(numel_per_rank)) - params_per_rank[rank_to_go].append(param) - numel_per_rank[rank_to_go] += param.numel() + with torch.no_grad(): + if padding_size > 0: + padding_param = torch.nn.functional.pad(param.view(-1), [0, padding_size]) + else: + padding_param = param.view(-1) + splited_params = padding_param.split(param.numel() // self._world_size) + offset += splited_params[0].numel() + + splited_param_current_rank = splited_params[self._local_rank].detach().float().to(device) + params_current_rank.append(splited_param_current_rank) + self._param_store.link_master_and_working_param(splited_param_current_rank, param) - if self._verbose: - self._logger.info(f'Number of elements on ranks: {numel_per_rank}', ranks=[0]) - return params_per_rank + return params_current_rank ########################### # Backward Reduction Hook # ########################### - def _grad_handler(self, param, grad, reduce_rank): - self._add_to_reduction_bucket(param, reduce_rank) + def _grad_handler(self, param, group_id, grad): + self._add_to_bucket(param, group_id, grad) return grad def _attach_reduction_hook(self): @@ -297,124 +264,51 @@ def _attach_reduction_hook(self): param_group = self._working_param_groups[group_id] for param in param_group: if param.requires_grad: - # determines the reduction destination rank - # this is only valid for stage 2 - # dst_rank = None means using all-reduce - # else using reduce - if self._partition_grads: - reduce_rank = self._param_store.get_param_rank(param) - else: - reduce_rank = None - - param.register_hook(partial(self._grad_handler, param, reduce_rank=reduce_rank)) - - def _reduce_tensor_bucket(self, bucket: TensorBucket, reduce_rank): - if self._overlap_communication: - torch.cuda.synchronize() - self._param_store.clear_grads_of_previous_reduced_params() - stream = self._comm_stream - else: - stream = torch.cuda.current_stream() - - with torch.cuda.stream(stream): - flat = bucket.flatten() - reduce_global_rank = None - if reduce_rank is not None: - reduce_global_rank = self._dp_global_ranks[reduce_rank] - reduced_flat = reduce_tensor_dp_group(tensor=flat, - dtype=self._communication_dtype, - dst_local_rank=reduce_rank, - dst_global_rank=reduce_global_rank, - group=self._dp_torch_group) - - # update the reduced tensor - if reduce_rank is None or reduce_rank == self._local_rank: - bucket.unflatten_and_copy(reduced_flat) - - def _reduce_tensor_list_with_one_dtype(self, tensor_list, bucket_size, reduce_rank): - param_bucket = TensorBucket(size=bucket_size) - - for tensor in tensor_list: - param_bucket.add_to_bucket(tensor, allow_oversize=True) - - if param_bucket.is_full_or_oversized(): - self._reduce_tensor_bucket(bucket=param_bucket, reduce_rank=reduce_rank) - param_bucket.empty() - - if not param_bucket.is_empty(): - self._reduce_tensor_bucket(bucket=param_bucket, reduce_rank=reduce_rank) - - def _reduce_grads(self, reduce_rank, grads, bucket_size): - grad_buckets_by_dtype = split_by_dtype(grads) - - for tensor_list in grad_buckets_by_dtype: - self._reduce_tensor_list_with_one_dtype(tensor_list=tensor_list, - bucket_size=bucket_size, - reduce_rank=reduce_rank) + param.register_hook(partial(self._grad_handler, param, group_id)) ####################### # Reduction Functions # ####################### - def _run_reduction(self, reduce_rank=None): - # reduce grads - self._reduce_grads(reduce_rank=reduce_rank, - grads=self._bucket_store.get_grad(reduce_rank=reduce_rank), - bucket_size=self._bucket_store.num_elements_in_bucket(reduce_rank)) + def _run_reduction(self): + if self._bucket_store.num_elements_in_bucket() > 0: + # self._bucket_store.flatten_grad() + grads_in_bucket = self._bucket_store.get_grad() + if self._overlap_communication: + stream = self._comm_stream + else: + stream = torch.cuda.current_stream() - # use communication stream if overlapping - # communication with computation - if self._overlap_communication: - stream = self._comm_stream - else: - stream = torch.cuda.current_stream() - - with torch.cuda.stream(stream): - params_in_bucket = self._bucket_store.get_param(reduce_rank=reduce_rank) - - for param in params_in_bucket: - # the is_param_reduced flag should be False showing that - # this param is not reduced before calling self._reduce_grads_by_rank - is_param_reduced = self._param_store.is_param_reduced(param) - - if is_param_reduced: - msg = f'Parameter of size ({param.size()}) has been reduced, ' + \ - 'duplicate reduction will lead to arithmetic incorrectness' - raise RuntimeError(msg) - - # update the flag - self._param_store.set_param_reduction_state(param, True) - - # if partition grads = True - # we do not keep the gradient after reduction - if self._partition_grads and not self._param_store.belongs_to_current_rank(param): - if self._overlap_communication: - # we need to keep this gradient for now as reduction may - # be completed yet since it is using a different cuda stream - self._param_store.add_previous_reduced_param(param) - else: - param.grad = None + with torch.cuda.stream(stream): + group_id = self._bucket_store.current_group_id + if not self._partition_grads: + for rank, grad_list in grads_in_bucket.items(): + for grad in grad_list: + dist.all_reduce(grad, group=self._dp_torch_group) + + self._grad_store.append_average_gradient_by_group(group_id, rank, grad) - self._bucket_store.reset_by_rank(reduce_rank) + else: + grad = torch.zeros_like(grads_in_bucket[0]) + dist.reduce_scatter(grad, grads_in_bucket, group=self._dp_torch_group) + # each rank would get its own grad + self._grad_store.append_average_gradient_by_group(group_id, self._local_rank, grad) + self._bucket_store.reset() - def _add_to_reduction_bucket(self, param, reduce_rank=None): + def _add_to_bucket(self, param, group_id, grad): param_size = param.numel() # check if the bucket is full # if full, will reduce the grads already in the bucket + # or got a grad of param from another group # after reduction, the bucket will be empty - if self._bucket_store.num_elements_in_bucket(reduce_rank) + param_size > self._reduce_bucket_size: - self._run_reduction(reduce_rank) + if self._bucket_store.num_elements_in_bucket( + ) + param_size > self._reduce_bucket_size or group_id != self._bucket_store.current_group_id: - # the param must not be reduced to ensure correctness - is_param_reduced = self._param_store.is_param_reduced(param) - if is_param_reduced: - msg = f'Parameter of size ({param.size()}) has already been reduced, ' \ - + 'duplicate reduction will lead to arithmetic incorrectness' - raise RuntimeError(msg) + self._run_reduction() - self._bucket_store.add_num_elements_in_bucket(param_size, reduce_rank) - self._bucket_store.add_param(param, reduce_rank) + padding_size = self._param_store.get_param_padding_size(param) + self._bucket_store.add_param_grad(group_id, grad, padding_size) ################################ # torch.optim.Optimizer methods @@ -430,16 +324,15 @@ def backward(self, loss, retain_graph=False, sync_grad=True): self._reduce_grad_stage1() else: # TODO: support async comm in reduce - self._reduce_grad_stage2() + self._reduce_scatter_grad_stage2() # clear reduced grads if self._overlap_communication: torch.cuda.synchronize() - self._param_store.clear_grads_of_previous_reduced_params() # gradient synchronization - if sync_grad: - self._sync_grad() + # if sync_grad: + # self._sync_grad() def zero_grad(self, set_to_none=True): """ @@ -475,60 +368,55 @@ def step(self, closure=None): return # copy the grad of working param to master param - single_grad_partition_groups = [] + grad_partition_groups = [] norm_groups = [] for group_id in range(self.num_param_groups): # compute norm - norm_group = compute_norm(gradients=self._grad_store.get_averaged_gradients_by_group(group_id), - params=self._param_store.get_params_by_rank_group(group_id=group_id, - rank=self._local_rank), + working_avg_grads = self._grad_store.get_averaged_gradients_by_group(group_id, self._local_rank) + master_params = self._master_param_groups_of_current_rank[group_id] + + norm_group = compute_norm(gradients=working_avg_grads, + params=master_params, dp_group=self._dp_torch_group, - mp_group=self._mp_torch_group) + mp_group=self._mp_torch_group, + master_working_map=self._param_store.master_to_working_param) norm_groups.append(norm_group) # create flat gradient for the flat fp32 master params - working_avg_grads = self._grad_store.get_averaged_gradients_by_group(group_id) - flat_working_avg_grads = flatten(working_avg_grads) + for param, grad in zip(master_params, working_avg_grads): + assert param.shape == grad.shape, \ + f'fp32 param and grad have different shape {param.shape} vs {grad.shape}' - dtype = self._master_flat_param_groups_of_current_rank[group_id].dtype - flat_master_avg_grads = flat_working_avg_grads.to(dtype) + dtype = param.dtype + master_grad = grad.to(dtype) + param.grad = master_grad.to(param.device) + grad_partition_groups.append(grad) - param_shape = self._master_flat_param_groups_of_current_rank[group_id].shape - assert param_shape == flat_master_avg_grads.shape, \ - f'fp32 param and grad have different shape {param_shape} vs {flat_master_avg_grads.shape}' - - single_grad_partition_groups.append(flat_master_avg_grads) - device = self._master_flat_param_groups_of_current_rank[group_id].device - self._master_flat_param_groups_of_current_rank[group_id].grad = flat_master_avg_grads.to(device) self._grad_store.reset_average_gradients_by_group(group_id) # unscale and clip grads global_norm = calculate_global_norm_from_list(norm_list=norm_groups) - self._unscale_and_clip_grads(single_grad_partition_groups, global_norm) + self._unscale_and_clip_grads(grad_partition_groups, global_norm) # update the parameters self.optim.step() # release the master grad - release_param_grad(self._master_flat_param_groups_of_current_rank.values()) # update working partition updated by the current rank - for group_id in range(len(self._working_param_groups)): - working_param = self._param_store.get_flat_param_by_rank_group(rank=self._local_rank, group_id=group_id) - master_param = self._master_flat_param_groups_of_current_rank[group_id] - working_param.data.copy_(master_param) - - # broadcast the updated model weights - handles = [] for group_id in range(self.num_param_groups): - for index in range(self._world_size): - rank = self._dp_global_ranks[index] - working_param = self._param_store.get_flat_param_by_rank_group(rank=index, group_id=group_id) - handle = dist.broadcast(working_param, src=rank, group=self._dp_torch_group, async_op=True) - handles.append(handle) + release_param_grad(self._master_param_groups_of_current_rank[group_id]) + # for param in self._working_param_groups[group_id]: + master_params = self._master_param_groups_of_current_rank[group_id] + for partition_param in master_params: + full_master_param = [torch.zeros_like(partition_param) for _ in range(self._world_size)] + dist.all_gather(full_master_param, partition_param, group=self._dp_torch_group) - for handle in handles: - handle.wait() + working_param = self._param_store.master_to_working_param[id(partition_param)] + + full_master_param = flatten(full_master_param)[:working_param.numel()].reshape_as(working_param) + + working_param.data.copy_(full_master_param) ############################# # Mixed Precision Utilities # @@ -592,10 +480,13 @@ def _reduce_grad_stage1(self): # left in the communication bucket self._run_reduction() - def _reduce_grad_stage2(self): + def _reduce_scatter_grad_stage2(self): # when partition_grads is True, reduction hooks # are attached in the __init__ function, so we # only need to reduce the gradients # left in the communication bucket - for reduce_rank in range(self._world_size): - self._run_reduction(reduce_rank) + + # TODO:use reduce-scatter + self._run_reduction() + # for reduce_rank in range(self._world_size): + # self._run_reduction(reduce_rank) From 83c1f7381f925b2113a05354b790d8a77f95e724 Mon Sep 17 00:00:00 2001 From: lclgy Date: Sun, 18 Jun 2023 13:34:35 +0800 Subject: [PATCH 02/18] fix zero2 and support cpu offload --- colossalai/zero/low_level/low_level_optim.py | 23 +++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 6e01b0b5c1e2..53ba5d5db3d5 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -280,6 +280,7 @@ def _run_reduction(self): stream = torch.cuda.current_stream() with torch.cuda.stream(stream): + # TODO: both zero 1 and 2 do need flatten when comm group_id = self._bucket_store.current_group_id if not self._partition_grads: for rank, grad_list in grads_in_bucket.items(): @@ -289,10 +290,14 @@ def _run_reduction(self): self._grad_store.append_average_gradient_by_group(group_id, rank, grad) else: - grad = torch.zeros_like(grads_in_bucket[0]) - dist.reduce_scatter(grad, grads_in_bucket, group=self._dp_torch_group) - # each rank would get its own grad - self._grad_store.append_average_gradient_by_group(group_id, self._local_rank, grad) + for i in range(len(grads_in_bucket[0])): + comm_grad_list = [] + for rank, grad_list in grads_in_bucket.items(): + comm_grad_list.append(grad_list[i]) + grad = torch.zeros_like(comm_grad_list[0]) + dist.reduce_scatter(grad, comm_grad_list, group=self._dp_torch_group) + self._grad_store.append_average_gradient_by_group(group_id, self._local_rank, grad) + self._bucket_store.reset() def _add_to_bucket(self, param, group_id, grad): @@ -330,6 +335,7 @@ def backward(self, loss, retain_graph=False, sync_grad=True): if self._overlap_communication: torch.cuda.synchronize() + self.zero_grad() # gradient synchronization # if sync_grad: # self._sync_grad() @@ -409,8 +415,13 @@ def step(self, closure=None): # for param in self._working_param_groups[group_id]: master_params = self._master_param_groups_of_current_rank[group_id] for partition_param in master_params: - full_master_param = [torch.zeros_like(partition_param) for _ in range(self._world_size)] - dist.all_gather(full_master_param, partition_param, group=self._dp_torch_group) + device = partition_param.device + # print(device, self._local_rank) + # if device == "cpu": + # partition_param = partition_param.to(device='cuda') + # print(partition_param.device, self._local_rank) + full_master_param = [torch.zeros_like(partition_param).cuda() for _ in range(self._world_size)] + dist.all_gather(full_master_param, partition_param.cuda(), group=self._dp_torch_group) working_param = self._param_store.master_to_working_param[id(partition_param)] From cc15f9e0765554b2cdff4855c2ca292e48b3d3d0 Mon Sep 17 00:00:00 2001 From: lclgy Date: Tue, 20 Jun 2023 10:48:56 +0800 Subject: [PATCH 03/18] avg gradient and modify unit test --- .../low_level/bookkeeping/gradient_store.py | 10 +++++++- colossalai/zero/low_level/low_level_optim.py | 6 +++-- .../test_zero/test_low_level/test_zero1_2.py | 25 +++++++++++-------- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/colossalai/zero/low_level/bookkeeping/gradient_store.py b/colossalai/zero/low_level/bookkeeping/gradient_store.py index 3233078cef89..8ad8ef510bfa 100644 --- a/colossalai/zero/low_level/bookkeeping/gradient_store.py +++ b/colossalai/zero/low_level/bookkeeping/gradient_store.py @@ -26,7 +26,7 @@ def append_accumulate_grad_object(self, obj): self._grad_acc_objs.append(obj) - def get_averaged_gradients_by_group(self, group_id: int, rank) -> List[Tensor]: + def get_averaged_gradients_by_group(self, group_id: int, rank: int = None) -> List[Tensor]: """ Return a list of flatten average gradients of a parameter group @@ -38,6 +38,14 @@ def get_averaged_gradients_by_group(self, group_id: int, rank) -> List[Tensor]: """ if group_id not in self._averaged_gradients: self._averaged_gradients[group_id] = dict() + + if rank is None: + tensor_list = [] + for tensors in self._averaged_gradients[group_id].values(): + for tensor in tensors: + tensor_list.append(tensor) + return tensor_list + if rank not in self._averaged_gradients[group_id]: self._averaged_gradients[group_id][rank] = [] diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 53ba5d5db3d5..c27814f148dc 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -52,6 +52,7 @@ def check_local_overflow(self) -> bool: for group_id in range(self.num_working_param_groups): for avg_grad in self.grad_store.get_averaged_gradients_by_group(group_id): if avg_grad is not None and has_inf_or_nan(avg_grad): + # print(avg_grad) return True return False @@ -286,7 +287,7 @@ def _run_reduction(self): for rank, grad_list in grads_in_bucket.items(): for grad in grad_list: dist.all_reduce(grad, group=self._dp_torch_group) - + grad /= self._world_size self._grad_store.append_average_gradient_by_group(group_id, rank, grad) else: @@ -296,6 +297,7 @@ def _run_reduction(self): comm_grad_list.append(grad_list[i]) grad = torch.zeros_like(comm_grad_list[0]) dist.reduce_scatter(grad, comm_grad_list, group=self._dp_torch_group) + grad /= self._world_size self._grad_store.append_average_gradient_by_group(group_id, self._local_rank, grad) self._bucket_store.reset() @@ -335,7 +337,7 @@ def backward(self, loss, retain_graph=False, sync_grad=True): if self._overlap_communication: torch.cuda.synchronize() - self.zero_grad() + # self.zero_grad() # gradient synchronization # if sync_grad: # self._sync_grad() diff --git a/tests/test_zero/test_low_level/test_zero1_2.py b/tests/test_zero/test_low_level/test_zero1_2.py index 8e2206fe6c8d..8b31bdc10ea3 100644 --- a/tests/test_zero/test_low_level/test_zero1_2.py +++ b/tests/test_zero/test_low_level/test_zero1_2.py @@ -16,8 +16,8 @@ class MlpModel(nn.Module): def __init__(self): super(MlpModel, self).__init__() - self.linear1 = nn.Linear(128, 256) - self.linear2 = nn.Linear(256, 512) + self.linear1 = nn.Linear(32, 64) + self.linear2 = nn.Linear(64, 32) def forward(self, x): x = self.linear1(x) @@ -72,7 +72,7 @@ def exam_zero_1_2(): initial_scale=128) # create data seed_all(2001 + local_rank) - input_data = torch.randn(32, 128).cuda() + input_data = torch.randn(32, 32).cuda() zero1_output = zero1_model(input_data) zero2_output = zero2_model(input_data) @@ -82,13 +82,13 @@ def exam_zero_1_2(): zero1_optimizer.backward(zero1_output.mean().float(), sync_grad=False) zero2_optimizer.backward(zero2_output.mean().float(), sync_grad=False) - for (n, z1p), z2p in zip(zero1_model.named_parameters(), zero2_model.parameters()): - if z2p.grad is not None: - # print(local_rank, n, z1p.shape, torch.max(z2p.grad), torch.max(torch.abs(z1p.grad - z2p.grad))) - assert torch.equal(z1p.grad, z2p.grad) + # for (n, z1p), z2p in zip(zero1_model.named_parameters(), zero2_model.parameters()): + # if z2p.grad is not None: + # # print(local_rank, n, z1p.shape, torch.max(z2p.grad), torch.max(torch.abs(z1p.grad - z2p.grad))) + # assert torch.equal(z1p.grad, z2p.grad) - zero1_optimizer._sync_grad() - zero2_optimizer._sync_grad() + # zero1_optimizer._sync_grad() + # zero2_optimizer._sync_grad() # step zero1_optimizer.step() @@ -133,7 +133,7 @@ def exam_zero_1_torch_ddp(dtype: torch.dtype): seed_all(1453 + local_rank) # create - input_data = torch.rand(32, 128).cuda() + input_data = torch.rand(32, 32).cuda() # zero-dp forward zero_output = zero_model(input_data.to(dtype)) @@ -151,9 +151,12 @@ def exam_zero_1_torch_ddp(dtype: torch.dtype): # check grad for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): loose_close(p.grad, z1p.grad, dtype=dtype) + # print("torch") + # for (n, p) in torch_model.named_parameters(): + # print(p.grad) # zero-dp step - zero_optimizer._sync_grad() + # zero_optimizer._sync_grad() zero_optimizer.step() # torch ddp step From fce9f279101565b6fdfe183e69666c3d2a8e5802 Mon Sep 17 00:00:00 2001 From: lclgy Date: Wed, 21 Jun 2023 18:29:55 +0800 Subject: [PATCH 04/18] refactor grad store, support layer drop --- colossalai/zero/low_level/_utils.py | 3 +- .../low_level/bookkeeping/bucket_store.py | 18 +- .../low_level/bookkeeping/gradient_store.py | 109 +++-------- colossalai/zero/low_level/low_level_optim.py | 181 ++++++++---------- .../test_zero/test_low_level/test_zero1_2.py | 4 +- 5 files changed, 122 insertions(+), 193 deletions(-) diff --git a/colossalai/zero/low_level/_utils.py b/colossalai/zero/low_level/_utils.py index d6f25b9f348b..218f7603bc54 100644 --- a/colossalai/zero/low_level/_utils.py +++ b/colossalai/zero/low_level/_utils.py @@ -194,7 +194,7 @@ def calculate_global_norm_from_list(norm_list): return math.sqrt(total_norm) -def compute_norm(gradients, params, dp_group, mp_group, master_working_map, norm_type=2): +def compute_norm(gradients, params, dp_group, mp_group, norm_type=2): """Clips gradient norm of an iterable of parameters. This is adapted from torch.nn.utils.clip_grad.clip_grad_norm_ and added functionality to handle model parallel parameters. Note that @@ -230,7 +230,6 @@ def compute_norm(gradients, params, dp_group, mp_group, master_working_map, norm # logger.info(f"Total Norm beginning {total_norm}") for g, p in zip(gradients, params): - p = master_working_map[id(p)] # Pipeline parallelism may replicate parameters. Avoid multi-counting. tp_param_flag = False if is_model_parallel_parameter(p) or (isinstance(p, ColoParameter) and not p.is_replicate()): diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 0841fdc8140c..07b156274a45 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -12,13 +12,14 @@ def __init__(self, torch_pg: ProcessGroup): # init and reset self.current_group_id = 0 + self.grad_to_param_mapping = dict() self.reset() def num_elements_in_bucket(self): return self._num_elements_in_bucket - def add_param_grad(self, group_id, grad, padding_size): + def add_param_grad(self, group_id, param, grad, padding_size): self.current_group_id = group_id with torch.no_grad(): if padding_size > 0: @@ -28,16 +29,27 @@ def add_param_grad(self, group_id, grad, padding_size): self._num_elements_in_bucket += grad.numel() grad_list = grad.split(grad.numel() // self._world_size) for rank in range(self._world_size): + self.grad_to_param_mapping[id(grad_list[rank])] = id(param) self._grad_in_bucket[rank].append(grad_list[rank]) def get_grad(self): return self._grad_in_bucket - def flatten_grad(self): + def get_param_id_of_grad(self, grad): + return self.grad_to_param_mapping[id(grad)] + + def get_flatten_grad(self): + flat_grad = [] for rank, grad_list in self._grad_in_bucket.items(): - self._grad_in_bucket[rank] = _flatten_dense_tensors(grad_list) + flat_grad.append(_flatten_dense_tensors(grad_list)) + flat_grad = _flatten_dense_tensors(flat_grad) + return flat_grad + + def unflatten_grad(self, flat_grad): + grad_list = flat_grad.split(len(flat_grad) // self._world_size) def reset(self): + self.grad_to_param_mapping = dict() self._num_elements_in_bucket = 0 self._grad_in_bucket = dict() for rank in range(self._world_size): diff --git a/colossalai/zero/low_level/bookkeeping/gradient_store.py b/colossalai/zero/low_level/bookkeeping/gradient_store.py index 8ad8ef510bfa..f128fcb275ad 100644 --- a/colossalai/zero/low_level/bookkeeping/gradient_store.py +++ b/colossalai/zero/low_level/bookkeeping/gradient_store.py @@ -1,99 +1,44 @@ from typing import List -from torch import Tensor +from torch._utils import _flatten_dense_tensors from .base_store import BaseStore class GradientStore(BaseStore): - def __init__(self, *args): + def __init__(self, *args, partition_grad=False): super().__init__(*args) - # bookkeeping data structures - self._averaged_gradients = dict() - # for backward reduction hooks - self._grad_acc_objs = [] + self._grads_of_params = dict() + self._working_index = 0 if partition_grad else self._local_rank - def append_accumulate_grad_object(self, obj): - """ - Keep :class:`AccumulateGrad` objects. If these objects are not kept, reduction hooks may not - be attached successfully. + def get_partitioned_gradients_by_param_id(self, group_id, param_id): + if group_id in self._grads_of_params: + if param_id in self._grads_of_params[group_id]: + return self._grads_of_params[group_id][param_id] + return [] - :param obj: An object of :class:`AccumulateGrad` class - :type obj: :class:`AccumulateGrad` - """ + def append_gradients_by_param_id(self, grad, group_id, param_id): + if group_id not in self._grads_of_params: + self._grads_of_params[group_id] = dict() + if param_id not in self._grads_of_params[group_id]: + self._grads_of_params[group_id][param_id] = [grad] + else: + self._grads_of_params[group_id][param_id].append(grad) - self._grad_acc_objs.append(obj) + def add_gradients_by_param_id(self, grad, grad_idx, group_id, param_id): + self._grads_of_params[group_id][param_id][grad_idx].add_(grad) - def get_averaged_gradients_by_group(self, group_id: int, rank: int = None) -> List[Tensor]: - """ - Return a list of flatten average gradients of a parameter group + def get_working_grads_by_group_id(self, group_id): + grad_list = [] + for param_grads in self._grads_of_params[group_id].values(): + grad_list.append(param_grads[self._working_index]) - :param group_id: The index of parameter group - :type group_id: int + return grad_list - :return: Return the list of averaged gradients of a parameter group. Each element is a gradient, not a parameter. - :rtype: List[torch.Tensor] - """ - if group_id not in self._averaged_gradients: - self._averaged_gradients[group_id] = dict() + def reset_grads_by_group_id(self, group_id): + self._grads_of_params[group_id] = dict() - if rank is None: - tensor_list = [] - for tensors in self._averaged_gradients[group_id].values(): - for tensor in tensors: - tensor_list.append(tensor) - return tensor_list - - if rank not in self._averaged_gradients[group_id]: - self._averaged_gradients[group_id][rank] = [] - - return self._averaged_gradients[group_id][rank] - - def append_average_gradient_by_group(self, group_id: int, rank, tensor: Tensor) -> None: - """ - Append an average gradient to the list of averaged gradients of a parameter group - - :param group_id: The index of a parameter group - :param tensor: A :class:`torch.Tensor` object - :type group_id: int - :type tensor: torch.Tensor - - """ - - if group_id not in self._averaged_gradients: - self._averaged_gradients[group_id] = dict() - if rank not in self._averaged_gradients[group_id]: - self._averaged_gradients[group_id][rank] = [] - self._averaged_gradients[group_id][rank].append(tensor) - - def add_average_gradient_by_group(self, group_id: int, tensor_idx: int, tensor: Tensor) -> None: - """ - Add an average gradient to the list of averaged gradients of a parameter group - - :param group_id: The index of a parameter group - :param tensor_idx: The index of a tensor in the list of averaged gradients - :param tensor: A :class:`torch.Tensor` object - :type group_id: int - :type tensor_idx: int - :type tensor: torch.Tensor - - """ - self._averaged_gradients[group_id][tensor_idx].add_(tensor) - - def reset_average_gradients_by_group(self, group_id: int) -> None: - """ - Reset the bookkeeping data structure for averaged gradients to an empty list - - :param group_id: The index of a parameter group - :type group_id: int - """ - - self._averaged_gradients[group_id] = dict() - - def reset_all_average_gradients(self) -> None: - """ - Reset the bookkeeping data structure for averaged gradients to an empty list - """ - self._averaged_gradients = dict() + def reset_all_gradients(self): + self._grads_of_params = dict() diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index c27814f148dc..5f6d9fabb99c 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -27,6 +27,7 @@ release_param_grad, split_by_dtype, sync_param, + unflatten, ) from .bookkeeping import BucketStore, GradientStore, ParameterStore, TensorBucket @@ -50,9 +51,8 @@ def __init__(self, def check_local_overflow(self) -> bool: for group_id in range(self.num_working_param_groups): - for avg_grad in self.grad_store.get_averaged_gradients_by_group(group_id): + for avg_grad in self.grad_store.get_working_grads_by_group_id(group_id): if avg_grad is not None and has_inf_or_nan(avg_grad): - # print(avg_grad) return True return False @@ -143,7 +143,7 @@ def __init__( # ParameterStore will manage the tensor buffers used for zero # it will not manage the tensors used by mixed precision training self._param_store = ParameterStore(self._dp_torch_group) - self._grad_store = GradientStore(self._dp_torch_group) + self._grad_store = GradientStore(self._dp_torch_group, partition_grad=partition_grad) self._bucket_store = BucketStore(self._dp_torch_group) # iterate over the param group in the optimizer @@ -273,8 +273,8 @@ def _attach_reduction_hook(self): def _run_reduction(self): if self._bucket_store.num_elements_in_bucket() > 0: - # self._bucket_store.flatten_grad() grads_in_bucket = self._bucket_store.get_grad() + flat_grads = self._bucket_store.get_flatten_grad() if self._overlap_communication: stream = self._comm_stream else: @@ -284,21 +284,29 @@ def _run_reduction(self): # TODO: both zero 1 and 2 do need flatten when comm group_id = self._bucket_store.current_group_id if not self._partition_grads: - for rank, grad_list in grads_in_bucket.items(): - for grad in grad_list: - dist.all_reduce(grad, group=self._dp_torch_group) - grad /= self._world_size - self._grad_store.append_average_gradient_by_group(group_id, rank, grad) + dist.all_reduce(flat_grads, group=self._dp_torch_group) + flat_grads /= self._world_size + flat_grads_list = flat_grads.split(len(flat_grads) // self._world_size) + for flat_grad_rank, (rank, grad_list) in zip(flat_grads_list, grads_in_bucket.items()): + updated_grad_list = unflatten(flat_grad_rank, grad_list) + for old_grad, new_grad in zip(grad_list, updated_grad_list): + param_id = self._bucket_store.get_param_id_of_grad(old_grad) + stored_param_grad = self._grad_store.get_partitioned_gradients_by_param_id( + group_id, param_id) + if len(stored_param_grad) == self._world_size: + self._grad_store.add_gradients_by_param_id(new_grad, rank, group_id, param_id) + else: + self._grad_store.append_gradients_by_param_id(new_grad, group_id, param_id) else: - for i in range(len(grads_in_bucket[0])): - comm_grad_list = [] - for rank, grad_list in grads_in_bucket.items(): - comm_grad_list.append(grad_list[i]) - grad = torch.zeros_like(comm_grad_list[0]) - dist.reduce_scatter(grad, comm_grad_list, group=self._dp_torch_group) - grad /= self._world_size - self._grad_store.append_average_gradient_by_group(group_id, self._local_rank, grad) + flat_grads_list = list(flat_grads.split(len(flat_grads) // self._world_size)) + recieved_grad = torch.zeros_like(flat_grads_list[0]) + dist.reduce_scatter(recieved_grad, flat_grads_list, group=self._dp_torch_group) + recieved_grad /= self._world_size + updated_grad_list = unflatten(recieved_grad, grads_in_bucket[0]) + for new_grad, old_grad in zip(updated_grad_list, grads_in_bucket[0]): + param_id = self._bucket_store.get_param_id_of_grad(old_grad) + self._grad_store.append_gradients_by_param_id(new_grad, group_id, param_id) self._bucket_store.reset() @@ -309,13 +317,12 @@ def _add_to_bucket(self, param, group_id, grad): # if full, will reduce the grads already in the bucket # or got a grad of param from another group # after reduction, the bucket will be empty - if self._bucket_store.num_elements_in_bucket( - ) + param_size > self._reduce_bucket_size or group_id != self._bucket_store.current_group_id: - + if self._bucket_store.num_elements_in_bucket() + param_size > self._reduce_bucket_size or \ + group_id != self._bucket_store.current_group_id: self._run_reduction() padding_size = self._param_store.get_param_padding_size(param) - self._bucket_store.add_param_grad(group_id, grad, padding_size) + self._bucket_store.add_param_grad(group_id, param, grad, padding_size) ################################ # torch.optim.Optimizer methods @@ -326,21 +333,13 @@ def backward(self, loss, retain_graph=False, sync_grad=True): loss = self.mixed_precision_mixin.pre_backward(loss) loss.backward(retain_graph=retain_graph) - # finish gradient reduction - if not self._partition_grads: - self._reduce_grad_stage1() - else: - # TODO: support async comm in reduce - self._reduce_scatter_grad_stage2() + self._reduce_grad(self._partition_grads) # clear reduced grads if self._overlap_communication: torch.cuda.synchronize() - # self.zero_grad() - # gradient synchronization - # if sync_grad: - # self._sync_grad() + self.zero_grad() def zero_grad(self, set_to_none=True): """ @@ -369,39 +368,53 @@ def step(self, closure=None): assert closure is None, 'closure is not supported by step()' if self.mixed_precision_mixin is not None and self.mixed_precision_mixin.should_skip_step(): - self._grad_store.reset_all_average_gradients() + self._grad_store.reset_all_gradients() if self._verbose: self._logger.info(f'Found overflow. Skip step') self.zero_grad() return - # copy the grad of working param to master param + # record all grads for unscale and clip grad_partition_groups = [] norm_groups = [] + # sometimes not all params are 'really' working + # for instance, when layer drop, the dropped layer has no grad + # and should not be updated + real_working_params = dict() + real_master_params = dict() + + grad_index = 0 if self._partition_grads else self._local_rank + for group_id in range(self.num_param_groups): - # compute norm - working_avg_grads = self._grad_store.get_averaged_gradients_by_group(group_id, self._local_rank) master_params = self._master_param_groups_of_current_rank[group_id] + real_working_params[group_id] = [] + real_master_params[group_id] = [] + for splited_param in master_params: + working_param = self._param_store.master_to_working_param[id(splited_param)] + # if a working param requires grad and has no grad + # it is not 'really' working + # else the splited grad should be attached to the splited param + grads = self._grad_store.get_partitioned_gradients_by_param_id(group_id, id(working_param)) + if len(grads) > 0: + real_working_params[group_id].append(working_param) + grad = grads[grad_index].to(splited_param.dtype).to(splited_param.device) + splited_param.grad = grad + grad_partition_groups.append(grad) + real_master_params[group_id].append(splited_param) - norm_group = compute_norm(gradients=working_avg_grads, - params=master_params, + # compute norm + working_grads = self._grad_store.get_working_grads_by_group_id(group_id) + norm_group = compute_norm(gradients=working_grads, + params=real_working_params[group_id], dp_group=self._dp_torch_group, - mp_group=self._mp_torch_group, - master_working_map=self._param_store.master_to_working_param) + mp_group=self._mp_torch_group) norm_groups.append(norm_group) - # create flat gradient for the flat fp32 master params - for param, grad in zip(master_params, working_avg_grads): - assert param.shape == grad.shape, \ - f'fp32 param and grad have different shape {param.shape} vs {grad.shape}' + self._grad_store.reset_grads_by_group_id(group_id) - dtype = param.dtype - master_grad = grad.to(dtype) - param.grad = master_grad.to(param.device) - grad_partition_groups.append(grad) - - self._grad_store.reset_average_gradients_by_group(group_id) + # update the params in the optimizer + self.optim.param_groups[group_id]['params'] = real_master_params[group_id] # unscale and clip grads global_norm = calculate_global_norm_from_list(norm_list=norm_groups) @@ -409,28 +422,25 @@ def step(self, closure=None): # update the parameters self.optim.step() - # release the master grad - # update working partition updated by the current rank + # release the master grad, release the mem for group_id in range(self.num_param_groups): release_param_grad(self._master_param_groups_of_current_rank[group_id]) - # for param in self._working_param_groups[group_id]: - master_params = self._master_param_groups_of_current_rank[group_id] - for partition_param in master_params: - device = partition_param.device - # print(device, self._local_rank) - # if device == "cpu": - # partition_param = partition_param.to(device='cuda') - # print(partition_param.device, self._local_rank) - full_master_param = [torch.zeros_like(partition_param).cuda() for _ in range(self._world_size)] - dist.all_gather(full_master_param, partition_param.cuda(), group=self._dp_torch_group) - working_param = self._param_store.master_to_working_param[id(partition_param)] + # update working partition updated by the current rank + for group_id in range(self.num_param_groups): + master_working_param = self.optim.param_groups[group_id]['params'] - full_master_param = flatten(full_master_param)[:working_param.numel()].reshape_as(working_param) + for idx, splited_param in enumerate(master_working_param): + full_master_param = [torch.zeros_like(splited_param).cuda() for _ in range(self._world_size)] + dist.all_gather(full_master_param, splited_param.cuda(), group=self._dp_torch_group) + working_param = real_working_params[group_id][idx] + full_master_param = flatten(full_master_param)[:working_param.numel()].reshape_as(working_param) working_param.data.copy_(full_master_param) + self.optim.param_groups[group_id]['params'] = self._master_param_groups_of_current_rank[group_id] + ############################# # Mixed Precision Utilities # ############################# @@ -454,52 +464,15 @@ def _unscale_and_clip_grads(self, grad_groups_flat, total_norm): # Gradient Synchronization # ############################ - def _sync_grad(self): - # update param already reduced flag - reduction_states = self._param_store.get_param_reduction_states() - for tensor, _ in reduction_states.items(): - reduction_states[tensor] = False - - # accumulate gradient - for group_id in range(self.num_param_groups): - param_group = self._param_store.get_params_by_rank_group(self._local_rank, group_id) - - avg_gradients_group = self._grad_store.get_averaged_gradients_by_group(group_id) - - param_idx = 0 - for param in param_group: - if param.grad is not None: - if len(avg_gradients_group) == param_idx: - self._grad_store.append_average_gradient_by_group(group_id, param.grad) - else: - self._grad_store.add_average_gradient_by_group(group_id, param_idx, param.grad) - param_idx += 1 - - # the gradients needed are stored in the avg_gradients buffer - # thus, can clear this - self.zero_grad() - - def _reduce_grad_stage1(self): - # if not overlapping communication (no reduction hook is attached) + def _reduce_grad(self, partition_grad): + # if not overlapping communication (no reduction hook is attached) when zero1 # we need to manually reduce these gradients - if not self._overlap_communication: + if not partition_grad and not self._overlap_communication: for group_id in range(len(self._working_param_groups)): param_group = self._working_param_groups[group_id] for param in param_group: if param.grad is not None: self._add_to_reduction_bucket(param) - # we need to reduce the gradients - # left in the communication bucket - self._run_reduction() - - def _reduce_scatter_grad_stage2(self): - # when partition_grads is True, reduction hooks - # are attached in the __init__ function, so we - # only need to reduce the gradients - # left in the communication bucket - - # TODO:use reduce-scatter + # run reduction self._run_reduction() - # for reduce_rank in range(self._world_size): - # self._run_reduction(reduce_rank) diff --git a/tests/test_zero/test_low_level/test_zero1_2.py b/tests/test_zero/test_low_level/test_zero1_2.py index 8b31bdc10ea3..5bd91865b143 100644 --- a/tests/test_zero/test_low_level/test_zero1_2.py +++ b/tests/test_zero/test_low_level/test_zero1_2.py @@ -149,8 +149,8 @@ def exam_zero_1_torch_ddp(dtype: torch.dtype): torch_output.mean().backward() # check grad - for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): - loose_close(p.grad, z1p.grad, dtype=dtype) + # for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): + # loose_close(p.grad, z1p.grad, dtype=dtype) # print("torch") # for (n, p) in torch_model.named_parameters(): # print(p.grad) From ad6a0655f21939cb1dc03a21328307920f335ea0 Mon Sep 17 00:00:00 2001 From: lclgy Date: Sun, 25 Jun 2023 19:15:52 +0800 Subject: [PATCH 05/18] refactor bucket store, support grad accumulation --- .../low_level/bookkeeping/bucket_store.py | 38 ++++---- colossalai/zero/low_level/low_level_optim.py | 91 +++++++++++-------- 2 files changed, 76 insertions(+), 53 deletions(-) diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 07b156274a45..78f63fd411bb 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -14,43 +14,49 @@ def __init__(self, torch_pg: ProcessGroup): self.current_group_id = 0 self.grad_to_param_mapping = dict() + self._param_list = [] + self._padding_size = [] + self.reset() def num_elements_in_bucket(self): return self._num_elements_in_bucket def add_param_grad(self, group_id, param, grad, padding_size): + self._param_list.append(param) + self._padding_size.append(padding_size) + self._num_elements_in_bucket += (param.numel() + padding_size) self.current_group_id = group_id - with torch.no_grad(): - if padding_size > 0: - grad = torch.nn.functional.pad(grad.view(-1), [0, padding_size]) - else: - grad = grad.view(-1) - self._num_elements_in_bucket += grad.numel() - grad_list = grad.split(grad.numel() // self._world_size) - for rank in range(self._world_size): - self.grad_to_param_mapping[id(grad_list[rank])] = id(param) - self._grad_in_bucket[rank].append(grad_list[rank]) + + def build_grad_in_bucket(self): + for param, padding_size in zip(self._param_list, self._padding_size): + with torch.no_grad(): + grad = param.grad.view(-1) + if padding_size > 0: + grad = torch.nn.function.pad(grad, [0, padding_size]) + grad_list = grad.split(grad.numel() // self._world_size) + for rank in range(self._world_size): + self.grad_to_param_mapping[id(grad_list[rank])] = id(param) + self._grad_in_bucket[rank].append(grad_list[rank]) + param.grad = None def get_grad(self): return self._grad_in_bucket - def get_param_id_of_grad(self, grad): - return self.grad_to_param_mapping[id(grad)] - def get_flatten_grad(self): flat_grad = [] - for rank, grad_list in self._grad_in_bucket.items(): + for _, grad_list in self._grad_in_bucket.items(): flat_grad.append(_flatten_dense_tensors(grad_list)) flat_grad = _flatten_dense_tensors(flat_grad) return flat_grad - def unflatten_grad(self, flat_grad): - grad_list = flat_grad.split(len(flat_grad) // self._world_size) + def get_param_id_of_grad(self, grad): + return self.grad_to_param_mapping[id(grad)] def reset(self): self.grad_to_param_mapping = dict() self._num_elements_in_bucket = 0 + self._param_list = [] self._grad_in_bucket = dict() for rank in range(self._world_size): self._grad_in_bucket[rank] = [] diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 5f6d9fabb99c..f41bf8f06037 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -1,4 +1,5 @@ # this code is inspired by the DeepSpeed library and implemented with our own design from scratch +from contextlib import contextmanager from functools import partial from typing import Optional @@ -16,20 +17,11 @@ from colossalai.logging import get_dist_logger from colossalai.nn.optimizer import ColossalaiOptimizer from colossalai.tensor import ColoParameter, ProcessGroup +from colossalai.utils import conditional_context from colossalai.utils.cuda import get_current_device -from ._utils import ( - calculate_global_norm_from_list, - compute_norm, - flatten, - has_inf_or_nan, - reduce_tensor_dp_group, - release_param_grad, - split_by_dtype, - sync_param, - unflatten, -) -from .bookkeeping import BucketStore, GradientStore, ParameterStore, TensorBucket +from ._utils import calculate_global_norm_from_list, compute_norm, flatten, has_inf_or_nan, release_param_grad +from .bookkeeping import BucketStore, GradientStore, ParameterStore class LowLevelZeroFP16MixedPrecisionMixin(FP16MixedPrecisionMixin): @@ -78,11 +70,11 @@ def __init__( overlap_communication: bool = False, partition_grad: bool = False, # stage 2 flag cpu_offload: bool = False, # cpu offload + grad_accumulate_interval: int = 1, forced_dtype: Optional[torch.dtype] = None): - # TODO: add support for - # 1. optimize the sharding - # 2. support layer drop + assert not (partition_grad and grad_accumulate_interval > 1), \ + "gradient accumulation is not compatible with ZeRO-2" super(LowLevelZeroOptimizer, self).__init__(optim=optimizer) self._dtype = self.optim.param_groups[0]['params'][0].dtype self._logger = get_dist_logger() @@ -93,6 +85,11 @@ def __init__( self._cpu_offload = cpu_offload + # grad accumulation + self.require_grad_sync = True + self._accumulate_intervel = grad_accumulate_interval + self._accumulate_step = 0 + colo_pg = self._search_colo_process_group() if isinstance(colo_pg, ProcessGroup): self._local_rank = colo_pg.dp_local_rank() @@ -226,7 +223,7 @@ def _search_colo_process_group(self): return colo_pg def _create_master_param_current_rank(self, param_list): - # split each param evenly + # split each param evenly by world size params_current_rank = [] device = 'cpu' if self._cpu_offload else get_current_device() @@ -255,7 +252,9 @@ def _create_master_param_current_rank(self, param_list): ########################### def _grad_handler(self, param, group_id, grad): - self._add_to_bucket(param, group_id, grad) + # if run with no_sync context, would not sync grad when backward + if self.require_grad_sync: + self._add_to_bucket(param, group_id, grad) return grad def _attach_reduction_hook(self): @@ -273,7 +272,7 @@ def _attach_reduction_hook(self): def _run_reduction(self): if self._bucket_store.num_elements_in_bucket() > 0: - grads_in_bucket = self._bucket_store.get_grad() + self._bucket_store.build_grad_in_bucket() flat_grads = self._bucket_store.get_flatten_grad() if self._overlap_communication: stream = self._comm_stream @@ -281,32 +280,27 @@ def _run_reduction(self): stream = torch.cuda.current_stream() with torch.cuda.stream(stream): - # TODO: both zero 1 and 2 do need flatten when comm group_id = self._bucket_store.current_group_id if not self._partition_grads: dist.all_reduce(flat_grads, group=self._dp_torch_group) - flat_grads /= self._world_size - flat_grads_list = flat_grads.split(len(flat_grads) // self._world_size) - for flat_grad_rank, (rank, grad_list) in zip(flat_grads_list, grads_in_bucket.items()): - updated_grad_list = unflatten(flat_grad_rank, grad_list) - for old_grad, new_grad in zip(grad_list, updated_grad_list): - param_id = self._bucket_store.get_param_id_of_grad(old_grad) - stored_param_grad = self._grad_store.get_partitioned_gradients_by_param_id( - group_id, param_id) - if len(stored_param_grad) == self._world_size: - self._grad_store.add_gradients_by_param_id(new_grad, rank, group_id, param_id) - else: - self._grad_store.append_gradients_by_param_id(new_grad, group_id, param_id) + # flat_grads /= self._world_size + grad_in_bucket = self._bucket_store.get_grad() + + for _, grad_list in grad_in_bucket.items(): + for grad in grad_list: + param_id = self._bucket_store.get_param_id_of_grad(grad) + self._grad_store.append_gradients_by_param_id(grad, group_id, param_id) else: flat_grads_list = list(flat_grads.split(len(flat_grads) // self._world_size)) recieved_grad = torch.zeros_like(flat_grads_list[0]) dist.reduce_scatter(recieved_grad, flat_grads_list, group=self._dp_torch_group) recieved_grad /= self._world_size - updated_grad_list = unflatten(recieved_grad, grads_in_bucket[0]) - for new_grad, old_grad in zip(updated_grad_list, grads_in_bucket[0]): - param_id = self._bucket_store.get_param_id_of_grad(old_grad) - self._grad_store.append_gradients_by_param_id(new_grad, group_id, param_id) + + grad_in_bucket_current_rank = self._bucket_store.get_grad()[self._local_rank] + for grad in grad_in_bucket_current_rank: + param_id = self._bucket_store.get_param_id_of_grad(grad) + self._grad_store.append_gradients_by_param_id(grad, group_id, param_id) self._bucket_store.reset() @@ -328,10 +322,17 @@ def _add_to_bucket(self, param, group_id, grad): # torch.optim.Optimizer methods ################################ - def backward(self, loss, retain_graph=False, sync_grad=True): + def backward(self, loss, retain_graph=False): if self.mixed_precision_mixin is not None: loss = self.mixed_precision_mixin.pre_backward(loss) - loss.backward(retain_graph=retain_graph) + + self._accumulate_step += 1 + no_sync = self._accumulate_step < self._accumulate_intervel + with conditional_context(self.no_sync(), enable=no_sync): + loss.backward(retain_graph=retain_graph) + + if no_sync: + return self._reduce_grad(self._partition_grads) @@ -367,6 +368,9 @@ def zero_grad(self, set_to_none=True): def step(self, closure=None): assert closure is None, 'closure is not supported by step()' + if not self._accumulate_step == self._accumulate_intervel: + return + if self.mixed_precision_mixin is not None and self.mixed_precision_mixin.should_skip_step(): self._grad_store.reset_all_gradients() if self._verbose: @@ -441,6 +445,9 @@ def step(self, closure=None): self.optim.param_groups[group_id]['params'] = self._master_param_groups_of_current_rank[group_id] + # reset accumulate step + self._accumulate_step = 0 + ############################# # Mixed Precision Utilities # ############################# @@ -476,3 +483,13 @@ def _reduce_grad(self, partition_grad): # run reduction self._run_reduction() + + # this context comes from pytorch DDP + @contextmanager + def no_sync(self): + old_require_grad_sync = self.require_grad_sync + self.require_grad_sync = False + try: + yield + finally: + self.require_grad_sync = old_require_grad_sync From a0a2e285c8d25ef298b4cef834043671dea33a68 Mon Sep 17 00:00:00 2001 From: lclgy Date: Mon, 26 Jun 2023 13:59:07 +0800 Subject: [PATCH 06/18] fix and update unit test of zero and ddp --- colossalai/zero/low_level/_utils.py | 2 +- .../low_level/bookkeeping/bucket_store.py | 2 +- colossalai/zero/low_level/low_level_optim.py | 17 ++++++-- .../test_zero/test_low_level/test_zero1_2.py | 43 +++++++++---------- 4 files changed, 36 insertions(+), 28 deletions(-) diff --git a/colossalai/zero/low_level/_utils.py b/colossalai/zero/low_level/_utils.py index 218f7603bc54..a9e552ebdabc 100644 --- a/colossalai/zero/low_level/_utils.py +++ b/colossalai/zero/low_level/_utils.py @@ -253,7 +253,7 @@ def compute_norm(gradients, params, dp_group, mp_group, norm_type=2): return total_norm -def sync_param(flat_tensor, tensor_list): +def sync_tensor(flat_tensor, tensor_list): """ Synchronize the flattened tensor and unflattened tensor list. When a list of tensor are flattened with `torch._utils._unflatten_dense_tensors`, diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 78f63fd411bb..0abc94efb06a 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -1,5 +1,5 @@ import torch -from torch._utils import _flatten_dense_tensors +from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors from torch.distributed import ProcessGroup from .base_store import BaseStore diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index f41bf8f06037..aa5a5594670c 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -20,7 +20,14 @@ from colossalai.utils import conditional_context from colossalai.utils.cuda import get_current_device -from ._utils import calculate_global_norm_from_list, compute_norm, flatten, has_inf_or_nan, release_param_grad +from ._utils import ( + calculate_global_norm_from_list, + compute_norm, + flatten, + has_inf_or_nan, + release_param_grad, + sync_tensor, +) from .bookkeeping import BucketStore, GradientStore, ParameterStore @@ -283,10 +290,13 @@ def _run_reduction(self): group_id = self._bucket_store.current_group_id if not self._partition_grads: dist.all_reduce(flat_grads, group=self._dp_torch_group) - # flat_grads /= self._world_size + flat_grads /= self._world_size + + flat_grads_per_rank = flat_grads.split(flat_grads.numel() // self._world_size) grad_in_bucket = self._bucket_store.get_grad() - for _, grad_list in grad_in_bucket.items(): + for rank, grad_list in grad_in_bucket.items(): + sync_tensor(flat_grads_per_rank[rank], grad_list) for grad in grad_list: param_id = self._bucket_store.get_param_id_of_grad(grad) self._grad_store.append_gradients_by_param_id(grad, group_id, param_id) @@ -298,6 +308,7 @@ def _run_reduction(self): recieved_grad /= self._world_size grad_in_bucket_current_rank = self._bucket_store.get_grad()[self._local_rank] + sync_tensor(recieved_grad, grad_in_bucket_current_rank) for grad in grad_in_bucket_current_rank: param_id = self._bucket_store.get_param_id_of_grad(grad) self._grad_store.append_gradients_by_param_id(grad, group_id, param_id) diff --git a/tests/test_zero/test_low_level/test_zero1_2.py b/tests/test_zero/test_low_level/test_zero1_2.py index 5bd91865b143..276105550841 100644 --- a/tests/test_zero/test_low_level/test_zero1_2.py +++ b/tests/test_zero/test_low_level/test_zero1_2.py @@ -3,6 +3,7 @@ import pytest import torch import torch.nn as nn +from common import split_ddp_grad from torch.nn.parallel import DistributedDataParallel as DDP from torch.testing import assert_close @@ -16,8 +17,8 @@ class MlpModel(nn.Module): def __init__(self): super(MlpModel, self).__init__() - self.linear1 = nn.Linear(32, 64) - self.linear2 = nn.Linear(64, 32) + self.linear1 = nn.Linear(128, 256) + self.linear2 = nn.Linear(256, 512) def forward(self, x): x = self.linear1(x) @@ -72,23 +73,21 @@ def exam_zero_1_2(): initial_scale=128) # create data seed_all(2001 + local_rank) - input_data = torch.randn(32, 32).cuda() + input_data = torch.randn(32, 128).cuda() zero1_output = zero1_model(input_data) zero2_output = zero2_model(input_data) assert torch.equal(zero1_output, zero2_output) # zero-dp backward - zero1_optimizer.backward(zero1_output.mean().float(), sync_grad=False) - zero2_optimizer.backward(zero2_output.mean().float(), sync_grad=False) + zero1_optimizer.backward(zero1_output.mean().float()) + zero2_optimizer.backward(zero2_output.mean().float()) - # for (n, z1p), z2p in zip(zero1_model.named_parameters(), zero2_model.parameters()): - # if z2p.grad is not None: - # # print(local_rank, n, z1p.shape, torch.max(z2p.grad), torch.max(torch.abs(z1p.grad - z2p.grad))) - # assert torch.equal(z1p.grad, z2p.grad) - - # zero1_optimizer._sync_grad() - # zero2_optimizer._sync_grad() + # check grad + z1g_list = zero1_optimizer._grad_store.get_working_grads_by_group_id(0) + z2g_list = zero2_optimizer._grad_store.get_working_grads_by_group_id(0) + for z1g, z2g in zip(z1g_list, z2g_list): + assert torch.equal(z1g, z2g) # step zero1_optimizer.step() @@ -100,7 +99,7 @@ def exam_zero_1_2(): @parameterize('dtype', [torch.float16, torch.bfloat16]) -def exam_zero_1_torch_ddp(dtype: torch.dtype): +def exam_zero_1_torch_ddp(world_size, dtype: torch.dtype): """ In this test, two pairs of model and optimizers are created. 1. zero: use sharded optimizer and fp16 parameters @@ -133,7 +132,7 @@ def exam_zero_1_torch_ddp(dtype: torch.dtype): seed_all(1453 + local_rank) # create - input_data = torch.rand(32, 32).cuda() + input_data = torch.rand(32, 128).cuda() # zero-dp forward zero_output = zero_model(input_data.to(dtype)) @@ -143,20 +142,19 @@ def exam_zero_1_torch_ddp(dtype: torch.dtype): loose_close(zero_output, torch_output, dtype=dtype) # zero-dp backward - zero_optimizer.backward(zero_output.mean().float(), sync_grad=False) + zero_optimizer.backward(zero_output.mean().float()) # torch-ddp backward torch_output.mean().backward() # check grad - # for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): - # loose_close(p.grad, z1p.grad, dtype=dtype) - # print("torch") - # for (n, p) in torch_model.named_parameters(): - # print(p.grad) + for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): + zero_grad_list = zero_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(z1p)) + torch_grad_list = split_ddp_grad(p.grad, world_size) + for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list): + loose_close(zero_grad, torch_grad, dtype=dtype) # zero-dp step - # zero_optimizer._sync_grad() zero_optimizer.step() # torch ddp step @@ -164,14 +162,13 @@ def exam_zero_1_torch_ddp(dtype: torch.dtype): # check updated param for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): - # print(n, torch.max(torch.abs(p.data - z1p.data))) loose_close(p.data, z1p.data, dtype=dtype) def run_dist(rank, world_size, port): colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host='localhost') - exam_zero_1_torch_ddp() + exam_zero_1_torch_ddp(world_size=world_size) exam_zero_1_2() From 4e2a436d0bb956e2be58905dab819a24a86b0e1b Mon Sep 17 00:00:00 2001 From: lclgy Date: Mon, 26 Jun 2023 20:02:35 +0800 Subject: [PATCH 07/18] compatible with tp, ga and unit test --- .../low_level/bookkeeping/bucket_store.py | 2 +- colossalai/zero/low_level/low_level_optim.py | 17 ++++--- tests/test_zero/test_low_level/common.py | 11 +++++ .../test_zero/test_low_level/test_grad_acc.py | 48 +++++++++++-------- 4 files changed, 47 insertions(+), 31 deletions(-) create mode 100644 tests/test_zero/test_low_level/common.py diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 0abc94efb06a..1fe58fbf59bd 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -22,7 +22,7 @@ def __init__(self, torch_pg: ProcessGroup): def num_elements_in_bucket(self): return self._num_elements_in_bucket - def add_param_grad(self, group_id, param, grad, padding_size): + def add_param_grad(self, group_id, param, padding_size): self._param_list.append(param) self._padding_size.append(padding_size) self._num_elements_in_bucket += (param.numel() + padding_size) diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index aa5a5594670c..0ba2e365b950 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -242,9 +242,9 @@ def _create_master_param_current_rank(self, param_list): with torch.no_grad(): if padding_size > 0: - padding_param = torch.nn.functional.pad(param.view(-1), [0, padding_size]) + padding_param = torch.nn.functional.pad(param.data.view(-1), [0, padding_size]) else: - padding_param = param.view(-1) + padding_param = param.data.view(-1) splited_params = padding_param.split(param.numel() // self._world_size) offset += splited_params[0].numel() @@ -261,7 +261,7 @@ def _create_master_param_current_rank(self, param_list): def _grad_handler(self, param, group_id, grad): # if run with no_sync context, would not sync grad when backward if self.require_grad_sync: - self._add_to_bucket(param, group_id, grad) + self._add_to_bucket(param, group_id) return grad def _attach_reduction_hook(self): @@ -315,7 +315,7 @@ def _run_reduction(self): self._bucket_store.reset() - def _add_to_bucket(self, param, group_id, grad): + def _add_to_bucket(self, param, group_id): param_size = param.numel() # check if the bucket is full @@ -327,7 +327,7 @@ def _add_to_bucket(self, param, group_id, grad): self._run_reduction() padding_size = self._param_store.get_param_padding_size(param) - self._bucket_store.add_param_grad(group_id, param, grad, padding_size) + self._bucket_store.add_param_grad(group_id, param, padding_size) ################################ # torch.optim.Optimizer methods @@ -434,7 +434,6 @@ def step(self, closure=None): # unscale and clip grads global_norm = calculate_global_norm_from_list(norm_list=norm_groups) self._unscale_and_clip_grads(grad_partition_groups, global_norm) - # update the parameters self.optim.step() @@ -449,9 +448,9 @@ def step(self, closure=None): for idx, splited_param in enumerate(master_working_param): full_master_param = [torch.zeros_like(splited_param).cuda() for _ in range(self._world_size)] dist.all_gather(full_master_param, splited_param.cuda(), group=self._dp_torch_group) - working_param = real_working_params[group_id][idx] - full_master_param = flatten(full_master_param)[:working_param.numel()].reshape_as(working_param) + full_master_param = flatten(full_master_param) + full_master_param = full_master_param[:working_param.numel()].reshape_as(working_param) working_param.data.copy_(full_master_param) self.optim.param_groups[group_id]['params'] = self._master_param_groups_of_current_rank[group_id] @@ -490,7 +489,7 @@ def _reduce_grad(self, partition_grad): param_group = self._working_param_groups[group_id] for param in param_group: if param.grad is not None: - self._add_to_reduction_bucket(param) + self._add_to_bucket(param, group_id) # run reduction self._run_reduction() diff --git a/tests/test_zero/test_low_level/common.py b/tests/test_zero/test_low_level/common.py new file mode 100644 index 000000000000..a6f6d2664b33 --- /dev/null +++ b/tests/test_zero/test_low_level/common.py @@ -0,0 +1,11 @@ +import torch + + +def split_ddp_grad(grad, world_size): + with torch.no_grad(): + grad = grad.clone().detach().flatten() + padding_size = (world_size - grad.numel() % world_size) % world_size + if padding_size > 0: + grad = torch.nn.function.pad(grad, [0, padding_size]) + splited_grad = grad.split(grad.numel() // world_size) + return splited_grad diff --git a/tests/test_zero/test_low_level/test_grad_acc.py b/tests/test_zero/test_low_level/test_grad_acc.py index c264a8077d2a..ac1f677f9a0d 100644 --- a/tests/test_zero/test_low_level/test_grad_acc.py +++ b/tests/test_zero/test_low_level/test_grad_acc.py @@ -39,37 +39,37 @@ def exam_zero_1_2_grad_acc(): overlap_communication=True, initial_scale=32, clip_grad_norm=1.0, + grad_accumulate_interval=2, verbose=True) zero2_optimizer = LowLevelZeroOptimizer(zero2_optimizer, overlap_communication=True, partition_grad=True, initial_scale=32, - clip_grad_norm=1.0) + clip_grad_norm=1.0, + grad_accumulate_interval=2) # create data seed_all(2021 + local_rank) input_data1 = torch.randn(32, 128).cuda() input_data2 = torch.randn(32, 128).cuda() - def fwd_bwd_func(number, cur_data): + def fwd_bwd_func(number, cur_data, check_flag): # zero-dp forward zero1_output = zero1_model(cur_data) zero2_output = zero2_model(cur_data) assert torch.equal(zero1_output, zero2_output) # zero-dp backward - zero1_optimizer.backward(zero1_output.sum().float(), sync_grad=False) - zero2_optimizer.backward(zero2_output.sum().float(), sync_grad=False) + zero1_optimizer.backward(zero1_output.sum().float()) + zero2_optimizer.backward(zero2_output.sum().float()) - for (n, z1p), z2p in zip(zero1_model.named_parameters(), zero2_model.parameters()): - if z2p.grad is not None: - # print(local_rank, n, z1p.shape, torch.max(z2p.grad), torch.max(torch.abs(z1p.grad - z2p.grad))) - assert torch.equal(z1p.grad, z2p.grad) - - zero1_optimizer._sync_grad() - zero2_optimizer._sync_grad() + if check_flag: + for (n, z1p), z2p in zip(zero1_model.named_parameters(), zero2_model.parameters()): + if z2p.grad is not None: + # print(local_rank, n, z1p.shape, torch.max(z2p.grad), torch.max(torch.abs(z1p.grad - z2p.grad))) + assert torch.equal(z1p.grad, z2p.grad) - fwd_bwd_func(0, input_data1) - fwd_bwd_func(1, input_data2) + fwd_bwd_func(0, input_data1, True) + fwd_bwd_func(1, input_data2, False) # step zero1_optimizer.step() @@ -101,7 +101,8 @@ def exam_zero_1_grad_acc(): zero_optimizer = LowLevelZeroOptimizer(zero_optimizer, overlap_communication=False, reduce_bucket_size=262144, - clip_grad_norm=1.0) + clip_grad_norm=1.0, + grad_accumulate_interval=2) torch_optimizer = torch.optim.Adam(torch_model.parameters(), lr=1) @@ -115,13 +116,19 @@ def fwd_bwd_func(number, cur_data, check_flag): zero_output = zero_model(cur_data) # torch-ddp forward - torch_output = torch_model(cur_data) - assert torch.equal(zero_output, torch_output) # zero-dp backward - zero_optimizer.backward(zero_output.sum().float(), sync_grad=False) + zero_optimizer.backward(zero_output.sum().float()) # torch-ddp backward - torch_output.sum().backward() + if number < 1: + with torch_model.no_sync(): + torch_output = torch_model(cur_data) + assert torch.equal(zero_output, torch_output) + torch_output.sum().backward() + else: + torch_output = torch_model(cur_data) + assert torch.equal(zero_output, torch_output) + torch_output.sum().backward() if check_flag: # check grad @@ -129,8 +136,6 @@ def fwd_bwd_func(number, cur_data, check_flag): # print(n, p.shape, torch.max(torch.abs(p.grad - unscale_grad))) assert torch.equal(p.grad, z1p.grad) - zero_optimizer._sync_grad() - fwd_bwd_func(0, input_data1, True) fwd_bwd_func(1, input_data2, False) @@ -148,7 +153,8 @@ def run_dist(rank, world_size, port): colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host='localhost') exam_zero_1_grad_acc() - exam_zero_1_2_grad_acc() + # gradient accumulation is not compatible with ZeRO-2 + # exam_zero_1_2_grad_acc() @pytest.mark.dist From 21dcf7da3130082b859c001e3f5416ca5a7b1919 Mon Sep 17 00:00:00 2001 From: lclgy Date: Tue, 27 Jun 2023 19:34:44 +0800 Subject: [PATCH 08/18] fix memory leak and polish --- .../zero/low_level/bookkeeping/bucket_store.py | 5 +++-- colossalai/zero/low_level/low_level_optim.py | 13 +++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 1fe58fbf59bd..9642b30d12b9 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -36,8 +36,9 @@ def build_grad_in_bucket(self): grad = torch.nn.function.pad(grad, [0, padding_size]) grad_list = grad.split(grad.numel() // self._world_size) for rank in range(self._world_size): - self.grad_to_param_mapping[id(grad_list[rank])] = id(param) - self._grad_in_bucket[rank].append(grad_list[rank]) + grad_current_rank = grad_list[rank].detach() + self.grad_to_param_mapping[id(grad_current_rank)] = id(param) + self._grad_in_bucket[rank].append(grad_current_rank) param.grad = None def get_grad(self): diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 0ba2e365b950..7ec37d5f618e 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -281,6 +281,7 @@ def _run_reduction(self): if self._bucket_store.num_elements_in_bucket() > 0: self._bucket_store.build_grad_in_bucket() flat_grads = self._bucket_store.get_flatten_grad() + flat_grads /= self._world_size if self._overlap_communication: stream = self._comm_stream else: @@ -290,7 +291,6 @@ def _run_reduction(self): group_id = self._bucket_store.current_group_id if not self._partition_grads: dist.all_reduce(flat_grads, group=self._dp_torch_group) - flat_grads /= self._world_size flat_grads_per_rank = flat_grads.split(flat_grads.numel() // self._world_size) grad_in_bucket = self._bucket_store.get_grad() @@ -305,7 +305,6 @@ def _run_reduction(self): flat_grads_list = list(flat_grads.split(len(flat_grads) // self._world_size)) recieved_grad = torch.zeros_like(flat_grads_list[0]) dist.reduce_scatter(recieved_grad, flat_grads_list, group=self._dp_torch_group) - recieved_grad /= self._world_size grad_in_bucket_current_rank = self._bucket_store.get_grad()[self._local_rank] sync_tensor(recieved_grad, grad_in_bucket_current_rank) @@ -387,6 +386,7 @@ def step(self, closure=None): if self._verbose: self._logger.info(f'Found overflow. Skip step') self.zero_grad() + self._accumulate_step -= 1 return # record all grads for unscale and clip @@ -408,7 +408,7 @@ def step(self, closure=None): for splited_param in master_params: working_param = self._param_store.master_to_working_param[id(splited_param)] # if a working param requires grad and has no grad - # it is not 'really' working + # it is not 'really' working, e.g. the droped layer # else the splited grad should be attached to the splited param grads = self._grad_store.get_partitioned_gradients_by_param_id(group_id, id(working_param)) if len(grads) > 0: @@ -434,10 +434,12 @@ def step(self, closure=None): # unscale and clip grads global_norm = calculate_global_norm_from_list(norm_list=norm_groups) self._unscale_and_clip_grads(grad_partition_groups, global_norm) + # update the parameters self.optim.step() - # release the master grad, release the mem + # release the grad + grad_partition_groups = [] for group_id in range(self.num_param_groups): release_param_grad(self._master_param_groups_of_current_rank[group_id]) @@ -449,8 +451,7 @@ def step(self, closure=None): full_master_param = [torch.zeros_like(splited_param).cuda() for _ in range(self._world_size)] dist.all_gather(full_master_param, splited_param.cuda(), group=self._dp_torch_group) working_param = real_working_params[group_id][idx] - full_master_param = flatten(full_master_param) - full_master_param = full_master_param[:working_param.numel()].reshape_as(working_param) + full_master_param = flatten(full_master_param)[:working_param.numel()].reshape_as(working_param) working_param.data.copy_(full_master_param) self.optim.param_groups[group_id]['params'] = self._master_param_groups_of_current_rank[group_id] From 08c239cb832d90db629894d2ff34697ba4588d57 Mon Sep 17 00:00:00 2001 From: lclgy Date: Wed, 28 Jun 2023 13:32:21 +0800 Subject: [PATCH 09/18] add zero layer drop unittest --- .../test_low_level/test_layer_drop.py | 184 ++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 tests/test_zero/test_low_level/test_layer_drop.py diff --git a/tests/test_zero/test_low_level/test_layer_drop.py b/tests/test_zero/test_low_level/test_layer_drop.py new file mode 100644 index 000000000000..8376210532d8 --- /dev/null +++ b/tests/test_zero/test_low_level/test_layer_drop.py @@ -0,0 +1,184 @@ +import copy + +import pytest +import torch +import torch.nn as nn +from common import split_ddp_grad +from torch.nn.parallel import DistributedDataParallel as DDP +from torch.testing import assert_close + +import colossalai +from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn +from colossalai.testing.random import seed_all +from colossalai.zero import LowLevelZeroOptimizer + + +class MlpModel(nn.Module): + + def __init__(self): + super(MlpModel, self).__init__() + self.linear1 = nn.Linear(128, 256) + self.drop_linear = nn.Linear(256, 256) + self.linear2 = nn.Linear(256, 512) + + def forward(self, x): + x = self.linear1(x) + x = self.linear2(x) + return x + + +def loose_close(a, b, dtype: torch.dtype = torch.float32): + rtol = None + atol = None + if dtype is torch.float16: + rtol = 5e-2 + atol = 5e-4 + elif dtype is torch.bfloat16: + rtol = 4e-3 + atol = 4e-3 + + a = a.detach().to(dtype) + b = b.detach().to(dtype) + + assert_close(a, b, rtol=rtol, atol=atol) + + +def exam_zero_1_2(): + """ + In this test, we want to test whether zero stage 1 and 2 + deliver the same numerical results despite different communication + pattern + + we use these prefixes to differentiate the zero stage + oss: partition optimizer states + pg: partition gradients and optimizer states + + """ + local_rank = torch.distributed.get_rank() + seed_all(2001) + + # create model + zero1_model = MlpModel().cuda() + zero2_model = copy.deepcopy(zero1_model) + + # create optimizer + zero1_optimizer = torch.optim.Adam(zero1_model.parameters(), lr=1) + zero2_optimizer = torch.optim.Adam(zero2_model.parameters(), lr=1) + zero1_optimizer = LowLevelZeroOptimizer(zero1_optimizer, + overlap_communication=True, + initial_scale=128, + verbose=True) + zero2_optimizer = LowLevelZeroOptimizer(zero2_optimizer, + overlap_communication=True, + partition_grad=True, + initial_scale=128) + # create data + seed_all(2001 + local_rank) + input_data = torch.randn(32, 128).cuda() + + zero1_output = zero1_model(input_data) + zero2_output = zero2_model(input_data) + assert torch.equal(zero1_output, zero2_output) + + # zero-dp backward + zero1_optimizer.backward(zero1_output.mean().float()) + zero2_optimizer.backward(zero2_output.mean().float()) + + # check grad + z1g_list = zero1_optimizer._grad_store.get_working_grads_by_group_id(0) + z2g_list = zero2_optimizer._grad_store.get_working_grads_by_group_id(0) + for z1g, z2g in zip(z1g_list, z2g_list): + assert torch.equal(z1g, z2g) + + # step + zero1_optimizer.step() + zero2_optimizer.step() + + # check updated param + for z1p, z2p in zip(zero1_model.parameters(), zero2_model.parameters()): + assert torch.equal(z1p.data, z2p.data) + + +@parameterize('dtype', [torch.float16, torch.bfloat16]) +def exam_zero_1_torch_ddp(world_size, dtype: torch.dtype): + """ + In this test, two pairs of model and optimizers are created. + 1. zero: use sharded optimizer and fp16 parameters + 2. torch: use torch DDP and fp32 parameters + + We feed these two sets of models with the same input and check if the + differences in model output and updated parameters are within tolerance. + """ + local_rank = torch.distributed.get_rank() + seed_all(1453) + + # create models + torch_model = MlpModel().cuda() + zero_model = copy.deepcopy(torch_model).to(dtype) + + torch_model = DDP(torch_model.cuda(), bucket_cap_mb=0).cuda() + + # create optimizer + zero_optimizer = torch.optim.SGD(zero_model.parameters(), lr=1) + + # we only test stage 1 here + # in `check_sharded_param_consistency.py`, we will test whether + # level 1 and 2 will produce exactly the same results + zero_optimizer = LowLevelZeroOptimizer(zero_optimizer, + overlap_communication=True, + initial_scale=1, + reduce_bucket_size=262144) + + torch_optimizer = torch.optim.SGD(torch_model.parameters(), lr=1) + + seed_all(1453 + local_rank) + # create + input_data = torch.rand(32, 128).cuda() + + # zero-dp forward + zero_output = zero_model(input_data.to(dtype)) + + # torch-ddp forward + torch_output = torch_model(input_data) + loose_close(zero_output, torch_output, dtype=dtype) + + # zero-dp backward + zero_optimizer.backward(zero_output.mean().float()) + + # torch-ddp backward + torch_output.mean().backward() + + # check grad + for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): + if p.grad is not None: + zero_grad_list = zero_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(z1p)) + torch_grad_list = split_ddp_grad(p.grad, world_size) + for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list): + loose_close(zero_grad, torch_grad, dtype=dtype) + + # zero-dp step + zero_optimizer.step() + + # torch ddp step + torch_optimizer.step() + + # check updated param + for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): + loose_close(p.data, z1p.data, dtype=dtype) + + +def run_dist(rank, world_size, port): + colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host='localhost') + + exam_zero_1_torch_ddp(world_size=world_size) + exam_zero_1_2() + + +@pytest.mark.dist +@rerun_if_address_is_in_use() +def test_zero_1_2(): + spawn(run_dist, 2) + + +if __name__ == '__main__': + test_zero_1_2() From 9cf5f87f0e6f883eae9be62c6734a38f489bc918 Mon Sep 17 00:00:00 2001 From: lclgy Date: Wed, 28 Jun 2023 16:05:00 +0800 Subject: [PATCH 10/18] polish code --- .../low_level/bookkeeping/bucket_store.py | 64 +++++++++++++++-- .../low_level/bookkeeping/gradient_store.py | 69 +++++++++++++++++-- .../low_level/bookkeeping/parameter_store.py | 47 ++++++++----- colossalai/zero/low_level/low_level_optim.py | 3 - 4 files changed, 148 insertions(+), 35 deletions(-) diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 9642b30d12b9..54cc6f9eab84 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -1,5 +1,8 @@ +from typing import Dict + import torch -from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors +from torch import Tensor +from torch._utils import _flatten_dense_tensors from torch.distributed import ProcessGroup from .base_store import BaseStore @@ -12,6 +15,7 @@ def __init__(self, torch_pg: ProcessGroup): # init and reset self.current_group_id = 0 + # mapping gardient slices and parameter self.grad_to_param_mapping = dict() self._param_list = [] @@ -19,16 +23,43 @@ def __init__(self, torch_pg: ProcessGroup): self.reset() - def num_elements_in_bucket(self): + def num_elements_in_bucket(self) -> int: + """ + Return the total number of elements in bucket + + :return: Return the total number of elements in bucket + :rtype: int + + """ return self._num_elements_in_bucket - def add_param_grad(self, group_id, param, padding_size): + def add_param_grad(self, group_id: int, param: Tensor, padding_size: int): + """ + Add a param to bucket and record the padding size of a param for gradient padding + :param group_id: The index of a parameter group + :param param: The parameter + :param paddind_size: The padding size of the parameter + :type group_id: int + :type param: Tensor + :type padding_size: int + + """ self._param_list.append(param) self._padding_size.append(padding_size) self._num_elements_in_bucket += (param.numel() + padding_size) self.current_group_id = group_id def build_grad_in_bucket(self): + """ + Orgnize parameters' gradient(padding and split), follows the paramters' splitting method + + Data structure of self._grad_in_bucket: + { + rank0: [grad0_rank0, grad1_rank0, ...] + rank1: [grad1_rank1, grad1_rank1, ...] + } + """ + for param, padding_size in zip(self._param_list, self._padding_size): with torch.no_grad(): grad = param.grad.view(-1) @@ -41,17 +72,38 @@ def build_grad_in_bucket(self): self._grad_in_bucket[rank].append(grad_current_rank) param.grad = None - def get_grad(self): + def get_grad(self) -> Dict: + """ + Return the dictionary of gradients slices, of which the keys are ranks + + :return: Return the dictionary of gradients slices. + :rtype: List[torch.Tensor] or [] + """ return self._grad_in_bucket - def get_flatten_grad(self): + def get_flatten_grad(self) -> Tensor: + """ + Return the flattened gradients slices in the bucket, the data orginization of the flattened tensor: + [grad0_rank0, grad1_rank0, ..., grad_1_rank0, grad1_rank1, ....] + + + :return: Return the flattened gradients slices in the bucket + :rtype: Tensor + """ flat_grad = [] for _, grad_list in self._grad_in_bucket.items(): flat_grad.append(_flatten_dense_tensors(grad_list)) flat_grad = _flatten_dense_tensors(flat_grad) return flat_grad - def get_param_id_of_grad(self, grad): + def get_param_id_of_grad(self, grad: Tensor) -> int: + """ + Return the id of a parameter which the gradient slice belongs to + + :return: Return the id of a parameter which the gradient slice belongs to + :rtype: int + + """ return self.grad_to_param_mapping[id(grad)] def reset(self): diff --git a/colossalai/zero/low_level/bookkeeping/gradient_store.py b/colossalai/zero/low_level/bookkeeping/gradient_store.py index f128fcb275ad..6ebeba635dc1 100644 --- a/colossalai/zero/low_level/bookkeeping/gradient_store.py +++ b/colossalai/zero/low_level/bookkeeping/gradient_store.py @@ -1,5 +1,6 @@ from typing import List +from torch import Tensor from torch._utils import _flatten_dense_tensors from .base_store import BaseStore @@ -7,19 +8,50 @@ class GradientStore(BaseStore): - def __init__(self, *args, partition_grad=False): + def __init__(self, *args, partition_grad: bool = False): super().__init__(*args) - + """ + self._grads_of_params mapping the paramater and its gradient slices + data structure: + { + group_id:{ + param_id: [grad_rank0, grad_rank1, ...] + } + } + """ self._grads_of_params = dict() + # for zero2, it's `param_id: [grad_local_rank]` self._working_index = 0 if partition_grad else self._local_rank - def get_partitioned_gradients_by_param_id(self, group_id, param_id): + def get_partitioned_gradients_by_param_id(self, group_id: int, param_id: int) -> List: + """ + Return list of gradient slices of a specific parameter + :param group_id: The index of a parameter group + :param param_id: The id of a parameter + :type group_id: int + :type param_id: int + + :return: Return the list of gradient slices of a parameter. Each element is a gradient, not a parameter. + :rtype: List[torch.Tensor] or [] + """ if group_id in self._grads_of_params: if param_id in self._grads_of_params[group_id]: return self._grads_of_params[group_id][param_id] + # the param has no grad, for instance, in layer drop return [] - def append_gradients_by_param_id(self, grad, group_id, param_id): + def append_gradients_by_param_id(self, grad: Tensor, group_id: int, param_id: int): + """ + Append a gradient slice to the parameter's gradient slice list + + :param grad: The gradient slice to append to list + :param group_id: The index of a parameter group + :param param_id: The id of a parameter + :type grad: torch.Tensor + :type group_id: int + :type param_id: int + + """ if group_id not in self._grads_of_params: self._grads_of_params[group_id] = dict() if param_id not in self._grads_of_params[group_id]: @@ -27,17 +59,40 @@ def append_gradients_by_param_id(self, grad, group_id, param_id): else: self._grads_of_params[group_id][param_id].append(grad) - def add_gradients_by_param_id(self, grad, grad_idx, group_id, param_id): + def add_gradients_by_param_id(self, grad: Tensor, grad_idx: int, group_id: int, param_id: int): + """ + For old gradient accumulation, not in use now. + + Add a gradient slice on an existing slice of the parameter's gradient + + :param grad: The split gradient to append to list + :param grad_idx: The index of the existing slice + :param group_id: The index of a parameter group + :param param_id: The id of a parameter + :type grad: torch.Tensor + :type grad_idx: int + :type group_id: int + :type param_id: int + + """ self._grads_of_params[group_id][param_id][grad_idx].add_(grad) - def get_working_grads_by_group_id(self, group_id): + def get_working_grads_by_group_id(self, group_id: int) -> List: + """ + Return list of working gradient slices in the group + :param group_id: The index of a parameter group + :type group_id: int + + :return: Return the list working gradient slices in the group. + :rtype: List[torch.Tensor] + """ grad_list = [] for param_grads in self._grads_of_params[group_id].values(): grad_list.append(param_grads[self._working_index]) return grad_list - def reset_grads_by_group_id(self, group_id): + def reset_grads_by_group_id(self, group_id: int): self._grads_of_params[group_id] = dict() def reset_all_gradients(self): diff --git a/colossalai/zero/low_level/bookkeeping/parameter_store.py b/colossalai/zero/low_level/bookkeeping/parameter_store.py index f982d7df2b32..9cfdd734b373 100644 --- a/colossalai/zero/low_level/bookkeeping/parameter_store.py +++ b/colossalai/zero/low_level/bookkeeping/parameter_store.py @@ -1,5 +1,3 @@ -from typing import List - from torch import Tensor from torch.distributed import ProcessGroup @@ -11,32 +9,43 @@ class ParameterStore(BaseStore): def __init__(self, torch_pg: ProcessGroup): super().__init__(torch_pg) + # record the padding size of each param self._padding_map = dict() - self._position_map = dict() - self._marked_params = dict() - self._numel_per_split_param = dict() + # mapping working param and master param self.master_to_working_param = dict() self.working_to_master_param = dict() - def record_param_padding_size(self, param, padding_size): - self._padding_map[id(param)] = padding_size - - def get_param_padding_size(self, param): - return self._padding_map[id(param)] + def record_param_padding_size(self, param: Tensor, padding_size: int): + """ + Record the padding size of a param + :param param: The parameter + :param paddind_size: The padding size of the parameter + :type param: Tensor + :type padding_size: int - def record_offset_in_flatten(self, param, position): - self._position_map[id(param)] = position + """ + self._padding_map[id(param)] = padding_size - def get_offset_in_flatten(self, param): - return self._position_map[id(param)] + def get_param_padding_size(self, param: Tensor) -> int: + """ + Return the padding size of the parameter + :param param: The parameter + :type param: Tensor - def record_numel_per_split_param(self, param, numel): - self._numel_per_split_param[id(param)] = numel + :return: Return the padding size of the parameter + :rtype: int + """ + return self._padding_map[id(param)] - def get_numel_per_split_param(self, param): - return self._numel_per_split_param[id(param)] + def link_master_and_working_param(self, master_param: Tensor, working_param: Tensor): + """ + Mapping master parameter and working parameter + :param master_param: The parameter copy in optimizer + :param working_param: The parameter of the model + :type master_param: Tensor + :type working_param: Tensor - def link_master_and_working_param(self, master_param, working_param): + """ self.master_to_working_param[id(master_param)] = working_param self.working_to_master_param[id(working_param)] = master_param diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 7ec37d5f618e..af274a497243 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -234,10 +234,8 @@ def _create_master_param_current_rank(self, param_list): params_current_rank = [] device = 'cpu' if self._cpu_offload else get_current_device() - offset = 0 for param in reversed(param_list): padding_size = (self._world_size - param.numel() % self._world_size) % self._world_size - self._param_store.record_offset_in_flatten(param, offset) self._param_store.record_param_padding_size(param, padding_size) with torch.no_grad(): @@ -246,7 +244,6 @@ def _create_master_param_current_rank(self, param_list): else: padding_param = param.data.view(-1) splited_params = padding_param.split(param.numel() // self._world_size) - offset += splited_params[0].numel() splited_param_current_rank = splited_params[self._local_rank].detach().float().to(device) params_current_rank.append(splited_param_current_rank) From c4e004d57a2a4227d4e33abbd11e0166ef9d96ce Mon Sep 17 00:00:00 2001 From: lclgy Date: Wed, 28 Jun 2023 16:25:07 +0800 Subject: [PATCH 11/18] fix import err in unit test --- tests/test_zero/test_low_level/common.py | 11 -- .../test_low_level/test_layer_drop.py | 184 ------------------ .../test_zero/test_low_level/test_zero1_2.py | 21 +- 3 files changed, 16 insertions(+), 200 deletions(-) delete mode 100644 tests/test_zero/test_low_level/common.py delete mode 100644 tests/test_zero/test_low_level/test_layer_drop.py diff --git a/tests/test_zero/test_low_level/common.py b/tests/test_zero/test_low_level/common.py deleted file mode 100644 index a6f6d2664b33..000000000000 --- a/tests/test_zero/test_low_level/common.py +++ /dev/null @@ -1,11 +0,0 @@ -import torch - - -def split_ddp_grad(grad, world_size): - with torch.no_grad(): - grad = grad.clone().detach().flatten() - padding_size = (world_size - grad.numel() % world_size) % world_size - if padding_size > 0: - grad = torch.nn.function.pad(grad, [0, padding_size]) - splited_grad = grad.split(grad.numel() // world_size) - return splited_grad diff --git a/tests/test_zero/test_low_level/test_layer_drop.py b/tests/test_zero/test_low_level/test_layer_drop.py deleted file mode 100644 index 8376210532d8..000000000000 --- a/tests/test_zero/test_low_level/test_layer_drop.py +++ /dev/null @@ -1,184 +0,0 @@ -import copy - -import pytest -import torch -import torch.nn as nn -from common import split_ddp_grad -from torch.nn.parallel import DistributedDataParallel as DDP -from torch.testing import assert_close - -import colossalai -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn -from colossalai.testing.random import seed_all -from colossalai.zero import LowLevelZeroOptimizer - - -class MlpModel(nn.Module): - - def __init__(self): - super(MlpModel, self).__init__() - self.linear1 = nn.Linear(128, 256) - self.drop_linear = nn.Linear(256, 256) - self.linear2 = nn.Linear(256, 512) - - def forward(self, x): - x = self.linear1(x) - x = self.linear2(x) - return x - - -def loose_close(a, b, dtype: torch.dtype = torch.float32): - rtol = None - atol = None - if dtype is torch.float16: - rtol = 5e-2 - atol = 5e-4 - elif dtype is torch.bfloat16: - rtol = 4e-3 - atol = 4e-3 - - a = a.detach().to(dtype) - b = b.detach().to(dtype) - - assert_close(a, b, rtol=rtol, atol=atol) - - -def exam_zero_1_2(): - """ - In this test, we want to test whether zero stage 1 and 2 - deliver the same numerical results despite different communication - pattern - - we use these prefixes to differentiate the zero stage - oss: partition optimizer states - pg: partition gradients and optimizer states - - """ - local_rank = torch.distributed.get_rank() - seed_all(2001) - - # create model - zero1_model = MlpModel().cuda() - zero2_model = copy.deepcopy(zero1_model) - - # create optimizer - zero1_optimizer = torch.optim.Adam(zero1_model.parameters(), lr=1) - zero2_optimizer = torch.optim.Adam(zero2_model.parameters(), lr=1) - zero1_optimizer = LowLevelZeroOptimizer(zero1_optimizer, - overlap_communication=True, - initial_scale=128, - verbose=True) - zero2_optimizer = LowLevelZeroOptimizer(zero2_optimizer, - overlap_communication=True, - partition_grad=True, - initial_scale=128) - # create data - seed_all(2001 + local_rank) - input_data = torch.randn(32, 128).cuda() - - zero1_output = zero1_model(input_data) - zero2_output = zero2_model(input_data) - assert torch.equal(zero1_output, zero2_output) - - # zero-dp backward - zero1_optimizer.backward(zero1_output.mean().float()) - zero2_optimizer.backward(zero2_output.mean().float()) - - # check grad - z1g_list = zero1_optimizer._grad_store.get_working_grads_by_group_id(0) - z2g_list = zero2_optimizer._grad_store.get_working_grads_by_group_id(0) - for z1g, z2g in zip(z1g_list, z2g_list): - assert torch.equal(z1g, z2g) - - # step - zero1_optimizer.step() - zero2_optimizer.step() - - # check updated param - for z1p, z2p in zip(zero1_model.parameters(), zero2_model.parameters()): - assert torch.equal(z1p.data, z2p.data) - - -@parameterize('dtype', [torch.float16, torch.bfloat16]) -def exam_zero_1_torch_ddp(world_size, dtype: torch.dtype): - """ - In this test, two pairs of model and optimizers are created. - 1. zero: use sharded optimizer and fp16 parameters - 2. torch: use torch DDP and fp32 parameters - - We feed these two sets of models with the same input and check if the - differences in model output and updated parameters are within tolerance. - """ - local_rank = torch.distributed.get_rank() - seed_all(1453) - - # create models - torch_model = MlpModel().cuda() - zero_model = copy.deepcopy(torch_model).to(dtype) - - torch_model = DDP(torch_model.cuda(), bucket_cap_mb=0).cuda() - - # create optimizer - zero_optimizer = torch.optim.SGD(zero_model.parameters(), lr=1) - - # we only test stage 1 here - # in `check_sharded_param_consistency.py`, we will test whether - # level 1 and 2 will produce exactly the same results - zero_optimizer = LowLevelZeroOptimizer(zero_optimizer, - overlap_communication=True, - initial_scale=1, - reduce_bucket_size=262144) - - torch_optimizer = torch.optim.SGD(torch_model.parameters(), lr=1) - - seed_all(1453 + local_rank) - # create - input_data = torch.rand(32, 128).cuda() - - # zero-dp forward - zero_output = zero_model(input_data.to(dtype)) - - # torch-ddp forward - torch_output = torch_model(input_data) - loose_close(zero_output, torch_output, dtype=dtype) - - # zero-dp backward - zero_optimizer.backward(zero_output.mean().float()) - - # torch-ddp backward - torch_output.mean().backward() - - # check grad - for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): - if p.grad is not None: - zero_grad_list = zero_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(z1p)) - torch_grad_list = split_ddp_grad(p.grad, world_size) - for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list): - loose_close(zero_grad, torch_grad, dtype=dtype) - - # zero-dp step - zero_optimizer.step() - - # torch ddp step - torch_optimizer.step() - - # check updated param - for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): - loose_close(p.data, z1p.data, dtype=dtype) - - -def run_dist(rank, world_size, port): - colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host='localhost') - - exam_zero_1_torch_ddp(world_size=world_size) - exam_zero_1_2() - - -@pytest.mark.dist -@rerun_if_address_is_in_use() -def test_zero_1_2(): - spawn(run_dist, 2) - - -if __name__ == '__main__': - test_zero_1_2() diff --git a/tests/test_zero/test_low_level/test_zero1_2.py b/tests/test_zero/test_low_level/test_zero1_2.py index 276105550841..8350d541b30c 100644 --- a/tests/test_zero/test_low_level/test_zero1_2.py +++ b/tests/test_zero/test_low_level/test_zero1_2.py @@ -3,7 +3,6 @@ import pytest import torch import torch.nn as nn -from common import split_ddp_grad from torch.nn.parallel import DistributedDataParallel as DDP from torch.testing import assert_close @@ -18,6 +17,7 @@ class MlpModel(nn.Module): def __init__(self): super(MlpModel, self).__init__() self.linear1 = nn.Linear(128, 256) + self.linear_drop = nn.Linear(256, 256) self.linear2 = nn.Linear(256, 512) def forward(self, x): @@ -42,6 +42,16 @@ def loose_close(a, b, dtype: torch.dtype = torch.float32): assert_close(a, b, rtol=rtol, atol=atol) +def split_ddp_grad(grad, world_size): + with torch.no_grad(): + grad = grad.clone().detach().flatten() + padding_size = (world_size - grad.numel() % world_size) % world_size + if padding_size > 0: + grad = torch.nn.function.pad(grad, [0, padding_size]) + splited_grad = grad.split(grad.numel() // world_size) + return splited_grad + + def exam_zero_1_2(): """ In this test, we want to test whether zero stage 1 and 2 @@ -149,10 +159,11 @@ def exam_zero_1_torch_ddp(world_size, dtype: torch.dtype): # check grad for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): - zero_grad_list = zero_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(z1p)) - torch_grad_list = split_ddp_grad(p.grad, world_size) - for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list): - loose_close(zero_grad, torch_grad, dtype=dtype) + if p.grad is not None: + zero_grad_list = zero_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(z1p)) + torch_grad_list = split_ddp_grad(p.grad, world_size) + for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list): + loose_close(zero_grad, torch_grad, dtype=dtype) # zero-dp step zero_optimizer.step() From e5712385e9a64e28151e09f731a02d6acb47fcd0 Mon Sep 17 00:00:00 2001 From: lclgy Date: Wed, 28 Jun 2023 17:51:31 +0800 Subject: [PATCH 12/18] support diffenert comm dtype, modify docstring style --- .../low_level/bookkeeping/bucket_store.py | 53 +++++++-------- .../low_level/bookkeeping/gradient_store.py | 65 +++++++++---------- .../low_level/bookkeeping/parameter_store.py | 35 +++++----- colossalai/zero/low_level/low_level_optim.py | 19 ++++-- 4 files changed, 86 insertions(+), 86 deletions(-) diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 54cc6f9eab84..0e523b1606c1 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -24,34 +24,30 @@ def __init__(self, torch_pg: ProcessGroup): self.reset() def num_elements_in_bucket(self) -> int: - """ - Return the total number of elements in bucket - - :return: Return the total number of elements in bucket - :rtype: int + """Return the total number of elements in bucket + Returns: + int: the total number of elements in bucket """ + return self._num_elements_in_bucket def add_param_grad(self, group_id: int, param: Tensor, padding_size: int): - """ - Add a param to bucket and record the padding size of a param for gradient padding - :param group_id: The index of a parameter group - :param param: The parameter - :param paddind_size: The padding size of the parameter - :type group_id: int - :type param: Tensor - :type padding_size: int + """Add a param to bucket and record the padding size of a param for gradient padding + Args: + group_id (int): The index of a parameter group + param (Tensor): The parameter + padding_size (int): The padding size of the parameter """ + self._param_list.append(param) self._padding_size.append(padding_size) self._num_elements_in_bucket += (param.numel() + padding_size) self.current_group_id = group_id def build_grad_in_bucket(self): - """ - Orgnize parameters' gradient(padding and split), follows the paramters' splitting method + """Orgnize parameters' gradient(padding and split), follows the paramters' splitting method Data structure of self._grad_in_bucket: { @@ -73,23 +69,22 @@ def build_grad_in_bucket(self): param.grad = None def get_grad(self) -> Dict: - """ - Return the dictionary of gradients slices, of which the keys are ranks + """Return the dictionary of gradients slices, of which the keys are ranks - :return: Return the dictionary of gradients slices. - :rtype: List[torch.Tensor] or [] + Returns: + Dict: The dictionary of gradients slices """ + return self._grad_in_bucket def get_flatten_grad(self) -> Tensor: - """ - Return the flattened gradients slices in the bucket, the data orginization of the flattened tensor: + """Return the flattened gradients slices in the bucket, the data orginization of the flattened tensor: [grad0_rank0, grad1_rank0, ..., grad_1_rank0, grad1_rank1, ....] - - :return: Return the flattened gradients slices in the bucket - :rtype: Tensor + Returns: + Tensor: the flattened gradients slices in the bucket """ + flat_grad = [] for _, grad_list in self._grad_in_bucket.items(): flat_grad.append(_flatten_dense_tensors(grad_list)) @@ -97,13 +92,15 @@ def get_flatten_grad(self) -> Tensor: return flat_grad def get_param_id_of_grad(self, grad: Tensor) -> int: - """ - Return the id of a parameter which the gradient slice belongs to + """Return the id of a parameter which the gradient slice belongs to - :return: Return the id of a parameter which the gradient slice belongs to - :rtype: int + Args: + grad (Tensor): the gradient slice + Returns: + int: the id of a parameter which the gradient slice belongs to """ + return self.grad_to_param_mapping[id(grad)] def reset(self): diff --git a/colossalai/zero/low_level/bookkeeping/gradient_store.py b/colossalai/zero/low_level/bookkeeping/gradient_store.py index 6ebeba635dc1..0b86ec8ca89e 100644 --- a/colossalai/zero/low_level/bookkeeping/gradient_store.py +++ b/colossalai/zero/low_level/bookkeeping/gradient_store.py @@ -24,16 +24,16 @@ def __init__(self, *args, partition_grad: bool = False): self._working_index = 0 if partition_grad else self._local_rank def get_partitioned_gradients_by_param_id(self, group_id: int, param_id: int) -> List: + """Return list of gradient slices of a specific parameter + + Args: + group_id (int): The index of a parameter group + param_id (int): The id of a parameter + + Returns: + List: the list of gradient slices of a parameter. """ - Return list of gradient slices of a specific parameter - :param group_id: The index of a parameter group - :param param_id: The id of a parameter - :type group_id: int - :type param_id: int - - :return: Return the list of gradient slices of a parameter. Each element is a gradient, not a parameter. - :rtype: List[torch.Tensor] or [] - """ + if group_id in self._grads_of_params: if param_id in self._grads_of_params[group_id]: return self._grads_of_params[group_id][param_id] @@ -41,17 +41,14 @@ def get_partitioned_gradients_by_param_id(self, group_id: int, param_id: int) -> return [] def append_gradients_by_param_id(self, grad: Tensor, group_id: int, param_id: int): - """ - Append a gradient slice to the parameter's gradient slice list - - :param grad: The gradient slice to append to list - :param group_id: The index of a parameter group - :param param_id: The id of a parameter - :type grad: torch.Tensor - :type group_id: int - :type param_id: int + """Append a gradient slice to the parameter's gradient slice list + Args: + grad (Tensor): The gradient slice to append to list + group_id (int): The index of a parameter group + param_id (int): The id of a parameter """ + if group_id not in self._grads_of_params: self._grads_of_params[group_id] = dict() if param_id not in self._grads_of_params[group_id]: @@ -60,32 +57,28 @@ def append_gradients_by_param_id(self, grad: Tensor, group_id: int, param_id: in self._grads_of_params[group_id][param_id].append(grad) def add_gradients_by_param_id(self, grad: Tensor, grad_idx: int, group_id: int, param_id: int): - """ - For old gradient accumulation, not in use now. - + """For old gradient accumulation, not in use now. Add a gradient slice on an existing slice of the parameter's gradient - :param grad: The split gradient to append to list - :param grad_idx: The index of the existing slice - :param group_id: The index of a parameter group - :param param_id: The id of a parameter - :type grad: torch.Tensor - :type grad_idx: int - :type group_id: int - :type param_id: int - + Args: + grad (Tensor): The split gradient to append to list + grad_idx (int): The index of the existing slice + group_id (int): The index of a parameter group + param_id (int): The id of a parameter """ + self._grads_of_params[group_id][param_id][grad_idx].add_(grad) def get_working_grads_by_group_id(self, group_id: int) -> List: - """ - Return list of working gradient slices in the group - :param group_id: The index of a parameter group - :type group_id: int + """Return list of working gradient slices in the group + + Args: + group_id (int): The index of a parameter group - :return: Return the list working gradient slices in the group. - :rtype: List[torch.Tensor] + Returns: + List: the list working gradient slices in the group """ + grad_list = [] for param_grads in self._grads_of_params[group_id].values(): grad_list.append(param_grads[self._working_index]) diff --git a/colossalai/zero/low_level/bookkeeping/parameter_store.py b/colossalai/zero/low_level/bookkeeping/parameter_store.py index 9cfdd734b373..63f7c5506069 100644 --- a/colossalai/zero/low_level/bookkeeping/parameter_store.py +++ b/colossalai/zero/low_level/bookkeeping/parameter_store.py @@ -17,35 +17,34 @@ def __init__(self, torch_pg: ProcessGroup): self.working_to_master_param = dict() def record_param_padding_size(self, param: Tensor, padding_size: int): - """ - Record the padding size of a param - :param param: The parameter - :param paddind_size: The padding size of the parameter - :type param: Tensor - :type padding_size: int + """Record the padding size of a param + Args: + param (Tensor): The parameter + padding_size (int): The padding size of the parameter """ + self._padding_map[id(param)] = padding_size def get_param_padding_size(self, param: Tensor) -> int: - """ - Return the padding size of the parameter - :param param: The parameter - :type param: Tensor + """Return the padding size of the parameter + + Args: + param (Tensor): The parameter - :return: Return the padding size of the parameter - :rtype: int + Returns: + int: the padding size of the parameter """ + return self._padding_map[id(param)] def link_master_and_working_param(self, master_param: Tensor, working_param: Tensor): - """ - Mapping master parameter and working parameter - :param master_param: The parameter copy in optimizer - :param working_param: The parameter of the model - :type master_param: Tensor - :type working_param: Tensor + """Mapping master parameter and working parameter + Args: + master_param (Tensor): The parameter copy in optimizer + working_param (Tensor): The parameter of the model """ + self.master_to_working_param[id(master_param)] = working_param self.working_to_master_param[id(working_param)] = master_param diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index af274a497243..26c043ffde5d 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -286,8 +286,16 @@ def _run_reduction(self): with torch.cuda.stream(stream): group_id = self._bucket_store.current_group_id + + if self._communication_dtype is not None: + comm_flat_grads = flat_grads.to(self._communication_dtype) + else: + comm_flat_grads = flat_grads + if not self._partition_grads: - dist.all_reduce(flat_grads, group=self._dp_torch_group) + dist.all_reduce(comm_flat_grads, group=self._dp_torch_group) + if flat_grads.dtype != comm_flat_grads.dtype: + flat_grads.copy_(comm_flat_grads) flat_grads_per_rank = flat_grads.split(flat_grads.numel() // self._world_size) grad_in_bucket = self._bucket_store.get_grad() @@ -299,9 +307,12 @@ def _run_reduction(self): self._grad_store.append_gradients_by_param_id(grad, group_id, param_id) else: - flat_grads_list = list(flat_grads.split(len(flat_grads) // self._world_size)) - recieved_grad = torch.zeros_like(flat_grads_list[0]) - dist.reduce_scatter(recieved_grad, flat_grads_list, group=self._dp_torch_group) + comm_flat_grads_list = list(comm_flat_grads.split(len(comm_flat_grads) // self._world_size)) + recieved_grad = torch.zeros_like(comm_flat_grads_list[0]) + dist.reduce_scatter(recieved_grad, comm_flat_grads_list, group=self._dp_torch_group) + + if recieved_grad.dtype != flat_grads.dtype: + recieved_grad = recieved_grad.to(flat_grads.dtype) grad_in_bucket_current_rank = self._bucket_store.get_grad()[self._local_rank] sync_tensor(recieved_grad, grad_in_bucket_current_rank) From b6f7cd2ca798124f366d85723756c3fc82ecfdf1 Mon Sep 17 00:00:00 2001 From: lclgy Date: Wed, 28 Jun 2023 18:15:15 +0800 Subject: [PATCH 13/18] polish code --- colossalai/zero/low_level/low_level_optim.py | 21 ++++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 26c043ffde5d..d525f9eb4b8b 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -287,15 +287,14 @@ def _run_reduction(self): with torch.cuda.stream(stream): group_id = self._bucket_store.current_group_id + grad_dtype = flat_grads.dtype if self._communication_dtype is not None: - comm_flat_grads = flat_grads.to(self._communication_dtype) - else: - comm_flat_grads = flat_grads + flat_grads = flat_grads.to(self._communication_dtype) if not self._partition_grads: - dist.all_reduce(comm_flat_grads, group=self._dp_torch_group) - if flat_grads.dtype != comm_flat_grads.dtype: - flat_grads.copy_(comm_flat_grads) + dist.all_reduce(flat_grads, group=self._dp_torch_group) + if flat_grads.dtype != grad_dtype: + flat_grads = flat_grads.to(grad_dtype) flat_grads_per_rank = flat_grads.split(flat_grads.numel() // self._world_size) grad_in_bucket = self._bucket_store.get_grad() @@ -307,12 +306,12 @@ def _run_reduction(self): self._grad_store.append_gradients_by_param_id(grad, group_id, param_id) else: - comm_flat_grads_list = list(comm_flat_grads.split(len(comm_flat_grads) // self._world_size)) - recieved_grad = torch.zeros_like(comm_flat_grads_list[0]) - dist.reduce_scatter(recieved_grad, comm_flat_grads_list, group=self._dp_torch_group) + flat_grads_list = list(flat_grads.split(len(flat_grads) // self._world_size)) + recieved_grad = torch.zeros_like(flat_grads_list[0]) + dist.reduce_scatter(recieved_grad, flat_grads_list, group=self._dp_torch_group) - if recieved_grad.dtype != flat_grads.dtype: - recieved_grad = recieved_grad.to(flat_grads.dtype) + if recieved_grad.dtype != grad_dtype: + recieved_grad = recieved_grad.to(grad_dtype) grad_in_bucket_current_rank = self._bucket_store.get_grad()[self._local_rank] sync_tensor(recieved_grad, grad_in_bucket_current_rank) From ea4e3c52149888cc99522356438c5e9b44e41c86 Mon Sep 17 00:00:00 2001 From: lclgy Date: Thu, 29 Jun 2023 13:24:26 +0800 Subject: [PATCH 14/18] test padding and fix --- .../zero/low_level/bookkeeping/bucket_store.py | 2 +- colossalai/zero/low_level/low_level_optim.py | 2 +- tests/test_zero/test_low_level/test_zero1_2.py | 14 ++++++++------ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 0e523b1606c1..dd5c5de8a976 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -60,7 +60,7 @@ def build_grad_in_bucket(self): with torch.no_grad(): grad = param.grad.view(-1) if padding_size > 0: - grad = torch.nn.function.pad(grad, [0, padding_size]) + grad = torch.nn.functional.pad(grad, [0, padding_size]) grad_list = grad.split(grad.numel() // self._world_size) for rank in range(self._world_size): grad_current_rank = grad_list[rank].detach() diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index d525f9eb4b8b..8743cab3313f 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -243,7 +243,7 @@ def _create_master_param_current_rank(self, param_list): padding_param = torch.nn.functional.pad(param.data.view(-1), [0, padding_size]) else: padding_param = param.data.view(-1) - splited_params = padding_param.split(param.numel() // self._world_size) + splited_params = padding_param.split(padding_param.numel() // self._world_size) splited_param_current_rank = splited_params[self._local_rank].detach().float().to(device) params_current_rank.append(splited_param_current_rank) diff --git a/tests/test_zero/test_low_level/test_zero1_2.py b/tests/test_zero/test_low_level/test_zero1_2.py index 8350d541b30c..4879c67a9fc4 100644 --- a/tests/test_zero/test_low_level/test_zero1_2.py +++ b/tests/test_zero/test_low_level/test_zero1_2.py @@ -16,9 +16,9 @@ class MlpModel(nn.Module): def __init__(self): super(MlpModel, self).__init__() - self.linear1 = nn.Linear(128, 256) - self.linear_drop = nn.Linear(256, 256) - self.linear2 = nn.Linear(256, 512) + self.linear1 = nn.Linear(123, 253) + self.linear_drop = nn.Linear(253, 253) + self.linear2 = nn.Linear(253, 512) def forward(self, x): x = self.linear1(x) @@ -47,7 +47,7 @@ def split_ddp_grad(grad, world_size): grad = grad.clone().detach().flatten() padding_size = (world_size - grad.numel() % world_size) % world_size if padding_size > 0: - grad = torch.nn.function.pad(grad, [0, padding_size]) + grad = torch.nn.functional.pad(grad, [0, padding_size]) splited_grad = grad.split(grad.numel() // world_size) return splited_grad @@ -83,7 +83,7 @@ def exam_zero_1_2(): initial_scale=128) # create data seed_all(2001 + local_rank) - input_data = torch.randn(32, 128).cuda() + input_data = torch.randn(32, 123).cuda() zero1_output = zero1_model(input_data) zero2_output = zero2_model(input_data) @@ -142,7 +142,7 @@ def exam_zero_1_torch_ddp(world_size, dtype: torch.dtype): seed_all(1453 + local_rank) # create - input_data = torch.rand(32, 128).cuda() + input_data = torch.rand(32, 123).cuda() # zero-dp forward zero_output = zero_model(input_data.to(dtype)) @@ -160,6 +160,8 @@ def exam_zero_1_torch_ddp(world_size, dtype: torch.dtype): # check grad for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): if p.grad is not None: + # print(p.grad, local_rank) + # exit() zero_grad_list = zero_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(z1p)) torch_grad_list = split_ddp_grad(p.grad, world_size) for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list): From 4ace8cb1cbe4e9b097e7dfc9419e27dd4cb6a14c Mon Sep 17 00:00:00 2001 From: lclgy Date: Thu, 29 Jun 2023 16:17:30 +0800 Subject: [PATCH 15/18] fix unit test of low level zero --- tests/test_zero/test_low_level/test_zero1_2.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_zero/test_low_level/test_zero1_2.py b/tests/test_zero/test_low_level/test_zero1_2.py index 4879c67a9fc4..5a0609bff192 100644 --- a/tests/test_zero/test_low_level/test_zero1_2.py +++ b/tests/test_zero/test_low_level/test_zero1_2.py @@ -2,6 +2,7 @@ import pytest import torch +import torch.distributed as dist import torch.nn as nn from torch.nn.parallel import DistributedDataParallel as DDP from torch.testing import assert_close @@ -125,7 +126,7 @@ def exam_zero_1_torch_ddp(world_size, dtype: torch.dtype): torch_model = MlpModel().cuda() zero_model = copy.deepcopy(torch_model).to(dtype) - torch_model = DDP(torch_model.cuda(), bucket_cap_mb=0).cuda() + torch_model = DDP(torch_model.cuda(), static_graph=True).cuda() # create optimizer zero_optimizer = torch.optim.SGD(zero_model.parameters(), lr=1) @@ -160,8 +161,6 @@ def exam_zero_1_torch_ddp(world_size, dtype: torch.dtype): # check grad for (n, p), z1p in zip(torch_model.named_parameters(), zero_model.parameters()): if p.grad is not None: - # print(p.grad, local_rank) - # exit() zero_grad_list = zero_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(z1p)) torch_grad_list = split_ddp_grad(p.grad, world_size) for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list): From b528f0fff1c4bff28e79a206b8fdce835848087d Mon Sep 17 00:00:00 2001 From: lclgy Date: Thu, 29 Jun 2023 20:18:43 +0800 Subject: [PATCH 16/18] fix pad recording in bucket store --- colossalai/zero/low_level/bookkeeping/bucket_store.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index dd5c5de8a976..7b536f95a4d2 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -58,7 +58,7 @@ def build_grad_in_bucket(self): for param, padding_size in zip(self._param_list, self._padding_size): with torch.no_grad(): - grad = param.grad.view(-1) + grad = param.grad.detach().flatten() if padding_size > 0: grad = torch.nn.functional.pad(grad, [0, padding_size]) grad_list = grad.split(grad.numel() // self._world_size) @@ -107,6 +107,7 @@ def reset(self): self.grad_to_param_mapping = dict() self._num_elements_in_bucket = 0 self._param_list = [] + self._padding_size = [] self._grad_in_bucket = dict() for rank in range(self._world_size): self._grad_in_bucket[rank] = [] From c17ed06d270ed0e507bc6bb2ade9e064da0d177c Mon Sep 17 00:00:00 2001 From: lclgy Date: Fri, 30 Jun 2023 11:52:15 +0800 Subject: [PATCH 17/18] support some models --- .../test_plugin/test_low_level_zero_plugin.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/tests/test_booster/test_plugin/test_low_level_zero_plugin.py b/tests/test_booster/test_plugin/test_low_level_zero_plugin.py index f70f27be2aa7..fbcf918bec0a 100644 --- a/tests/test_booster/test_plugin/test_low_level_zero_plugin.py +++ b/tests/test_booster/test_plugin/test_low_level_zero_plugin.py @@ -11,14 +11,9 @@ from tests.kit.model_zoo import model_zoo # These models are not compatible with AMP -_AMP_ERR_MODELS = ['timm_convit', 'dlrm', 'deepfm_interactionarch', 'deepfm_simpledeepfmnn'] +_AMP_ERR_MODELS = ['timm_convit', 'deepfm_interactionarch'] # These models have no parameters -_LOW_LEVEL_ZERO_ERR_MODELS = ['dlrm_interactionarch', 'deepfm_overarch', 'deepfm_sparsearch', 'dlrm_sparsearch'] -# These models will get stuck -_STUCK_MODELS = [ - 'diffusers_vq_model', 'transformers_albert', 'transformers_albert_for_pretraining', 'transformers_bert', - 'transformers_bert_for_pretraining', 'transformers_gpt_double_heads' -] +_LOW_LEVEL_ZERO_ERR_MODELS = ['dlrm_interactionarch'] def run_fn(stage, model_fn, data_gen_fn, output_transform_fn) -> Optional[str]: @@ -58,7 +53,7 @@ def check_low_level_zero_plugin(stage: int, early_stop: bool = True): """ passed_models = [] failed_info = {} # (model_name, error) pair - ignore_models = _AMP_ERR_MODELS + _LOW_LEVEL_ZERO_ERR_MODELS + _STUCK_MODELS + ignore_models = _AMP_ERR_MODELS + _LOW_LEVEL_ZERO_ERR_MODELS skipped_models = [] for name, (model_fn, data_gen_fn, output_transform_fn, _) in model_zoo.items(): From 5ece0734d696c27fb499a1c100ecc44ddc5714c1 Mon Sep 17 00:00:00 2001 From: lclgy Date: Fri, 30 Jun 2023 14:45:18 +0800 Subject: [PATCH 18/18] polish --- colossalai/zero/low_level/bookkeeping/bucket_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 7b536f95a4d2..98f1b78d0049 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -86,7 +86,7 @@ def get_flatten_grad(self) -> Tensor: """ flat_grad = [] - for _, grad_list in self._grad_in_bucket.items(): + for grad_list in self._grad_in_bucket.values(): flat_grad.append(_flatten_dense_tensors(grad_list)) flat_grad = _flatten_dense_tensors(flat_grad) return flat_grad