diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 64d6a5395120..7c00f0c450c1 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -137,6 +137,9 @@ def __init__( for group_id, param_group in enumerate(self.optim.param_groups): group_params = list() for param in param_group['params']: + # skip moe param + if hasattr(param, "moe_info"): + continue if param.requires_grad: group_params.append(param) diff --git a/tests/test_moe/moe_utils.py b/tests/test_moe/moe_utils.py new file mode 100644 index 000000000000..4b067c1ceea9 --- /dev/null +++ b/tests/test_moe/moe_utils.py @@ -0,0 +1,41 @@ +import torch.nn as nn + +from colossalai.context import MOE_CONTEXT +from colossalai.nn import CheckpointModule +from colossalai.nn.layer import MoeModule + + +class MoeModel(nn.Module): + + def __init__(self, checkpoint: bool = False): + + class TestSubModule(CheckpointModule): + + def __init__(self): + super().__init__(checkpoint) + expert_cls = nn.Linear + expert_args_dict = dict(in_features=16, out_features=16) + self.moe = MoeModule(dim_model=16, + num_experts=8, + use_residual=True, + expert_cls=expert_cls, + **expert_args_dict) + self.proj = nn.Linear(16, 4) + + def _forward(self, x): + x, y = self.moe(x) + x = self.proj(x) + return x, y + + super().__init__() + self.test_embed = nn.Linear(4, 16) + self.test_transform = TestSubModule() + + def forward(self, x): + MOE_CONTEXT.reset_loss() + + x = self.test_embed(x) + x, y = self.test_transform(x) + + MOE_CONTEXT.add_loss(y) + return x diff --git a/tests/test_moe/test_grad_handler.py b/tests/test_moe/test_grad_handler.py index e7002a75f3f7..a4473bf8eea4 100644 --- a/tests/test_moe/test_grad_handler.py +++ b/tests/test_moe/test_grad_handler.py @@ -13,11 +13,10 @@ BATCH_SIZE = 4 DIM = 16 -CONFIG = dict() def run_test(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') expert_module = nn.Linear expert_factor = dict(in_features=DIM, out_features=DIM, device=get_current_device()) diff --git a/tests/test_moe/test_kernel.py b/tests/test_moe/test_kernel.py index 39603c158731..9d11fd9bcd6d 100644 --- a/tests/test_moe/test_kernel.py +++ b/tests/test_moe/test_kernel.py @@ -12,7 +12,6 @@ BATCH_SIZE = 16 NUM_EXPERTS = 4 -CONFIG = dict() def check_equal(tensor_a, tensor_b, atol=1e-06): @@ -23,7 +22,7 @@ def run_routing(rank, world_size, port, rs=2, hidden_size=128, data_type=torch.f # Here we do not need TF32, since it brings absolute error on results torch.backends.cuda.matmul.allow_tf32 = False - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') local_rank = gpc.get_local_rank(ParallelMode.GLOBAL) MOE_CONTEXT.setup(42) # MOE environment initialization diff --git a/tests/test_moe/test_moe_checkpoint.py b/tests/test_moe/test_moe_checkpoint.py index 8a0283ba71fc..df0fa164c068 100644 --- a/tests/test_moe/test_moe_checkpoint.py +++ b/tests/test_moe/test_moe_checkpoint.py @@ -10,8 +10,7 @@ from colossalai.testing import rerun_if_address_is_in_use, spawn from colossalai.utils import get_current_device from colossalai.zero import ColoInitContext -from tests.test_moe.test_moe_zero_init import MoeModel -from tests.test_zero.test_legacy.common import CONFIG +from tests.test_moe.moe_utils import MoeModel def exam_moe_checkpoint(): @@ -34,7 +33,7 @@ def exam_moe_checkpoint(): def _run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') MOE_CONTEXT.setup(seed=42) exam_moe_checkpoint() diff --git a/tests/test_moe/test_moe_colo_init.py b/tests/test_moe/test_moe_colo_init.py deleted file mode 100644 index 555338fcf9fc..000000000000 --- a/tests/test_moe/test_moe_colo_init.py +++ /dev/null @@ -1,56 +0,0 @@ -import pytest -import torch -import torch.distributed as dist - -import colossalai -from colossalai.context import MOE_CONTEXT -from colossalai.tensor import ColoParameter -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn -from colossalai.utils import get_current_device -from colossalai.zero import ColoInitContext -from tests.test_moe.test_moe_zero_init import MoeModel -from tests.test_tensor.common_utils import debug_print -from tests.test_zero.test_legacy.common import CONFIG - - -@parameterize("init_device_type", ['cpu', 'cuda']) -def exam_moe_colo_init(init_device_type): - world_size = dist.get_world_size() - - if init_device_type == 'cuda': - init_device = get_current_device() - elif init_device_type == 'cpu': - init_device = torch.device("cpu") - else: - raise NotImplementedError("Unknown device found.") - - with ColoInitContext(device=init_device): - model = MoeModel(checkpoint=True) - - for name, param in model.named_parameters(): - assert isinstance(param, ColoParameter), "parameter `{}` has an init problem".format(name) - - if hasattr(param, "moe_info"): - param.set_process_group(param.moe_info.pg) - - if hasattr(param, "moe_info"): - assert param.process_group.dp_world_size() == param.moe_info.dp_size - else: - assert param.process_group.dp_world_size() == world_size - - -def _run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - MOE_CONTEXT.setup(seed=42) - exam_moe_colo_init() - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [4]) -@rerun_if_address_is_in_use() -def test_moe_colo_init(world_size): - spawn(_run_dist, world_size) - - -if __name__ == '__main__': - test_moe_colo_init(world_size=4) diff --git a/tests/test_moe/test_moe_group.py b/tests/test_moe/test_moe_group.py index 6dc3f5f18b6d..d073e0e5c08f 100644 --- a/tests/test_moe/test_moe_group.py +++ b/tests/test_moe/test_moe_group.py @@ -11,12 +11,11 @@ D_MODEL = 4 D_FF = 8 -CONFIG = dict() def run_test(rank, world_size, port): world_size = 4 - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') expert_module = nn.Linear expert_factor = dict(in_features=D_MODEL, out_features=D_FF, device=get_current_device()) diff --git a/tests/test_moe/test_moe_zero_fwd_bwd.py b/tests/test_moe/test_moe_zero_fwd_bwd.py new file mode 100644 index 000000000000..83ec884b1515 --- /dev/null +++ b/tests/test_moe/test_moe_zero_fwd_bwd.py @@ -0,0 +1,106 @@ +import pytest +import torch + +import colossalai +from colossalai.booster import Booster +from colossalai.booster.plugin import LowLevelZeroPlugin +from colossalai.booster.plugin.low_level_zero_plugin import LowLevelZeroModel +from colossalai.context import MOE_CONTEXT +from colossalai.engine.gradient_handler import MoeGradientHandler +from colossalai.nn import MoeLoss +from colossalai.testing import rerun_if_address_is_in_use, spawn +from colossalai.testing.random import seed_all +from tests.test_moe.moe_utils import MoeModel + + +def split_ddp_grad(grad, world_size): + with torch.no_grad(): + grad = grad.clone().detach().flatten() + padding_size = (world_size - grad.numel() % world_size) % world_size + if padding_size > 0: + grad = torch.nn.functional.pad(grad, [0, padding_size]) + splited_grad = grad.split(grad.numel() // world_size) + return splited_grad + + +def run_fwd_bwd(model, data, label, criterion, optimizer, enable_autocast=False): + model.train() + with torch.cuda.amp.autocast(enabled=enable_autocast): + if criterion: + y = model(data) + loss = criterion(y, label) + else: + loss = model(data, label) + loss = loss.float() + + if isinstance(model, LowLevelZeroModel): + optimizer.backward(loss) + else: + loss.backward() + return y + + +def run_zero_test(local_rank, world_size, stage=1): + criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) + + zero_model = MoeModel(checkpoint=True) + optimizer = torch.optim.Adam(zero_model.parameters()) + plugin = LowLevelZeroPlugin(stage=stage, precision="fp32") + booster = Booster(plugin=plugin) + zero_model, optimizer, _, _, _ = booster.boost(zero_model, optimizer) + + torch_model = MoeModel(checkpoint=True) + for zero_param, torch_param in zip(zero_model.parameters(), torch_model.parameters()): + torch_param.data.copy_(zero_param.data) + torch_model = torch_model.cuda() + grad_handler = MoeGradientHandler(torch_model) + + # assert zero model + assert len(zero_model.module.test_transform.moe.moe_layer.experts.experts) == 8 // MOE_CONTEXT.world_size + for (torch_name, torch_param), (zero_name, zero_param) in zip(torch_model.named_parameters(), + zero_model.module.named_parameters()): + assert zero_name == torch_name + assert torch.allclose(zero_param.data, torch_param.data) + + data = torch.randn(16, 4).cuda() + label = torch.randint(0, 4, (16,)).cuda() + + torch_out = run_fwd_bwd(torch_model, data, label, criterion, None) + zero_out = run_fwd_bwd(zero_model, data, label, criterion, optimizer) + assert torch.allclose(torch_out, zero_out) + grad_handler.handle_gradient() + + for (zero_name, zero_param), (torch_name, torch_param) in zip(zero_model.module.named_parameters(), + torch_model.named_parameters()): + assert zero_name == torch_name + zero_grad_list = optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(zero_param)) + if hasattr(zero_param, "moe_info"): + assert len(zero_grad_list) == 0 + assert torch.allclose(zero_param.grad, torch_param.grad) + else: + assert len(zero_grad_list) > 0 + torch_grad_list = split_ddp_grad(torch_param.grad, world_size) + if stage == 2: + torch_grad_list = torch_grad_list[local_rank:local_rank + 1] + assert len(zero_grad_list) == len(torch_grad_list) + for zero_grad, torch_grad in zip(zero_grad_list, torch_grad_list): + assert torch.allclose(zero_grad, torch_grad) + + +def run_dist(rank, world_size, port): + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + MOE_CONTEXT.setup(seed=42) + seed_all(42 + rank) + run_zero_test(rank, world_size, stage=1) + run_zero_test(rank, world_size, stage=2) + + +@pytest.mark.dist +@pytest.mark.parametrize("world_size", [2]) +@rerun_if_address_is_in_use() +def test_moe_zero_model(world_size): + spawn(run_dist, world_size) + + +if __name__ == '__main__': + test_moe_zero_model(world_size=2) diff --git a/tests/test_moe/test_moe_zero_init.py b/tests/test_moe/test_moe_zero_init.py deleted file mode 100644 index 79722f9f4056..000000000000 --- a/tests/test_moe/test_moe_zero_init.py +++ /dev/null @@ -1,108 +0,0 @@ -import pytest -import torch -import torch.nn as nn - -import colossalai -from colossalai.context import MOE_CONTEXT -from colossalai.logging import get_dist_logger -from colossalai.nn import CheckpointModule -from colossalai.nn.layer import MoeModule -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn -from colossalai.utils import get_current_device -from colossalai.zero.legacy.init_ctx import ZeroInitContext -from colossalai.zero.legacy.shard_utils import BucketTensorShardStrategy, TensorShardStrategy -from tests.test_zero.test_legacy.common import CONFIG - - -class MoeModel(nn.Module): - - def __init__(self, checkpoint: bool = False): - - class TestSubModule(CheckpointModule): - - def __init__(self): - super().__init__(checkpoint) - expert_cls = nn.Linear - expert_args_dict = dict(in_features=16, out_features=16) - self.moe = MoeModule(dim_model=16, - num_experts=8, - use_residual=True, - expert_cls=expert_cls, - **expert_args_dict) - self.proj = nn.Linear(16, 4) - - def _forward(self, x): - x, y = self.moe(x) - x = self.proj(x) - return x, y - - super().__init__() - self.test_embed = nn.Linear(4, 16) - self.test_transform = TestSubModule() - - def forward(self, x): - MOE_CONTEXT.reset_loss() - - x = self.test_embed(x) - x, y = self.test_transform(x) - - MOE_CONTEXT.add_loss(y) - return x - - -@parameterize("init_device_type", ['cpu', 'cuda']) -@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) -def run_moe_zero_init(init_device_type, shard_strategy_class): - logger = get_dist_logger("test_moe_zero_init") - - if init_device_type == 'cuda': - init_device = get_current_device() - elif init_device_type == 'cpu': - init_device = torch.device("cpu") - else: - raise NotImplementedError("Unknown device found.") - - model_numel_tensor = torch.zeros(1, dtype=torch.int) - with ZeroInitContext(target_device=init_device, - shard_strategy=shard_strategy_class(), - shard_param=True, - model_numel_tensor=model_numel_tensor): - model = MoeModel(checkpoint=True) - - for name, param in model.named_parameters(): - assert hasattr(param, 'colo_attr') - - # the parameters in moe experts and its gate should not be sharded - if ('experts' in name) or ('gate' in name) or ('residual_combine' in name): - assert not param.colo_attr.sharded_data_tensor.is_sharded, "`{}` parameter has problem".format(name) - else: - assert param.colo_attr.sharded_data_tensor.is_sharded - - # the parameters in moe experts is not replicated - if 'experts' in name: - assert not param.colo_attr.is_replicated - else: - assert param.colo_attr.is_replicated - - if param.colo_attr.param_is_sharded: - assert param.colo_attr.data_payload.device.type == init_device.type, \ - f'{param.colo_attr.data_payload.device.type} vs. {init_device.type}' - else: - assert param.colo_attr.data_payload.device.type == 'cuda' - - -def _run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - MOE_CONTEXT.setup(seed=42) - run_moe_zero_init() - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [2, 4]) -@rerun_if_address_is_in_use() -def test_moe_zero_init(world_size): - spawn(_run_dist, world_size) - - -if __name__ == '__main__': - test_moe_zero_init(world_size=2) diff --git a/tests/test_moe/test_moe_zero_model.py b/tests/test_moe/test_moe_zero_model.py deleted file mode 100644 index ec37967f18c5..000000000000 --- a/tests/test_moe/test_moe_zero_model.py +++ /dev/null @@ -1,70 +0,0 @@ -import pytest -import torch - -import colossalai -from colossalai.context import MOE_CONTEXT -from colossalai.engine.gradient_handler import MoeGradientHandler -from colossalai.nn import MoeLoss -from colossalai.testing import assert_equal_in_group, parameterize, rerun_if_address_is_in_use, spawn -from colossalai.zero.legacy.init_ctx import ZeroInitContext -from colossalai.zero.legacy.shard_utils import BucketTensorShardStrategy, TensorShardStrategy -from colossalai.zero.legacy.sharded_model import ShardedModelV2 -from colossalai.zero.legacy.sharded_model._utils import cast_tensor_to_fp16 -from colossalai.zero.legacy.sharded_model.utils import col_model_deepcopy -from tests.components_to_test.registry import non_distributed_component_funcs -from tests.test_moe.test_moe_zero_init import MoeModel -from tests.test_zero.test_legacy.common import CONFIG, check_grads_padding, run_fwd_bwd - - -@parameterize("enable_autocast", [False]) -@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy]) -def run_model_test(enable_autocast, shard_strategy_class): - shard_strategy = shard_strategy_class() - - get_components_func = non_distributed_component_funcs.get_callable('hanging_param_model') - _, train_dataloader, _, optimizer_class, _ = get_components_func() - criterion = MoeLoss(aux_weight=0.01, loss_fn=torch.nn.CrossEntropyLoss) - - with ZeroInitContext(target_device=torch.device('cuda', torch.cuda.current_device()), - shard_strategy=shard_strategy, - shard_param=True): - zero_model = MoeModel(checkpoint=True) - zero_model = ShardedModelV2(zero_model, shard_strategy) - - # check whether parameters are identical in ddp - for name, p in zero_model.named_parameters(): - if not p.colo_attr.param_is_sharded and p.colo_attr.is_replicated: - assert_equal_in_group(p.colo_attr.data_payload) - - model = MoeModel(checkpoint=True).half() - col_model_deepcopy(zero_model, model) - model = model.cuda() - grad_handler = MoeGradientHandler(model) - - for i, (data, label) in enumerate(train_dataloader): - if i > 5: - break - - data, label = cast_tensor_to_fp16(data).cuda(), label.cuda() - run_fwd_bwd(model, data, label, criterion, enable_autocast) - run_fwd_bwd(zero_model, data, label, criterion, enable_autocast) - grad_handler.handle_gradient() - - check_grads_padding(model, zero_model, loose=True) - - -def run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - MOE_CONTEXT.setup(seed=42) - run_model_test() - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [2]) -@rerun_if_address_is_in_use() -def test_moe_zero_model(world_size): - spawn(run_dist, world_size) - - -if __name__ == '__main__': - test_moe_zero_model(world_size=2) diff --git a/tests/test_moe/test_moe_zero_optim.py b/tests/test_moe/test_moe_zero_optim.py index efc6e9ddae27..fbd64def9b41 100644 --- a/tests/test_moe/test_moe_zero_optim.py +++ b/tests/test_moe/test_moe_zero_optim.py @@ -1,5 +1,6 @@ import pytest import torch +import torch.distributed as dist import colossalai from colossalai.amp import convert_to_apex_amp @@ -16,8 +17,31 @@ from colossalai.zero.legacy.sharded_optim import ShardedOptimizerV2 from colossalai.zero.low_level._utils import has_inf_or_nan from tests.components_to_test.registry import non_distributed_component_funcs -from tests.test_moe.test_moe_zero_init import MoeModel -from tests.test_zero.test_legacy.common import CONFIG, check_sharded_model_params +from tests.test_moe.moe_utils import MoeModel + + +def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool: + if loose: + return torch.allclose(tensor_a, tensor_b, atol=1e-2, rtol=1e-3) + return torch.allclose(tensor_a, tensor_b) + + +def check_sharded_model_params(model, zero_model, loose=False, reuse_fp16_shard=False): + rank = dist.get_rank() + for (name, p), (zero_name, zero_p) in zip(model.named_parameters(), zero_model.named_parameters()): + if zero_p.colo_attr.param_is_sharded: + zero_p = zero_p.colo_attr.data_payload.to(p.device).float() + chunks = torch.flatten(p).chunk(dist.get_world_size()) + if rank >= len(chunks): + continue + p = chunks[rank].float() + if zero_p.size(0) > p.size(0): + zero_p = zero_p[:p.size(0)] + else: + zero_p = zero_p.colo_attr.data_payload.to(p.device) + + assert p.dtype == zero_p.dtype, "Parameter `{}`:\n{} vs {}".format(name, p.dtype, zero_p.dtype) + assert allclose(p, zero_p, loose=loose), f'{p} vs {zero_p}' def _run_step(model, optimizer, data, label, criterion, grad_handler): @@ -103,7 +127,7 @@ def _run_test_sharded_optim_v2(cpu_offload, def _run_dist(rank, world_size, port): - colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') MOE_CONTEXT.setup(seed=42) _run_test_sharded_optim_v2()