From 798bab0dd35ef134e160e6359b43e8d0d6e600e9 Mon Sep 17 00:00:00 2001 From: oahzxl Date: Thu, 3 Aug 2023 15:21:37 +0800 Subject: [PATCH 01/15] fix test files --- tests/test_moe/test_moe_checkpoint.py | 12 +++++-- tests/test_moe/test_moe_colo_init.py | 3 +- tests/test_moe/test_moe_zero_init.py | 12 +++++-- tests/test_moe/test_moe_zero_model.py | 51 ++++++++++++++++++++++++++- tests/test_moe/test_moe_zero_optim.py | 32 ++++++++++++++++- 5 files changed, 102 insertions(+), 8 deletions(-) diff --git a/tests/test_moe/test_moe_checkpoint.py b/tests/test_moe/test_moe_checkpoint.py index 8a0283ba71fc..43c1772b315b 100644 --- a/tests/test_moe/test_moe_checkpoint.py +++ b/tests/test_moe/test_moe_checkpoint.py @@ -11,8 +11,16 @@ from colossalai.utils import get_current_device from colossalai.zero import ColoInitContext from tests.test_moe.test_moe_zero_init import MoeModel -from tests.test_zero.test_legacy.common import CONFIG - +CONFIG = dict(fp16=dict(mode=None,), + zero=dict(level=3, + verbose=False, + offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), + offload_param_config=dict(device='cpu', + pin_memory=True, + buffer_count=5, + buffer_size=1e8, + max_in_cpu=1e9)), + parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) def exam_moe_checkpoint(): with ColoInitContext(device=get_current_device()): diff --git a/tests/test_moe/test_moe_colo_init.py b/tests/test_moe/test_moe_colo_init.py index 555338fcf9fc..a4ac52be99de 100644 --- a/tests/test_moe/test_moe_colo_init.py +++ b/tests/test_moe/test_moe_colo_init.py @@ -10,7 +10,6 @@ from colossalai.zero import ColoInitContext from tests.test_moe.test_moe_zero_init import MoeModel from tests.test_tensor.common_utils import debug_print -from tests.test_zero.test_legacy.common import CONFIG @parameterize("init_device_type", ['cpu', 'cuda']) @@ -40,7 +39,7 @@ def exam_moe_colo_init(init_device_type): def _run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(parallel=dict(data=2)), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') MOE_CONTEXT.setup(seed=42) exam_moe_colo_init() diff --git a/tests/test_moe/test_moe_zero_init.py b/tests/test_moe/test_moe_zero_init.py index 79722f9f4056..20d533cea024 100644 --- a/tests/test_moe/test_moe_zero_init.py +++ b/tests/test_moe/test_moe_zero_init.py @@ -11,8 +11,16 @@ from colossalai.utils import get_current_device from colossalai.zero.legacy.init_ctx import ZeroInitContext from colossalai.zero.legacy.shard_utils import BucketTensorShardStrategy, TensorShardStrategy -from tests.test_zero.test_legacy.common import CONFIG - +CONFIG = dict(fp16=dict(mode=None,), + zero=dict(level=3, + verbose=False, + offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), + offload_param_config=dict(device='cpu', + pin_memory=True, + buffer_count=5, + buffer_size=1e8, + max_in_cpu=1e9)), + parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) class MoeModel(nn.Module): diff --git a/tests/test_moe/test_moe_zero_model.py b/tests/test_moe/test_moe_zero_model.py index ec37967f18c5..e975b1896299 100644 --- a/tests/test_moe/test_moe_zero_model.py +++ b/tests/test_moe/test_moe_zero_model.py @@ -1,5 +1,6 @@ import pytest import torch +import torch.distributed as dist import colossalai from colossalai.context import MOE_CONTEXT @@ -13,7 +14,55 @@ from colossalai.zero.legacy.sharded_model.utils import col_model_deepcopy from tests.components_to_test.registry import non_distributed_component_funcs from tests.test_moe.test_moe_zero_init import MoeModel -from tests.test_zero.test_legacy.common import CONFIG, check_grads_padding, run_fwd_bwd + +def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool: + if loose: + return torch.allclose(tensor_a, tensor_b, atol=1e-2, rtol=1e-3) + return torch.allclose(tensor_a, tensor_b) + +CONFIG = dict(fp16=dict(mode=None,), + zero=dict(level=3, + verbose=False, + offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), + offload_param_config=dict(device='cpu', + pin_memory=True, + buffer_count=5, + buffer_size=1e8, + max_in_cpu=1e9)), + parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) + +def check_grads_padding(model, zero_model, loose=False): + rank = dist.get_rank() + for (name, p), (zero_name, zero_p) in zip(model.named_parameters(), zero_model.named_parameters()): + # zero_grad = zero_p.grad.clone().to(p.device) + if zero_p.colo_attr.is_replicated: + zero_grad = zero_p.colo_attr.grad_payload.clone().to(p.device) + chunks = torch.flatten(p.grad).chunk(dist.get_world_size()) + if rank >= len(chunks): + continue + grad = chunks[rank].float() + if zero_grad.size(0) > grad.size(0): + zero_grad = zero_grad[:grad.size(0)] + else: + zero_grad = zero_p.colo_attr.grad_payload + grad = p.grad.to(zero_grad.dtype) + + assert grad.dtype == zero_grad.dtype + assert allclose(grad, zero_grad, loose=loose), f'diff: {grad - zero_grad}' + +def run_fwd_bwd(model, data, label, criterion, enable_autocast=False): + model.train() + with torch.cuda.amp.autocast(enabled=enable_autocast): + if criterion: + y = model(data) + loss = criterion(y, label) + else: + loss = model(data, label) + loss = loss.float() + if isinstance(model, ShardedModelV2): + model.backward(loss) + else: + loss.backward() @parameterize("enable_autocast", [False]) diff --git a/tests/test_moe/test_moe_zero_optim.py b/tests/test_moe/test_moe_zero_optim.py index efc6e9ddae27..f2f3f0e6456d 100644 --- a/tests/test_moe/test_moe_zero_optim.py +++ b/tests/test_moe/test_moe_zero_optim.py @@ -1,5 +1,6 @@ import pytest import torch +import torch.distributed as dist import colossalai from colossalai.amp import convert_to_apex_amp @@ -17,7 +18,36 @@ from colossalai.zero.low_level._utils import has_inf_or_nan from tests.components_to_test.registry import non_distributed_component_funcs from tests.test_moe.test_moe_zero_init import MoeModel -from tests.test_zero.test_legacy.common import CONFIG, check_sharded_model_params +def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool: + if loose: + return torch.allclose(tensor_a, tensor_b, atol=1e-2, rtol=1e-3) + return torch.allclose(tensor_a, tensor_b) +def check_sharded_model_params(model, zero_model, loose=False, reuse_fp16_shard=False): + rank = dist.get_rank() + for (name, p), (zero_name, zero_p) in zip(model.named_parameters(), zero_model.named_parameters()): + if zero_p.colo_attr.param_is_sharded: + zero_p = zero_p.colo_attr.data_payload.to(p.device).float() + chunks = torch.flatten(p).chunk(dist.get_world_size()) + if rank >= len(chunks): + continue + p = chunks[rank].float() + if zero_p.size(0) > p.size(0): + zero_p = zero_p[:p.size(0)] + else: + zero_p = zero_p.colo_attr.data_payload.to(p.device) + + assert p.dtype == zero_p.dtype, "Parameter `{}`:\n{} vs {}".format(name, p.dtype, zero_p.dtype) + assert allclose(p, zero_p, loose=loose), f'{p} vs {zero_p}' +CONFIG = dict(fp16=dict(mode=None,), + zero=dict(level=3, + verbose=False, + offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), + offload_param_config=dict(device='cpu', + pin_memory=True, + buffer_count=5, + buffer_size=1e8, + max_in_cpu=1e9)), + parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) def _run_step(model, optimizer, data, label, criterion, grad_handler): From 9dafe1329efc77afa1658c6d8fdecce78093db1b Mon Sep 17 00:00:00 2001 From: oahzxl Date: Thu, 3 Aug 2023 17:39:36 +0800 Subject: [PATCH 02/15] new file --- tests/test_moe/test_moe_zero_init_new.py | 116 +++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 tests/test_moe/test_moe_zero_init_new.py diff --git a/tests/test_moe/test_moe_zero_init_new.py b/tests/test_moe/test_moe_zero_init_new.py new file mode 100644 index 000000000000..20d533cea024 --- /dev/null +++ b/tests/test_moe/test_moe_zero_init_new.py @@ -0,0 +1,116 @@ +import pytest +import torch +import torch.nn as nn + +import colossalai +from colossalai.context import MOE_CONTEXT +from colossalai.logging import get_dist_logger +from colossalai.nn import CheckpointModule +from colossalai.nn.layer import MoeModule +from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn +from colossalai.utils import get_current_device +from colossalai.zero.legacy.init_ctx import ZeroInitContext +from colossalai.zero.legacy.shard_utils import BucketTensorShardStrategy, TensorShardStrategy +CONFIG = dict(fp16=dict(mode=None,), + zero=dict(level=3, + verbose=False, + offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), + offload_param_config=dict(device='cpu', + pin_memory=True, + buffer_count=5, + buffer_size=1e8, + max_in_cpu=1e9)), + parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) + +class MoeModel(nn.Module): + + def __init__(self, checkpoint: bool = False): + + class TestSubModule(CheckpointModule): + + def __init__(self): + super().__init__(checkpoint) + expert_cls = nn.Linear + expert_args_dict = dict(in_features=16, out_features=16) + self.moe = MoeModule(dim_model=16, + num_experts=8, + use_residual=True, + expert_cls=expert_cls, + **expert_args_dict) + self.proj = nn.Linear(16, 4) + + def _forward(self, x): + x, y = self.moe(x) + x = self.proj(x) + return x, y + + super().__init__() + self.test_embed = nn.Linear(4, 16) + self.test_transform = TestSubModule() + + def forward(self, x): + MOE_CONTEXT.reset_loss() + + x = self.test_embed(x) + x, y = self.test_transform(x) + + MOE_CONTEXT.add_loss(y) + return x + + +@parameterize("init_device_type", ['cpu', 'cuda']) +@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) +def run_moe_zero_init(init_device_type, shard_strategy_class): + logger = get_dist_logger("test_moe_zero_init") + + if init_device_type == 'cuda': + init_device = get_current_device() + elif init_device_type == 'cpu': + init_device = torch.device("cpu") + else: + raise NotImplementedError("Unknown device found.") + + model_numel_tensor = torch.zeros(1, dtype=torch.int) + with ZeroInitContext(target_device=init_device, + shard_strategy=shard_strategy_class(), + shard_param=True, + model_numel_tensor=model_numel_tensor): + model = MoeModel(checkpoint=True) + + for name, param in model.named_parameters(): + assert hasattr(param, 'colo_attr') + + # the parameters in moe experts and its gate should not be sharded + if ('experts' in name) or ('gate' in name) or ('residual_combine' in name): + assert not param.colo_attr.sharded_data_tensor.is_sharded, "`{}` parameter has problem".format(name) + else: + assert param.colo_attr.sharded_data_tensor.is_sharded + + # the parameters in moe experts is not replicated + if 'experts' in name: + assert not param.colo_attr.is_replicated + else: + assert param.colo_attr.is_replicated + + if param.colo_attr.param_is_sharded: + assert param.colo_attr.data_payload.device.type == init_device.type, \ + f'{param.colo_attr.data_payload.device.type} vs. {init_device.type}' + else: + assert param.colo_attr.data_payload.device.type == 'cuda' + + +def _run_dist(rank, world_size, port): + colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + MOE_CONTEXT.setup(seed=42) + run_moe_zero_init() + + +@pytest.mark.dist +@pytest.mark.parametrize("world_size", [2, 4]) +@rerun_if_address_is_in_use() +def test_moe_zero_init(world_size): + spawn(_run_dist, world_size) + + +if __name__ == '__main__': + test_moe_zero_init(world_size=2) From a041c7c20cfda21f9dd067e558f01a986432a91f Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 4 Aug 2023 16:20:14 +0800 Subject: [PATCH 03/15] add new --- tests/test_moe/test_moe_zero_init_new.py | 116 --------------------- tests/test_moe/test_moe_zero_model_new.py | 119 ++++++++++++++++++++++ 2 files changed, 119 insertions(+), 116 deletions(-) delete mode 100644 tests/test_moe/test_moe_zero_init_new.py create mode 100644 tests/test_moe/test_moe_zero_model_new.py diff --git a/tests/test_moe/test_moe_zero_init_new.py b/tests/test_moe/test_moe_zero_init_new.py deleted file mode 100644 index 20d533cea024..000000000000 --- a/tests/test_moe/test_moe_zero_init_new.py +++ /dev/null @@ -1,116 +0,0 @@ -import pytest -import torch -import torch.nn as nn - -import colossalai -from colossalai.context import MOE_CONTEXT -from colossalai.logging import get_dist_logger -from colossalai.nn import CheckpointModule -from colossalai.nn.layer import MoeModule -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn -from colossalai.utils import get_current_device -from colossalai.zero.legacy.init_ctx import ZeroInitContext -from colossalai.zero.legacy.shard_utils import BucketTensorShardStrategy, TensorShardStrategy -CONFIG = dict(fp16=dict(mode=None,), - zero=dict(level=3, - verbose=False, - offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), - offload_param_config=dict(device='cpu', - pin_memory=True, - buffer_count=5, - buffer_size=1e8, - max_in_cpu=1e9)), - parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) - -class MoeModel(nn.Module): - - def __init__(self, checkpoint: bool = False): - - class TestSubModule(CheckpointModule): - - def __init__(self): - super().__init__(checkpoint) - expert_cls = nn.Linear - expert_args_dict = dict(in_features=16, out_features=16) - self.moe = MoeModule(dim_model=16, - num_experts=8, - use_residual=True, - expert_cls=expert_cls, - **expert_args_dict) - self.proj = nn.Linear(16, 4) - - def _forward(self, x): - x, y = self.moe(x) - x = self.proj(x) - return x, y - - super().__init__() - self.test_embed = nn.Linear(4, 16) - self.test_transform = TestSubModule() - - def forward(self, x): - MOE_CONTEXT.reset_loss() - - x = self.test_embed(x) - x, y = self.test_transform(x) - - MOE_CONTEXT.add_loss(y) - return x - - -@parameterize("init_device_type", ['cpu', 'cuda']) -@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) -def run_moe_zero_init(init_device_type, shard_strategy_class): - logger = get_dist_logger("test_moe_zero_init") - - if init_device_type == 'cuda': - init_device = get_current_device() - elif init_device_type == 'cpu': - init_device = torch.device("cpu") - else: - raise NotImplementedError("Unknown device found.") - - model_numel_tensor = torch.zeros(1, dtype=torch.int) - with ZeroInitContext(target_device=init_device, - shard_strategy=shard_strategy_class(), - shard_param=True, - model_numel_tensor=model_numel_tensor): - model = MoeModel(checkpoint=True) - - for name, param in model.named_parameters(): - assert hasattr(param, 'colo_attr') - - # the parameters in moe experts and its gate should not be sharded - if ('experts' in name) or ('gate' in name) or ('residual_combine' in name): - assert not param.colo_attr.sharded_data_tensor.is_sharded, "`{}` parameter has problem".format(name) - else: - assert param.colo_attr.sharded_data_tensor.is_sharded - - # the parameters in moe experts is not replicated - if 'experts' in name: - assert not param.colo_attr.is_replicated - else: - assert param.colo_attr.is_replicated - - if param.colo_attr.param_is_sharded: - assert param.colo_attr.data_payload.device.type == init_device.type, \ - f'{param.colo_attr.data_payload.device.type} vs. {init_device.type}' - else: - assert param.colo_attr.data_payload.device.type == 'cuda' - - -def _run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - MOE_CONTEXT.setup(seed=42) - run_moe_zero_init() - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [2, 4]) -@rerun_if_address_is_in_use() -def test_moe_zero_init(world_size): - spawn(_run_dist, world_size) - - -if __name__ == '__main__': - test_moe_zero_init(world_size=2) diff --git a/tests/test_moe/test_moe_zero_model_new.py b/tests/test_moe/test_moe_zero_model_new.py new file mode 100644 index 000000000000..e975b1896299 --- /dev/null +++ b/tests/test_moe/test_moe_zero_model_new.py @@ -0,0 +1,119 @@ +import pytest +import torch +import torch.distributed as dist + +import colossalai +from colossalai.context import MOE_CONTEXT +from colossalai.engine.gradient_handler import MoeGradientHandler +from colossalai.nn import MoeLoss +from colossalai.testing import assert_equal_in_group, parameterize, rerun_if_address_is_in_use, spawn +from colossalai.zero.legacy.init_ctx import ZeroInitContext +from colossalai.zero.legacy.shard_utils import BucketTensorShardStrategy, TensorShardStrategy +from colossalai.zero.legacy.sharded_model import ShardedModelV2 +from colossalai.zero.legacy.sharded_model._utils import cast_tensor_to_fp16 +from colossalai.zero.legacy.sharded_model.utils import col_model_deepcopy +from tests.components_to_test.registry import non_distributed_component_funcs +from tests.test_moe.test_moe_zero_init import MoeModel + +def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool: + if loose: + return torch.allclose(tensor_a, tensor_b, atol=1e-2, rtol=1e-3) + return torch.allclose(tensor_a, tensor_b) + +CONFIG = dict(fp16=dict(mode=None,), + zero=dict(level=3, + verbose=False, + offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), + offload_param_config=dict(device='cpu', + pin_memory=True, + buffer_count=5, + buffer_size=1e8, + max_in_cpu=1e9)), + parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) + +def check_grads_padding(model, zero_model, loose=False): + rank = dist.get_rank() + for (name, p), (zero_name, zero_p) in zip(model.named_parameters(), zero_model.named_parameters()): + # zero_grad = zero_p.grad.clone().to(p.device) + if zero_p.colo_attr.is_replicated: + zero_grad = zero_p.colo_attr.grad_payload.clone().to(p.device) + chunks = torch.flatten(p.grad).chunk(dist.get_world_size()) + if rank >= len(chunks): + continue + grad = chunks[rank].float() + if zero_grad.size(0) > grad.size(0): + zero_grad = zero_grad[:grad.size(0)] + else: + zero_grad = zero_p.colo_attr.grad_payload + grad = p.grad.to(zero_grad.dtype) + + assert grad.dtype == zero_grad.dtype + assert allclose(grad, zero_grad, loose=loose), f'diff: {grad - zero_grad}' + +def run_fwd_bwd(model, data, label, criterion, enable_autocast=False): + model.train() + with torch.cuda.amp.autocast(enabled=enable_autocast): + if criterion: + y = model(data) + loss = criterion(y, label) + else: + loss = model(data, label) + loss = loss.float() + if isinstance(model, ShardedModelV2): + model.backward(loss) + else: + loss.backward() + + +@parameterize("enable_autocast", [False]) +@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) +def run_model_test(enable_autocast, shard_strategy_class): + shard_strategy = shard_strategy_class() + + get_components_func = non_distributed_component_funcs.get_callable('hanging_param_model') + _, train_dataloader, _, optimizer_class, _ = get_components_func() + criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) + + with ZeroInitContext(target_device=torch.device('cuda', torch.cuda.current_device()), + shard_strategy=shard_strategy, + shard_param=True): + zero_model = MoeModel(checkpoint=True) + zero_model = ShardedModelV2(zero_model, shard_strategy) + + # check whether parameters are identical in ddp + for name, p in zero_model.named_parameters(): + if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated: + assert_equal_in_group(p.colo_attr.data_payload) + + model = MoeModel(checkpoint=True).half() + col_model_deepcopy(zero_model, model) + model = model.cuda() + grad_handler = MoeGradientHandler(model) + + for i, (data, label) in enumerate(train_dataloader): + if i > 5: + break + + data, label = cast_tensor_to_fp16(data).cuda(), label.cuda() + run_fwd_bwd(model, data, label, criterion, enable_autocast) + run_fwd_bwd(zero_model, data, label, criterion, enable_autocast) + grad_handler.handle_gradient() + + check_grads_padding(model, zero_model, loose=True) + + +def run_dist(rank, world_size, port): + colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + MOE_CONTEXT.setup(seed=42) + run_model_test() + + +@pytest.mark.dist +@pytest.mark.parametrize("world_size", [2]) +@rerun_if_address_is_in_use() +def test_moe_zero_model(world_size): + spawn(run_dist, world_size) + + +if __name__ == '__main__': + test_moe_zero_model(world_size=2) From cde1a02c9d5a1f124780815d9a6fd41e244b79da Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 11 Aug 2023 15:44:42 +0800 Subject: [PATCH 04/15] fix zero --- .../low_level/bookkeeping/bucket_store.py | 55 ++++++++++++------- colossalai/zero/low_level/low_level_optim.py | 9 +++ 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/colossalai/zero/low_level/bookkeeping/bucket_store.py b/colossalai/zero/low_level/bookkeeping/bucket_store.py index 98f1b78d0049..8b7b1c119251 100644 --- a/colossalai/zero/low_level/bookkeeping/bucket_store.py +++ b/colossalai/zero/low_level/bookkeeping/bucket_store.py @@ -13,15 +13,20 @@ class BucketStore(BaseStore): def __init__(self, torch_pg: ProcessGroup): super().__init__(torch_pg) - # init and reset + # init self.current_group_id = 0 + self._num_elements_in_bucket = 0 # mapping gardient slices and parameter self.grad_to_param_mapping = dict() + self._grad_in_bucket = dict() self._param_list = [] self._padding_size = [] + for rank in range(self._world_size): + self._grad_in_bucket[rank] = [] - self.reset() + # offset_list records number of tensors in the bucket before each reduction + self.offset_list = [0] def num_elements_in_bucket(self) -> int: """Return the total number of elements in bucket @@ -32,6 +37,12 @@ def num_elements_in_bucket(self) -> int: return self._num_elements_in_bucket + def reset_num_elements_in_bucket(self): + """Set the number of elements in bucket to zero. + """ + + self._num_elements_in_bucket = 0 + def add_param_grad(self, group_id: int, param: Tensor, padding_size: int): """Add a param to bucket and record the padding size of a param for gradient padding @@ -46,28 +57,32 @@ def add_param_grad(self, group_id: int, param: Tensor, padding_size: int): self._num_elements_in_bucket += (param.numel() + padding_size) self.current_group_id = group_id + # number of tensors in current bucket + self.offset_list[-1] += 1 + def build_grad_in_bucket(self): """Orgnize parameters' gradient(padding and split), follows the paramters' splitting method Data structure of self._grad_in_bucket: { rank0: [grad0_rank0, grad1_rank0, ...] - rank1: [grad1_rank1, grad1_rank1, ...] + rank1: [grad0_rank1, grad1_rank1, ...] } """ - for param, padding_size in zip(self._param_list, self._padding_size): - with torch.no_grad(): - grad = param.grad.detach().flatten() - if padding_size > 0: - grad = torch.nn.functional.pad(grad, [0, padding_size]) - grad_list = grad.split(grad.numel() // self._world_size) - for rank in range(self._world_size): - grad_current_rank = grad_list[rank].detach() - self.grad_to_param_mapping[id(grad_current_rank)] = id(param) - self._grad_in_bucket[rank].append(grad_current_rank) + grad = param.grad.clone().detach().flatten() + if padding_size > 0: + with torch.no_grad(): + grad = torch.nn.functional.pad(grad.view(-1), [0, padding_size]) + grad_list = grad.split(grad.numel() // self._world_size) + for rank in range(self._world_size): + grad_current_rank = grad_list[rank].clone().detach() + self.grad_to_param_mapping[id(grad_current_rank)] = id(param) + self._grad_in_bucket[rank].append(grad_current_rank) param.grad = None + self.offset_list.append(0) + def get_grad(self) -> Dict: """Return the dictionary of gradients slices, of which the keys are ranks @@ -104,10 +119,12 @@ def get_param_id_of_grad(self, grad: Tensor) -> int: return self.grad_to_param_mapping[id(grad)] def reset(self): - self.grad_to_param_mapping = dict() - self._num_elements_in_bucket = 0 - self._param_list = [] - self._padding_size = [] - self._grad_in_bucket = dict() + """Reset the bucket storage after reduction, only release the tensors have been reduced + """ + cur_offset = self.offset_list.pop(0) + self._param_list = self._param_list[cur_offset:] + self._padding_size = self._padding_size[cur_offset:] + for _ in range(cur_offset): + del self.grad_to_param_mapping[next(iter(self.grad_to_param_mapping))] for rank in range(self._world_size): - self._grad_in_bucket[rank] = [] + self._grad_in_bucket[rank] = self._grad_in_bucket[rank][cur_offset:] \ No newline at end of file diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 2b3f50ed4fd4..64d6a5395120 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -242,10 +242,19 @@ def _attach_reduction_hook(self): def _run_reduction(self): if self._bucket_store.num_elements_in_bucket() > 0: self._bucket_store.build_grad_in_bucket() + flat_grads = self._bucket_store.get_flatten_grad() flat_grads /= self._world_size + + # ready to add other tensors to bucket + self._bucket_store.reset_num_elements_in_bucket() + if self._overlap_communication: stream = self._comm_stream + # in case of the memory being reused in the default stream + flat_grads.record_stream(stream) + # waiting for ops in the default stream finishing + stream.wait_stream(torch.cuda.current_stream()) else: stream = torch.cuda.current_stream() From 6958d848c7438f5976bb4fdbf4897deed8a2258f Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 11 Aug 2023 16:24:44 +0800 Subject: [PATCH 05/15] update moe tests for forward and backward --- colossalai/zero/low_level/low_level_optim.py | 3 + tests/test_moe/test_moe_zero_init.py | 64 ++------- tests/test_moe/test_moe_zero_model.py | 132 +++++++++---------- tests/test_moe/test_moe_zero_model_new.py | 119 ----------------- 4 files changed, 76 insertions(+), 242 deletions(-) delete mode 100644 tests/test_moe/test_moe_zero_model_new.py diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 64d6a5395120..7c00f0c450c1 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -137,6 +137,9 @@ def __init__( for group_id, param_group in enumerate(self.optim.param_groups): group_params = list() for param in param_group['params']: + # skip moe param + if hasattr(param, "moe_info"): + continue if param.requires_grad: group_params.append(param) diff --git a/tests/test_moe/test_moe_zero_init.py b/tests/test_moe/test_moe_zero_init.py index 20d533cea024..adf11ea0ca53 100644 --- a/tests/test_moe/test_moe_zero_init.py +++ b/tests/test_moe/test_moe_zero_init.py @@ -1,27 +1,19 @@ import pytest -import torch import torch.nn as nn import colossalai +from colossalai.booster import Booster +from colossalai.booster.plugin import LowLevelZeroPlugin from colossalai.context import MOE_CONTEXT -from colossalai.logging import get_dist_logger from colossalai.nn import CheckpointModule from colossalai.nn.layer import MoeModule -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn -from colossalai.utils import get_current_device -from colossalai.zero.legacy.init_ctx import ZeroInitContext -from colossalai.zero.legacy.shard_utils import BucketTensorShardStrategy, TensorShardStrategy +from colossalai.testing import rerun_if_address_is_in_use, spawn + CONFIG = dict(fp16=dict(mode=None,), - zero=dict(level=3, - verbose=False, - offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), - offload_param_config=dict(device='cpu', - pin_memory=True, - buffer_count=5, - buffer_size=1e8, - max_in_cpu=1e9)), + zero=dict(level=2), parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) + class MoeModel(nn.Module): def __init__(self, checkpoint: bool = False): @@ -58,45 +50,17 @@ def forward(self, x): return x -@parameterize("init_device_type", ['cpu', 'cuda']) -@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) -def run_moe_zero_init(init_device_type, shard_strategy_class): - logger = get_dist_logger("test_moe_zero_init") - - if init_device_type == 'cuda': - init_device = get_current_device() - elif init_device_type == 'cpu': - init_device = torch.device("cpu") - else: - raise NotImplementedError("Unknown device found.") +def run_moe_zero_init(): + model = MoeModel(checkpoint=True) + plugin = LowLevelZeroPlugin(initial_scale=2**5) + booster = Booster(plugin=plugin) + model = booster.boost(model)[0] - model_numel_tensor = torch.zeros(1, dtype=torch.int) - with ZeroInitContext(target_device=init_device, - shard_strategy=shard_strategy_class(), - shard_param=True, - model_numel_tensor=model_numel_tensor): - model = MoeModel(checkpoint=True) + # assert local expert number + assert len(model.module.test_transform.moe.moe_layer.experts.experts) == 8 // MOE_CONTEXT.world_size for name, param in model.named_parameters(): - assert hasattr(param, 'colo_attr') - - # the parameters in moe experts and its gate should not be sharded - if ('experts' in name) or ('gate' in name) or ('residual_combine' in name): - assert not param.colo_attr.sharded_data_tensor.is_sharded, "`{}` parameter has problem".format(name) - else: - assert param.colo_attr.sharded_data_tensor.is_sharded - - # the parameters in moe experts is not replicated - if 'experts' in name: - assert not param.colo_attr.is_replicated - else: - assert param.colo_attr.is_replicated - - if param.colo_attr.param_is_sharded: - assert param.colo_attr.data_payload.device.type == init_device.type, \ - f'{param.colo_attr.data_payload.device.type} vs. {init_device.type}' - else: - assert param.colo_attr.data_payload.device.type == 'cuda' + print(name, param) def _run_dist(rank, world_size, port): diff --git a/tests/test_moe/test_moe_zero_model.py b/tests/test_moe/test_moe_zero_model.py index e975b1896299..ab09d68505f7 100644 --- a/tests/test_moe/test_moe_zero_model.py +++ b/tests/test_moe/test_moe_zero_model.py @@ -1,56 +1,38 @@ import pytest import torch -import torch.distributed as dist import colossalai +from colossalai.booster import Booster +from colossalai.booster.plugin import LowLevelZeroPlugin +from colossalai.booster.plugin.low_level_zero_plugin import LowLevelZeroModel from colossalai.context import MOE_CONTEXT from colossalai.engine.gradient_handler import MoeGradientHandler from colossalai.nn import MoeLoss -from colossalai.testing import assert_equal_in_group, parameterize, rerun_if_address_is_in_use, spawn -from colossalai.zero.legacy.init_ctx import ZeroInitContext -from colossalai.zero.legacy.shard_utils import BucketTensorShardStrategy, TensorShardStrategy -from colossalai.zero.legacy.sharded_model import ShardedModelV2 -from colossalai.zero.legacy.sharded_model._utils import cast_tensor_to_fp16 -from colossalai.zero.legacy.sharded_model.utils import col_model_deepcopy -from tests.components_to_test.registry import non_distributed_component_funcs +from colossalai.testing import rerun_if_address_is_in_use, spawn +from colossalai.testing.random import seed_all from tests.test_moe.test_moe_zero_init import MoeModel + def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool: if loose: return torch.allclose(tensor_a, tensor_b, atol=1e-2, rtol=1e-3) return torch.allclose(tensor_a, tensor_b) -CONFIG = dict(fp16=dict(mode=None,), - zero=dict(level=3, - verbose=False, - offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), - offload_param_config=dict(device='cpu', - pin_memory=True, - buffer_count=5, - buffer_size=1e8, - max_in_cpu=1e9)), - parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) - -def check_grads_padding(model, zero_model, loose=False): - rank = dist.get_rank() - for (name, p), (zero_name, zero_p) in zip(model.named_parameters(), zero_model.named_parameters()): - # zero_grad = zero_p.grad.clone().to(p.device) - if zero_p.colo_attr.is_replicated: - zero_grad = zero_p.colo_attr.grad_payload.clone().to(p.device) - chunks = torch.flatten(p.grad).chunk(dist.get_world_size()) - if rank >= len(chunks): - continue - grad = chunks[rank].float() - if zero_grad.size(0) > grad.size(0): - zero_grad = zero_grad[:grad.size(0)] - else: - zero_grad = zero_p.colo_attr.grad_payload - grad = p.grad.to(zero_grad.dtype) - assert grad.dtype == zero_grad.dtype - assert allclose(grad, zero_grad, loose=loose), f'diff: {grad - zero_grad}' +CONFIG = dict(zero=dict(level=2), parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) + + +def split_ddp_grad(grad, world_size): + with torch.no_grad(): + grad = grad.clone().detach().flatten() + padding_size = (world_size - grad.numel() % world_size) % world_size + if padding_size > 0: + grad = torch.nn.functional.pad(grad, [0, padding_size]) + splited_grad = grad.split(grad.numel() // world_size) + return splited_grad -def run_fwd_bwd(model, data, label, criterion, enable_autocast=False): + +def run_fwd_bwd(model, data, label, criterion, optimizer, enable_autocast=False): model.train() with torch.cuda.amp.autocast(enabled=enable_autocast): if criterion: @@ -59,53 +41,57 @@ def run_fwd_bwd(model, data, label, criterion, enable_autocast=False): else: loss = model(data, label) loss = loss.float() - if isinstance(model, ShardedModelV2): - model.backward(loss) + + if isinstance(model, LowLevelZeroModel): + optimizer.backward(loss) else: loss.backward() -@parameterize("enable_autocast", [False]) -@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) -def run_model_test(enable_autocast, shard_strategy_class): - shard_strategy = shard_strategy_class() - - get_components_func = non_distributed_component_funcs.get_callable('hanging_param_model') - _, train_dataloader, _, optimizer_class, _ = get_components_func() +def run_zero_test(local_rank, world_size, stage=1): criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) - with ZeroInitContext(target_device=torch.device('cuda', torch.cuda.current_device()), - shard_strategy=shard_strategy, - shard_param=True): - zero_model = MoeModel(checkpoint=True) - zero_model = ShardedModelV2(zero_model, shard_strategy) - - # check whether parameters are identical in ddp - for name, p in zero_model.named_parameters(): - if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated: - assert_equal_in_group(p.colo_attr.data_payload) - - model = MoeModel(checkpoint=True).half() - col_model_deepcopy(zero_model, model) - model = model.cuda() - grad_handler = MoeGradientHandler(model) - - for i, (data, label) in enumerate(train_dataloader): - if i > 5: - break - - data, label = cast_tensor_to_fp16(data).cuda(), label.cuda() - run_fwd_bwd(model, data, label, criterion, enable_autocast) - run_fwd_bwd(zero_model, data, label, criterion, enable_autocast) - grad_handler.handle_gradient() - - check_grads_padding(model, zero_model, loose=True) + zero_model = MoeModel(checkpoint=True) + optimizer = torch.optim.Adam(zero_model.parameters()) + plugin = LowLevelZeroPlugin(stage=stage, precision="fp32") + booster = Booster(plugin=plugin) + zero_model, optimizer, _, _, _ = booster.boost(zero_model, optimizer) + + torch_model = MoeModel(checkpoint=True) + for zero_param, torch_param in zip(zero_model.parameters(), torch_model.parameters()): + torch_param.data.copy_(zero_param.data) + torch_model = torch_model.cuda() + grad_handler = MoeGradientHandler(torch_model) + + data = torch.randn(16, 4).cuda() + label = torch.randint(0, 4, (16,)).cuda() + + run_fwd_bwd(torch_model, data, label, criterion, None) + run_fwd_bwd(zero_model, data, label, criterion, optimizer) + grad_handler.handle_gradient() + + for (zero_name, zero_param), (torch_name, torch_param) in zip(zero_model.module.named_parameters(), + torch_model.named_parameters()): + assert zero_name == torch_name + zero_grad_list = optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(zero_param)) + if hasattr(zero_param, "moe_info"): + assert len(zero_grad_list) == 0 + assert torch.allclose(zero_grad, torch_grad) + else: + assert len(zero_grad_list) > 0 + torch_grad_list = split_ddp_grad(torch_param.grad, world_size) + if stage == 2: + torch_grad_list = torch_grad_list[local_rank:local_rank + 1] + for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list): + assert torch.allclose(zero_grad, torch_grad) def run_dist(rank, world_size, port): colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') MOE_CONTEXT.setup(seed=42) - run_model_test() + seed_all(42 + rank) + run_zero_test(rank, world_size, stage=1) + run_zero_test(rank, world_size, stage=2) @pytest.mark.dist diff --git a/tests/test_moe/test_moe_zero_model_new.py b/tests/test_moe/test_moe_zero_model_new.py deleted file mode 100644 index e975b1896299..000000000000 --- a/tests/test_moe/test_moe_zero_model_new.py +++ /dev/null @@ -1,119 +0,0 @@ -import pytest -import torch -import torch.distributed as dist - -import colossalai -from colossalai.context import MOE_CONTEXT -from colossalai.engine.gradient_handler import MoeGradientHandler -from colossalai.nn import MoeLoss -from colossalai.testing import assert_equal_in_group, parameterize, rerun_if_address_is_in_use, spawn -from colossalai.zero.legacy.init_ctx import ZeroInitContext -from colossalai.zero.legacy.shard_utils import BucketTensorShardStrategy, TensorShardStrategy -from colossalai.zero.legacy.sharded_model import ShardedModelV2 -from colossalai.zero.legacy.sharded_model._utils import cast_tensor_to_fp16 -from colossalai.zero.legacy.sharded_model.utils import col_model_deepcopy -from tests.components_to_test.registry import non_distributed_component_funcs -from tests.test_moe.test_moe_zero_init import MoeModel - -def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool: - if loose: - return torch.allclose(tensor_a, tensor_b, atol=1e-2, rtol=1e-3) - return torch.allclose(tensor_a, tensor_b) - -CONFIG = dict(fp16=dict(mode=None,), - zero=dict(level=3, - verbose=False, - offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), - offload_param_config=dict(device='cpu', - pin_memory=True, - buffer_count=5, - buffer_size=1e8, - max_in_cpu=1e9)), - parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) - -def check_grads_padding(model, zero_model, loose=False): - rank = dist.get_rank() - for (name, p), (zero_name, zero_p) in zip(model.named_parameters(), zero_model.named_parameters()): - # zero_grad = zero_p.grad.clone().to(p.device) - if zero_p.colo_attr.is_replicated: - zero_grad = zero_p.colo_attr.grad_payload.clone().to(p.device) - chunks = torch.flatten(p.grad).chunk(dist.get_world_size()) - if rank >= len(chunks): - continue - grad = chunks[rank].float() - if zero_grad.size(0) > grad.size(0): - zero_grad = zero_grad[:grad.size(0)] - else: - zero_grad = zero_p.colo_attr.grad_payload - grad = p.grad.to(zero_grad.dtype) - - assert grad.dtype == zero_grad.dtype - assert allclose(grad, zero_grad, loose=loose), f'diff: {grad - zero_grad}' - -def run_fwd_bwd(model, data, label, criterion, enable_autocast=False): - model.train() - with torch.cuda.amp.autocast(enabled=enable_autocast): - if criterion: - y = model(data) - loss = criterion(y, label) - else: - loss = model(data, label) - loss = loss.float() - if isinstance(model, ShardedModelV2): - model.backward(loss) - else: - loss.backward() - - -@parameterize("enable_autocast", [False]) -@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) -def run_model_test(enable_autocast, shard_strategy_class): - shard_strategy = shard_strategy_class() - - get_components_func = non_distributed_component_funcs.get_callable('hanging_param_model') - _, train_dataloader, _, optimizer_class, _ = get_components_func() - criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) - - with ZeroInitContext(target_device=torch.device('cuda', torch.cuda.current_device()), - shard_strategy=shard_strategy, - shard_param=True): - zero_model = MoeModel(checkpoint=True) - zero_model = ShardedModelV2(zero_model, shard_strategy) - - # check whether parameters are identical in ddp - for name, p in zero_model.named_parameters(): - if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated: - assert_equal_in_group(p.colo_attr.data_payload) - - model = MoeModel(checkpoint=True).half() - col_model_deepcopy(zero_model, model) - model = model.cuda() - grad_handler = MoeGradientHandler(model) - - for i, (data, label) in enumerate(train_dataloader): - if i > 5: - break - - data, label = cast_tensor_to_fp16(data).cuda(), label.cuda() - run_fwd_bwd(model, data, label, criterion, enable_autocast) - run_fwd_bwd(zero_model, data, label, criterion, enable_autocast) - grad_handler.handle_gradient() - - check_grads_padding(model, zero_model, loose=True) - - -def run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - MOE_CONTEXT.setup(seed=42) - run_model_test() - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [2]) -@rerun_if_address_is_in_use() -def test_moe_zero_model(world_size): - spawn(run_dist, world_size) - - -if __name__ == '__main__': - test_moe_zero_model(world_size=2) From 9155aaab86b5b058cc148919c47d47d1c397e492 Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 11 Aug 2023 16:30:33 +0800 Subject: [PATCH 06/15] remove useless test --- tests/test_moe/test_moe_colo_init.py | 55 ---------------------------- 1 file changed, 55 deletions(-) delete mode 100644 tests/test_moe/test_moe_colo_init.py diff --git a/tests/test_moe/test_moe_colo_init.py b/tests/test_moe/test_moe_colo_init.py deleted file mode 100644 index a4ac52be99de..000000000000 --- a/tests/test_moe/test_moe_colo_init.py +++ /dev/null @@ -1,55 +0,0 @@ -import pytest -import torch -import torch.distributed as dist - -import colossalai -from colossalai.context import MOE_CONTEXT -from colossalai.tensor import ColoParameter -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn -from colossalai.utils import get_current_device -from colossalai.zero import ColoInitContext -from tests.test_moe.test_moe_zero_init import MoeModel -from tests.test_tensor.common_utils import debug_print - - -@parameterize("init_device_type", ['cpu', 'cuda']) -def exam_moe_colo_init(init_device_type): - world_size = dist.get_world_size() - - if init_device_type == 'cuda': - init_device = get_current_device() - elif init_device_type == 'cpu': - init_device = torch.device("cpu") - else: - raise NotImplementedError("Unknown device found.") - - with ColoInitContext(device=init_device): - model = MoeModel(checkpoint=True) - - for name, param in model.named_parameters(): - assert isinstance(param, ColoParameter), "parameter `{}` has an init problem".format(name) - - if hasattr(param, "moe_info"): - param.set_process_group(param.moe_info.pg) - - if hasattr(param, "moe_info"): - assert param.process_group.dp_world_size() == param.moe_info.dp_size - else: - assert param.process_group.dp_world_size() == world_size - - -def _run_dist(rank, world_size, port): - colossalai.launch(config=dict(parallel=dict(data=2)), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - MOE_CONTEXT.setup(seed=42) - exam_moe_colo_init() - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [4]) -@rerun_if_address_is_in_use() -def test_moe_colo_init(world_size): - spawn(_run_dist, world_size) - - -if __name__ == '__main__': - test_moe_colo_init(world_size=4) From 26e54930cf08a7cf36a9b445c23fdd6d8f026215 Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 11 Aug 2023 16:31:26 +0800 Subject: [PATCH 07/15] remove print --- tests/test_moe/test_moe_zero_init.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_moe/test_moe_zero_init.py b/tests/test_moe/test_moe_zero_init.py index adf11ea0ca53..5b3ec58475fa 100644 --- a/tests/test_moe/test_moe_zero_init.py +++ b/tests/test_moe/test_moe_zero_init.py @@ -59,8 +59,8 @@ def run_moe_zero_init(): # assert local expert number assert len(model.module.test_transform.moe.moe_layer.experts.experts) == 8 // MOE_CONTEXT.world_size - for name, param in model.named_parameters(): - print(name, param) + # for name, param in model.named_parameters(): + # print(name, param) def _run_dist(rank, world_size, port): From 3c51c4845a5b9a86aec0ed4710268055901d4c5d Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 11 Aug 2023 16:36:27 +0800 Subject: [PATCH 08/15] moe --- tests/test_moe/test_moe_zero_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_moe/test_moe_zero_model.py b/tests/test_moe/test_moe_zero_model.py index ab09d68505f7..0c017004c8b6 100644 --- a/tests/test_moe/test_moe_zero_model.py +++ b/tests/test_moe/test_moe_zero_model.py @@ -64,7 +64,7 @@ def run_zero_test(local_rank, world_size, stage=1): grad_handler = MoeGradientHandler(torch_model) data = torch.randn(16, 4).cuda() - label = torch.randint(0, 4, (16,)).cuda() + label = torch.randint(0, 4, (16,)).cuda() run_fwd_bwd(torch_model, data, label, criterion, None) run_fwd_bwd(zero_model, data, label, criterion, optimizer) From 8a02a4273ae478e225ee65c477069d7073d55bac Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 11 Aug 2023 16:37:48 +0800 Subject: [PATCH 09/15] code style --- tests/test_moe/test_moe_zero_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_moe/test_moe_zero_model.py b/tests/test_moe/test_moe_zero_model.py index 0c017004c8b6..ab09d68505f7 100644 --- a/tests/test_moe/test_moe_zero_model.py +++ b/tests/test_moe/test_moe_zero_model.py @@ -64,7 +64,7 @@ def run_zero_test(local_rank, world_size, stage=1): grad_handler = MoeGradientHandler(torch_model) data = torch.randn(16, 4).cuda() - label = torch.randint(0, 4, (16,)).cuda() + label = torch.randint(0, 4, (16,)).cuda() run_fwd_bwd(torch_model, data, label, criterion, None) run_fwd_bwd(zero_model, data, label, criterion, optimizer) From 16759bd5922bff6ec70f9ab44a303b940003c178 Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 11 Aug 2023 16:39:32 +0800 Subject: [PATCH 10/15] code style --- tests/test_moe/test_moe_checkpoint.py | 2 ++ tests/test_moe/test_moe_zero_optim.py | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/tests/test_moe/test_moe_checkpoint.py b/tests/test_moe/test_moe_checkpoint.py index 43c1772b315b..79e4741e8a18 100644 --- a/tests/test_moe/test_moe_checkpoint.py +++ b/tests/test_moe/test_moe_checkpoint.py @@ -11,6 +11,7 @@ from colossalai.utils import get_current_device from colossalai.zero import ColoInitContext from tests.test_moe.test_moe_zero_init import MoeModel + CONFIG = dict(fp16=dict(mode=None,), zero=dict(level=3, verbose=False, @@ -22,6 +23,7 @@ max_in_cpu=1e9)), parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) + def exam_moe_checkpoint(): with ColoInitContext(device=get_current_device()): model = MoeModel(checkpoint=True) diff --git a/tests/test_moe/test_moe_zero_optim.py b/tests/test_moe/test_moe_zero_optim.py index f2f3f0e6456d..9aa53732148a 100644 --- a/tests/test_moe/test_moe_zero_optim.py +++ b/tests/test_moe/test_moe_zero_optim.py @@ -18,10 +18,14 @@ from colossalai.zero.low_level._utils import has_inf_or_nan from tests.components_to_test.registry import non_distributed_component_funcs from tests.test_moe.test_moe_zero_init import MoeModel + + def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool: if loose: return torch.allclose(tensor_a, tensor_b, atol=1e-2, rtol=1e-3) return torch.allclose(tensor_a, tensor_b) + + def check_sharded_model_params(model, zero_model, loose=False, reuse_fp16_shard=False): rank = dist.get_rank() for (name, p), (zero_name, zero_p) in zip(model.named_parameters(), zero_model.named_parameters()): @@ -38,6 +42,8 @@ def check_sharded_model_params(model, zero_model, loose=False, reuse_fp16_shard= assert p.dtype == zero_p.dtype, "Parameter `{}`:\n{} vs {}".format(name, p.dtype, zero_p.dtype) assert allclose(p, zero_p, loose=loose), f'{p} vs {zero_p}' + + CONFIG = dict(fp16=dict(mode=None,), zero=dict(level=3, verbose=False, From f89e71a26b972143c1ae25221d8e75862e5bd7aa Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 11 Aug 2023 16:47:27 +0800 Subject: [PATCH 11/15] rename --- .../{test_moe_zero_model.py => test_moe_zero_12.py} | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) rename tests/test_moe/{test_moe_zero_model.py => test_moe_zero_12.py} (94%) diff --git a/tests/test_moe/test_moe_zero_model.py b/tests/test_moe/test_moe_zero_12.py similarity index 94% rename from tests/test_moe/test_moe_zero_model.py rename to tests/test_moe/test_moe_zero_12.py index ab09d68505f7..a357a5b958a8 100644 --- a/tests/test_moe/test_moe_zero_model.py +++ b/tests/test_moe/test_moe_zero_12.py @@ -46,6 +46,7 @@ def run_fwd_bwd(model, data, label, criterion, optimizer, enable_autocast=False) optimizer.backward(loss) else: loss.backward() + return y def run_zero_test(local_rank, world_size, stage=1): @@ -66,8 +67,9 @@ def run_zero_test(local_rank, world_size, stage=1): data = torch.randn(16, 4).cuda() label = torch.randint(0, 4, (16,)).cuda() - run_fwd_bwd(torch_model, data, label, criterion, None) - run_fwd_bwd(zero_model, data, label, criterion, optimizer) + torch_out = run_fwd_bwd(torch_model, data, label, criterion, None) + zero_out = run_fwd_bwd(zero_model, data, label, criterion, optimizer) + assert torch.allclose(torch_out, zero_out) grad_handler.handle_gradient() for (zero_name, zero_param), (torch_name, torch_param) in zip(zero_model.module.named_parameters(), From e8128e1da306a593c46d406b6ffa77adc5d7d8ae Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 11 Aug 2023 17:44:30 +0800 Subject: [PATCH 12/15] rename --- .../test_moe/{test_moe_zero_12.py => test_moe_zero_fwd_bwd.py} | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) rename tests/test_moe/{test_moe_zero_12.py => test_moe_zero_fwd_bwd.py} (96%) diff --git a/tests/test_moe/test_moe_zero_12.py b/tests/test_moe/test_moe_zero_fwd_bwd.py similarity index 96% rename from tests/test_moe/test_moe_zero_12.py rename to tests/test_moe/test_moe_zero_fwd_bwd.py index a357a5b958a8..0c4f9b9af634 100644 --- a/tests/test_moe/test_moe_zero_12.py +++ b/tests/test_moe/test_moe_zero_fwd_bwd.py @@ -78,12 +78,13 @@ def run_zero_test(local_rank, world_size, stage=1): zero_grad_list = optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(zero_param)) if hasattr(zero_param, "moe_info"): assert len(zero_grad_list) == 0 - assert torch.allclose(zero_grad, torch_grad) + assert torch.allclose(zero_param.grad, torch_param.grad) else: assert len(zero_grad_list) > 0 torch_grad_list = split_ddp_grad(torch_param.grad, world_size) if stage == 2: torch_grad_list = torch_grad_list[local_rank:local_rank + 1] + assert len(zero_grad_list) == len(torch_grad_list) for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list): assert torch.allclose(zero_grad, torch_grad) From eeb20b68ae894261b4d9de7a899dc500f385a847 Mon Sep 17 00:00:00 2001 From: oahzxl Date: Fri, 11 Aug 2023 17:50:33 +0800 Subject: [PATCH 13/15] remove useless func --- tests/test_moe/test_moe_zero_fwd_bwd.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/test_moe/test_moe_zero_fwd_bwd.py b/tests/test_moe/test_moe_zero_fwd_bwd.py index 0c4f9b9af634..c5e4d0bc96ee 100644 --- a/tests/test_moe/test_moe_zero_fwd_bwd.py +++ b/tests/test_moe/test_moe_zero_fwd_bwd.py @@ -12,13 +12,6 @@ from colossalai.testing.random import seed_all from tests.test_moe.test_moe_zero_init import MoeModel - -def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool: - if loose: - return torch.allclose(tensor_a, tensor_b, atol=1e-2, rtol=1e-3) - return torch.allclose(tensor_a, tensor_b) - - CONFIG = dict(zero=dict(level=2), parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) From a50d4667ff4a618534318308af5ac983dee1eb09 Mon Sep 17 00:00:00 2001 From: oahzxl Date: Mon, 14 Aug 2023 10:06:13 +0800 Subject: [PATCH 14/15] update param check --- tests/test_moe/test_moe_zero_fwd_bwd.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_moe/test_moe_zero_fwd_bwd.py b/tests/test_moe/test_moe_zero_fwd_bwd.py index c5e4d0bc96ee..7f2ef4007f03 100644 --- a/tests/test_moe/test_moe_zero_fwd_bwd.py +++ b/tests/test_moe/test_moe_zero_fwd_bwd.py @@ -57,6 +57,11 @@ def run_zero_test(local_rank, world_size, stage=1): torch_model = torch_model.cuda() grad_handler = MoeGradientHandler(torch_model) + for (torch_name, torch_param), (zero_name, zero_param) in zip(torch_model.named_parameters(), + zero_model.module.named_parameters()): + assert zero_name == torch_name + assert torch.allclose(zero_param.data, torch_param.data) + data = torch.randn(16, 4).cuda() label = torch.randint(0, 4, (16,)).cuda() From cdbf1dc8d63520f6236f3ebf7d3de511bc9141fb Mon Sep 17 00:00:00 2001 From: oahzxl Date: Mon, 14 Aug 2023 14:30:50 +0800 Subject: [PATCH 15/15] update utils and config --- .../{test_moe_zero_init.py => moe_utils.py} | 39 ------------------- tests/test_moe/test_grad_handler.py | 3 +- tests/test_moe/test_kernel.py | 3 +- tests/test_moe/test_moe_checkpoint.py | 15 +------ tests/test_moe/test_moe_group.py | 3 +- tests/test_moe/test_moe_zero_fwd_bwd.py | 8 ++-- tests/test_moe/test_moe_zero_optim.py | 16 +------- 7 files changed, 11 insertions(+), 76 deletions(-) rename tests/test_moe/{test_moe_zero_init.py => moe_utils.py} (50%) diff --git a/tests/test_moe/test_moe_zero_init.py b/tests/test_moe/moe_utils.py similarity index 50% rename from tests/test_moe/test_moe_zero_init.py rename to tests/test_moe/moe_utils.py index 5b3ec58475fa..4b067c1ceea9 100644 --- a/tests/test_moe/test_moe_zero_init.py +++ b/tests/test_moe/moe_utils.py @@ -1,17 +1,8 @@ -import pytest import torch.nn as nn -import colossalai -from colossalai.booster import Booster -from colossalai.booster.plugin import LowLevelZeroPlugin from colossalai.context import MOE_CONTEXT from colossalai.nn import CheckpointModule from colossalai.nn.layer import MoeModule -from colossalai.testing import rerun_if_address_is_in_use, spawn - -CONFIG = dict(fp16=dict(mode=None,), - zero=dict(level=2), - parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) class MoeModel(nn.Module): @@ -48,33 +39,3 @@ def forward(self, x): MOE_CONTEXT.add_loss(y) return x - - -def run_moe_zero_init(): - model = MoeModel(checkpoint=True) - plugin = LowLevelZeroPlugin(initial_scale=2**5) - booster = Booster(plugin=plugin) - model = booster.boost(model)[0] - - # assert local expert number - assert len(model.module.test_transform.moe.moe_layer.experts.experts) == 8 // MOE_CONTEXT.world_size - - # for name, param in model.named_parameters(): - # print(name, param) - - -def _run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - MOE_CONTEXT.setup(seed=42) - run_moe_zero_init() - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [2, 4]) -@rerun_if_address_is_in_use() -def test_moe_zero_init(world_size): - spawn(_run_dist, world_size) - - -if __name__ == '__main__': - test_moe_zero_init(world_size=2) diff --git a/tests/test_moe/test_grad_handler.py b/tests/test_moe/test_grad_handler.py index e7002a75f3f7..a4473bf8eea4 100644 --- a/tests/test_moe/test_grad_handler.py +++ b/tests/test_moe/test_grad_handler.py @@ -13,11 +13,10 @@ BATCH_SIZE = 4 DIM = 16 -CONFIG = dict() def run_test(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') expert_module = nn.Linear expert_factor = dict(in_features=DIM, out_features=DIM, device=get_current_device()) diff --git a/tests/test_moe/test_kernel.py b/tests/test_moe/test_kernel.py index 39603c158731..9d11fd9bcd6d 100644 --- a/tests/test_moe/test_kernel.py +++ b/tests/test_moe/test_kernel.py @@ -12,7 +12,6 @@ BATCH_SIZE = 16 NUM_EXPERTS = 4 -CONFIG = dict() def check_equal(tensor_a, tensor_b, atol=1e-06): @@ -23,7 +22,7 @@ def run_routing(rank, world_size, port, rs=2, hidden_size=128, data_type=torch.f # Here we do not need TF32, since it brings absolute error on results torch.backends.cuda.matmul.allow_tf32 = False - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') local_rank = gpc.get_local_rank(ParallelMode.GLOBAL) MOE_CONTEXT.setup(42) # MOE environment initialization diff --git a/tests/test_moe/test_moe_checkpoint.py b/tests/test_moe/test_moe_checkpoint.py index 79e4741e8a18..df0fa164c068 100644 --- a/tests/test_moe/test_moe_checkpoint.py +++ b/tests/test_moe/test_moe_checkpoint.py @@ -10,18 +10,7 @@ from colossalai.testing import rerun_if_address_is_in_use, spawn from colossalai.utils import get_current_device from colossalai.zero import ColoInitContext -from tests.test_moe.test_moe_zero_init import MoeModel - -CONFIG = dict(fp16=dict(mode=None,), - zero=dict(level=3, - verbose=False, - offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), - offload_param_config=dict(device='cpu', - pin_memory=True, - buffer_count=5, - buffer_size=1e8, - max_in_cpu=1e9)), - parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) +from tests.test_moe.moe_utils import MoeModel def exam_moe_checkpoint(): @@ -44,7 +33,7 @@ def exam_moe_checkpoint(): def _run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') MOE_CONTEXT.setup(seed=42) exam_moe_checkpoint() diff --git a/tests/test_moe/test_moe_group.py b/tests/test_moe/test_moe_group.py index 6dc3f5f18b6d..d073e0e5c08f 100644 --- a/tests/test_moe/test_moe_group.py +++ b/tests/test_moe/test_moe_group.py @@ -11,12 +11,11 @@ D_MODEL = 4 D_FF = 8 -CONFIG = dict() def run_test(rank, world_size, port): world_size = 4 - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') expert_module = nn.Linear expert_factor = dict(in_features=D_MODEL, out_features=D_FF, device=get_current_device()) diff --git a/tests/test_moe/test_moe_zero_fwd_bwd.py b/tests/test_moe/test_moe_zero_fwd_bwd.py index 7f2ef4007f03..83ec884b1515 100644 --- a/tests/test_moe/test_moe_zero_fwd_bwd.py +++ b/tests/test_moe/test_moe_zero_fwd_bwd.py @@ -10,9 +10,7 @@ from colossalai.nn import MoeLoss from colossalai.testing import rerun_if_address_is_in_use, spawn from colossalai.testing.random import seed_all -from tests.test_moe.test_moe_zero_init import MoeModel - -CONFIG = dict(zero=dict(level=2), parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) +from tests.test_moe.moe_utils import MoeModel def split_ddp_grad(grad, world_size): @@ -57,6 +55,8 @@ def run_zero_test(local_rank, world_size, stage=1): torch_model = torch_model.cuda() grad_handler = MoeGradientHandler(torch_model) + # assert zero model + assert len(zero_model.module.test_transform.moe.moe_layer.experts.experts) == 8 // MOE_CONTEXT.world_size for (torch_name, torch_param), (zero_name, zero_param) in zip(torch_model.named_parameters(), zero_model.module.named_parameters()): assert zero_name == torch_name @@ -88,7 +88,7 @@ def run_zero_test(local_rank, world_size, stage=1): def run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') MOE_CONTEXT.setup(seed=42) seed_all(42 + rank) run_zero_test(rank, world_size, stage=1) diff --git a/tests/test_moe/test_moe_zero_optim.py b/tests/test_moe/test_moe_zero_optim.py index 9aa53732148a..fbd64def9b41 100644 --- a/tests/test_moe/test_moe_zero_optim.py +++ b/tests/test_moe/test_moe_zero_optim.py @@ -17,7 +17,7 @@ from colossalai.zero.legacy.sharded_optim import ShardedOptimizerV2 from colossalai.zero.low_level._utils import has_inf_or_nan from tests.components_to_test.registry import non_distributed_component_funcs -from tests.test_moe.test_moe_zero_init import MoeModel +from tests.test_moe.moe_utils import MoeModel def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool: @@ -44,18 +44,6 @@ def check_sharded_model_params(model, zero_model, loose=False, reuse_fp16_shard= assert allclose(p, zero_p, loose=loose), f'{p} vs {zero_p}' -CONFIG = dict(fp16=dict(mode=None,), - zero=dict(level=3, - verbose=False, - offload_optimizer_config=dict(device='cpu', pin_memory=True, buffer_count=5, fast_init=False), - offload_param_config=dict(device='cpu', - pin_memory=True, - buffer_count=5, - buffer_size=1e8, - max_in_cpu=1e9)), - parallel=dict(pipeline=dict(size=1), tensor=dict(size=1, mode=None))) - - def _run_step(model, optimizer, data, label, criterion, grad_handler): model.train() optimizer.zero_grad() @@ -139,7 +127,7 @@ def _run_test_sharded_optim_v2(cpu_offload, def _run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') MOE_CONTEXT.setup(seed=42) _run_test_sharded_optim_v2()