From c66650ae60fce33fe529ebbe5a6b4b7fc49163e3 Mon Sep 17 00:00:00 2001 From: jiaruifang Date: Thu, 12 Jan 2023 14:16:23 +0800 Subject: [PATCH 01/10] [example] low level optim no GPC --- colossalai/zero/sharded_optim/_utils.py | 15 +++--- .../sharded_optim/bookkeeping/base_store.py | 9 ++-- .../sharded_optim/bookkeeping/bucket_store.py | 7 ++- .../bookkeeping/parameter_store.py | 6 ++- .../zero/sharded_optim/low_level_optim.py | 51 +++++++++++-------- .../language/gpt/gemini/train_gpt_demo.py | 14 +++-- 6 files changed, 57 insertions(+), 45 deletions(-) diff --git a/colossalai/zero/sharded_optim/_utils.py b/colossalai/zero/sharded_optim/_utils.py index 9a839a5705c3..cac361f8cb07 100644 --- a/colossalai/zero/sharded_optim/_utils.py +++ b/colossalai/zero/sharded_optim/_utils.py @@ -5,8 +5,7 @@ from torch._six import inf from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc +from colossalai.tensor import ProcessGroup from colossalai.utils import is_model_parallel_parameter @@ -101,7 +100,7 @@ def split_half_float_double(tensor_list): return buckets -def reduce_tensor(tensor, dtype=None, dst_rank=None, parallel_mode=ParallelMode.DATA): +def reduce_tensor(tensor, dtype=None, dst_rank=None, pg: ProcessGroup = ProcessGroup()): """ Reduce the tensor in the data parallel process group @@ -114,7 +113,7 @@ def reduce_tensor(tensor, dtype=None, dst_rank=None, parallel_mode=ParallelMode. :type tensor: torch.Tensor :type dtype: torch.dtype, optional :type dst_rank: int, optional - :type parallel_mode: ParallelMode, optional + :type pg: ProcessGroup, optional """ # use the original dtype if dtype is None: @@ -126,8 +125,8 @@ def reduce_tensor(tensor, dtype=None, dst_rank=None, parallel_mode=ParallelMode. else: tensor_to_reduce = tensor - world_size = gpc.get_world_size(parallel_mode) - group = gpc.get_group(parallel_mode) + group = pg.dp_process_group() + world_size = pg.dp_world_size() tensor_to_reduce.div_(world_size) # if rank is None, all reduce will be used @@ -137,13 +136,13 @@ def reduce_tensor(tensor, dtype=None, dst_rank=None, parallel_mode=ParallelMode. if use_all_reduce: dist.all_reduce(tensor_to_reduce, group=group) else: - ranks_in_group = gpc.get_ranks_in_group(parallel_mode) + ranks_in_group = pg.dp_rank_list() global_rank = ranks_in_group[dst_rank] dist.reduce(tensor=tensor_to_reduce, dst=global_rank, group=group) # recover the original dtype if tensor.dtype != dtype and tensor is not tensor_to_reduce: - local_rank = gpc.get_local_rank(parallel_mode) + local_rank = pg.dp_local_rank() if use_all_reduce or dst_rank == local_rank: tensor.copy_(tensor_to_reduce) diff --git a/colossalai/zero/sharded_optim/bookkeeping/base_store.py b/colossalai/zero/sharded_optim/bookkeeping/base_store.py index d4436acaa4bf..87b6793a8a9c 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/base_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/base_store.py @@ -1,12 +1,11 @@ -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc +from colossalai.tensor import ProcessGroup class BaseStore: - def __init__(self, dp_parallel_mode=ParallelMode.DATA): - self._world_size = gpc.get_world_size(dp_parallel_mode) - self._local_rank = gpc.get_local_rank(dp_parallel_mode) + def __init__(self, pg: ProcessGroup): + self._world_size = pg.dp_world_size() + self._local_rank = pg.get_ranks_in_dp() @property def world_size(self): diff --git a/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py b/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py index 0f2b1bb88b58..39cd4423690e 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py @@ -1,13 +1,12 @@ -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc +from colossalai.tensor import ProcessGroup from .base_store import BaseStore class BucketStore(BaseStore): - def __init__(self, dp_parallel_mode): - super().__init__(dp_parallel_mode) + def __init__(self, pg: ProcessGroup): + super().__init__(pg) self._grads = dict() self._params = dict() self._num_elements_in_bucket = dict() diff --git a/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py b/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py index 09ebaaf9938c..a2869b41e4c6 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py @@ -2,13 +2,15 @@ from torch import Tensor +from colossalai.tensor import ProcessGroup + from .base_store import BaseStore class ParameterStore(BaseStore): - def __init__(self, dp_paralle_mode): - super().__init__(dp_paralle_mode) + def __init__(self, pg: ProcessGroup): + super().__init__(pg) # param partitioning data structures self._fp16_param_to_rank = dict() self._rank_groupid_to_fp16_param_list = dict() diff --git a/colossalai/zero/sharded_optim/low_level_optim.py b/colossalai/zero/sharded_optim/low_level_optim.py index c437ac54939c..1956031d6c13 100644 --- a/colossalai/zero/sharded_optim/low_level_optim.py +++ b/colossalai/zero/sharded_optim/low_level_optim.py @@ -10,6 +10,7 @@ from colossalai.core import global_context as gpc from colossalai.logging import get_dist_logger from colossalai.nn.optimizer import ColossalaiOptimizer +from colossalai.tensor import ProcessGroup from colossalai.utils.cuda import get_current_device from ._utils import ( @@ -33,7 +34,7 @@ class LowLevelZeroOptimizer(ColossalaiOptimizer): def __init__( self, optimizer: Optimizer, - + pg: ProcessGroup = ProcessGroup(), # grad scaler config initial_scale=2**16, min_scale=1, @@ -54,11 +55,11 @@ def __init__( # stage 2 partition_grad=False, - dp_parallel_mode=ParallelMode.DATA, - mp_parallel_mode=ParallelMode.MODEL, + # dp_parallel_mode=ParallelMode.DATA, + # mp_parallel_mode=ParallelMode.MODEL, # cpu offload - cpu_offload=False, + cpu_offload=False, # forced dtype forced_dtype=None): @@ -75,21 +76,29 @@ def __init__( # stage 2 self._partition_grads = partition_grad - + self._pg = pg # cpu_offload self._cpu_offload = cpu_offload + # old # get process groups - self._dp_parallel_mode = dp_parallel_mode - self._mp_parallel_mode = mp_parallel_mode - self._local_rank = gpc.get_local_rank(dp_parallel_mode) - self._world_size = gpc.get_world_size(dp_parallel_mode) - - self._dp_group = gpc.get_group(dp_parallel_mode) - if gpc.is_initialized(mp_parallel_mode) and gpc.get_world_size(mp_parallel_mode) > 1: - self._mp_group = gpc.get_group(mp_parallel_mode) - else: - self._mp_group = None + # self._dp_parallel_mode = dp_parallel_mode + # self._mp_parallel_mode = mp_parallel_mode + # self._local_rank = gpc.get_local_rank(dp_parallel_mode) + # self._world_size = gpc.get_world_size(dp_parallel_mode) + + print(f'pg rank {pg._rank}') + self._local_rank = pg.dp_local_rank() + self._world_size = pg.dp_world_size() + self._dp_group = pg.dp_process_group() + self._mp_group = pg.tp_process_group() + + # old + # self._dp_group = gpc.get_group(dp_parallel_mode) + # if gpc.is_initialized(mp_parallel_mode) and gpc.get_world_size(mp_parallel_mode) > 1: + # self._mp_group = gpc.get_group(mp_parallel_mode) + # else: + # self._mp_group = None # fp16 and fp32 params for mixed precision training self._fp16_param_groups = dict() @@ -126,9 +135,9 @@ def __init__( # ParameterStore will manage the tensor buffers used for zero # it will not manage the tensors used by mixed precision training - self._param_store = ParameterStore(self._dp_parallel_mode) - self._grad_store = GradientStore(self._dp_parallel_mode) - self._bucket_store = BucketStore(self._dp_parallel_mode) + self._param_store = ParameterStore(pg) + self._grad_store = GradientStore(pg) + self._bucket_store = BucketStore(pg) # iterate over the param group in the optimizer # partition these param groups for data parallel training @@ -223,9 +232,7 @@ def _partition_param_list(self, param_list): numel_per_rank[rank_to_go] += param.numel() if self._verbose: - self._logger.info(f'Number of elements on ranks: {numel_per_rank}', - ranks=[0], - parallel_mode=self._dp_parallel_mode) + self._logger.info(f'Number of elements on ranks: {numel_per_rank}', ranks=[0]) return params_per_rank def _sanity_checks(self): @@ -374,7 +381,7 @@ def _reduce_and_copy(self, bucket: TensorBucket, reduce_rank): reduced_flat = reduce_tensor(tensor=flat, dtype=self._communication_dtype, dst_rank=reduce_rank, - parallel_mode=self._dp_parallel_mode) + pg=self._pg) # update the reduced tensor if reduce_rank is None or reduce_rank == self._local_rank: diff --git a/examples/language/gpt/gemini/train_gpt_demo.py b/examples/language/gpt/gemini/train_gpt_demo.py index 29f8c8ef1215..2a139073e563 100644 --- a/examples/language/gpt/gemini/train_gpt_demo.py +++ b/examples/language/gpt/gemini/train_gpt_demo.py @@ -288,12 +288,18 @@ def main(): from torch.distributed.optim import ZeroRedundancyOptimizer optimizer = ZeroRedundancyOptimizer(model.parameters(), optimizer_class=torch.optim.Adam, lr=0.01) elif args.distplan.startswith("zero"): + pg = ProcessGroup() partition_flag = args.distplan == "zero2" + print(f'RANK {torch.distributed.get_rank()} {pg._rank}') optimizer = torch.optim.Adam(model.parameters(), lr=0.01) - optimizer = LowLevelZeroOptimizer(optimizer, - overlap_communication=True, - partition_grad=partition_flag, - verbose=True) + + optimizer = LowLevelZeroOptimizer( + optimizer, + pg=pg, + overlap_communication=True, + partition_grad=partition_flag, + verbose=True, + ) # model is shared after TP numel = get_model_size(model) From fbba9da0f3158f2435f0ef2f045ccbee2c521ce9 Mon Sep 17 00:00:00 2001 From: jiaruifang Date: Thu, 12 Jan 2023 14:17:04 +0800 Subject: [PATCH 02/10] polish code --- colossalai/zero/sharded_optim/low_level_optim.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/colossalai/zero/sharded_optim/low_level_optim.py b/colossalai/zero/sharded_optim/low_level_optim.py index 1956031d6c13..04ef7840b491 100644 --- a/colossalai/zero/sharded_optim/low_level_optim.py +++ b/colossalai/zero/sharded_optim/low_level_optim.py @@ -1,13 +1,10 @@ from functools import partial -from itertools import groupby import torch import torch.distributed as dist from torch.optim import Optimizer from colossalai.amp.naive_amp.grad_scaler import DynamicGradScaler -from colossalai.context import ParallelMode -from colossalai.core import global_context as gpc from colossalai.logging import get_dist_logger from colossalai.nn.optimizer import ColossalaiOptimizer from colossalai.tensor import ProcessGroup From 688f4543c80e8587b7a3a305b1d43cc15cfd900f Mon Sep 17 00:00:00 2001 From: jiaruifang Date: Thu, 12 Jan 2023 14:20:05 +0800 Subject: [PATCH 03/10] polish code --- examples/language/gpt/gemini/train_gpt_demo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/language/gpt/gemini/train_gpt_demo.py b/examples/language/gpt/gemini/train_gpt_demo.py index 707fb0a11ead..6cfbfb88d53a 100644 --- a/examples/language/gpt/gemini/train_gpt_demo.py +++ b/examples/language/gpt/gemini/train_gpt_demo.py @@ -291,7 +291,7 @@ def main(): optimizer = ZeroRedundancyOptimizer(model.parameters(), optimizer_class=torch.optim.Adam, lr=0.01) elif args.distplan.startswith("zero"): pg = ProcessGroup() - partition_flag = args.distplan == "zero2" + partition_flag = (args.distplan == "zero2") print(f'RANK {torch.distributed.get_rank()} {pg._rank}') optimizer = torch.optim.Adam(model.parameters(), lr=0.01) From 01f884b691b44c5ec4509391fbadef8d99ac1e4e Mon Sep 17 00:00:00 2001 From: jiaruifang Date: Thu, 12 Jan 2023 14:23:17 +0800 Subject: [PATCH 04/10] polish code --- colossalai/zero/sharded_optim/low_level_optim.py | 15 --------------- examples/language/gpt/gemini/train_gpt_demo.py | 1 - 2 files changed, 16 deletions(-) diff --git a/colossalai/zero/sharded_optim/low_level_optim.py b/colossalai/zero/sharded_optim/low_level_optim.py index 04ef7840b491..9debf1e28015 100644 --- a/colossalai/zero/sharded_optim/low_level_optim.py +++ b/colossalai/zero/sharded_optim/low_level_optim.py @@ -77,26 +77,11 @@ def __init__( # cpu_offload self._cpu_offload = cpu_offload - # old - # get process groups - # self._dp_parallel_mode = dp_parallel_mode - # self._mp_parallel_mode = mp_parallel_mode - # self._local_rank = gpc.get_local_rank(dp_parallel_mode) - # self._world_size = gpc.get_world_size(dp_parallel_mode) - - print(f'pg rank {pg._rank}') self._local_rank = pg.dp_local_rank() self._world_size = pg.dp_world_size() self._dp_group = pg.dp_process_group() self._mp_group = pg.tp_process_group() - # old - # self._dp_group = gpc.get_group(dp_parallel_mode) - # if gpc.is_initialized(mp_parallel_mode) and gpc.get_world_size(mp_parallel_mode) > 1: - # self._mp_group = gpc.get_group(mp_parallel_mode) - # else: - # self._mp_group = None - # fp16 and fp32 params for mixed precision training self._fp16_param_groups = dict() self._fp32_flat_param_groups_of_current_rank = dict() diff --git a/examples/language/gpt/gemini/train_gpt_demo.py b/examples/language/gpt/gemini/train_gpt_demo.py index 6cfbfb88d53a..28e7169fca9a 100644 --- a/examples/language/gpt/gemini/train_gpt_demo.py +++ b/examples/language/gpt/gemini/train_gpt_demo.py @@ -292,7 +292,6 @@ def main(): elif args.distplan.startswith("zero"): pg = ProcessGroup() partition_flag = (args.distplan == "zero2") - print(f'RANK {torch.distributed.get_rank()} {pg._rank}') optimizer = torch.optim.Adam(model.parameters(), lr=0.01) optimizer = LowLevelZeroOptimizer( From c68a39080b95946c8a5666f1b5c69d0635e08179 Mon Sep 17 00:00:00 2001 From: jiaruifang Date: Thu, 12 Jan 2023 14:28:27 +0800 Subject: [PATCH 05/10] polish code --- colossalai/zero/sharded_optim/bookkeeping/base_store.py | 2 +- examples/language/gpt/gemini/train_gpt_demo.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/colossalai/zero/sharded_optim/bookkeeping/base_store.py b/colossalai/zero/sharded_optim/bookkeeping/base_store.py index 87b6793a8a9c..18e580a2241a 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/base_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/base_store.py @@ -5,7 +5,7 @@ class BaseStore: def __init__(self, pg: ProcessGroup): self._world_size = pg.dp_world_size() - self._local_rank = pg.get_ranks_in_dp() + self._local_rank = pg.dp_local_rank() @property def world_size(self): diff --git a/examples/language/gpt/gemini/train_gpt_demo.py b/examples/language/gpt/gemini/train_gpt_demo.py index 28e7169fca9a..6b5b8e2e262f 100644 --- a/examples/language/gpt/gemini/train_gpt_demo.py +++ b/examples/language/gpt/gemini/train_gpt_demo.py @@ -291,6 +291,7 @@ def main(): optimizer = ZeroRedundancyOptimizer(model.parameters(), optimizer_class=torch.optim.Adam, lr=0.01) elif args.distplan.startswith("zero"): pg = ProcessGroup() + model = model.half() partition_flag = (args.distplan == "zero2") optimizer = torch.optim.Adam(model.parameters(), lr=0.01) From 34ce4bfcfc61e364d9fe30c926aeb57b1465b79d Mon Sep 17 00:00:00 2001 From: jiaruifang Date: Thu, 12 Jan 2023 14:30:36 +0800 Subject: [PATCH 06/10] rename reduce_tensor -> reduce_tensor_dp_group --- colossalai/zero/sharded_optim/_utils.py | 2 +- colossalai/zero/sharded_optim/low_level_optim.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/colossalai/zero/sharded_optim/_utils.py b/colossalai/zero/sharded_optim/_utils.py index cac361f8cb07..4a5eeca35ffd 100644 --- a/colossalai/zero/sharded_optim/_utils.py +++ b/colossalai/zero/sharded_optim/_utils.py @@ -100,7 +100,7 @@ def split_half_float_double(tensor_list): return buckets -def reduce_tensor(tensor, dtype=None, dst_rank=None, pg: ProcessGroup = ProcessGroup()): +def reduce_tensor_dp_group(tensor, dtype=None, dst_rank=None, pg: ProcessGroup = ProcessGroup()): """ Reduce the tensor in the data parallel process group diff --git a/colossalai/zero/sharded_optim/low_level_optim.py b/colossalai/zero/sharded_optim/low_level_optim.py index 9debf1e28015..d81b0461c4cd 100644 --- a/colossalai/zero/sharded_optim/low_level_optim.py +++ b/colossalai/zero/sharded_optim/low_level_optim.py @@ -360,10 +360,10 @@ def _reduce_and_copy(self, bucket: TensorBucket, reduce_rank): with torch.cuda.stream(stream): flat = bucket.flatten() - reduced_flat = reduce_tensor(tensor=flat, - dtype=self._communication_dtype, - dst_rank=reduce_rank, - pg=self._pg) + reduced_flat = reduce_tensor_dp_group(tensor=flat, + dtype=self._communication_dtype, + dst_rank=reduce_rank, + pg=self._pg) # update the reduced tensor if reduce_rank is None or reduce_rank == self._local_rank: From 6e91080a25bb048157bc2cf5cc15ca31fa3089e3 Mon Sep 17 00:00:00 2001 From: jiaruifang Date: Thu, 12 Jan 2023 14:39:05 +0800 Subject: [PATCH 07/10] polish code --- colossalai/zero/sharded_optim/low_level_optim.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/colossalai/zero/sharded_optim/low_level_optim.py b/colossalai/zero/sharded_optim/low_level_optim.py index d81b0461c4cd..a727a938e812 100644 --- a/colossalai/zero/sharded_optim/low_level_optim.py +++ b/colossalai/zero/sharded_optim/low_level_optim.py @@ -16,7 +16,7 @@ flatten, get_grad_accumulate_object, has_inf_or_nan, - reduce_tensor, + reduce_tensor_dp_group, release_param_grad, split_half_float_double, sync_param, From 6f571b0cf16de5d8214e29bc61830eb9f169fe75 Mon Sep 17 00:00:00 2001 From: jiaruifang Date: Thu, 12 Jan 2023 15:00:06 +0800 Subject: [PATCH 08/10] polish code --- tests/test_zero/low_level_zero/test_grad_acc.py | 7 ++++++- tests/test_zero/low_level_zero/test_zero1_2.py | 6 ++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/test_zero/low_level_zero/test_grad_acc.py b/tests/test_zero/low_level_zero/test_grad_acc.py index c23b3a3e8fd8..713978068644 100644 --- a/tests/test_zero/low_level_zero/test_grad_acc.py +++ b/tests/test_zero/low_level_zero/test_grad_acc.py @@ -9,6 +9,7 @@ from torch.testing import assert_close import colossalai +from colossalai.tensor import ProcessGroup from colossalai.testing.random import seed_all from colossalai.utils import free_port from colossalai.zero import LowLevelZeroOptimizer @@ -34,16 +35,18 @@ def exam_zero_1_2_grad_acc(): # create model zero1_model = TestModel().cuda() zero2_model = copy.deepcopy(zero1_model) - + pg = ProcessGroup() # create optimizer zero1_optimizer = torch.optim.Adam(zero1_model.parameters(), lr=1) zero2_optimizer = torch.optim.Adam(zero2_model.parameters(), lr=1) zero1_optimizer = LowLevelZeroOptimizer(zero1_optimizer, + pg=pg, overlap_communication=True, initial_scale=32, clip_grad_norm=1.0, verbose=True) zero2_optimizer = LowLevelZeroOptimizer(zero2_optimizer, + pg=pg, overlap_communication=True, partition_grad=True, initial_scale=32, @@ -101,7 +104,9 @@ def exam_zero_1_grad_acc(): # we only test stage 1 here # in `check_sharded_param_consistency.py`, we will test whether # level 1 and 2 will produce exactly the same results + pg = ProcessGroup() zero_optimizer = LowLevelZeroOptimizer(zero_optimizer, + pg=pg, overlap_communication=False, initial_scale=grad_scale, reduce_bucket_size=262144, diff --git a/tests/test_zero/low_level_zero/test_zero1_2.py b/tests/test_zero/low_level_zero/test_zero1_2.py index b02d3a6a4486..6924827fe4b4 100644 --- a/tests/test_zero/low_level_zero/test_zero1_2.py +++ b/tests/test_zero/low_level_zero/test_zero1_2.py @@ -9,6 +9,7 @@ from torch.testing import assert_close import colossalai +from colossalai.tensor import ProcessGroup from colossalai.testing.random import seed_all from colossalai.utils import free_port from colossalai.zero import LowLevelZeroOptimizer @@ -58,14 +59,17 @@ def exam_zero_1_2(): zero1_model = TestModel().cuda() zero2_model = copy.deepcopy(zero1_model) + pg = ProcessGroup() # create optimizer zero1_optimizer = torch.optim.Adam(zero1_model.parameters(), lr=1) zero2_optimizer = torch.optim.Adam(zero2_model.parameters(), lr=1) zero1_optimizer = LowLevelZeroOptimizer(zero1_optimizer, + pg=pg, overlap_communication=True, initial_scale=128, verbose=True) zero2_optimizer = LowLevelZeroOptimizer(zero2_optimizer, + pg=pg, overlap_communication=True, partition_grad=True, initial_scale=128) @@ -127,7 +131,9 @@ def exam_zero_1_torch_ddp(): # we only test stage 1 here # in `check_sharded_param_consistency.py`, we will test whether # level 1 and 2 will produce exactly the same results + pg = ProcessGroup() zero_optimizer = LowLevelZeroOptimizer(zero_optimizer, + pg=pg, overlap_communication=True, initial_scale=1, reduce_bucket_size=262144) From 698a08e9d7f6da77bfa709092354995877076224 Mon Sep 17 00:00:00 2001 From: jiaruifang Date: Thu, 12 Jan 2023 15:57:44 +0800 Subject: [PATCH 09/10] support both gpc and ProcessGroup --- colossalai/zero/sharded_optim/_utils.py | 24 ++++++++++--- .../sharded_optim/bookkeeping/base_store.py | 14 ++++++-- .../sharded_optim/bookkeeping/bucket_store.py | 4 ++- .../bookkeeping/parameter_store.py | 4 +-- .../zero/sharded_optim/low_level_optim.py | 36 +++++++++++++++---- .../language/gpt/gemini/train_gpt_demo.py | 1 + .../test_zero/low_level_zero/test_grad_acc.py | 5 +-- 7 files changed, 68 insertions(+), 20 deletions(-) diff --git a/colossalai/zero/sharded_optim/_utils.py b/colossalai/zero/sharded_optim/_utils.py index 4a5eeca35ffd..7369f8a2edde 100644 --- a/colossalai/zero/sharded_optim/_utils.py +++ b/colossalai/zero/sharded_optim/_utils.py @@ -1,10 +1,13 @@ import math +from typing import Optional import torch import torch.distributed as dist from torch._six import inf from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors +from colossalai.context import ParallelMode +from colossalai.core import global_context as gpc from colossalai.tensor import ProcessGroup from colossalai.utils import is_model_parallel_parameter @@ -100,7 +103,7 @@ def split_half_float_double(tensor_list): return buckets -def reduce_tensor_dp_group(tensor, dtype=None, dst_rank=None, pg: ProcessGroup = ProcessGroup()): +def reduce_tensor_dp_group(tensor, dtype=None, dst_rank=None, pg: Optional[ProcessGroup] = None): """ Reduce the tensor in the data parallel process group @@ -125,8 +128,13 @@ def reduce_tensor_dp_group(tensor, dtype=None, dst_rank=None, pg: ProcessGroup = else: tensor_to_reduce = tensor - group = pg.dp_process_group() - world_size = pg.dp_world_size() + if isinstance(pg, ProcessGroup): + group = pg.dp_process_group() + world_size = pg.dp_world_size() + else: + world_size = gpc.get_world_size(ParallelMode.DATA) + group = gpc.get_group(ParallelMode.DATA) + tensor_to_reduce.div_(world_size) # if rank is None, all reduce will be used @@ -136,13 +144,19 @@ def reduce_tensor_dp_group(tensor, dtype=None, dst_rank=None, pg: ProcessGroup = if use_all_reduce: dist.all_reduce(tensor_to_reduce, group=group) else: - ranks_in_group = pg.dp_rank_list() + if pg is not None: + ranks_in_group = pg.dp_rank_list() + else: + ranks_in_group = gpc.get_ranks_in_group(ParallelMode.DATA) global_rank = ranks_in_group[dst_rank] dist.reduce(tensor=tensor_to_reduce, dst=global_rank, group=group) # recover the original dtype if tensor.dtype != dtype and tensor is not tensor_to_reduce: - local_rank = pg.dp_local_rank() + if pg is not None: + local_rank = pg.dp_local_rank() + else: + local_rank = gpc.get_local_rank(ParallelMode.DATA) if use_all_reduce or dst_rank == local_rank: tensor.copy_(tensor_to_reduce) diff --git a/colossalai/zero/sharded_optim/bookkeeping/base_store.py b/colossalai/zero/sharded_optim/bookkeeping/base_store.py index 18e580a2241a..759b46a5a4ca 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/base_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/base_store.py @@ -1,11 +1,19 @@ +from typing import Optional + +from colossalai.context import ParallelMode +from colossalai.core import global_context as gpc from colossalai.tensor import ProcessGroup class BaseStore: - def __init__(self, pg: ProcessGroup): - self._world_size = pg.dp_world_size() - self._local_rank = pg.dp_local_rank() + def __init__(self, pg: Optional[ProcessGroup] = None): + if pg is None: + self._world_size = gpc.get_world_size(ParallelMode.DATA) + self._local_rank = gpc.get_local_rank(ParallelMode.DATA) + else: + self._world_size = pg.dp_world_size() + self._local_rank = pg.dp_local_rank() @property def world_size(self): diff --git a/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py b/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py index 39cd4423690e..aba61624e46e 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/bucket_store.py @@ -1,3 +1,5 @@ +from typing import Optional + from colossalai.tensor import ProcessGroup from .base_store import BaseStore @@ -5,7 +7,7 @@ class BucketStore(BaseStore): - def __init__(self, pg: ProcessGroup): + def __init__(self, pg: Optional[ProcessGroup] = None): super().__init__(pg) self._grads = dict() self._params = dict() diff --git a/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py b/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py index a2869b41e4c6..c22186abee0f 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/parameter_store.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Optional from torch import Tensor @@ -9,7 +9,7 @@ class ParameterStore(BaseStore): - def __init__(self, pg: ProcessGroup): + def __init__(self, pg: Optional[ProcessGroup] = None): super().__init__(pg) # param partitioning data structures self._fp16_param_to_rank = dict() diff --git a/colossalai/zero/sharded_optim/low_level_optim.py b/colossalai/zero/sharded_optim/low_level_optim.py index a727a938e812..1393a0df4791 100644 --- a/colossalai/zero/sharded_optim/low_level_optim.py +++ b/colossalai/zero/sharded_optim/low_level_optim.py @@ -1,10 +1,13 @@ from functools import partial +from typing import Optional import torch import torch.distributed as dist from torch.optim import Optimizer from colossalai.amp.naive_amp.grad_scaler import DynamicGradScaler +from colossalai.context import ParallelMode +from colossalai.core import global_context as gpc from colossalai.logging import get_dist_logger from colossalai.nn.optimizer import ColossalaiOptimizer from colossalai.tensor import ProcessGroup @@ -31,7 +34,7 @@ class LowLevelZeroOptimizer(ColossalaiOptimizer): def __init__( self, optimizer: Optimizer, - pg: ProcessGroup = ProcessGroup(), + pg: Optional[ProcessGroup] = None, # grad scaler config initial_scale=2**16, min_scale=1, @@ -73,14 +76,33 @@ def __init__( # stage 2 self._partition_grads = partition_grad - self._pg = pg - # cpu_offload + self._cpu_offload = cpu_offload - self._local_rank = pg.dp_local_rank() - self._world_size = pg.dp_world_size() - self._dp_group = pg.dp_process_group() - self._mp_group = pg.tp_process_group() + if isinstance(pg, ProcessGroup): + self._pg = pg + self._local_rank = pg.dp_local_rank() + self._world_size = pg.dp_world_size() + self._dp_group = pg.dp_process_group() + if pg.tp_world_size == 1: + self._mp_group = None + else: + self._mp_group = pg.tp_process_group() + elif pg is None: + self._pg = None + dp_parallel_mode = ParallelMode.DATA + mp_parallel_mode = ParallelMode.MODEL + + self._dp_parallel_mode = dp_parallel_mode + self._mp_parallel_mode = mp_parallel_mode + self._local_rank = gpc.get_local_rank(dp_parallel_mode) + self._world_size = gpc.get_world_size(dp_parallel_mode) + + self._dp_group = gpc.get_group(dp_parallel_mode) + if gpc.is_initialized(mp_parallel_mode) and gpc.get_world_size(mp_parallel_mode) > 1: + self._mp_group = gpc.get_group(mp_parallel_mode) + else: + self._mp_group = None # fp16 and fp32 params for mixed precision training self._fp16_param_groups = dict() diff --git a/examples/language/gpt/gemini/train_gpt_demo.py b/examples/language/gpt/gemini/train_gpt_demo.py index 6b5b8e2e262f..7bec980f95bd 100644 --- a/examples/language/gpt/gemini/train_gpt_demo.py +++ b/examples/language/gpt/gemini/train_gpt_demo.py @@ -298,6 +298,7 @@ def main(): optimizer = LowLevelZeroOptimizer( optimizer, pg=pg, + reduce_bucket_size=12 * 1024 * 1024, overlap_communication=True, partition_grad=partition_flag, verbose=True, diff --git a/tests/test_zero/low_level_zero/test_grad_acc.py b/tests/test_zero/low_level_zero/test_grad_acc.py index 713978068644..003f7a24b9ae 100644 --- a/tests/test_zero/low_level_zero/test_grad_acc.py +++ b/tests/test_zero/low_level_zero/test_grad_acc.py @@ -95,6 +95,7 @@ def exam_zero_1_grad_acc(): zero_model = TestModel() torch_model = copy.deepcopy(zero_model) + seed_all(2008) zero_model = zero_model.cuda() torch_model = DDP(torch_model.cuda(), bucket_cap_mb=0) @@ -104,7 +105,7 @@ def exam_zero_1_grad_acc(): # we only test stage 1 here # in `check_sharded_param_consistency.py`, we will test whether # level 1 and 2 will produce exactly the same results - pg = ProcessGroup() + pg = None #ProcessGroup() zero_optimizer = LowLevelZeroOptimizer(zero_optimizer, pg=pg, overlap_communication=False, @@ -163,7 +164,7 @@ def run_dist(rank, world_size, port): @pytest.mark.dist def test_grad_accumulation(): - world_size = 2 + world_size = 4 run_func = partial(run_dist, world_size=world_size, port=free_port()) mp.spawn(run_func, nprocs=world_size) From ab726091f5176b9965709f98246a034b9f2acf1b Mon Sep 17 00:00:00 2001 From: jiaruifang Date: Thu, 12 Jan 2023 18:06:43 +0800 Subject: [PATCH 10/10] pass unittests --- .../sharded_optim/bookkeeping/base_store.py | 8 +++--- .../zero/sharded_optim/low_level_optim.py | 28 ++++++++++--------- .../test_zero/low_level_zero/test_grad_acc.py | 9 +++--- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/colossalai/zero/sharded_optim/bookkeeping/base_store.py b/colossalai/zero/sharded_optim/bookkeeping/base_store.py index 759b46a5a4ca..3623ed1f048c 100644 --- a/colossalai/zero/sharded_optim/bookkeeping/base_store.py +++ b/colossalai/zero/sharded_optim/bookkeeping/base_store.py @@ -8,12 +8,12 @@ class BaseStore: def __init__(self, pg: Optional[ProcessGroup] = None): - if pg is None: - self._world_size = gpc.get_world_size(ParallelMode.DATA) - self._local_rank = gpc.get_local_rank(ParallelMode.DATA) - else: + if isinstance(pg, ProcessGroup): self._world_size = pg.dp_world_size() self._local_rank = pg.dp_local_rank() + else: + self._world_size = gpc.get_world_size(ParallelMode.DATA) + self._local_rank = gpc.get_local_rank(ParallelMode.DATA) @property def world_size(self): diff --git a/colossalai/zero/sharded_optim/low_level_optim.py b/colossalai/zero/sharded_optim/low_level_optim.py index 1393a0df4791..e372eaa50be4 100644 --- a/colossalai/zero/sharded_optim/low_level_optim.py +++ b/colossalai/zero/sharded_optim/low_level_optim.py @@ -55,11 +55,8 @@ def __init__( # stage 2 partition_grad=False, - # dp_parallel_mode=ParallelMode.DATA, - # mp_parallel_mode=ParallelMode.MODEL, - # cpu offload - cpu_offload=False, + cpu_offload=False, # forced dtype forced_dtype=None): @@ -79,17 +76,16 @@ def __init__( self._cpu_offload = cpu_offload + self._pg = pg if isinstance(pg, ProcessGroup): - self._pg = pg self._local_rank = pg.dp_local_rank() self._world_size = pg.dp_world_size() self._dp_group = pg.dp_process_group() - if pg.tp_world_size == 1: - self._mp_group = None - else: + if pg.tp_world_size() > 1: self._mp_group = pg.tp_process_group() + else: + self._mp_group = None elif pg is None: - self._pg = None dp_parallel_mode = ParallelMode.DATA mp_parallel_mode = ParallelMode.MODEL @@ -103,7 +99,8 @@ def __init__( self._mp_group = gpc.get_group(mp_parallel_mode) else: self._mp_group = None - + else: + raise TypeError(f"pg should be None or a ProcesGroup") # fp16 and fp32 params for mixed precision training self._fp16_param_groups = dict() self._fp32_flat_param_groups_of_current_rank = dict() @@ -139,9 +136,14 @@ def __init__( # ParameterStore will manage the tensor buffers used for zero # it will not manage the tensors used by mixed precision training - self._param_store = ParameterStore(pg) - self._grad_store = GradientStore(pg) - self._bucket_store = BucketStore(pg) + if self._pg is not None: + self._param_store = ParameterStore(self._pg) + self._grad_store = GradientStore(self._pg) + self._bucket_store = BucketStore(self._pg) + else: + self._param_store = ParameterStore(self._dp_parallel_mode) + self._grad_store = GradientStore(self._dp_parallel_mode) + self._bucket_store = BucketStore(self._dp_parallel_mode) # iterate over the param group in the optimizer # partition these param groups for data parallel training diff --git a/tests/test_zero/low_level_zero/test_grad_acc.py b/tests/test_zero/low_level_zero/test_grad_acc.py index 003f7a24b9ae..a0d1ac531485 100644 --- a/tests/test_zero/low_level_zero/test_grad_acc.py +++ b/tests/test_zero/low_level_zero/test_grad_acc.py @@ -86,7 +86,7 @@ def fwd_bwd_func(number, cur_data): assert torch.equal(z1p.data, z2p.data) -def exam_zero_1_grad_acc(): +def exam_zero_1_grad_acc(use_pg=True): local_rank = torch.distributed.get_rank() grad_scale = 32 seed_all(2008) @@ -105,7 +105,7 @@ def exam_zero_1_grad_acc(): # we only test stage 1 here # in `check_sharded_param_consistency.py`, we will test whether # level 1 and 2 will produce exactly the same results - pg = None #ProcessGroup() + pg = ProcessGroup() if use_pg else None #ProcessGroup() zero_optimizer = LowLevelZeroOptimizer(zero_optimizer, pg=pg, overlap_communication=False, @@ -158,13 +158,14 @@ def fwd_bwd_func(number, cur_data, check_flag): def run_dist(rank, world_size, port): colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host='localhost') - exam_zero_1_grad_acc() + exam_zero_1_grad_acc(True) + exam_zero_1_grad_acc(False) # exam_zero_1_2_grad_acc() @pytest.mark.dist def test_grad_accumulation(): - world_size = 4 + world_size = 2 run_func = partial(run_dist, world_size=world_size, port=free_port()) mp.spawn(run_func, nprocs=world_size)