From b6ea9e70fb1e4b59591f09fccd5138f5cb8017e0 Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Wed, 12 Jun 2024 05:18:29 +0000 Subject: [PATCH 1/5] [moe refactor] update unit test with the refactored ZeRO and remove useless test --- .../zero/low_level/low_level_strategy.py | 4 +- tests/test_moe/test_moe_zero_fwd_bwd.py | 107 --------------- tests/test_moe/test_moe_zero_fwd_bwd_optim.py | 62 ++++----- tests/test_moe/test_moe_zero_optim.py | 125 ------------------ 4 files changed, 34 insertions(+), 264 deletions(-) delete mode 100644 tests/test_moe/test_moe_zero_fwd_bwd.py delete mode 100644 tests/test_moe/test_moe_zero_optim.py diff --git a/colossalai/zero/low_level/low_level_strategy.py b/colossalai/zero/low_level/low_level_strategy.py index 16effac9c80a..7298ef543eae 100644 --- a/colossalai/zero/low_level/low_level_strategy.py +++ b/colossalai/zero/low_level/low_level_strategy.py @@ -66,7 +66,9 @@ def __init__( # it will not manage the tensors used by mixed precision training self._param_store = ParameterStore(process_group) self._grad_store = GradientStore(process_group, partition_grad=partition_grad) - self._bucket_store = BucketStore(process_group) + self._bucket_store = BucketStore( + process_group, reduce_bucket_size=reduce_bucket_size, overlap_communication=overlap_communication + ) # working and master params for mixed precision training group_params = [] diff --git a/tests/test_moe/test_moe_zero_fwd_bwd.py b/tests/test_moe/test_moe_zero_fwd_bwd.py deleted file mode 100644 index c0722881bfcd..000000000000 --- a/tests/test_moe/test_moe_zero_fwd_bwd.py +++ /dev/null @@ -1,107 +0,0 @@ -import pytest -import torch -import torch.distributed as dist -from torch.nn.parallel import DistributedDataParallel as DDP - -import colossalai -from colossalai.moe.manager import MOE_MANAGER -from colossalai.tensor.moe_tensor.api import is_moe_tensor -from colossalai.testing import rerun_if_address_is_in_use, spawn -from colossalai.testing.random import seed_all -from colossalai.zero.low_level.low_level_optim import LowLevelZeroOptimizer -from colossalai.zero.low_level.low_level_strategy import LowLevelOptStrategy, MoeZeroStrategy -from tests.test_moe.moe_utils import MoeModel, delete_moe_info, loose_close, sync_local_from_ep - - -def run_zero_test(local_rank): - dp_size = world_size = dist.get_world_size() - assert world_size >= 4, f"{world_size=}: at least 4 processes are required for this test (ep=2, moe_dp=2)" - criterion = torch.nn.CrossEntropyLoss() - - ep_size = 2 - extra_dp_size = world_size // ep_size - - MOE_MANAGER.__init__() - MOE_MANAGER.setup(parallel="EP", mode="fixed", fixed_dp_size=extra_dp_size, fixed_ep_size=ep_size, fixed_pp_size=1) - - zero_model = MoeModel().bfloat16().cuda() - - dp_group = dist.group.WORLD - ep_group = MOE_MANAGER.parallel_info_dict[ep_size].ep_group - moe_extra_dp_group = MOE_MANAGER.parallel_info_dict[ep_size].dp_group - - zero_params = list(filter(lambda x: not is_moe_tensor(x), zero_model.parameters())) - moe_params = list(filter(lambda x: is_moe_tensor(x), zero_model.parameters())) - print(f"{len(zero_params)=}, {len(moe_params)=}") - lr = 1e-3 - zero_optimizer = torch.optim.SGD(zero_model.parameters(), lr=lr) - zero_optimizer.param_groups.clear() - zero_optimizer.add_param_group({"params": zero_params}) - zero_optimizer.add_param_group({"params": moe_params}) - - strategies = [ - LowLevelOptStrategy( - param_group=zero_optimizer.param_groups[0], - process_group=dp_group, - overlap_communication=False, - partition_grad=True, - ), - MoeZeroStrategy( - param_group=zero_optimizer.param_groups[1], - process_group=moe_extra_dp_group, - overlap_communication=True, - partition_grad=False, - ), - ] - zero_optimizer = LowLevelZeroOptimizer( - zero_optimizer, - strategies, - ) - - MOE_MANAGER.__init__() - MOE_MANAGER.setup(parallel=None) - ddp_model = DDP(MoeModel().bfloat16().cuda(), static_graph=True) - delete_moe_info(ddp_model) - torch_optim = torch.optim.SGD(ddp_model.parameters(), lr=lr) - sync_local_from_ep(ddp_model, zero_model) - - seed_all(42 + local_rank) - data = torch.randn(16, 4).bfloat16().cuda() - label = torch.randint(0, 4, (16,)).cuda() - - ddp_model.train() - zero_model.train() - ddp_out = criterion(ddp_model(data), label).float() - zero_out = criterion(zero_model(data), label).float() - assert torch.allclose(ddp_out, zero_out) - print(f"{local_rank=} {ddp_out.mean()=}") - - ddp_out.backward() - zero_optimizer.backward(zero_out) - - for (zero_name, zero_param), (ddp_name, ddp_param) in zip( - zero_model.named_parameters(), ddp_model.named_parameters() - ): - torch_grad = ddp_param.grad - zero_grad = zero_optimizer.get_param_grad(zero_param) - if is_moe_tensor(zero_param): - moe_grad_list = [torch.empty_like(zero_grad) for _ in range(ep_size)] - dist.all_gather(moe_grad_list, zero_grad, group=ep_group) - zero_grad = torch.cat(moe_grad_list, dim=0) - loose_close(torch_grad, zero_grad, dtype=torch_grad.dtype) - - -def run_dist(rank, world_size, port, stage): - colossalai.launch(config=dict(), rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl") - run_zero_test(rank, stage=stage) - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [4]) -@rerun_if_address_is_in_use() -def test_moe_zero_model(world_size): - spawn(run_dist, world_size) - - -if __name__ == "__main__": - test_moe_zero_model(world_size=4) diff --git a/tests/test_moe/test_moe_zero_fwd_bwd_optim.py b/tests/test_moe/test_moe_zero_fwd_bwd_optim.py index 7dcd3d19a734..126ddc6fea65 100644 --- a/tests/test_moe/test_moe_zero_fwd_bwd_optim.py +++ b/tests/test_moe/test_moe_zero_fwd_bwd_optim.py @@ -14,6 +14,7 @@ from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn from colossalai.testing.random import seed_all from colossalai.zero import LowLevelZeroOptimizer +from colossalai.zero.low_level.low_level_strategy import LowLevelOptStrategy, MoeZeroStrategy from tests.test_moe.moe_utils import loose_close tokens, n_experts = 7, 4 @@ -59,14 +60,30 @@ def run_zero_with_original_model(world_size, master_weights: bool, dtype: torch. zero_model = EPMixtralSparseMoeBlock.from_native_module(zero_model, ep_group=plugin.ep_group) zero_optimizer = torch.optim.SGD(zero_model.parameters(), lr=1) + zero_params = list(filter(lambda x: not is_moe_tensor(x), zero_model.parameters())) + moe_params = list(filter(lambda x: is_moe_tensor(x), zero_model.parameters())) + zero_optimizer.param_groups.clear() + zero_optimizer.add_param_group({"params": zero_params}) + zero_optimizer.add_param_group({"params": moe_params}) + strategies = [ + LowLevelOptStrategy( + param_group=zero_optimizer.param_groups[0], + process_group=plugin.global_dp_group, + overlap_communication=False, + partition_grad=(stage == 2), + ), + MoeZeroStrategy( + param_group=zero_optimizer.param_groups[1], + process_group=plugin.moe_dp_group, + overlap_communication=True, + partition_grad=(stage == 2), + ), + ] zero_optimizer = LowLevelZeroOptimizer( zero_optimizer, - overlap_communication=True, - initial_scale=1, - reduce_bucket_size=1024 * 1024, + strategies, master_weights=master_weights, - moe_extra_dp_process_group=plugin.moe_dp_group, - partition_grad=(stage == 2), + initial_scale=1, ) ori_optimizer = torch.optim.SGD(ori_model.parameters(), lr=1) @@ -89,34 +106,17 @@ def run_zero_with_original_model(world_size, master_weights: bool, dtype: torch. # check grad name_to_p = {n: p for n, p in ori_model.module.named_parameters()} - for n, p in zero_model.named_parameters(): - if is_moe_tensor(p): # moe param - if p.grad is None: - """ - For fixed input seed, the test input may cause a certain expert not to be routed to, - so its gradient is None instead of a tensor, which may lead to a potential bug. - TODO(haze188) fix later - """ - p.grad = torch.zeros_like(p) - continue - dist.all_reduce( - p.grad, group=plugin.moe_dp_group - ) # TODO(haze188) bug fix: this step should be finished by zero - p.grad = ( - p.grad / plugin.moe_dp_group.size() - ) # moe param scaling amoung the moe dp group, not the WORLD group. - loose_close(p.grad, name_to_p[n].grad, dtype=dtype) + zero_grad = zero_optimizer.get_param_grad(p) + if p.grad is None: + """ + For fixed input seed, the test input may cause a certain expert not to be routed to, + so its gradient is None instead of a tensor, which may lead to a potential bug. + """ + # TODO(haze188) fix later + p.grad = torch.zeros_like(p) continue - else: - zero_grad_list = zero_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(p)) - assert len(zero_grad_list) != 0 - ori_grad_list = split_grad(name_to_p[n].grad, world_size) - if stage == 2: - # Zero2 splits the gradient, and each rank holds the corresponding part - ori_grad_list = ori_grad_list[rank : rank + 1] - for zero_grad, torch_grad in zip(zero_grad_list, ori_grad_list): - loose_close(zero_grad, torch_grad, dtype=dtype) + loose_close(zero_grad, name_to_p[n].grad, dtype=dtype) # zero-dp step zero_optimizer.step() diff --git a/tests/test_moe/test_moe_zero_optim.py b/tests/test_moe/test_moe_zero_optim.py deleted file mode 100644 index 3bbd90fd6aac..000000000000 --- a/tests/test_moe/test_moe_zero_optim.py +++ /dev/null @@ -1,125 +0,0 @@ -import pytest -import torch -import torch.distributed as dist -from torch.nn.parallel import DistributedDataParallel as DDP - -import colossalai -from colossalai.moe.manager import MOE_MANAGER -from colossalai.tensor.moe_tensor.api import is_moe_tensor -from colossalai.testing import rerun_if_address_is_in_use, spawn -from colossalai.testing.random import seed_all -from colossalai.zero.low_level.low_level_optim import LowLevelZeroOptimizer -from colossalai.zero.low_level.low_level_strategy import LowLevelOptStrategy, MoeZeroStrategy -from tests.test_moe.moe_utils import MoeModel, delete_moe_info, loose_close, sync_local_from_ep - - -def run_zero_test(local_rank): - dp_size = world_size = dist.get_world_size() - assert world_size >= 4, f"{world_size=}: at least 4 processes are required for this test (ep=2, moe_dp=2)" - criterion = torch.nn.CrossEntropyLoss() - - ep_size = 2 - extra_dp_size = world_size // ep_size - - MOE_MANAGER.__init__() - MOE_MANAGER.setup(parallel="EP", mode="fixed", fixed_dp_size=extra_dp_size, fixed_ep_size=ep_size, fixed_pp_size=1) - - zero_model = MoeModel().bfloat16().cuda() - - dp_group = dist.group.WORLD - ep_group = MOE_MANAGER.parallel_info_dict[ep_size].ep_group - moe_extra_dp_group = MOE_MANAGER.parallel_info_dict[ep_size].dp_group - - zero_params = list(filter(lambda x: not is_moe_tensor(x), zero_model.parameters())) - moe_params = list(filter(lambda x: is_moe_tensor(x), zero_model.parameters())) - print(f"{len(zero_params)=}, {len(moe_params)=}") - lr = 1e-3 - zero_optimizer = torch.optim.SGD(zero_model.parameters(), lr=lr) - zero_optimizer.param_groups.clear() - zero_optimizer.add_param_group({"params": zero_params}) - zero_optimizer.add_param_group({"params": moe_params}) - - strategies = [ - LowLevelOptStrategy( - param_group=zero_optimizer.param_groups[0], - process_group=dp_group, - overlap_communication=False, - partition_grad=True, - ), - MoeZeroStrategy( - param_group=zero_optimizer.param_groups[1], - process_group=moe_extra_dp_group, - overlap_communication=True, - partition_grad=False, - ), - ] - zero_optimizer = LowLevelZeroOptimizer( - zero_optimizer, - strategies, - ) - - MOE_MANAGER.__init__() - MOE_MANAGER.setup(parallel=None) - ddp_model = DDP(MoeModel().bfloat16().cuda(), static_graph=True) - delete_moe_info(ddp_model) - torch_optim = torch.optim.SGD(ddp_model.parameters(), lr=lr) - sync_local_from_ep(ddp_model, zero_model) - - seed_all(42 + local_rank) - data = torch.randn(16, 4).bfloat16().cuda() - label = torch.randint(0, 4, (16,)).cuda() - - ddp_model.train() - zero_model.train() - ddp_out = criterion(ddp_model(data), label).float() - zero_out = criterion(zero_model(data), label).float() - assert torch.allclose(ddp_out, zero_out) - print(f"{local_rank=} {ddp_out.mean()=}") - - ddp_out.backward() - zero_optimizer.backward(zero_out) - - for (zero_name, zero_param), (ddp_name, ddp_param) in zip( - zero_model.named_parameters(), ddp_model.named_parameters() - ): - torch_grad = ddp_param.grad - zero_grad = zero_optimizer.get_param_grad(zero_param) - if is_moe_tensor(zero_param): - moe_grad_list = [torch.empty_like(zero_grad) for _ in range(ep_size)] - dist.all_gather(moe_grad_list, zero_grad, group=ep_group) - zero_grad = torch.cat(moe_grad_list, dim=0) - loose_close(torch_grad, zero_grad, dtype=torch_grad.dtype) - - torch_optim.step() - zero_optimizer.step() - - for (zero_name, zero_param), (ddp_name, ddp_param) in zip( - zero_model.named_parameters(), ddp_model.named_parameters() - ): - if is_moe_tensor(zero_param): - moe_param_list = [torch.empty_like(zero_param) for _ in range(ep_size)] - dist.all_gather(moe_param_list, zero_param, group=ep_group) - zero_param = torch.cat(moe_param_list, dim=0) - assert ddp_param.dtype == zero_param.dtype - ddp_param.numel() // dp_size - loose_close( - ddp_param, - zero_param, - dtype=ddp_param.dtype, - ) - - -def run_dist(rank, world_size, port, stage): - colossalai.launch(config=dict(), rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl") - run_zero_test(rank, stage=stage) - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [4]) -@rerun_if_address_is_in_use() -def test_moe_zero_model(world_size): - spawn(run_dist, world_size) - - -if __name__ == "__main__": - test_moe_zero_model(world_size=4) From ec997003c891c3566b4c8d0d79b63bf8dffd9f15 Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Wed, 12 Jun 2024 06:52:37 +0000 Subject: [PATCH 2/5] move moe checkpoint to checkpoint folder and exchange global axis to class member --- .../plugin/moe_hybrid_parallel_plugin.py | 32 ++++++++++--------- colossalai/checkpoint_io/__init__.py | 9 +++++- .../moe_checkpoint.py} | 0 colossalai/moe/__init__.py | 2 -- 4 files changed, 25 insertions(+), 18 deletions(-) rename colossalai/{moe/checkpoint.py => checkpoint_io/moe_checkpoint.py} (100%) diff --git a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py index 94deb6befeb5..caa0f30da52d 100644 --- a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py @@ -30,8 +30,6 @@ from colossalai.shardformer.policies.base_policy import Policy from colossalai.zero.low_level import LowLevelZeroOptimizer -PP_AXIS, DP_AXIS, EP_AXIS, TP_AXIS = 0, 1, 2, 3 - class HybridParallelZeroOptimizer(LowLevelZeroOptimizer): def __init__( @@ -185,7 +183,6 @@ def __init__( custom_policy: Policy = None, checkpoint_io: Optional[MoECheckpointIO] = None, ) -> None: - global DP_AXIS, EP_AXIS world_size = dist.get_world_size() assert tp_size == 1, "Tensor parallel is not supported in MoE yet" assert ( @@ -224,28 +221,30 @@ def __init__( self.moe_dp_size = self.dp_size // self.ep_size self.use_ep_inside = use_ep_inside if self.use_ep_inside: + self.pp_axis, self.dp_axis, self.ep_axis, self.tp_axis = 0, 1, 2, 3 self.pg_mesh = ProcessGroupMesh(self.pp_size, self.moe_dp_size, ep_size, tp_size) - self.moe_dp_group = self.pg_mesh.get_group_along_axis(DP_AXIS) - self.ep_group = self.pg_mesh.get_group_along_axis(EP_AXIS) + self.moe_dp_group = self.pg_mesh.get_group_along_axis(self.dp_axis) + self.ep_group = self.pg_mesh.get_group_along_axis(self.ep_axis) if dist.get_rank() == 0: print(f"MoE Parallel: pp {self.pp_size}, outer_dp {self.moe_dp_size}, inner_ep {ep_size}, tp {tp_size}") else: warnings.warn("Using ep outside dp (cross-node) is strongly discouraged due to communication costs.") + self.pp_axis, self.dp_axis, self.ep_axis, self.tp_axis = 0, 2, 1, 3 self.pg_mesh = ProcessGroupMesh(self.pp_size, ep_size, self.moe_dp_size, tp_size) - EP_AXIS = 1 - DP_AXIS = 2 - self.moe_dp_group = self.pg_mesh.get_group_along_axis(DP_AXIS) - self.ep_group = self.pg_mesh.get_group_along_axis(EP_AXIS) + self.moe_dp_group = self.pg_mesh.get_group_along_axis(self.dp_axis) + self.ep_group = self.pg_mesh.get_group_along_axis(self.ep_axis) if dist.get_rank() == 0: print(f"MoE Parallel: pp {self.pp_size}, outer_ep {ep_size}, inner_dp {self.moe_dp_size}, tp {tp_size}") if dist.get_rank() == 0: print(f"Non-MoE Parameter Parallel: pp {self.pp_size}, dp {self.dp_size}, tp {tp_size}") - self.tp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) # TODO: support custom tp size for mixtral lm head - self.global_dp_group = self.pg_mesh.get_group_along_axis((DP_AXIS, EP_AXIS)) - self.pp_group = self.pg_mesh.get_group_along_axis(PP_AXIS) + self.tp_group = self.pg_mesh.get_group_along_axis( + self.tp_axis + ) # TODO: support custom tp size for mixtral lm head + self.global_dp_group = self.pg_mesh.get_group_along_axis((self.dp_axis, self.ep_axis)) + self.pp_group = self.pg_mesh.get_group_along_axis(self.pp_axis) # TODO: Currently moe only support partially sequence parallel - self.sp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) + self.sp_group = self.pg_mesh.get_group_along_axis(self.tp_axis) self.custom_policy = custom_policy self.stage_manager = None @@ -257,7 +256,7 @@ def __init__( num_microbatches is not None or microbatch_size is not None ), "num_microbatches or microbatch_size must be specified when using pipeline parallelism" assert self.zero_stage <= 1, "zero stage must be 0 or 1 when using pipeline parallelism" - self.stage_manager = PipelineStageManager(self.pg_mesh, PP_AXIS) + self.stage_manager = PipelineStageManager(self.pg_mesh, self.pp_axis) self.schedule = OneForwardOneBackwardSchedule( self.stage_manager, num_microbatches=num_microbatches, microbatch_size=microbatch_size ) @@ -329,7 +328,10 @@ def prepare_dataloader( """ _kwargs = kwargs.copy() sampler = DistributedSampler( - dataset, num_replicas=self.pg_mesh.size(DP_AXIS), rank=self.pg_mesh.coordinate(DP_AXIS), shuffle=shuffle + dataset, + num_replicas=self.pg_mesh.size(self.dp_axis), + rank=self.pg_mesh.coordinate(self.dp_axis), + shuffle=shuffle, ) # Deterministic dataloader diff --git a/colossalai/checkpoint_io/__init__.py b/colossalai/checkpoint_io/__init__.py index 19b61730bded..ef37534fe01a 100644 --- a/colossalai/checkpoint_io/__init__.py +++ b/colossalai/checkpoint_io/__init__.py @@ -2,5 +2,12 @@ from .general_checkpoint_io import GeneralCheckpointIO from .hybrid_parallel_checkpoint_io import HybridParallelCheckpointIO from .index_file import CheckpointIndexFile +from .moe_checkpoint import MoECheckpointIO -__all__ = ["CheckpointIO", "CheckpointIndexFile", "GeneralCheckpointIO", "HybridParallelCheckpointIO"] +__all__ = [ + "CheckpointIO", + "CheckpointIndexFile", + "GeneralCheckpointIO", + "HybridParallelCheckpointIO", + "MoECheckpointIO", +] diff --git a/colossalai/moe/checkpoint.py b/colossalai/checkpoint_io/moe_checkpoint.py similarity index 100% rename from colossalai/moe/checkpoint.py rename to colossalai/checkpoint_io/moe_checkpoint.py diff --git a/colossalai/moe/__init__.py b/colossalai/moe/__init__.py index 2708764d89bd..0623d19efd5f 100644 --- a/colossalai/moe/__init__.py +++ b/colossalai/moe/__init__.py @@ -1,7 +1,5 @@ -from .checkpoint import MoECheckpointIO from .manager import MOE_MANAGER __all__ = [ - "MoECheckpointIO", "MOE_MANAGER", ] From 64fc0f7f09742ebec9fbba45ef75a2aec8fc264b Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Fri, 14 Jun 2024 09:48:03 +0000 Subject: [PATCH 3/5] update moe hybrid parallel plugin with newest version of zero & fix zero working/master params bug --- .../plugin/moe_hybrid_parallel_plugin.py | 44 ++++++++++++++----- colossalai/zero/low_level/__init__.py | 3 +- colossalai/zero/low_level/low_level_optim.py | 9 ++-- .../zero/low_level/low_level_strategy.py | 2 +- tests/test_moe/test_moe_checkpoint.py | 2 +- 5 files changed, 42 insertions(+), 18 deletions(-) diff --git a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py index caa0f30da52d..c451665297e8 100644 --- a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py @@ -21,17 +21,18 @@ get_param_info, init_pipeline_optimizer, ) +from colossalai.checkpoint_io import MoECheckpointIO from colossalai.cluster import ProcessGroupMesh from colossalai.interface import ModelWrapper, OptimizerWrapper -from colossalai.moe import MoECheckpointIO from colossalai.pipeline.schedule import OneForwardOneBackwardSchedule from colossalai.pipeline.stage_manager import PipelineStageManager from colossalai.shardformer import ShardConfig from colossalai.shardformer.policies.base_policy import Policy -from colossalai.zero.low_level import LowLevelZeroOptimizer +from colossalai.tensor.moe_tensor.api import is_moe_tensor +from colossalai.zero.low_level import LowLevelOptStrategy, LowLevelZeroOptimizer, MoeZeroStrategy -class HybridParallelZeroOptimizer(LowLevelZeroOptimizer): +class MoeHybridParallelZeroOptimizer(LowLevelZeroOptimizer): def __init__( self, optimizer: Optimizer, @@ -66,8 +67,36 @@ def __init__( self.pp_pg = pp_process_group if use_pipeline: init_pipeline_optimizer(optimizer, model) + + zero_params = list(filter(lambda x: not is_moe_tensor(x), model.parameters())) + moe_params = list(filter(lambda x: is_moe_tensor(x), model.parameters())) + + optimizer.param_groups.clear() + optimizer.add_param_group({"params": zero_params}) + optimizer.add_param_group({"params": moe_params}) + strategies = [ + LowLevelOptStrategy( + param_group=optimizer.param_groups[0], + process_group=dp_process_group, + reduce_bucket_size=reduce_bucket_size, + communication_dtype=communication_dtype, + overlap_communication=overlap_communication, + partition_grad=partition_grad, + cpu_offload=cpu_offload, + ), + MoeZeroStrategy( + param_group=optimizer.param_groups[1], + process_group=moe_extra_dp_process_group, + reduce_bucket_size=reduce_bucket_size, + communication_dtype=communication_dtype, + overlap_communication=overlap_communication, + partition_grad=partition_grad, + cpu_offload=cpu_offload, + ), + ] super().__init__( optimizer=optimizer, + group_strategies=strategies, initial_scale=initial_scale, min_scale=min_scale, growth_factor=growth_factor, @@ -77,14 +106,7 @@ def __init__( max_scale=max_scale, clip_grad_norm=clip_grad_norm, verbose=verbose, - reduce_bucket_size=reduce_bucket_size, - communication_dtype=communication_dtype, - overlap_communication=overlap_communication, - partition_grad=partition_grad, - cpu_offload=cpu_offload, - dp_process_group=dp_process_group, forced_dtype=forced_dtype, - moe_extra_dp_process_group=moe_extra_dp_process_group, ) @@ -411,7 +433,7 @@ def configure( else: assert self.dp_size > 1, "Please use Zero when data parallel size is greater than 1." assert self.precision != "fp32", "Please set precision to 'fp16' or 'bf16' when using ZeRO." - optimizer = HybridParallelZeroOptimizer( + optimizer = MoeHybridParallelZeroOptimizer( optimizer, model, use_pipeline=self.enable_pipeline_parallelism, diff --git a/colossalai/zero/low_level/__init__.py b/colossalai/zero/low_level/__init__.py index 270a6a6a4786..7e4702dfd38c 100644 --- a/colossalai/zero/low_level/__init__.py +++ b/colossalai/zero/low_level/__init__.py @@ -1,3 +1,4 @@ from .low_level_optim import LowLevelZeroOptimizer +from .low_level_strategy import LowLevelOptStrategy, LowLevelOptStrategyBase, MoeZeroStrategy -__all__ = ["LowLevelZeroOptimizer"] +__all__ = ["LowLevelZeroOptimizer", "LowLevelOptStrategy", "MoeZeroStrategy", "LowLevelOptStrategyBase"] diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 29903cb09219..46bb7fe0d229 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -86,11 +86,11 @@ def __init__( elif len(self.optim.param_groups) > 1 and group_strategies is None: raise ValueError("group_strategies must be provided when the optimizer has multiple param groups") - self.masterparam2strategy: Dict[torch.nn.Parameter, LowLevelOptStrategyBase] = {} + self.workingparam2strategy: Dict[torch.nn.Parameter, LowLevelOptStrategyBase] = {} for grp, strategy in zip(self.optim.param_groups, group_strategies): assert grp["params"] is strategy.param_group["params"], "param groups should be in the same order" for param in strategy.working_param_group: - self.masterparam2strategy[param] = strategy + self.workingparam2strategy[param] = strategy self._group_strategies = group_strategies # initialize mixed precision mixin @@ -265,8 +265,9 @@ def update_master_params(self, model: nn.Module) -> None: Args: model (nn.Module): The model to update master params """ - for master_param in model.parameters(): - strategy = self.masterparam2strategy[master_param] + for working_param in model.parameters(): + strategy = self.workingparam2strategy[working_param] + master_param = strategy.working2master(working_param=working_param) strategy.update_master_param(master_param) def get_working_to_master_map(self) -> Dict[int, torch.Tensor]: diff --git a/colossalai/zero/low_level/low_level_strategy.py b/colossalai/zero/low_level/low_level_strategy.py index d469e859d833..2fba512e5a80 100644 --- a/colossalai/zero/low_level/low_level_strategy.py +++ b/colossalai/zero/low_level/low_level_strategy.py @@ -306,7 +306,7 @@ def update_master_param(self, master_param): padding_size = self.get_param_padding_size(working_param) if padding_size > 0: working_param = torch.nn.functional.pad(working_param, [0, padding_size]) - master_param.copy_(working_param.chunk(self._world_size)[self._local_rank]) + master_param.copy_(working_param.flatten().chunk(self._world_size)[self._local_rank]) def get_grad_norm(self, norm_type: int = 2) -> float: r""" diff --git a/tests/test_moe/test_moe_checkpoint.py b/tests/test_moe/test_moe_checkpoint.py index 3a3930fbc622..86f2d2909475 100644 --- a/tests/test_moe/test_moe_checkpoint.py +++ b/tests/test_moe/test_moe_checkpoint.py @@ -11,7 +11,7 @@ import colossalai from colossalai.booster import Booster from colossalai.booster.plugin.moe_hybrid_parallel_plugin import MoeHybridParallelPlugin -from colossalai.moe import MoECheckpointIO +from colossalai.checkpoint_io import MoECheckpointIO from colossalai.tensor.moe_tensor.api import is_moe_tensor from colossalai.testing.utils import spawn From 8b277cc17dd613c57c5d1bccd73e89029cea9f19 Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Fri, 14 Jun 2024 09:55:41 +0000 Subject: [PATCH 4/5] fix zero unit test --- colossalai/zero/low_level/low_level_optim.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 46bb7fe0d229..bcbc7561dcd6 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -139,9 +139,9 @@ def backward(self, loss, retain_graph=False): # another way of doing this is to reassign tensor.grad, however this won't apply for zero-2 # since the shape doesn't match - def get_param_grad(self, master_param): - strategy = self.masterparam2strategy[master_param] - return strategy.get_param_grad(master_param) + def get_param_grad(self, working_param): + strategy = self.workingparam2strategy[working_param] + return strategy.get_param_grad(working_param) def _unscale_and_clip_grads(self, grad_groups_flat, total_norm): # compute combined scale factor for this group From ed42193da2c4b47ae6cb7c1108cbfc15f0cf2e6d Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Fri, 14 Jun 2024 10:10:15 +0000 Subject: [PATCH 5/5] Add an assertion to prevent users from using it incorrectly --- colossalai/booster/plugin/moe_hybrid_parallel_plugin.py | 3 +++ colossalai/zero/low_level/low_level_strategy.py | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py index c451665297e8..8ba68270e514 100644 --- a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py @@ -68,6 +68,9 @@ def __init__( if use_pipeline: init_pipeline_optimizer(optimizer, model) + assert ( + len(optimizer.param_groups) == 1 + ), "Currently only one parameter group is supported, and we will support multiple groups later." zero_params = list(filter(lambda x: not is_moe_tensor(x), model.parameters())) moe_params = list(filter(lambda x: is_moe_tensor(x), model.parameters())) diff --git a/colossalai/zero/low_level/low_level_strategy.py b/colossalai/zero/low_level/low_level_strategy.py index 2fba512e5a80..359e608d334b 100644 --- a/colossalai/zero/low_level/low_level_strategy.py +++ b/colossalai/zero/low_level/low_level_strategy.py @@ -304,9 +304,10 @@ def state_dict(self, optim: torch.optim.Optimizer) -> Dict: def update_master_param(self, master_param): working_param = self.master2working(master_param) padding_size = self.get_param_padding_size(working_param) + working_param = working_param.data.view(-1) if padding_size > 0: working_param = torch.nn.functional.pad(working_param, [0, padding_size]) - master_param.copy_(working_param.flatten().chunk(self._world_size)[self._local_rank]) + master_param.copy_(working_param.chunk(self._world_size)[self._local_rank]) def get_grad_norm(self, norm_type: int = 2) -> float: r"""