From 36086927e1f6320badaedc5ebc44aa4c5b7c5fb2 Mon Sep 17 00:00:00 2001 From: HELSON Date: Thu, 14 Jul 2022 16:37:20 +0800 Subject: [PATCH 01/14] [hotfix] fix ColoTensor GPT2 unitest (#1309) --- tests/test_tensor/test_gpt2.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/tests/test_tensor/test_gpt2.py b/tests/test_tensor/test_gpt2.py index 3c6a3a43f197..ad1ee5d58a21 100644 --- a/tests/test_tensor/test_gpt2.py +++ b/tests/test_tensor/test_gpt2.py @@ -12,7 +12,7 @@ from colossalai.utils.cuda import get_current_device from colossalai.utils import free_port from colossalai.utils.model.colo_init_context import ColoInitContext -from colossalai.tensor import ShardSpec, ComputePattern, ComputeSpec, DistSpecManager, ProcessGroup +from colossalai.tensor import ShardSpec, ComputePattern, ComputeSpec, DistSpecManager, ProcessGroup, ColoTensor, ColoTensorSpec from colossalai.nn.parallel.data_parallel import ColoDDP from colossalai.core import global_context as gpc from colossalai.context.parallel_mode import ParallelMode @@ -21,18 +21,20 @@ def init_1d_row_spec(model, pg: ProcessGroup): tensor_spec = (ShardSpec([0], [pg.tp_world_size()]), ComputeSpec(ComputePattern.TP1D)) - with DistSpecManager.no_grad(): - for n, p in model.named_parameters(): - if 'weight' in n and 'ln' not in n: - p.set_tensor_spec(*tensor_spec) + + for n, p in model.named_parameters(): + p.set_process_group(pg) + if 'weight' in n and 'ln' not in n: + p.set_tensor_spec(*tensor_spec) def init_1d_col_spec(model, pg: ProcessGroup): spec = (ShardSpec([-1], [pg.tp_world_size()]), ComputeSpec(ComputePattern.TP1D)) - with DistSpecManager.no_grad(): - for n, p in model.named_parameters(): - if 'ln' not in n and ('weight' in n or 'bias' in n): - p.set_tensor_spec(*spec) + + for n, p in model.named_parameters(): + p.set_process_group(pg) + if 'ln' not in n and ('weight' in n or 'bias' in n): + p.set_tensor_spec(*spec) def check_param_equal(model, torch_model, pg: ProcessGroup): @@ -48,6 +50,7 @@ def check_grad_equal(model, torch_model, pg: ProcessGroup): def run_gpt(init_spec_func, use_ddp): + set_seed(13234) world_size = torch.distributed.get_world_size() pg = ProcessGroup(dp_degree=(2 if (use_ddp and world_size >= 2) else 1)) get_components_func = non_distributed_component_funcs.get_callable('gpt2') @@ -67,14 +70,16 @@ def run_gpt(init_spec_func, use_ddp): model = ColoDDP(model, process_group=pg) for torch_p, p in zip(torch_model.parameters(), model.parameters()): torch_p.data.copy_(p) + init_spec_func(model, pg) check_param_equal(model, torch_model, pg) model.train() torch_model.train() - set_seed(pg.tp_local_rank()) + torch.distributed.barrier() for i, (input_ids, attn_mask) in enumerate(train_dataloader): - logits = model(input_ids, attn_mask) + colo_input = ColoTensor.from_torch_tensor(input_ids, ColoTensorSpec(pg)) + logits = model(colo_input, attn_mask) torch_logits = torch_model(input_ids, attn_mask) assert tensor_equal(torch_logits, logits), f"{torch_logits - logits}" loss = criterion(logits, input_ids) @@ -95,14 +100,13 @@ def run_dist(rank, world_size, port, use_ddp): tp_world_size = world_size // 2 if use_ddp else world_size config = dict(parallel=dict(tensor=dict(mode="1d", size=tp_world_size),)) colossalai.launch(config=config, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - # run_gpt(init_1d_row_spec, use_ddp) + run_gpt(init_1d_row_spec, use_ddp) run_gpt(init_1d_col_spec, use_ddp) @pytest.mark.dist -@pytest.mark.skip("under development") @pytest.mark.parametrize('world_size', [1, 4]) -@pytest.mark.parametrize('use_ddp', [False, True]) +@pytest.mark.parametrize('use_ddp', [False]) @rerun_if_address_is_in_use() def test_gpt(world_size, use_ddp): run_func = partial(run_dist, world_size=world_size, port=free_port(), use_ddp=use_ddp) @@ -110,4 +114,4 @@ def test_gpt(world_size, use_ddp): if __name__ == '__main__': - test_gpt(4, True) + test_gpt(4, False) From 9f10524313d92fd06e241de200149282c5f4d3e4 Mon Sep 17 00:00:00 2001 From: Jiarui Fang Date: Thu, 14 Jul 2022 16:37:33 +0800 Subject: [PATCH 02/14] [Optimizer] polish the init method of ColoOptimizer (#1310) --- colossalai/nn/optimizer/colo_optimizer.py | 16 ++++------------ colossalai/tensor/process_group.py | 9 +++++++-- tests/test_tensor/test_model.py | 6 +++--- tests/test_utils/test_colo_checkpoint.py | 2 +- 4 files changed, 15 insertions(+), 18 deletions(-) diff --git a/colossalai/nn/optimizer/colo_optimizer.py b/colossalai/nn/optimizer/colo_optimizer.py index 52c641594fa1..72ac916823ef 100644 --- a/colossalai/nn/optimizer/colo_optimizer.py +++ b/colossalai/nn/optimizer/colo_optimizer.py @@ -24,12 +24,7 @@ def __init__(self, named_params: Mapping[str, Union[Tensor, ColoTensor]], optimi **optimizer_kwargs: the key-word arguments to initialize the optimizer. """ - tensors: List[Tensor] = [] - for value in named_params.values(): - tensors.append(value) - - self.named_params = named_params - self._optim = optimizer_class(tensors, *optimizer_args, **optimizer_kwargs) + self._optim = optimizer_class([p for n, p in named_params], *optimizer_args, **optimizer_kwargs) self.param_groups = self._optim.param_groups self.state = self._optim.state @@ -68,8 +63,7 @@ def state_dict(self) -> Dict[str, Any]: Returned state and param_groups will contain parameter keys instead of parameter indices like torch.optim.Optimizer. """ - # TODO: implement state_dict - raise NotImplementedError("ColoOptimizer state_dict not implemented yet!") + return self._optim.state_dict() def load_state_dict(self, state_dict: Mapping[str, Any]): r"""Loads the ColoOptimizer state. @@ -78,11 +72,9 @@ def load_state_dict(self, state_dict: Mapping[str, Any]): state_dict (dict): ColoOptimizer state. Should be an object returned from a call to :meth:`state_dict`. """ - # TODO: implement load_state_dict - raise NotImplementedError("ColoOptimizer load_state_dict not implemented yet!") + self._optim.load_state_dict(state_dict) def add_param_group(self, param_group: Any): r"""Add a new param group """ - # TODO: implement add_param_group - raise NotImplementedError("ColoOptimizer add_param_group not implemented yet!") + self._optim.add_param_group(param_group) diff --git a/colossalai/tensor/process_group.py b/colossalai/tensor/process_group.py index 1624638c4117..f6330c2b1996 100644 --- a/colossalai/tensor/process_group.py +++ b/colossalai/tensor/process_group.py @@ -48,6 +48,7 @@ def __init__(self, tp_degree: Optional[int] = None, dp_degree: Optional[int] = None) -> None: if not torch.distributed.is_initialized(): + self.is_init = False return assert torch.distributed.is_initialized(), f"ProcessGroup must be used after distributed initialized" @@ -96,6 +97,7 @@ def __init__(self, self._has_cpu_groups = False PYTORCHPGDICT_.get(self._tp_rank_list, 'nccl') PYTORCHPGDICT_.get(self._dp_rank_list, 'nccl') + self.is_init = True def set_cpu_groups(self): if self.has_cpu_groups: @@ -110,8 +112,11 @@ def has_cpu_groups(self): return self._has_cpu_groups def __repr__(self): - return "ProcessGroup:\n\tRank: {}, World size: {}, DP degree: {}, TP degree: {}\n\tRanks in group: {}".\ - format(self._rank, self._world_size, self._dp_degree, self._tp_degree, self._rank_list) + if self.is_init: + return "ProcessGroup:\n\tRank: {}, World size: {}, DP degree: {}, TP degree: {}\n\tRanks in group: {}".\ + format(self._rank, self._world_size, self._dp_degree, self._tp_degree, self._rank_list) + else: + return "ProcessGroup not initialized" def __eq__(self, obj: 'ProcessGroup') -> bool: if not isinstance(obj, ProcessGroup): diff --git a/tests/test_tensor/test_model.py b/tests/test_tensor/test_model.py index 34a376891f85..ee5edae2c578 100644 --- a/tests/test_tensor/test_model.py +++ b/tests/test_tensor/test_model.py @@ -33,7 +33,7 @@ def run_1d_hybrid_tp(model_name): if rank == 0: model_torch = model_builder(checkpoint=True) model_torch = model_torch.cuda() - optimizer_torch = ColoOptimizer(dict(model_torch.named_parameters()), torch.optim.SGD, lr=0.1) + optimizer_torch = ColoOptimizer(model_torch.named_parameters(), torch.optim.SGD, lr=0.1) # Make two models have the same init params for p1, p2 in zip(model.parameters(), model_torch.parameters()): @@ -80,7 +80,7 @@ def run_1d_hybrid_tp(model_name): if rank == 0: model_torch.train() - colo_optimizer = ColoOptimizer(dict(model.named_parameters()), torch.optim.SGD, lr=0.1) + colo_optimizer = ColoOptimizer(model.named_parameters(), torch.optim.SGD, lr=0.1) for i, (data, label) in enumerate(train_dataloader): @@ -170,7 +170,7 @@ def test_colo_optimizer(): with ColoInitContext(lazy_memory_allocate=False, device=get_current_device()): model = model_builder(checkpoint=True) - colo_optimizer = ColoOptimizer(dict(model.named_parameters()), torch.optim.SGD, lr=0.1) + colo_optimizer = ColoOptimizer(model.named_parameters(), torch.optim.SGD, lr=0.1) for i, (data, label) in enumerate(train_dataloader): colo_optimizer.zero_grad() data = data.to(get_current_device()) diff --git a/tests/test_utils/test_colo_checkpoint.py b/tests/test_utils/test_colo_checkpoint.py index 4557cfa2805c..13f54aefe8be 100644 --- a/tests/test_utils/test_colo_checkpoint.py +++ b/tests/test_utils/test_colo_checkpoint.py @@ -117,7 +117,7 @@ def _run_checkpoint(model_name, init_spec_func, use_ddp, use_mp_reload, test_sch model_reload = model_reload.cuda() model_reload.train() - colo_optimizer = ColoOptimizer(dict(model.named_parameters()), torch.optim.SGD, lr=0.1) + colo_optimizer = ColoOptimizer(model.named_parameters(), torch.optim.SGD, lr=0.1) for i, (data, label) in enumerate(train_dataloader): From c9c37dcc4dfeb58eb72cf24b9ea5cfd9b593fd99 Mon Sep 17 00:00:00 2001 From: Frank Lee Date: Thu, 14 Jul 2022 16:45:17 +0800 Subject: [PATCH 03/14] [workflow] updated pytorch compatibility test (#1311) --- .github/workflows/compatibility_test.yml | 44 ++++++++++++++---------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/.github/workflows/compatibility_test.yml b/.github/workflows/compatibility_test.yml index 08aa5f1a945e..a6e8f5ea6c46 100644 --- a/.github/workflows/compatibility_test.yml +++ b/.github/workflows/compatibility_test.yml @@ -3,18 +3,14 @@ name: Compatibility Test on: workflow_dispatch: inputs: - version: - type: choice - description: version for testing - default: 'all' + torch_version: + type: string + description: torch version, separated by comma + required: true + cuda_version: + type: string + description: cuda version, separated by comma required: true - options: - - all - - pytorch-cuda:1.9.0-11.1.1 # python 3.8 - - pytorch-cuda:1.8.1-11.1.1 # python 3.8 - - pytorch-cuda:1.7.1-11.0.3 # python 3.8 - - pytorch-cuda:1.6.0-10.2 # python 3.6 - jobs: matrix_preparation: @@ -24,12 +20,25 @@ jobs: matrix: ${{ steps.set-matrix.outputs.matrix }} steps: - id: set-matrix + env: + TORCH_VERSIONS: ${{ inputs.torch_version }} + CUDA_VERSIONS: ${{ inputs.cuda_version }} run: | - [ "${{github.event.inputs.version}}" != "" ] && matrix="[\"frankleeeee/${{github.event.inputs.version}}\"]" - [ "${{github.event.inputs.version}}" == "" ] || [ "${{github.event.inputs.version}}" == "all" ] && \ - matrix="[\"frankleeeee/pytorch-cuda:1.9.0-11.1.1\", \"frankleeeee/pytorch-cuda:1.8.1-11.1.1\", \"frankleeeee/pytorch-cuda:1.7.1-11.0.3\", \"frankleeeee/pytorch-cuda:1.6.0-10.2\"]" - echo $matrix - echo "::set-output name=matrix::{\"container\":$(echo $matrix)}" + IFS=',' + DOCKER_IMAGE=() + + for tv in $TORCH_VERSIONS + do + for cv in $CUDA_VERSIONS + do + DOCKER_IMAGE+=("\"hpcaitech/pytorch-cuda:${tv}-${cv}\"") + done + done + + container=$( IFS=',' ; echo "${DOCKER_IMAGE[*]}" ) + container="[${container}]" + echo "$container" + echo "::set-output name=matrix::{\"container\":$(echo "$container")}" build: name: Test for PyTorch Compatibility @@ -46,14 +55,13 @@ jobs: steps: - name: Install dependencies run: | - pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple pip install -U pip setuptools wheel --user - uses: actions/checkout@v2 - name: Install Colossal-AI run: | pip install -r requirements/requirements.txt - pip install -r requirements/requirements-test.txt pip install -v --no-cache-dir . + pip install -r requirements/requirements-test.txt - name: Unit Testing run: | PYTHONPATH=$PWD pytest tests From 85f933b58b74a892130e679f554a6ead76b456e9 Mon Sep 17 00:00:00 2001 From: Jiarui Fang Date: Thu, 14 Jul 2022 16:57:48 +0800 Subject: [PATCH 04/14] [Optimizer] Remove useless ColoOptimizer (#1312) --- colossalai/nn/optimizer/__init__.py | 4 +- colossalai/nn/optimizer/colo_optimizer.py | 80 ----------------------- colossalai/tensor/colo_parameter.py | 1 - tests/test_tensor/test_model.py | 9 +-- tests/test_utils/test_colo_checkpoint.py | 4 +- 5 files changed, 8 insertions(+), 90 deletions(-) delete mode 100644 colossalai/nn/optimizer/colo_optimizer.py diff --git a/colossalai/nn/optimizer/__init__.py b/colossalai/nn/optimizer/__init__.py index f9a2bc98f3a1..14cb01c24a66 100644 --- a/colossalai/nn/optimizer/__init__.py +++ b/colossalai/nn/optimizer/__init__.py @@ -7,9 +7,7 @@ from .lars import Lars from .cpu_adam import CPUAdam from .hybrid_adam import HybridAdam -from .colo_optimizer import ColoOptimizer __all__ = [ - 'ColossalaiOptimizer', 'FusedLAMB', 'FusedAdam', 'FusedSGD', 'Lamb', 'Lars', 'CPUAdam', 'HybridAdam', - 'CPU_ADAM_CNT', 'ColoOptimizer' + 'ColossalaiOptimizer', 'FusedLAMB', 'FusedAdam', 'FusedSGD', 'Lamb', 'Lars', 'CPUAdam', 'HybridAdam', 'CPU_ADAM_CNT' ] diff --git a/colossalai/nn/optimizer/colo_optimizer.py b/colossalai/nn/optimizer/colo_optimizer.py deleted file mode 100644 index 72ac916823ef..000000000000 --- a/colossalai/nn/optimizer/colo_optimizer.py +++ /dev/null @@ -1,80 +0,0 @@ -from typing import List, Union, Mapping, Dict, Any - -import torch.optim as optim -from torch import Tensor -from colossalai.tensor.colo_tensor import ColoTensor - - -class ColoOptimizer(optim.Optimizer): - - def __init__(self, named_params: Mapping[str, Union[Tensor, ColoTensor]], optimizer_class, *optimizer_args, - **optimizer_kwargs): - """ - ColoOptimizer collects all tensors in type of ColoTensor and torch.Tensor, - then use these tensors as ``params`` for optimizers - - Args: - named_params (Dict[str, Union[Tensor, ShardedTensor]]) : a Dict - of parameters, where key is the parameter key, value is either - Tensor or ColoTensor. This usually used in - conjunction with model.named_parameters(), the same as PyTorch. - optimizer_class (torch.optim.Optimizer): the Optimizer to use - locally, i.e. torch.optim.SGD, torch.optim.Adagrad, etc. - *optimizer_args: the arguments to initialize the optimizer. - **optimizer_kwargs: the key-word arguments to initialize the optimizer. - - """ - self._optim = optimizer_class([p for n, p in named_params], *optimizer_args, **optimizer_kwargs) - self.param_groups = self._optim.param_groups - self.state = self._optim.state - - def zero_grad(self, set_to_none: bool = False): # type: ignore[override] - r"""Sets the gradients of all optimized :class:`torch.Tensor` s to zero. - - Args: - set_to_none (bool): instead of setting to zero, set the grads to None. - This will in general have lower memory footprint, and can modestly improve performance. - However, it changes certain behaviors. For example: - 1. When the user tries to access a gradient and perform manual ops on it, - a None attribute or a Tensor full of 0s will behave differently. - 2. If the user requests ``zero_grad(set_to_none=True)`` followed by a backward pass, ``.grad``\ s - are guaranteed to be None for params that did not receive a gradient. - 3. ``torch.optim`` optimizers have a different behavior if the gradient is 0 or None - (in one case it does the step with a gradient of 0 and in the other it skips - the step altogether). - """ - self._optim.zero_grad(set_to_none) - - def step(self, closure=None): - r"""Performs a single optimization step (parameter update). - - Args: - closure (callable): A closure that reevaluates the model and - returns the loss. Optional for most optimizers. - - .. note:: - Unless otherwise specified, this function should not modify the - ``.grad`` field of the parameters. - """ - self._optim.step(closure) - - def state_dict(self) -> Dict[str, Any]: - """ - Returned state and param_groups will contain parameter keys - instead of parameter indices like torch.optim.Optimizer. - """ - return self._optim.state_dict() - - def load_state_dict(self, state_dict: Mapping[str, Any]): - r"""Loads the ColoOptimizer state. - - Args: - state_dict (dict): ColoOptimizer state. Should be an object returned - from a call to :meth:`state_dict`. - """ - self._optim.load_state_dict(state_dict) - - def add_param_group(self, param_group: Any): - r"""Add a new param group - """ - self._optim.add_param_group(param_group) diff --git a/colossalai/tensor/colo_parameter.py b/colossalai/tensor/colo_parameter.py index 8963d2194348..17c3265165c7 100644 --- a/colossalai/tensor/colo_parameter.py +++ b/colossalai/tensor/colo_parameter.py @@ -1,7 +1,6 @@ import torch from typing import Optional -from copy import copy from colossalai.tensor.colo_tensor import ColoTensor from colossalai.tensor.const import TensorType diff --git a/tests/test_tensor/test_model.py b/tests/test_tensor/test_model.py index ee5edae2c578..a442f6ad7789 100644 --- a/tests/test_tensor/test_model.py +++ b/tests/test_tensor/test_model.py @@ -12,7 +12,7 @@ from colossalai.utils import free_port from colossalai.utils.model.colo_init_context import ColoInitContext from colossalai.tensor import ColoTensor, ProcessGroup -from colossalai.nn.optimizer import ColoOptimizer +from colossalai.nn.optimizer import ColossalaiOptimizer from tests.components_to_test.registry import non_distributed_component_funcs from _utils import split_param_row_tp1d, split_param_col_tp1d @@ -33,7 +33,8 @@ def run_1d_hybrid_tp(model_name): if rank == 0: model_torch = model_builder(checkpoint=True) model_torch = model_torch.cuda() - optimizer_torch = ColoOptimizer(model_torch.named_parameters(), torch.optim.SGD, lr=0.1) + + optimizer_torch = ColossalaiOptimizer(torch.optim.SGD(model_torch.parameters(), lr=0.1)) # Make two models have the same init params for p1, p2 in zip(model.parameters(), model_torch.parameters()): @@ -80,7 +81,7 @@ def run_1d_hybrid_tp(model_name): if rank == 0: model_torch.train() - colo_optimizer = ColoOptimizer(model.named_parameters(), torch.optim.SGD, lr=0.1) + colo_optimizer = ColossalaiOptimizer(torch.optim.SGD(model.parameters(), lr=0.1)) for i, (data, label) in enumerate(train_dataloader): @@ -170,7 +171,7 @@ def test_colo_optimizer(): with ColoInitContext(lazy_memory_allocate=False, device=get_current_device()): model = model_builder(checkpoint=True) - colo_optimizer = ColoOptimizer(model.named_parameters(), torch.optim.SGD, lr=0.1) + colo_optimizer = ColossalaiOptimizer(torch.optim.SGD(model.parameters(), lr=0.1)) for i, (data, label) in enumerate(train_dataloader): colo_optimizer.zero_grad() data = data.to(get_current_device()) diff --git a/tests/test_utils/test_colo_checkpoint.py b/tests/test_utils/test_colo_checkpoint.py index 13f54aefe8be..edc463b0dbea 100644 --- a/tests/test_utils/test_colo_checkpoint.py +++ b/tests/test_utils/test_colo_checkpoint.py @@ -18,7 +18,7 @@ from colossalai.tensor import ComputePattern, ComputeSpec, ColoTensor, ShardSpec, ProcessGroup, DistSpecManager, ReplicaSpec from colossalai.nn.parallel.data_parallel import ColoDDP from colossalai.utils.checkpoint import save_checkpoint, load_checkpoint -from colossalai.nn.optimizer import ColoOptimizer +from colossalai.nn.optimizer import ColossalaiOptimizer from tests.components_to_test.registry import non_distributed_component_funcs @@ -117,7 +117,7 @@ def _run_checkpoint(model_name, init_spec_func, use_ddp, use_mp_reload, test_sch model_reload = model_reload.cuda() model_reload.train() - colo_optimizer = ColoOptimizer(model.named_parameters(), torch.optim.SGD, lr=0.1) + colo_optimizer = ColossalaiOptimizer(torch.optim.SGD(model.named_parameters(), r=0.1)) for i, (data, label) in enumerate(train_dataloader): From 7c70bfbefa8484011eeb704cea1470449873e5e3 Mon Sep 17 00:00:00 2001 From: ver217 Date: Thu, 14 Jul 2022 17:31:13 +0800 Subject: [PATCH 05/14] [hotfix] fix PipelineSharedModuleGradientHandler (#1314) --- .../_pipeline_parallel_gradient_handler.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/colossalai/engine/gradient_handler/_pipeline_parallel_gradient_handler.py b/colossalai/engine/gradient_handler/_pipeline_parallel_gradient_handler.py index 31a3dccee4df..83f5c00cf2af 100644 --- a/colossalai/engine/gradient_handler/_pipeline_parallel_gradient_handler.py +++ b/colossalai/engine/gradient_handler/_pipeline_parallel_gradient_handler.py @@ -33,14 +33,19 @@ def handle_gradient(self): # Pack the buckets. for param in self._model.parameters(): group = getattr(param, 'pipeline_shared_module_pg', None) - if param.requires_grad and param.grad is not None and group is not None: + if param.requires_grad and group is not None and ( + (hasattr(param, 'colo_attr') and not param.colo_attr.saved_grad.is_null()) + or param.grad is not None): tp = param.data.type() buckets[group][tp].append(param) # For each bucket, all-reduce and copy all-reduced grads. for group, group_buckets in buckets.items(): for tp, bucket in group_buckets.items(): - grads = [param.grad.data for param in bucket] + grads = [ + param.colo_attr.grad_payload if hasattr(param, 'colo_attr') else param.grad.data + for param in bucket + ] coalesced = _flatten_dense_tensors(grads).to(torch.cuda.current_device()) dist.all_reduce(coalesced, op=dist.ReduceOp.SUM, group=group) for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)): From efdc240f1fba00efd670cea357b7f04ef976ed0c Mon Sep 17 00:00:00 2001 From: Frank Lee Date: Thu, 14 Jul 2022 17:44:43 +0800 Subject: [PATCH 06/14] [workflow] disable SHM for compatibility CI on rtx3080 (#1315) --- .github/workflows/compatibility_test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/compatibility_test.yml b/.github/workflows/compatibility_test.yml index a6e8f5ea6c46..acc84edbe3f9 100644 --- a/.github/workflows/compatibility_test.yml +++ b/.github/workflows/compatibility_test.yml @@ -67,3 +67,4 @@ jobs: PYTHONPATH=$PWD pytest tests env: DATA: /data/scratch/cifar-10 + NCCL_SHM_DISABLE: 1 From 869cf3d3b8d4c3cf56edda715c414ea7b852dc12 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 15 Jul 2022 09:38:26 +0800 Subject: [PATCH 07/14] Automated submodule synchronization (#1319) Co-authored-by: github-actions --- inference | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inference b/inference index e03c10a809fa..7e0faccb8037 160000 --- a/inference +++ b/inference @@ -1 +1 @@ -Subproject commit e03c10a809fabe75904d2ec932bc8b7db961bc28 +Subproject commit 7e0faccb8037d6a5bc0da43b974ac149b00d562e From 7c2634f4b3c8ae5eeb1b89ad1c8530233dee5c92 Mon Sep 17 00:00:00 2001 From: Frank Lee Date: Fri, 15 Jul 2022 09:40:58 +0800 Subject: [PATCH 08/14] [workflow] updated release bdist workflow (#1318) * [workflow] updated release bdist workflow * polish workflow * polish workflow --- .github/workflows/build.yml | 2 +- .github/workflows/release_bdist.yml | 41 ++++++++++++------- .../scripts/build_colossalai_wheel.py | 37 ++++++++--------- 3 files changed, 46 insertions(+), 34 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 07452f4f398d..fa00a6a72c7c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -14,7 +14,7 @@ jobs: contains( github.event.pull_request.labels.*.name, 'Run Build and Test') runs-on: [self-hosted, gpu] container: - image: hpcaitech/pytorch-cuda:1.10.1-11.3.0 + image: hpcaitech/pytorch-cuda:1.12.0-11.3.0 options: --gpus all --rm -v /data/scratch/cifar-10:/data/scratch/cifar-10 timeout-minutes: 40 steps: diff --git a/.github/workflows/release_bdist.yml b/.github/workflows/release_bdist.yml index 2e5233d0a29c..7d5f9e731743 100644 --- a/.github/workflows/release_bdist.yml +++ b/.github/workflows/release_bdist.yml @@ -3,16 +3,15 @@ name: Release bdist wheel on: workflow_dispatch: inputs: + torch_version: + type: string + description: torch version, separated by comma + required: true + default: "all" cuda_version: - type: choice - description: CUDA Version - default: 'all' + type: string + description: cuda version, separated by comma required: true - options: - - all - - "11.3" - - "11.1" - - "10.2" github_ref: type: string description: Branch or Tag @@ -27,12 +26,24 @@ jobs: matrix: ${{ steps.set-matrix.outputs.matrix }} steps: - id: set-matrix + env: + TORCH_VERSIONS: ${{ inputs.torch_version }} + CUDA_VERSIONS: ${{ inputs.cuda_version }} run: | - [ "${{github.event.inputs.cuda_version}}" != "" ] && matrix="[\"hpcaitech/cuda-conda:${{github.event.inputs.cuda_version}}\"]" - [ "${{github.event.inputs.cuda_version}}" == "" ] || [ "${{github.event.inputs.cuda_version}}" == "all" ] && \ - matrix="[\"hpcaitech/cuda-conda:11.3\", \"hpcaitech/cuda-conda:11.1\", \"hpcaitech/cuda-conda:10.2\"]" - echo $matrix - echo "::set-output name=matrix::{\"container\":$(echo $matrix)}" + echo $TORCH_VERSIONS + echo $CUDA_VERSIONS + IFS=',' + DOCKER_IMAGE=() + + for cv in $CUDA_VERSIONS + do + DOCKER_IMAGE+=("\"hpcaitech/cuda-conda:${cv}\"") + done + + container=$( IFS=',' ; echo "${DOCKER_IMAGE[*]}" ) + container="[${container}]" + echo "$container" + echo "::set-output name=matrix::{\"container\":$(echo "$container")}" build: name: Release bdist wheels @@ -62,7 +73,9 @@ jobs: - name: Build bdist wheel run: | pip install beautifulsoup4 requests packaging - python ./build_colossalai_wheel.py + python ./build_colossalai_wheel.py --torch_version $TORCH_VERSIONS + env: + TORCH_VERSIONS: ${{ inputs.torch_version }} - name: 🚀 Deploy uses: garygrossgarten/github-action-scp@release with: diff --git a/.github/workflows/scripts/build_colossalai_wheel.py b/.github/workflows/scripts/build_colossalai_wheel.py index dcedca73790c..2d33238e25de 100644 --- a/.github/workflows/scripts/build_colossalai_wheel.py +++ b/.github/workflows/scripts/build_colossalai_wheel.py @@ -15,6 +15,7 @@ def parse_args(): parser = argparse.ArgumentParser() + parser.add_argument('--torch_version', type=str) parser.add_argument('--nightly', action='store_true', help='whether this build is for nightly release, if True, will only build on the latest PyTorch version and Python 3.8') return parser.parse_args() @@ -81,29 +82,27 @@ def main(): args = parse_args() wheel_info = all_wheel_info() - if args.nightly: - latest_torch_version = list(wheel_info.keys()) + # filter wheels on condition + all_torch_versions = list(wheel_info.keys()) + def _compare_version(a, b): + if version.parse(a) > version.parse(b): + return 1 + else: + return -1 - def _compare_version(a, b): - if version.parse(a) > version.parse(b): - return 1 - else: - return -1 + all_torch_versions.sort(key=cmp_to_key(_compare_version)) - latest_torch_version.sort(key=cmp_to_key(_compare_version)) - + if args.nightly: # only keep the latest version - for key in latest_torch_version[:-1]: + for key in all_torch_versions[:-1]: wheel_info.pop(key) - - # we only keep python 3.8 for nightly release - for torch_version, cuda_versioned_info in wheel_info.items(): - for cuda_version, python_versioned_info in cuda_versioned_info.items(): - python_versions = list(python_versioned_info.keys()) - - for key in python_versions: - if key != '3.8': - python_versioned_info.pop(key) + elif args.torch_version != 'all': + torch_versions = args.torch_version.split(',') + # only keep the torch versions specified + for key in all_torch_versions: + if key not in torch_versions: + wheel_info.pop(key) + build_colossalai(wheel_info) if __name__ == '__main__': From 9e4c6449b0ae314a9e2b6668895a64f52be48abb Mon Sep 17 00:00:00 2001 From: Jiarui Fang Date: Fri, 15 Jul 2022 09:52:55 +0800 Subject: [PATCH 09/14] [checkpoint] add ColoOptimizer checkpointing (#1316) --- .../nn/optimizer/colossalai_optimizer.py | 3 -- .../utils/checkpoint/module_checkpoint.py | 42 ++++++++++++++++-- tests/test_utils/test_colo_checkpoint.py | 44 +++++++++++++++---- 3 files changed, 74 insertions(+), 15 deletions(-) diff --git a/colossalai/nn/optimizer/colossalai_optimizer.py b/colossalai/nn/optimizer/colossalai_optimizer.py index fb0c43903509..34f5a9541975 100644 --- a/colossalai/nn/optimizer/colossalai_optimizer.py +++ b/colossalai/nn/optimizer/colossalai_optimizer.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - import torch import torch.nn as nn from torch import Tensor diff --git a/colossalai/utils/checkpoint/module_checkpoint.py b/colossalai/utils/checkpoint/module_checkpoint.py index 119d719b2c33..81370ad0fff5 100644 --- a/colossalai/utils/checkpoint/module_checkpoint.py +++ b/colossalai/utils/checkpoint/module_checkpoint.py @@ -1,12 +1,15 @@ import torch import torch.distributed as dist from colossalai.tensor import ColoTensor, DistSpecManager +from colossalai.nn.optimizer import ColossalaiOptimizer +from copy import copy +from typing import Optional def save_checkpoint(dire: str, epoch: int, model: torch.nn.Module, - optimizer: torch.optim.Optimizer = None, + optimizer: Optional[ColossalaiOptimizer] = None, lr_scheduler: torch.optim.lr_scheduler._LRScheduler = None, *args, **kwargs): @@ -16,7 +19,7 @@ def save_checkpoint(dire: str, dire (str): directory to save the checkpoint files. epoch (int): the number of epoch model (torch.nn.Module): a torch module initialized by ColoInitContext - optimizer (torch.optim.Optimizer, optional): optimizers. Defaults to None. + optimizer (ColossalaiOptimizer, optional): optimizers. Defaults to None. lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional): lr schedule. Defaults to None. """ @@ -41,11 +44,21 @@ def save_checkpoint(dire: str, # delete the new dict del new_dict + optim_state_copy = copy(optimizer.state_dict()) + for k, v in optim_state_copy['state'].items(): + for n, t in v.items(): + if isinstance(t, ColoTensor): + t.to_replicate_() + if dist.get_rank() == 0: + model_state = {'epoch': epoch, 'optim': optim_state_copy} + torch.save(model_state, dire + '/epoch_{}_optim.pth'.format(epoch)) + del optim_state_copy + def load_checkpoint(dire, epoch: int, model: torch.nn.Module, - optimizer: torch.optim.Optimizer = None, + optimizer: Optional[ColossalaiOptimizer] = None, lr_scheduler: torch.optim.lr_scheduler._LRScheduler = None, *args, **kwargs): @@ -56,7 +69,7 @@ def load_checkpoint(dire, epoch (int): _description_ rank (int): _description_ model (torch.nn.Module): _description_ - optimizer (torch.optim.Optimizer, optional): _description_. Defaults to None. + optimizer (ColossalaiOptimizer, optional): _description_. Defaults to None. lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional): _description_. Defaults to None. """ @@ -74,3 +87,24 @@ def load_checkpoint(dire, for k, v in model.state_dict().items(): if isinstance(v, ColoTensor): v.set_tensor_spec(*mapping[k]) + + del mapping + mapping = dict() + + for k, v in optimizer.state_dict()['state'].items(): + for n, t in v.items(): + if isinstance(t, ColoTensor): + mapping[(k, n)] = (t.dist_spec, t.compute_spec) + t.to_replicate_() + + colo_checkpoint = torch.load(dire + '/epoch_{}_optim.pth'.format(epoch)) + optimizer.load_state_dict(colo_checkpoint['optim']) + + for k, v in optimizer.state_dict()['state'].items(): + for n, t in v.items(): + if isinstance(t, ColoTensor): + # skip key not in mapping. + # For Adam, if it dose not execute step() once, there will be not exp_avg and exp_avg_sq in optimizer + if (k, n) not in mapping: + continue + t.set_tensor_spec(*mapping[(k, n)]) diff --git a/tests/test_utils/test_colo_checkpoint.py b/tests/test_utils/test_colo_checkpoint.py index edc463b0dbea..524a39be1d9d 100644 --- a/tests/test_utils/test_colo_checkpoint.py +++ b/tests/test_utils/test_colo_checkpoint.py @@ -77,6 +77,18 @@ def remove(path): raise ValueError("file {} is not a file or dir.".format(path)) +def compare_optims(optim1, optim2): + state1 = optim1.state_dict()['state'] + state2 = optim2.state_dict()['state'] + for k, p1 in state1.items(): + if k not in state2: + continue + p2 = state2[k] + if isinstance(p1, ColoTensor): + assert isinstance(p2, ColoTensor) + assert torch.allclose(p1.to_replicate_(), p2.to_replicate_(), rtol=1e-3, atol=1e-1) + + def _run_checkpoint(model_name, init_spec_func, use_ddp, use_mp_reload, test_scheduler, pg): get_components_func = non_distributed_component_funcs.get_callable(model_name) model_builder, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func() @@ -117,7 +129,10 @@ def _run_checkpoint(model_name, init_spec_func, use_ddp, use_mp_reload, test_sch model_reload = model_reload.cuda() model_reload.train() - colo_optimizer = ColossalaiOptimizer(torch.optim.SGD(model.named_parameters(), r=0.1)) + opt_class = torch.optim.Adam + colo_optimizer = ColossalaiOptimizer(opt_class(model.parameters(), lr=0.1)) + colo_optimizer_reload = ColossalaiOptimizer(opt_class(model_reload.parameters(), lr=0.1)) + run_reload = False for i, (data, label) in enumerate(train_dataloader): @@ -130,22 +145,35 @@ def _run_checkpoint(model_name, init_spec_func, use_ddp, use_mp_reload, test_sch # Bcast rank0 data to all processes if criterion: output = model(data) + output_reload = model_reload(data) loss = criterion(output, label) + loss_reload = criterion(output_reload, label) else: - output = model(data, label) - loss = output + loss = model(data, label) + loss_reload = model_reload(data, label) loss.backward() - colo_optimizer.step() + loss_reload.backward() + + if run_reload: + colo_optimizer_reload.zero_grad() + if criterion: + output_reload = model_reload(data) + loss_reload = criterion(output_reload, label) + else: + loss_reload = model_reload(data, label) + loss_reload.backward() + colo_optimizer_reload.step() if i > 2: break if not os.path.isdir('./checkpoint') and rank == 0: os.mkdir('./checkpoint') - save_checkpoint('./checkpoint', 0, model, None, None) + save_checkpoint('./checkpoint', 0, model, colo_optimizer, None) + dist.barrier() + load_checkpoint('./checkpoint', 0, model_reload, colo_optimizer_reload, None) dist.barrier() - load_checkpoint('./checkpoint', 0, model_reload, None, None) # Since model is sharded, we merge them before param checking. for p in model.parameters(): @@ -155,7 +183,7 @@ def _run_checkpoint(model_name, init_spec_func, use_ddp, use_mp_reload, test_sch p.to_replicate_() check_param_equal(model, model_reload) - + compare_optims(colo_optimizer, colo_optimizer_reload) if rank == 0: remove('./checkpoint') @@ -163,7 +191,7 @@ def _run_checkpoint(model_name, init_spec_func, use_ddp, use_mp_reload, test_sch def run_dist(rank, world_size, port, use_ddp, use_mp_reload, test_scheduler): colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') pg = ProcessGroup(tp_degree=world_size) - for model_name in ['bert', 'simple_net']: + for model_name in ['simple_net', 'bert']: _run_checkpoint(model_name, init_1d_row_for_linear_weight_spec, use_ddp, From 1b41686461d65e58c430e65678a76c55874a83ce Mon Sep 17 00:00:00 2001 From: HELSON Date: Fri, 15 Jul 2022 14:02:32 +0800 Subject: [PATCH 10/14] [hotfix] fix unit test test_module_spec (#1321) --- colossalai/nn/parallel/layers/module_utils.py | 3 ++- colossalai/tensor/colo_tensor.py | 24 ++++++++++--------- tests/test_tensor/test_module_spec.py | 24 +++++++++++-------- 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/colossalai/nn/parallel/layers/module_utils.py b/colossalai/nn/parallel/layers/module_utils.py index 09969b4ccc1c..38d128cc705e 100644 --- a/colossalai/nn/parallel/layers/module_utils.py +++ b/colossalai/nn/parallel/layers/module_utils.py @@ -88,7 +88,7 @@ def init_colo_module(module: torch.nn.Module, compute_pattern = compute_spec.compute_pattern if is_colo_module(module): # for each param - # set DistSpec and ComputeSpec + # set its process_group, dist_spec and compute_spec colo_module = get_colo_module(module) colo_module.register(compute_pattern, pg) if not colo_module.has_compute_pattern_with_mode(compute_pattern, mode=mode): @@ -101,6 +101,7 @@ def init_colo_module(module: torch.nn.Module, continue param = module.get_parameter(param_name) if isinstance(param, ColoParameter): + param.set_process_group(pg) param.set_dist_spec(dist_spec) param.compute_spec = compute_spec for mod in param.shared_param_modules: diff --git a/colossalai/tensor/colo_tensor.py b/colossalai/tensor/colo_tensor.py index 31aedebd3e00..f5f0b2505613 100644 --- a/colossalai/tensor/colo_tensor.py +++ b/colossalai/tensor/colo_tensor.py @@ -18,7 +18,7 @@ def _get_my_nowrap_functions() -> Set[Callable]: Tensor._base.__get__, Tensor.grad.__get__, Tensor._grad.__get__, - Tensor.data.__get__, # make .data returns torch.Tensor rather than ColoTensor + Tensor.data.__get__, # make .data returns torch.Tensor rather than ColoTensor } @@ -121,11 +121,13 @@ def set_process_group(self, pg: ProcessGroup): RuntimeError: """ assert isinstance(pg, ProcessGroup), f"pg as type {type(pg)} is invalid" - if self.process_group.tp_world_size() != 1: - raise RuntimeError("can not set_process_group on a ColoTensor whose process_group has tp world group") - - if self.dist_spec.placement.value != 'r': - raise RuntimeError("can not set_process_group on a ColoTensor whose dist spec is not REPLICATE") + # if the new pg is the same as the old pg, just returns + if self.process_group == pg: + return + assert self.process_group.tp_world_size() == 1, \ + "Can not set_process_group on a ColoTensor whose process_group has tp world group" + assert self.dist_spec.placement.value == 'r', \ + "Can not set_process_group on a ColoTensor whose dist spec is not REPLICATE" self.process_group = pg @@ -290,17 +292,17 @@ def size_global(self, args: Optional[int] = None): def is_replicate(self): return self.dist_spec.placement == DistPlacementPattern.REPLICATE \ - or (len(self.dist_spec.num_partitions) == 1 - and self.dist_spec.num_partitions[0] == 1) \ - or (self.process_group.tp_world_size() == 1) + or (len(self.dist_spec.num_partitions) == 1 + and self.dist_spec.num_partitions[0] == 1) \ + or (self.process_group.tp_world_size() == 1) def is_shard_1dcol(self): return self.dist_spec.placement == DistPlacementPattern.SHARD \ - and len(self.dist_spec.dims) == 1 and self.dist_spec.dims[0] == -1 + and len(self.dist_spec.dims) == 1 and self.dist_spec.dims[0] == -1 def is_shard_1drow(self): return self.dist_spec.placement == DistPlacementPattern.SHARD \ - and len(self.dist_spec.dims) == 1 and self.dist_spec.dims[0] == 0 + and len(self.dist_spec.dims) == 1 and self.dist_spec.dims[0] == 0 def is_sharded(self): return self.dist_spec.placement == DistPlacementPattern.SHARD diff --git a/tests/test_tensor/test_module_spec.py b/tests/test_tensor/test_module_spec.py index a33af9c3ca54..b51d9df42b03 100644 --- a/tests/test_tensor/test_module_spec.py +++ b/tests/test_tensor/test_module_spec.py @@ -1,11 +1,11 @@ -from copy import copy +from copy import deepcopy import pytest from functools import partial import torch import torch.multiprocessing as mp -from colossalai.tensor import ColoTensorSpec, ComputePattern, ComputeSpec, ShardSpec, ReplicaSpec +from colossalai.tensor import ColoTensor, ComputePattern, ComputeSpec, ShardSpec, ColoTensorSpec from colossalai.nn.parallel.layers import init_colo_module, check_colo_module from _utils import tensor_equal, tensor_shard_equal, set_seed @@ -112,21 +112,25 @@ def run_linear_with_spec(mode): with ColoInitContext(device=get_current_device()): model = torch.nn.Linear(4, 8) - model_handy = copy(model) + model_handy = deepcopy(model) world_size = torch.distributed.get_world_size() pg = ProcessGroup(tp_degree=world_size) compute_spec = ComputeSpec(ComputePattern.TP1D) init_colo_module(model, compute_spec, pg=pg, recursive=True, mode=mode) x = torch.rand(2, 4).cuda() + colo_x = ColoTensor.from_torch_tensor(x, ColoTensorSpec(pg)) + out = model(x) - colo_out = model_handy(x) + colo_out = model_handy(colo_x) assert tensor_equal(out, colo_out) + grad = torch.rand_like(out) out.backward(grad) colo_out.backward(grad) - assert tensor_shard_equal(model.weight.grad, model_handy.weight.grad, pg.tp_local_rank(), pg.tp_world_size()) - assert tensor_shard_equal(model.bias.grad, model_handy.bias.grad, pg.tp_local_rank(), pg.tp_world_size()) + + assert tensor_shard_equal(model_handy.weight.grad, model.weight.grad, pg.tp_local_rank(), pg.tp_world_size()) + assert tensor_shard_equal(model_handy.bias.grad, model.bias.grad, pg.tp_local_rank(), pg.tp_world_size()) def run_check_shared_param(): @@ -196,7 +200,7 @@ def run_dist_check(rank, world_size, port): @pytest.mark.dist @pytest.mark.parametrize('world_size', [1, 4]) -@pytest.mark.skip("under development lazy init ColoParameter in Context") +@pytest.mark.skip("for higher testing speed") @rerun_if_address_is_in_use() def test_module_linear_1d(world_size): run_func = partial(run_dist, world_size=world_size, port=free_port()) @@ -205,7 +209,7 @@ def test_module_linear_1d(world_size): @pytest.mark.dist @pytest.mark.parametrize('world_size', [1, 4]) -@pytest.mark.skip("under development lazy init ColoParameter in Context") +@pytest.mark.skip("for higher testing speed") @rerun_if_address_is_in_use() def test_module_model(world_size): run_func = partial(run_dist_model, world_size=world_size, port=free_port()) @@ -214,7 +218,7 @@ def test_module_model(world_size): @pytest.mark.dist @pytest.mark.parametrize('world_size', [1, 2]) -@pytest.mark.skip("under development lazy init ColoParameter in Context") +@pytest.mark.skip("for higher testing speed") @rerun_if_address_is_in_use() def test_module_check(world_size): run_func = partial(run_dist_check, world_size=world_size, port=free_port()) @@ -222,4 +226,4 @@ def test_module_check(world_size): if __name__ == '__main__': - test_module_check(2) + test_module_linear_1d(4) From ca2d3f284fafe692f96150f337bee848110f593b Mon Sep 17 00:00:00 2001 From: XYE <92607131+Itok2000u@users.noreply.github.com> Date: Fri, 15 Jul 2022 14:37:58 +0800 Subject: [PATCH 11/14] [fx] Add unit test and fix bugs for transform_mlp_pass (#1299) * add test and fix bugs * add functions back * add comments --- colossalai/fx/passes/shard_1d_pass.py | 79 +++++++++++++++++------- tests/test_fx/test_transform_mlp_pass.py | 59 ++++++++++++++++++ 2 files changed, 114 insertions(+), 24 deletions(-) create mode 100644 tests/test_fx/test_transform_mlp_pass.py diff --git a/colossalai/fx/passes/shard_1d_pass.py b/colossalai/fx/passes/shard_1d_pass.py index 49a823076b85..44449ff8e1b9 100644 --- a/colossalai/fx/passes/shard_1d_pass.py +++ b/colossalai/fx/passes/shard_1d_pass.py @@ -1,59 +1,90 @@ import torch -from colossalai.tensor import ColoTensorSpec, distspec, ProcessGroup, ComputeSpec, ComputePattern, ShardSpec +import operator +import colossalai +ELEMENTWISE_MODULE_OP = [torch.nn.Dropout, torch.nn.ReLU, torch.nn.Conv1d, torch.nn.Conv2d, torch.nn.Conv3d, torch.nn.MaxPool1d, torch.nn.MaxPool2d, torch.nn.AvgPool1d, torch.nn.AvgPool2d] +ELEMENTWISE_FUNC_OP = [torch.add, operator.add, torch.abs, torch.cos, torch.exp, torch.mul, operator.mul, operator.floordiv, operator.truediv, operator.neg, torch.multiply, torch.nn.functional.relu, torch.nn.functional.dropout, torch.nn.functional.conv1d, torch.nn.functional.conv2d, torch.nn.functional.conv3d, torch.nn.functional.avg_pool1d, torch.nn.functional.avg_pool2d, torch.nn.functional.avg_pool3d, torch.nn.functional.max_pool1d, torch.nn.functional.max_pool2d, torch.nn.functional.max_pool3d] -def weight_split(weight: torch.Tensor, dim: int) -> torch.nn.parameter.Parameter: +def weight_split(weight: torch.nn.parameter.Parameter, dim: int, col_normal: bool) -> torch.nn.parameter.Parameter: """weight_split split a nn.Parameter Args: weight (torch.nn.parameter.Parameter): a torch Parameter instance dim (int): the dimension to be sharded along with - + col_normal(bool): col shard with gather or not Returns: _type_: _description_ """ - # Append a Tensor spec to target_module.weight.shard - # Convert to ColoTensor: colo_tensor = ColoTensor.from_torch_tensor(tensor, spec) - assert isinstance(weight, torch.Tensor), \ - f'The type of the input tensor should be torch.nn.parameter' \ - f'Your Input tensor is {type(weight)}' - - # FIXME() I initialized a PG for this tensor. Only has TP comm group. - # we only consider the TP-only caes. - world_size = torch.distributed.get_world_size() - pg = ProcessGroup(tp_degree=world_size) - - spec = ColoTensorSpec(pg, ShardSpec([dim], [pg.tp_world_size()]), ComputeSpec(ComputePattern.TP1D)) - # As you has constructed a Spec, why not directly convert the tensor to ColoTensor. - setattr(weight, "fx_attr", spec) + if col_normal: + setattr(weight, "fx_attr", (dim, "SHARD", "TP", "col_normal")) + else: + setattr(weight, "fx_attr", (dim, "SHARD", "TP", "col_needs_many_outputs")) return weight - - def column_shard_linear_pass(gm: torch.fx.GraphModule): + # Split all the linear module with column shard. Currently for testing only. mod_graph = gm.graph for node in mod_graph.nodes: if node.op == "call_module": target_module = node.graph.owning_module.get_submodule(node.target) if isinstance(target_module, torch.nn.Linear): - target_module.weight = weight_split(target_module.weight, dim=0) + target_module.weight = weight_split(target_module.weight, dim=0, col_normal=False) if target_module.bias is not None: - target_module.bias.data = weight_split(target_module.bias.data, dim=0) + target_module.bias.data = weight_split(target_module.bias.data, dim=0, col_normal=False) gm.recompile() return gm def row_shard_linear_pass(gm: torch.fx.GraphModule): + # Split all the linear module with row shard. Currently for testing only. mod_graph = gm.graph for node in mod_graph.nodes: if node.op == "call_module": target_module = node.graph.owning_module.get_submodule(node.target) if isinstance(target_module, torch.nn.Linear): - target_module.weight = weight_split(target_module.weight, dim=-1) + target_module.weight = weight_split(target_module.weight, dim=-1, col_normal=False) gm.recompile() return gm - -#TODO: add elementwise op process pass, then we can try to use column and row mixed strategy. +def transform_mlp_pass(gm: torch.fx.GraphModule): + #TODO: Needs to handle special cases, like x = linear(x) + linear(x) + mod_graph = gm.graph + col_shard = True + element_op = [] + all_linear_name = [] + linear_name = [] + # Get the name of element wise module(torch.nn.ReLU) + # Get the name of all the linear modules and repeated linear modules + for name, func in gm.named_children(): + if not isinstance(func, torch.nn.Linear): + for i in ELEMENTWISE_MODULE_OP: + if isinstance(func, i): + element_op.append(name) + break + else: + if name in all_linear_name: + if name in linear_name: + linear_name.remove(name) + else: + all_linear_name.append(name) + linear_name.append(name) + # If the linear modules is called multiple times, set the dist spec as col shard + # If the module is element wise or the function/method is element wise, remains col_shard + for node in mod_graph.nodes: + if node.target in linear_name: + target_module = node.graph.owning_module.get_submodule(node.target) + dim = 0 if col_shard else -1 + target_module.weight = weight_split(target_module.weight, dim=dim, col_normal=False) + col_shard = not col_shard + elif node.target in all_linear_name: + target_module = node.graph.owning_module.get_submodule(node.target) + dim = 0 if col_shard else -1 + target_module.weight = weight_split(target_module.weight, dim=dim, col_normal=True) + col_shard = not col_shard + else: + if node.target not in element_op and all(node.target != i for i in ELEMENTWISE_FUNC_OP): + col_shard = True + gm.recompile() + return gm \ No newline at end of file diff --git a/tests/test_fx/test_transform_mlp_pass.py b/tests/test_fx/test_transform_mlp_pass.py new file mode 100644 index 000000000000..202c8ce0e138 --- /dev/null +++ b/tests/test_fx/test_transform_mlp_pass.py @@ -0,0 +1,59 @@ +import torch +import torch.nn as nn +import pytest +import colossalai +from colossalai.fx import ColoTracer +from colossalai.fx.passes.shard_1d_pass import transform_mlp_pass +CONFIG = dict(parallel=dict(tensor=dict(size=2, mode='1d'))) + +class MLP(torch.nn.Module): + + def __init__(self, dim: int): + super().__init__() + self.linear1 = torch.nn.Linear(dim, dim) + self.linear2 = torch.nn.Linear(dim, dim) + self.linear3 = torch.nn.Linear(dim, dim) + self.linear4 = torch.nn.Linear(dim, dim) + self.dropout = torch.nn.Dropout() + self.relu = torch.nn.ReLU() + + def forward(self, x): + x = self.relu(self.linear1(x)) + x = self.dropout(self.relu(self.linear2(x))) + x = self.linear3(x) + x = torch.nn.functional.relu(self.linear4(x)) + return x + +def test_out_acc(): + model = MLP(16).cuda() + model.eval() + input_tensor = torch.rand(2, 16).cuda() + output = model(input_tensor) + tracer = ColoTracer() + graph = tracer.trace(model, meta_args={'x': torch.randn((2, 16), device="meta")}) + gm = torch.fx.GraphModule(model, graph, model.__class__.__name__) + splitted_gm = transform_mlp_pass(gm) + new_output = splitted_gm(input_tensor) + assert output.equal(new_output) + +def test_linear_acc(): + input_tensor = torch.rand(2, 16).cuda() + model = MLP(16).cuda() + tracer = ColoTracer() + graph = tracer.trace(model, meta_args={'x': torch.randn((2, 16), device="meta")}) + gm = torch.fx.GraphModule(model, graph, model.__class__.__name__) + splitted_gm = transform_mlp_pass(gm) + col_shard = True + for node in splitted_gm.graph.nodes: + if node.op == "call_module" and isinstance(node.graph.owning_module.get_submodule(node.target), torch.nn.Linear): + target_module = node.graph.owning_module.get_submodule(node.target) + dim = 0 if col_shard else -1 + assert target_module.weight.fx_attr == (dim, "SHARD", "TP", "col_needs_many_outputs") + col_shard = not col_shard + +if __name__ == "__main__": + torch.manual_seed(1) + torch.cuda.manual_seed(1) + # colossalai.launch_from_torch(config=CONFIG) + test_out_acc() + test_linear_acc() From e8acf55e8bd53b88f48e183ef360c8691a027e9a Mon Sep 17 00:00:00 2001 From: YuliangLiu0306 <72588413+YuliangLiu0306@users.noreply.github.com> Date: Fri, 15 Jul 2022 14:54:26 +0800 Subject: [PATCH 12/14] [fx] add balanced policy v2 (#1251) * [CLI] add CLI launcher * Revert "[CLI] add CLI launcher" This reverts commit df7e6506d4500af6a9220ef7fe4d3c7b1daebd4c. * [fx] add balanced policy v2 * add unittest --- .../fx/passes/adding_split_node_pass.py | 34 ++++++++++++++++++- colossalai/fx/passes/meta_info_prop.py | 21 ++++++++++-- tests/test_fx/test_pipeline_passes.py | 4 ++- 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/colossalai/fx/passes/adding_split_node_pass.py b/colossalai/fx/passes/adding_split_node_pass.py index 3a3e5ddbf179..e2ea6ec70b9d 100644 --- a/colossalai/fx/passes/adding_split_node_pass.py +++ b/colossalai/fx/passes/adding_split_node_pass.py @@ -10,7 +10,9 @@ def pipe_split(): def balanced_split_pass(gm: torch.fx.GraphModule, pp_size: int): - # TODO(lyl): balanced policy V2, split module by node size(weight+bias+output) + """ + In balanced_split_pass, we split module by the size of parameters(weights+bias). + """ mod_graph = gm.graph total_param_amount = 0 for param in mod_graph.owning_module.parameters(): @@ -39,6 +41,36 @@ def balanced_split_pass(gm: torch.fx.GraphModule, pp_size: int): return gm +def balanced_split_pass_v2(gm: torch.fx.GraphModule, pp_size: int): + """ + In balanced_split_pass_v12, we split module by the size of nodes(weights+bias+outputs). + """ + mod_graph = gm.graph + # To use balanced_split_pass_v2, we need run meta_info_prop interpreter first. + # If nodes don't have meta info, this pass will fall back to normal balanced split pass. + check_node = list(mod_graph.nodes)[0] + if 'tensor_meta' not in check_node.meta: + return balanced_split_pass(gm, pp_size) + + total_element_size = 0 + for node in mod_graph.nodes: + total_element_size += node.node_size + + partition_size = total_element_size // pp_size + accumulate_node_size = 0 + for node in mod_graph.nodes: + if pp_size <= 1: + break + accumulate_node_size += node.node_size + if accumulate_node_size >= partition_size: + accumulate_node_size = 0 + pp_size -= 1 + with mod_graph.inserting_after(node): + split_node = mod_graph.create_node('call_function', pipe_split) + gm.recompile() + return gm + + def uniform_split_pass(gm: torch.fx.GraphModule, pp_size: int): mod_graph = gm.graph valid_children_size = 0 diff --git a/colossalai/fx/passes/meta_info_prop.py b/colossalai/fx/passes/meta_info_prop.py index 0eb7f32f4fca..4033cd72b801 100644 --- a/colossalai/fx/passes/meta_info_prop.py +++ b/colossalai/fx/passes/meta_info_prop.py @@ -67,7 +67,6 @@ class MetaInfoProp(torch.fx.Interpreter): def run_node(self, n: Node) -> Any: result = super().run_node(n) - found_tensor = False def extract_tensor_meta(obj): @@ -83,7 +82,25 @@ def extract_tensor_meta(obj): n.meta['tensor_meta'] = meta else: n.meta['tensor_meta'] = TensorMetadata(None, None, False, None, 0) - + # counting the total size of node outputs + total_node_size = 0 + if isinstance(n.meta['tensor_meta'], TensorMetadata): + total_node_size += n.meta['tensor_meta'].numel + else: + for element in n.meta['tensor_meta']: + assert isinstance( + element, TensorMetadata + ), f"``n.meta['tensor_meta']`` should be either TensorMetadata or a tuple of TensorMetadata." + total_node_size += element.numel + # counting the total size of parameters + total_param_size = 0 + if n.op == 'call_module': + target_module = n.graph.owning_module.get_submodule(n.target) + for param in target_module.parameters(): + total_param_size += param.numel() + + total_node_size += total_param_size + n.node_size = total_node_size n.meta['type'] = type(result) return result diff --git a/tests/test_fx/test_pipeline_passes.py b/tests/test_fx/test_pipeline_passes.py index 54619d25ccb8..4d9e63d0d5a7 100644 --- a/tests/test_fx/test_pipeline_passes.py +++ b/tests/test_fx/test_pipeline_passes.py @@ -4,7 +4,8 @@ import colossalai.nn as col_nn from torch.fx import symbolic_trace from colossalai.fx.passes.adding_split_node_pass import split_with_split_nodes_pass, balanced_split_pass, \ - uniform_split_pass + uniform_split_pass, balanced_split_pass_v2 + import pytest MODEL_DIM = 16 @@ -43,6 +44,7 @@ def test_pipeline_passes(): model = MLP(MODEL_DIM) data = torch.rand(BATCH_SIZE, MODEL_DIM) pipeline_pass_test_helper(model, data, balanced_split_pass) + pipeline_pass_test_helper(model, data, balanced_split_pass_v2) pipeline_pass_test_helper(model, data, uniform_split_pass) From 4d5dbf48a60b66ce8715623876620956cb15f08f Mon Sep 17 00:00:00 2001 From: Frank Lee Date: Fri, 15 Jul 2022 15:00:02 +0800 Subject: [PATCH 13/14] [workflow] fixed trigger condition for 8-gpu unit test (#1323) --- .github/workflows/build_gpu_8.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_gpu_8.yml b/.github/workflows/build_gpu_8.yml index 4e1219be292a..1e320be52465 100644 --- a/.github/workflows/build_gpu_8.yml +++ b/.github/workflows/build_gpu_8.yml @@ -30,7 +30,7 @@ jobs: - name: Unit Testing run: | gpu_used=$(nvidia-smi -i 0 --query-gpu=memory.used --format=csv,noheader,nounits) - [ "$gpu_used" -gt "100" ] && PYTHONPATH=$PWD pytest tests + [ "$gpu_used" -le "100" ] && PYTHONPATH=$PWD pytest tests env: DATA: /data/scratch/cifar-10 \ No newline at end of file From 659a7407387754b70061ead5f98611a5bfeac060 Mon Sep 17 00:00:00 2001 From: Frank Lee Date: Fri, 15 Jul 2022 17:20:17 +0800 Subject: [PATCH 14/14] [workflow] roll back to use torch 1.11 for unit testing (#1325) --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index fa00a6a72c7c..25b89c6b79e8 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -14,7 +14,7 @@ jobs: contains( github.event.pull_request.labels.*.name, 'Run Build and Test') runs-on: [self-hosted, gpu] container: - image: hpcaitech/pytorch-cuda:1.12.0-11.3.0 + image: hpcaitech/pytorch-cuda:1.11.0-11.3.0 options: --gpus all --rm -v /data/scratch/cifar-10:/data/scratch/cifar-10 timeout-minutes: 40 steps: