From b6ea9e70fb1e4b59591f09fccd5138f5cb8017e0 Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Wed, 12 Jun 2024 05:18:29 +0000 Subject: [PATCH 1/9] [moe refactor] update unit test with the refactored ZeRO and remove useless test --- .../zero/low_level/low_level_strategy.py | 4 +- tests/test_moe/test_moe_zero_fwd_bwd.py | 107 --------------- tests/test_moe/test_moe_zero_fwd_bwd_optim.py | 62 ++++----- tests/test_moe/test_moe_zero_optim.py | 125 ------------------ 4 files changed, 34 insertions(+), 264 deletions(-) delete mode 100644 tests/test_moe/test_moe_zero_fwd_bwd.py delete mode 100644 tests/test_moe/test_moe_zero_optim.py diff --git a/colossalai/zero/low_level/low_level_strategy.py b/colossalai/zero/low_level/low_level_strategy.py index 16effac9c80a..7298ef543eae 100644 --- a/colossalai/zero/low_level/low_level_strategy.py +++ b/colossalai/zero/low_level/low_level_strategy.py @@ -66,7 +66,9 @@ def __init__( # it will not manage the tensors used by mixed precision training self._param_store = ParameterStore(process_group) self._grad_store = GradientStore(process_group, partition_grad=partition_grad) - self._bucket_store = BucketStore(process_group) + self._bucket_store = BucketStore( + process_group, reduce_bucket_size=reduce_bucket_size, overlap_communication=overlap_communication + ) # working and master params for mixed precision training group_params = [] diff --git a/tests/test_moe/test_moe_zero_fwd_bwd.py b/tests/test_moe/test_moe_zero_fwd_bwd.py deleted file mode 100644 index c0722881bfcd..000000000000 --- a/tests/test_moe/test_moe_zero_fwd_bwd.py +++ /dev/null @@ -1,107 +0,0 @@ -import pytest -import torch -import torch.distributed as dist -from torch.nn.parallel import DistributedDataParallel as DDP - -import colossalai -from colossalai.moe.manager import MOE_MANAGER -from colossalai.tensor.moe_tensor.api import is_moe_tensor -from colossalai.testing import rerun_if_address_is_in_use, spawn -from colossalai.testing.random import seed_all -from colossalai.zero.low_level.low_level_optim import LowLevelZeroOptimizer -from colossalai.zero.low_level.low_level_strategy import LowLevelOptStrategy, MoeZeroStrategy -from tests.test_moe.moe_utils import MoeModel, delete_moe_info, loose_close, sync_local_from_ep - - -def run_zero_test(local_rank): - dp_size = world_size = dist.get_world_size() - assert world_size >= 4, f"{world_size=}: at least 4 processes are required for this test (ep=2, moe_dp=2)" - criterion = torch.nn.CrossEntropyLoss() - - ep_size = 2 - extra_dp_size = world_size // ep_size - - MOE_MANAGER.__init__() - MOE_MANAGER.setup(parallel="EP", mode="fixed", fixed_dp_size=extra_dp_size, fixed_ep_size=ep_size, fixed_pp_size=1) - - zero_model = MoeModel().bfloat16().cuda() - - dp_group = dist.group.WORLD - ep_group = MOE_MANAGER.parallel_info_dict[ep_size].ep_group - moe_extra_dp_group = MOE_MANAGER.parallel_info_dict[ep_size].dp_group - - zero_params = list(filter(lambda x: not is_moe_tensor(x), zero_model.parameters())) - moe_params = list(filter(lambda x: is_moe_tensor(x), zero_model.parameters())) - print(f"{len(zero_params)=}, {len(moe_params)=}") - lr = 1e-3 - zero_optimizer = torch.optim.SGD(zero_model.parameters(), lr=lr) - zero_optimizer.param_groups.clear() - zero_optimizer.add_param_group({"params": zero_params}) - zero_optimizer.add_param_group({"params": moe_params}) - - strategies = [ - LowLevelOptStrategy( - param_group=zero_optimizer.param_groups[0], - process_group=dp_group, - overlap_communication=False, - partition_grad=True, - ), - MoeZeroStrategy( - param_group=zero_optimizer.param_groups[1], - process_group=moe_extra_dp_group, - overlap_communication=True, - partition_grad=False, - ), - ] - zero_optimizer = LowLevelZeroOptimizer( - zero_optimizer, - strategies, - ) - - MOE_MANAGER.__init__() - MOE_MANAGER.setup(parallel=None) - ddp_model = DDP(MoeModel().bfloat16().cuda(), static_graph=True) - delete_moe_info(ddp_model) - torch_optim = torch.optim.SGD(ddp_model.parameters(), lr=lr) - sync_local_from_ep(ddp_model, zero_model) - - seed_all(42 + local_rank) - data = torch.randn(16, 4).bfloat16().cuda() - label = torch.randint(0, 4, (16,)).cuda() - - ddp_model.train() - zero_model.train() - ddp_out = criterion(ddp_model(data), label).float() - zero_out = criterion(zero_model(data), label).float() - assert torch.allclose(ddp_out, zero_out) - print(f"{local_rank=} {ddp_out.mean()=}") - - ddp_out.backward() - zero_optimizer.backward(zero_out) - - for (zero_name, zero_param), (ddp_name, ddp_param) in zip( - zero_model.named_parameters(), ddp_model.named_parameters() - ): - torch_grad = ddp_param.grad - zero_grad = zero_optimizer.get_param_grad(zero_param) - if is_moe_tensor(zero_param): - moe_grad_list = [torch.empty_like(zero_grad) for _ in range(ep_size)] - dist.all_gather(moe_grad_list, zero_grad, group=ep_group) - zero_grad = torch.cat(moe_grad_list, dim=0) - loose_close(torch_grad, zero_grad, dtype=torch_grad.dtype) - - -def run_dist(rank, world_size, port, stage): - colossalai.launch(config=dict(), rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl") - run_zero_test(rank, stage=stage) - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [4]) -@rerun_if_address_is_in_use() -def test_moe_zero_model(world_size): - spawn(run_dist, world_size) - - -if __name__ == "__main__": - test_moe_zero_model(world_size=4) diff --git a/tests/test_moe/test_moe_zero_fwd_bwd_optim.py b/tests/test_moe/test_moe_zero_fwd_bwd_optim.py index 7dcd3d19a734..126ddc6fea65 100644 --- a/tests/test_moe/test_moe_zero_fwd_bwd_optim.py +++ b/tests/test_moe/test_moe_zero_fwd_bwd_optim.py @@ -14,6 +14,7 @@ from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn from colossalai.testing.random import seed_all from colossalai.zero import LowLevelZeroOptimizer +from colossalai.zero.low_level.low_level_strategy import LowLevelOptStrategy, MoeZeroStrategy from tests.test_moe.moe_utils import loose_close tokens, n_experts = 7, 4 @@ -59,14 +60,30 @@ def run_zero_with_original_model(world_size, master_weights: bool, dtype: torch. zero_model = EPMixtralSparseMoeBlock.from_native_module(zero_model, ep_group=plugin.ep_group) zero_optimizer = torch.optim.SGD(zero_model.parameters(), lr=1) + zero_params = list(filter(lambda x: not is_moe_tensor(x), zero_model.parameters())) + moe_params = list(filter(lambda x: is_moe_tensor(x), zero_model.parameters())) + zero_optimizer.param_groups.clear() + zero_optimizer.add_param_group({"params": zero_params}) + zero_optimizer.add_param_group({"params": moe_params}) + strategies = [ + LowLevelOptStrategy( + param_group=zero_optimizer.param_groups[0], + process_group=plugin.global_dp_group, + overlap_communication=False, + partition_grad=(stage == 2), + ), + MoeZeroStrategy( + param_group=zero_optimizer.param_groups[1], + process_group=plugin.moe_dp_group, + overlap_communication=True, + partition_grad=(stage == 2), + ), + ] zero_optimizer = LowLevelZeroOptimizer( zero_optimizer, - overlap_communication=True, - initial_scale=1, - reduce_bucket_size=1024 * 1024, + strategies, master_weights=master_weights, - moe_extra_dp_process_group=plugin.moe_dp_group, - partition_grad=(stage == 2), + initial_scale=1, ) ori_optimizer = torch.optim.SGD(ori_model.parameters(), lr=1) @@ -89,34 +106,17 @@ def run_zero_with_original_model(world_size, master_weights: bool, dtype: torch. # check grad name_to_p = {n: p for n, p in ori_model.module.named_parameters()} - for n, p in zero_model.named_parameters(): - if is_moe_tensor(p): # moe param - if p.grad is None: - """ - For fixed input seed, the test input may cause a certain expert not to be routed to, - so its gradient is None instead of a tensor, which may lead to a potential bug. - TODO(haze188) fix later - """ - p.grad = torch.zeros_like(p) - continue - dist.all_reduce( - p.grad, group=plugin.moe_dp_group - ) # TODO(haze188) bug fix: this step should be finished by zero - p.grad = ( - p.grad / plugin.moe_dp_group.size() - ) # moe param scaling amoung the moe dp group, not the WORLD group. - loose_close(p.grad, name_to_p[n].grad, dtype=dtype) + zero_grad = zero_optimizer.get_param_grad(p) + if p.grad is None: + """ + For fixed input seed, the test input may cause a certain expert not to be routed to, + so its gradient is None instead of a tensor, which may lead to a potential bug. + """ + # TODO(haze188) fix later + p.grad = torch.zeros_like(p) continue - else: - zero_grad_list = zero_optimizer._grad_store.get_partitioned_gradients_by_param_id(0, id(p)) - assert len(zero_grad_list) != 0 - ori_grad_list = split_grad(name_to_p[n].grad, world_size) - if stage == 2: - # Zero2 splits the gradient, and each rank holds the corresponding part - ori_grad_list = ori_grad_list[rank : rank + 1] - for zero_grad, torch_grad in zip(zero_grad_list, ori_grad_list): - loose_close(zero_grad, torch_grad, dtype=dtype) + loose_close(zero_grad, name_to_p[n].grad, dtype=dtype) # zero-dp step zero_optimizer.step() diff --git a/tests/test_moe/test_moe_zero_optim.py b/tests/test_moe/test_moe_zero_optim.py deleted file mode 100644 index 3bbd90fd6aac..000000000000 --- a/tests/test_moe/test_moe_zero_optim.py +++ /dev/null @@ -1,125 +0,0 @@ -import pytest -import torch -import torch.distributed as dist -from torch.nn.parallel import DistributedDataParallel as DDP - -import colossalai -from colossalai.moe.manager import MOE_MANAGER -from colossalai.tensor.moe_tensor.api import is_moe_tensor -from colossalai.testing import rerun_if_address_is_in_use, spawn -from colossalai.testing.random import seed_all -from colossalai.zero.low_level.low_level_optim import LowLevelZeroOptimizer -from colossalai.zero.low_level.low_level_strategy import LowLevelOptStrategy, MoeZeroStrategy -from tests.test_moe.moe_utils import MoeModel, delete_moe_info, loose_close, sync_local_from_ep - - -def run_zero_test(local_rank): - dp_size = world_size = dist.get_world_size() - assert world_size >= 4, f"{world_size=}: at least 4 processes are required for this test (ep=2, moe_dp=2)" - criterion = torch.nn.CrossEntropyLoss() - - ep_size = 2 - extra_dp_size = world_size // ep_size - - MOE_MANAGER.__init__() - MOE_MANAGER.setup(parallel="EP", mode="fixed", fixed_dp_size=extra_dp_size, fixed_ep_size=ep_size, fixed_pp_size=1) - - zero_model = MoeModel().bfloat16().cuda() - - dp_group = dist.group.WORLD - ep_group = MOE_MANAGER.parallel_info_dict[ep_size].ep_group - moe_extra_dp_group = MOE_MANAGER.parallel_info_dict[ep_size].dp_group - - zero_params = list(filter(lambda x: not is_moe_tensor(x), zero_model.parameters())) - moe_params = list(filter(lambda x: is_moe_tensor(x), zero_model.parameters())) - print(f"{len(zero_params)=}, {len(moe_params)=}") - lr = 1e-3 - zero_optimizer = torch.optim.SGD(zero_model.parameters(), lr=lr) - zero_optimizer.param_groups.clear() - zero_optimizer.add_param_group({"params": zero_params}) - zero_optimizer.add_param_group({"params": moe_params}) - - strategies = [ - LowLevelOptStrategy( - param_group=zero_optimizer.param_groups[0], - process_group=dp_group, - overlap_communication=False, - partition_grad=True, - ), - MoeZeroStrategy( - param_group=zero_optimizer.param_groups[1], - process_group=moe_extra_dp_group, - overlap_communication=True, - partition_grad=False, - ), - ] - zero_optimizer = LowLevelZeroOptimizer( - zero_optimizer, - strategies, - ) - - MOE_MANAGER.__init__() - MOE_MANAGER.setup(parallel=None) - ddp_model = DDP(MoeModel().bfloat16().cuda(), static_graph=True) - delete_moe_info(ddp_model) - torch_optim = torch.optim.SGD(ddp_model.parameters(), lr=lr) - sync_local_from_ep(ddp_model, zero_model) - - seed_all(42 + local_rank) - data = torch.randn(16, 4).bfloat16().cuda() - label = torch.randint(0, 4, (16,)).cuda() - - ddp_model.train() - zero_model.train() - ddp_out = criterion(ddp_model(data), label).float() - zero_out = criterion(zero_model(data), label).float() - assert torch.allclose(ddp_out, zero_out) - print(f"{local_rank=} {ddp_out.mean()=}") - - ddp_out.backward() - zero_optimizer.backward(zero_out) - - for (zero_name, zero_param), (ddp_name, ddp_param) in zip( - zero_model.named_parameters(), ddp_model.named_parameters() - ): - torch_grad = ddp_param.grad - zero_grad = zero_optimizer.get_param_grad(zero_param) - if is_moe_tensor(zero_param): - moe_grad_list = [torch.empty_like(zero_grad) for _ in range(ep_size)] - dist.all_gather(moe_grad_list, zero_grad, group=ep_group) - zero_grad = torch.cat(moe_grad_list, dim=0) - loose_close(torch_grad, zero_grad, dtype=torch_grad.dtype) - - torch_optim.step() - zero_optimizer.step() - - for (zero_name, zero_param), (ddp_name, ddp_param) in zip( - zero_model.named_parameters(), ddp_model.named_parameters() - ): - if is_moe_tensor(zero_param): - moe_param_list = [torch.empty_like(zero_param) for _ in range(ep_size)] - dist.all_gather(moe_param_list, zero_param, group=ep_group) - zero_param = torch.cat(moe_param_list, dim=0) - assert ddp_param.dtype == zero_param.dtype - ddp_param.numel() // dp_size - loose_close( - ddp_param, - zero_param, - dtype=ddp_param.dtype, - ) - - -def run_dist(rank, world_size, port, stage): - colossalai.launch(config=dict(), rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl") - run_zero_test(rank, stage=stage) - - -@pytest.mark.dist -@pytest.mark.parametrize("world_size", [4]) -@rerun_if_address_is_in_use() -def test_moe_zero_model(world_size): - spawn(run_dist, world_size) - - -if __name__ == "__main__": - test_moe_zero_model(world_size=4) From ec997003c891c3566b4c8d0d79b63bf8dffd9f15 Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Wed, 12 Jun 2024 06:52:37 +0000 Subject: [PATCH 2/9] move moe checkpoint to checkpoint folder and exchange global axis to class member --- .../plugin/moe_hybrid_parallel_plugin.py | 32 ++++++++++--------- colossalai/checkpoint_io/__init__.py | 9 +++++- .../moe_checkpoint.py} | 0 colossalai/moe/__init__.py | 2 -- 4 files changed, 25 insertions(+), 18 deletions(-) rename colossalai/{moe/checkpoint.py => checkpoint_io/moe_checkpoint.py} (100%) diff --git a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py index 94deb6befeb5..caa0f30da52d 100644 --- a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py @@ -30,8 +30,6 @@ from colossalai.shardformer.policies.base_policy import Policy from colossalai.zero.low_level import LowLevelZeroOptimizer -PP_AXIS, DP_AXIS, EP_AXIS, TP_AXIS = 0, 1, 2, 3 - class HybridParallelZeroOptimizer(LowLevelZeroOptimizer): def __init__( @@ -185,7 +183,6 @@ def __init__( custom_policy: Policy = None, checkpoint_io: Optional[MoECheckpointIO] = None, ) -> None: - global DP_AXIS, EP_AXIS world_size = dist.get_world_size() assert tp_size == 1, "Tensor parallel is not supported in MoE yet" assert ( @@ -224,28 +221,30 @@ def __init__( self.moe_dp_size = self.dp_size // self.ep_size self.use_ep_inside = use_ep_inside if self.use_ep_inside: + self.pp_axis, self.dp_axis, self.ep_axis, self.tp_axis = 0, 1, 2, 3 self.pg_mesh = ProcessGroupMesh(self.pp_size, self.moe_dp_size, ep_size, tp_size) - self.moe_dp_group = self.pg_mesh.get_group_along_axis(DP_AXIS) - self.ep_group = self.pg_mesh.get_group_along_axis(EP_AXIS) + self.moe_dp_group = self.pg_mesh.get_group_along_axis(self.dp_axis) + self.ep_group = self.pg_mesh.get_group_along_axis(self.ep_axis) if dist.get_rank() == 0: print(f"MoE Parallel: pp {self.pp_size}, outer_dp {self.moe_dp_size}, inner_ep {ep_size}, tp {tp_size}") else: warnings.warn("Using ep outside dp (cross-node) is strongly discouraged due to communication costs.") + self.pp_axis, self.dp_axis, self.ep_axis, self.tp_axis = 0, 2, 1, 3 self.pg_mesh = ProcessGroupMesh(self.pp_size, ep_size, self.moe_dp_size, tp_size) - EP_AXIS = 1 - DP_AXIS = 2 - self.moe_dp_group = self.pg_mesh.get_group_along_axis(DP_AXIS) - self.ep_group = self.pg_mesh.get_group_along_axis(EP_AXIS) + self.moe_dp_group = self.pg_mesh.get_group_along_axis(self.dp_axis) + self.ep_group = self.pg_mesh.get_group_along_axis(self.ep_axis) if dist.get_rank() == 0: print(f"MoE Parallel: pp {self.pp_size}, outer_ep {ep_size}, inner_dp {self.moe_dp_size}, tp {tp_size}") if dist.get_rank() == 0: print(f"Non-MoE Parameter Parallel: pp {self.pp_size}, dp {self.dp_size}, tp {tp_size}") - self.tp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) # TODO: support custom tp size for mixtral lm head - self.global_dp_group = self.pg_mesh.get_group_along_axis((DP_AXIS, EP_AXIS)) - self.pp_group = self.pg_mesh.get_group_along_axis(PP_AXIS) + self.tp_group = self.pg_mesh.get_group_along_axis( + self.tp_axis + ) # TODO: support custom tp size for mixtral lm head + self.global_dp_group = self.pg_mesh.get_group_along_axis((self.dp_axis, self.ep_axis)) + self.pp_group = self.pg_mesh.get_group_along_axis(self.pp_axis) # TODO: Currently moe only support partially sequence parallel - self.sp_group = self.pg_mesh.get_group_along_axis(TP_AXIS) + self.sp_group = self.pg_mesh.get_group_along_axis(self.tp_axis) self.custom_policy = custom_policy self.stage_manager = None @@ -257,7 +256,7 @@ def __init__( num_microbatches is not None or microbatch_size is not None ), "num_microbatches or microbatch_size must be specified when using pipeline parallelism" assert self.zero_stage <= 1, "zero stage must be 0 or 1 when using pipeline parallelism" - self.stage_manager = PipelineStageManager(self.pg_mesh, PP_AXIS) + self.stage_manager = PipelineStageManager(self.pg_mesh, self.pp_axis) self.schedule = OneForwardOneBackwardSchedule( self.stage_manager, num_microbatches=num_microbatches, microbatch_size=microbatch_size ) @@ -329,7 +328,10 @@ def prepare_dataloader( """ _kwargs = kwargs.copy() sampler = DistributedSampler( - dataset, num_replicas=self.pg_mesh.size(DP_AXIS), rank=self.pg_mesh.coordinate(DP_AXIS), shuffle=shuffle + dataset, + num_replicas=self.pg_mesh.size(self.dp_axis), + rank=self.pg_mesh.coordinate(self.dp_axis), + shuffle=shuffle, ) # Deterministic dataloader diff --git a/colossalai/checkpoint_io/__init__.py b/colossalai/checkpoint_io/__init__.py index 19b61730bded..ef37534fe01a 100644 --- a/colossalai/checkpoint_io/__init__.py +++ b/colossalai/checkpoint_io/__init__.py @@ -2,5 +2,12 @@ from .general_checkpoint_io import GeneralCheckpointIO from .hybrid_parallel_checkpoint_io import HybridParallelCheckpointIO from .index_file import CheckpointIndexFile +from .moe_checkpoint import MoECheckpointIO -__all__ = ["CheckpointIO", "CheckpointIndexFile", "GeneralCheckpointIO", "HybridParallelCheckpointIO"] +__all__ = [ + "CheckpointIO", + "CheckpointIndexFile", + "GeneralCheckpointIO", + "HybridParallelCheckpointIO", + "MoECheckpointIO", +] diff --git a/colossalai/moe/checkpoint.py b/colossalai/checkpoint_io/moe_checkpoint.py similarity index 100% rename from colossalai/moe/checkpoint.py rename to colossalai/checkpoint_io/moe_checkpoint.py diff --git a/colossalai/moe/__init__.py b/colossalai/moe/__init__.py index 2708764d89bd..0623d19efd5f 100644 --- a/colossalai/moe/__init__.py +++ b/colossalai/moe/__init__.py @@ -1,7 +1,5 @@ -from .checkpoint import MoECheckpointIO from .manager import MOE_MANAGER __all__ = [ - "MoECheckpointIO", "MOE_MANAGER", ] From 64fc0f7f09742ebec9fbba45ef75a2aec8fc264b Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Fri, 14 Jun 2024 09:48:03 +0000 Subject: [PATCH 3/9] update moe hybrid parallel plugin with newest version of zero & fix zero working/master params bug --- .../plugin/moe_hybrid_parallel_plugin.py | 44 ++++++++++++++----- colossalai/zero/low_level/__init__.py | 3 +- colossalai/zero/low_level/low_level_optim.py | 9 ++-- .../zero/low_level/low_level_strategy.py | 2 +- tests/test_moe/test_moe_checkpoint.py | 2 +- 5 files changed, 42 insertions(+), 18 deletions(-) diff --git a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py index caa0f30da52d..c451665297e8 100644 --- a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py @@ -21,17 +21,18 @@ get_param_info, init_pipeline_optimizer, ) +from colossalai.checkpoint_io import MoECheckpointIO from colossalai.cluster import ProcessGroupMesh from colossalai.interface import ModelWrapper, OptimizerWrapper -from colossalai.moe import MoECheckpointIO from colossalai.pipeline.schedule import OneForwardOneBackwardSchedule from colossalai.pipeline.stage_manager import PipelineStageManager from colossalai.shardformer import ShardConfig from colossalai.shardformer.policies.base_policy import Policy -from colossalai.zero.low_level import LowLevelZeroOptimizer +from colossalai.tensor.moe_tensor.api import is_moe_tensor +from colossalai.zero.low_level import LowLevelOptStrategy, LowLevelZeroOptimizer, MoeZeroStrategy -class HybridParallelZeroOptimizer(LowLevelZeroOptimizer): +class MoeHybridParallelZeroOptimizer(LowLevelZeroOptimizer): def __init__( self, optimizer: Optimizer, @@ -66,8 +67,36 @@ def __init__( self.pp_pg = pp_process_group if use_pipeline: init_pipeline_optimizer(optimizer, model) + + zero_params = list(filter(lambda x: not is_moe_tensor(x), model.parameters())) + moe_params = list(filter(lambda x: is_moe_tensor(x), model.parameters())) + + optimizer.param_groups.clear() + optimizer.add_param_group({"params": zero_params}) + optimizer.add_param_group({"params": moe_params}) + strategies = [ + LowLevelOptStrategy( + param_group=optimizer.param_groups[0], + process_group=dp_process_group, + reduce_bucket_size=reduce_bucket_size, + communication_dtype=communication_dtype, + overlap_communication=overlap_communication, + partition_grad=partition_grad, + cpu_offload=cpu_offload, + ), + MoeZeroStrategy( + param_group=optimizer.param_groups[1], + process_group=moe_extra_dp_process_group, + reduce_bucket_size=reduce_bucket_size, + communication_dtype=communication_dtype, + overlap_communication=overlap_communication, + partition_grad=partition_grad, + cpu_offload=cpu_offload, + ), + ] super().__init__( optimizer=optimizer, + group_strategies=strategies, initial_scale=initial_scale, min_scale=min_scale, growth_factor=growth_factor, @@ -77,14 +106,7 @@ def __init__( max_scale=max_scale, clip_grad_norm=clip_grad_norm, verbose=verbose, - reduce_bucket_size=reduce_bucket_size, - communication_dtype=communication_dtype, - overlap_communication=overlap_communication, - partition_grad=partition_grad, - cpu_offload=cpu_offload, - dp_process_group=dp_process_group, forced_dtype=forced_dtype, - moe_extra_dp_process_group=moe_extra_dp_process_group, ) @@ -411,7 +433,7 @@ def configure( else: assert self.dp_size > 1, "Please use Zero when data parallel size is greater than 1." assert self.precision != "fp32", "Please set precision to 'fp16' or 'bf16' when using ZeRO." - optimizer = HybridParallelZeroOptimizer( + optimizer = MoeHybridParallelZeroOptimizer( optimizer, model, use_pipeline=self.enable_pipeline_parallelism, diff --git a/colossalai/zero/low_level/__init__.py b/colossalai/zero/low_level/__init__.py index 270a6a6a4786..7e4702dfd38c 100644 --- a/colossalai/zero/low_level/__init__.py +++ b/colossalai/zero/low_level/__init__.py @@ -1,3 +1,4 @@ from .low_level_optim import LowLevelZeroOptimizer +from .low_level_strategy import LowLevelOptStrategy, LowLevelOptStrategyBase, MoeZeroStrategy -__all__ = ["LowLevelZeroOptimizer"] +__all__ = ["LowLevelZeroOptimizer", "LowLevelOptStrategy", "MoeZeroStrategy", "LowLevelOptStrategyBase"] diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 29903cb09219..46bb7fe0d229 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -86,11 +86,11 @@ def __init__( elif len(self.optim.param_groups) > 1 and group_strategies is None: raise ValueError("group_strategies must be provided when the optimizer has multiple param groups") - self.masterparam2strategy: Dict[torch.nn.Parameter, LowLevelOptStrategyBase] = {} + self.workingparam2strategy: Dict[torch.nn.Parameter, LowLevelOptStrategyBase] = {} for grp, strategy in zip(self.optim.param_groups, group_strategies): assert grp["params"] is strategy.param_group["params"], "param groups should be in the same order" for param in strategy.working_param_group: - self.masterparam2strategy[param] = strategy + self.workingparam2strategy[param] = strategy self._group_strategies = group_strategies # initialize mixed precision mixin @@ -265,8 +265,9 @@ def update_master_params(self, model: nn.Module) -> None: Args: model (nn.Module): The model to update master params """ - for master_param in model.parameters(): - strategy = self.masterparam2strategy[master_param] + for working_param in model.parameters(): + strategy = self.workingparam2strategy[working_param] + master_param = strategy.working2master(working_param=working_param) strategy.update_master_param(master_param) def get_working_to_master_map(self) -> Dict[int, torch.Tensor]: diff --git a/colossalai/zero/low_level/low_level_strategy.py b/colossalai/zero/low_level/low_level_strategy.py index d469e859d833..2fba512e5a80 100644 --- a/colossalai/zero/low_level/low_level_strategy.py +++ b/colossalai/zero/low_level/low_level_strategy.py @@ -306,7 +306,7 @@ def update_master_param(self, master_param): padding_size = self.get_param_padding_size(working_param) if padding_size > 0: working_param = torch.nn.functional.pad(working_param, [0, padding_size]) - master_param.copy_(working_param.chunk(self._world_size)[self._local_rank]) + master_param.copy_(working_param.flatten().chunk(self._world_size)[self._local_rank]) def get_grad_norm(self, norm_type: int = 2) -> float: r""" diff --git a/tests/test_moe/test_moe_checkpoint.py b/tests/test_moe/test_moe_checkpoint.py index 3a3930fbc622..86f2d2909475 100644 --- a/tests/test_moe/test_moe_checkpoint.py +++ b/tests/test_moe/test_moe_checkpoint.py @@ -11,7 +11,7 @@ import colossalai from colossalai.booster import Booster from colossalai.booster.plugin.moe_hybrid_parallel_plugin import MoeHybridParallelPlugin -from colossalai.moe import MoECheckpointIO +from colossalai.checkpoint_io import MoECheckpointIO from colossalai.tensor.moe_tensor.api import is_moe_tensor from colossalai.testing.utils import spawn From 8b277cc17dd613c57c5d1bccd73e89029cea9f19 Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Fri, 14 Jun 2024 09:55:41 +0000 Subject: [PATCH 4/9] fix zero unit test --- colossalai/zero/low_level/low_level_optim.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/colossalai/zero/low_level/low_level_optim.py b/colossalai/zero/low_level/low_level_optim.py index 46bb7fe0d229..bcbc7561dcd6 100644 --- a/colossalai/zero/low_level/low_level_optim.py +++ b/colossalai/zero/low_level/low_level_optim.py @@ -139,9 +139,9 @@ def backward(self, loss, retain_graph=False): # another way of doing this is to reassign tensor.grad, however this won't apply for zero-2 # since the shape doesn't match - def get_param_grad(self, master_param): - strategy = self.masterparam2strategy[master_param] - return strategy.get_param_grad(master_param) + def get_param_grad(self, working_param): + strategy = self.workingparam2strategy[working_param] + return strategy.get_param_grad(working_param) def _unscale_and_clip_grads(self, grad_groups_flat, total_norm): # compute combined scale factor for this group From ed42193da2c4b47ae6cb7c1108cbfc15f0cf2e6d Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Fri, 14 Jun 2024 10:10:15 +0000 Subject: [PATCH 5/9] Add an assertion to prevent users from using it incorrectly --- colossalai/booster/plugin/moe_hybrid_parallel_plugin.py | 3 +++ colossalai/zero/low_level/low_level_strategy.py | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py index c451665297e8..8ba68270e514 100644 --- a/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/moe_hybrid_parallel_plugin.py @@ -68,6 +68,9 @@ def __init__( if use_pipeline: init_pipeline_optimizer(optimizer, model) + assert ( + len(optimizer.param_groups) == 1 + ), "Currently only one parameter group is supported, and we will support multiple groups later." zero_params = list(filter(lambda x: not is_moe_tensor(x), model.parameters())) moe_params = list(filter(lambda x: is_moe_tensor(x), model.parameters())) diff --git a/colossalai/zero/low_level/low_level_strategy.py b/colossalai/zero/low_level/low_level_strategy.py index 2fba512e5a80..359e608d334b 100644 --- a/colossalai/zero/low_level/low_level_strategy.py +++ b/colossalai/zero/low_level/low_level_strategy.py @@ -304,9 +304,10 @@ def state_dict(self, optim: torch.optim.Optimizer) -> Dict: def update_master_param(self, master_param): working_param = self.master2working(master_param) padding_size = self.get_param_padding_size(working_param) + working_param = working_param.data.view(-1) if padding_size > 0: working_param = torch.nn.functional.pad(working_param, [0, padding_size]) - master_param.copy_(working_param.flatten().chunk(self._world_size)[self._local_rank]) + master_param.copy_(working_param.chunk(self._world_size)[self._local_rank]) def get_grad_norm(self, norm_type: int = 2) -> float: r""" From 419d25e841f2cd5603074975dacfc6ad7cce0ac0 Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Mon, 17 Jun 2024 03:17:19 +0000 Subject: [PATCH 6/9] Modify function parameter names to resolve compatibility issues --- .../zero/low_level/low_level_strategy.py | 44 +++++++++---------- tests/test_moe/test_moe_zero_fwd_bwd_optim.py | 4 +- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/colossalai/zero/low_level/low_level_strategy.py b/colossalai/zero/low_level/low_level_strategy.py index 359e608d334b..1d01494654a3 100644 --- a/colossalai/zero/low_level/low_level_strategy.py +++ b/colossalai/zero/low_level/low_level_strategy.py @@ -34,7 +34,7 @@ class LowLevelOptStrategyBase(ABC): def __init__( self, param_group, - process_group, + dp_process_group, master_weights, partition_grad, cpu_offload, @@ -46,14 +46,14 @@ def __init__( self.param_group = param_group self._dtype = self.param_group["params"][0].dtype - if process_group is None: # if process_group is none, convert to default explicitly - process_group = dist.group.WORLD + if dp_process_group is None: # if dp_process_group is none, convert to default explicitly + dp_process_group = dist.group.WORLD - self.process_group = process_group + self.dp_process_group = dp_process_group - # if process_group is none, will use the default one - self._local_rank = dist.get_rank(group=self.process_group) - self._world_size = dist.get_world_size(group=self.process_group) + # if dp_process_group is none, will use the default one + self._local_rank = dist.get_rank(group=self.dp_process_group) + self._world_size = dist.get_world_size(group=self.dp_process_group) # master weights copy self._master_weights = master_weights @@ -65,9 +65,9 @@ def __init__( # ParameterStore will manage the tensor buffers used for zero # it will not manage the tensors used by mixed precision training - self._param_store = ParameterStore(process_group) - self._grad_store = GradientStore(process_group, partition_grad=partition_grad) - self._bucket_store = BucketStore(process_group, reduce_bucket_size=reduce_bucket_size) + self._param_store = ParameterStore(dp_process_group) + self._grad_store = GradientStore(dp_process_group, partition_grad=partition_grad) + self._bucket_store = BucketStore(dp_process_group, reduce_bucket_size=reduce_bucket_size) # working and master params for mixed precision training group_params = [] @@ -224,7 +224,7 @@ def _run_reduction(self): flat_grads = flat_grads.to(self._communication_dtype) if not self._partition_grad: - dist.all_reduce(flat_grads, group=self.process_group) + dist.all_reduce(flat_grads, group=self.dp_process_group) if flat_grads.dtype != grad_dtype: flat_grads = flat_grads.to(grad_dtype) @@ -234,7 +234,7 @@ def _run_reduction(self): else: flat_grads_list = list(flat_grads.split(len(flat_grads) // self._world_size)) recieved_grad = torch.zeros_like(flat_grads_list[0]) - dist.reduce_scatter(recieved_grad, flat_grads_list, group=self.process_group) + dist.reduce_scatter(recieved_grad, flat_grads_list, group=self.dp_process_group) if recieved_grad.dtype != grad_dtype: recieved_grad = recieved_grad.to(grad_dtype) @@ -294,7 +294,7 @@ def state_dict(self, optim: torch.optim.Optimizer) -> Dict: gather_tensor = [ torch.zeros(v.shape, device=device, dtype=v.dtype) for _ in range(self._world_size) ] - dist.all_gather(gather_tensor, v, group=self.process_group) + dist.all_gather(gather_tensor, v, group=self.dp_process_group) param_state = ( torch.stack(gather_tensor).view(-1)[: working_param.numel()].reshape_as(working_param).cpu() ) @@ -328,7 +328,7 @@ def get_grad_norm(self, norm_type: int = 2) -> float: total_norm_cuda = torch.tensor( [float(total_norm)], device=get_accelerator().get_current_device(), dtype=torch.float ) - dist.all_reduce(total_norm_cuda, op=torch.distributed.ReduceOp.MAX, group=self.process_group) + dist.all_reduce(total_norm_cuda, op=torch.distributed.ReduceOp.MAX, group=self.dp_process_group) total_norm = total_norm_cuda.item() else: @@ -342,7 +342,7 @@ def get_grad_norm(self, norm_type: int = 2) -> float: [float(total_norm_exponentiated)], device=get_accelerator().get_current_device(), dtype=torch.float ) torch.distributed.all_reduce( - total_norm_exponentiated_cuda, op=torch.distributed.ReduceOp.SUM, group=self.process_group + total_norm_exponentiated_cuda, op=torch.distributed.ReduceOp.SUM, group=self.dp_process_group ) total_norm = total_norm_exponentiated_cuda.item() ** (1.0 / norm_type) @@ -381,7 +381,7 @@ def get_param_grad(self, param): return None if self._partition_grad: tensor_list = [torch.empty_like(grad_maybe_partial[0]) for _ in range(self._world_size)] - dist.all_gather(tensor_list, grad_maybe_partial[0], group=self.process_group) + dist.all_gather(tensor_list, grad_maybe_partial[0], group=self.dp_process_group) grad_flat = torch.cat(tensor_list, dim=0) else: grad_flat = torch.cat(grad_maybe_partial, dim=0) @@ -420,7 +420,7 @@ class LowLevelOptStrategy(LowLevelOptStrategyBase): def __init__( self, param_group: Dict[str, Any], # from optimizer.param_groups - process_group: Optional[ProcessGroup] = None, # the dp pg for comm + dp_process_group: Optional[ProcessGroup] = None, # the dp pg for comm reduce_bucket_size: int = 1024 * 1024, # communication communication_dtype: Optional[torch.dtype] = None, overlap_communication: bool = False, @@ -430,7 +430,7 @@ def __init__( ): super().__init__( param_group=param_group, - process_group=process_group, + dp_process_group=dp_process_group, cpu_offload=cpu_offload, partition_grad=partition_grad, master_weights=master_weights, @@ -516,7 +516,7 @@ def post_step(self): all_splited_param = [ torch.zeros(master_param.shape, device=device, dtype=self._dtype) for _ in range(self._world_size) ] - dist.all_gather(all_splited_param, master_param.to(device).to(self._dtype), group=self.process_group) + dist.all_gather(all_splited_param, master_param.to(device).to(self._dtype), group=self.dp_process_group) working_param.data.copy_(flatten(all_splited_param)[: working_param.numel()].reshape_as(working_param)) # restore tmp values @@ -535,7 +535,7 @@ def __init__( overlap_communication: bool = False, partition_grad: bool = False, # stage 2 flag cpu_offload: bool = False, # cpu offload - process_group: Optional[ProcessGroup] = None, # the dp pg for comm + dp_process_group: Optional[ProcessGroup] = None, # the dp pg for comm master_weights: bool = True, # master weights ): for param in param_group["params"]: @@ -544,7 +544,7 @@ def __init__( super().__init__( param_group=param_group, - process_group=process_group, + dp_process_group=dp_process_group, cpu_offload=cpu_offload, partition_grad=partition_grad, master_weights=master_weights, @@ -556,6 +556,6 @@ def __init__( # def get_param_grad(self, param): # TODO @botbw: discuss whether it's intuitive to return grad of divided of full moe tensor # moe_partial_grad = super().get_param_grad(param) # moe_grad_list = [torch.empty_like(moe_partial_grad) for _ in range(self._world_size)] - # dist.all_gather(moe_grad_list, moe_partial_grad, group=self.process_group) + # dist.all_gather(moe_grad_list, moe_partial_grad, group=self.dp_process_group) # moe_grad = torch.cat(moe_grad_list, dim=0).reshape(param.shape[0] * self._world_size, *param.shape[1:]) # return moe_grad diff --git a/tests/test_moe/test_moe_zero_fwd_bwd_optim.py b/tests/test_moe/test_moe_zero_fwd_bwd_optim.py index 126ddc6fea65..e4f288bf956f 100644 --- a/tests/test_moe/test_moe_zero_fwd_bwd_optim.py +++ b/tests/test_moe/test_moe_zero_fwd_bwd_optim.py @@ -68,13 +68,13 @@ def run_zero_with_original_model(world_size, master_weights: bool, dtype: torch. strategies = [ LowLevelOptStrategy( param_group=zero_optimizer.param_groups[0], - process_group=plugin.global_dp_group, + dp_process_group=plugin.global_dp_group, overlap_communication=False, partition_grad=(stage == 2), ), MoeZeroStrategy( param_group=zero_optimizer.param_groups[1], - process_group=plugin.moe_dp_group, + dp_process_group=plugin.moe_dp_group, overlap_communication=True, partition_grad=(stage == 2), ), From 3364ac958c8c1fb339ef98208f6c702c7dab695a Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Mon, 17 Jun 2024 05:04:23 +0000 Subject: [PATCH 7/9] remove useless code: MoECheckpoint --- applications/ColossalMoE/infer.py | 2 -- applications/ColossalMoE/train.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/applications/ColossalMoE/infer.py b/applications/ColossalMoE/infer.py index 99c1418bca77..6023e304db0a 100644 --- a/applications/ColossalMoE/infer.py +++ b/applications/ColossalMoE/infer.py @@ -9,7 +9,6 @@ from colossalai.booster import Booster from colossalai.booster.plugin.moe_hybrid_parallel_plugin import MoeHybridParallelPlugin from colossalai.cluster import DistCoordinator -from colossalai.moe.checkpoint import MoECheckpointIO def parse_args(): @@ -69,7 +68,6 @@ def main(): ep_size=ep_size, zero_stage=1, precision=args.precision, - checkpoint_io=MoECheckpointIO, enable_fused_normalization=args.use_layernorm_kernel, enable_jit_fused=args.use_kernel, ) diff --git a/applications/ColossalMoE/train.py b/applications/ColossalMoE/train.py index 7cdf02844dfa..9cd810e5a711 100644 --- a/applications/ColossalMoE/train.py +++ b/applications/ColossalMoE/train.py @@ -12,7 +12,6 @@ from colossalai.booster import Booster from colossalai.booster.plugin.moe_hybrid_parallel_plugin import MoeHybridParallelPlugin from colossalai.cluster import DistCoordinator -from colossalai.moe.checkpoint import MoECheckpointIO from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR from colossalai.nn.optimizer import HybridAdam from colossalai.utils import get_current_device @@ -158,7 +157,6 @@ def main(): enable_jit_fused=args.use_kernel, precision=args.precision, zero_stage=args.zero_stage, - checkpoint_io=MoECheckpointIO, ) else: From f7298bc5ca2cc47a64bef427d720539e75262d38 Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Mon, 17 Jun 2024 05:05:09 +0000 Subject: [PATCH 8/9] update github workflow config file --- .github/workflows/build_on_schedule.yml | 2 +- .github/workflows/compatiblity_test_on_dispatch.yml | 2 +- .github/workflows/compatiblity_test_on_pr.yml | 2 +- .github/workflows/compatiblity_test_on_schedule.yml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build_on_schedule.yml b/.github/workflows/build_on_schedule.yml index 4d4f2614c458..fc6424503fbc 100644 --- a/.github/workflows/build_on_schedule.yml +++ b/.github/workflows/build_on_schedule.yml @@ -13,7 +13,7 @@ jobs: runs-on: [self-hosted, gpu] container: image: hpcaitech/pytorch-cuda:2.1.0-12.1.0 - options: --gpus all --rm -v /dev/shm -v /data/scratch/llama-tiny:/data/scratch/llama-tiny + options: --gpus all --rm -v /dev/shm -v /data/scratch/:/data/scratch/ timeout-minutes: 90 steps: - name: Check GPU Availability # ensure all GPUs have enough memory diff --git a/.github/workflows/compatiblity_test_on_dispatch.yml b/.github/workflows/compatiblity_test_on_dispatch.yml index bc8b257aea2e..3da8b5e77df9 100644 --- a/.github/workflows/compatiblity_test_on_dispatch.yml +++ b/.github/workflows/compatiblity_test_on_dispatch.yml @@ -50,7 +50,7 @@ jobs: matrix: ${{fromJson(needs.matrix_preparation.outputs.matrix)}} container: image: ${{ matrix.container }} - options: --gpus all --rm -v /dev/shm -v /data/scratch/cifar-10:/data/scratch/cifar-10 -v /data/scratch/llama-tiny:/data/scratch/llama-tiny + options: --gpus all --rm -v /dev/shm -v /data/scratch/:/data/scratch/ timeout-minutes: 120 steps: - name: Install dependencies diff --git a/.github/workflows/compatiblity_test_on_pr.yml b/.github/workflows/compatiblity_test_on_pr.yml index e9cb6ccd569e..10ac0e128dc6 100644 --- a/.github/workflows/compatiblity_test_on_pr.yml +++ b/.github/workflows/compatiblity_test_on_pr.yml @@ -41,7 +41,7 @@ jobs: matrix: ${{fromJson(needs.matrix_preparation.outputs.matrix)}} container: image: ${{ matrix.container }} - options: --gpus all --rm -v /dev/shm -v /data/scratch/cifar-10:/data/scratch/cifar-10 -v /data/scratch/llama-tiny:/data/scratch/llama-tiny + options: --gpus all --rm -v /dev/shm -v /data/scratch/:/data/scratch/ timeout-minutes: 120 concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}-run-test-${{ matrix.container }} diff --git a/.github/workflows/compatiblity_test_on_schedule.yml b/.github/workflows/compatiblity_test_on_schedule.yml index a0b60557b3de..84ea7e28d967 100644 --- a/.github/workflows/compatiblity_test_on_schedule.yml +++ b/.github/workflows/compatiblity_test_on_schedule.yml @@ -38,7 +38,7 @@ jobs: matrix: ${{fromJson(needs.matrix_preparation.outputs.matrix)}} container: image: ${{ matrix.container }} - options: --gpus all --rm -v /dev/shm -v /data/scratch/cifar-10:/data/scratch/cifar-10 -v /data/scratch/llama-tiny:/data/scratch/llama-tiny + options: --gpus all --rm -v /dev/shm -v /data/scratch/:/data/scratch/ timeout-minutes: 120 steps: - name: Install dependencies From e6839fbc13c8924b2e830190e3b7b631f48e521c Mon Sep 17 00:00:00 2001 From: genghaozhe <939857490@qq.com> Date: Mon, 17 Jun 2024 05:36:27 +0000 Subject: [PATCH 9/9] fix typo --- colossalai/booster/plugin/low_level_zero_plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/colossalai/booster/plugin/low_level_zero_plugin.py b/colossalai/booster/plugin/low_level_zero_plugin.py index 4196a10ba9f6..7b5aec2aa405 100644 --- a/colossalai/booster/plugin/low_level_zero_plugin.py +++ b/colossalai/booster/plugin/low_level_zero_plugin.py @@ -448,7 +448,7 @@ def configure( if optimizer is not None and not isinstance(optimizer, OptimizerWrapper): optimizer: LowLevelZeroOptimizer = LowLevelZeroOptimizer( - optimizer, **self.zero_optim_kwargs, verbose=self.verbose + optimizer, **zero_optim_kwargs, verbose=self.verbose ) # inject update_master_params model.update_master_params = MethodType(optimizer.update_master_params, model)