Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion colossalai/context/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .config import Config, ConfigException
from .moe_context import MOE_CONTEXT
from .parallel_context import ParallelContext
from .parallel_mode import ParallelMode
from .moe_context import MOE_CONTEXT
from .process_group_initializer import *
from .random import *
66 changes: 40 additions & 26 deletions colossalai/context/moe_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,29 @@
import torch
import torch.distributed as dist

from colossalai.context.parallel_mode import ParallelMode
from colossalai.context.singleton_meta import SingletonMeta
from colossalai.tensor.moe_tensor.api import get_moe_info
from colossalai.tensor.moe_tensor.moe_info import MoeParallelInfo
from colossalai.tensor import ProcessGroup


def _check_sanity():
from colossalai.core import global_context as gpc
if gpc.tensor_parallel_size > 1 or gpc.pipeline_parallel_size > 1:
raise NotImplementedError("Moe is not compatible with tensor or "
"pipeline parallel at present.")


class MoeParallelInfo:
"""Moe parallelism information, storing parallel sizes and groups.
"""

def __init__(self, ep_size: int, dp_size: int):
_check_sanity()
self.ep_size = ep_size
self.dp_size = dp_size
self.pg = ProcessGroup(tp_degree=ep_size, dp_degree=dp_size)
self.ep_group = self.pg.tp_process_group()
self.dp_group = self.pg.dp_process_group()


class MoeContext(metaclass=SingletonMeta):
Expand All @@ -14,15 +34,13 @@ class MoeContext(metaclass=SingletonMeta):
"""

def __init__(self):
self.world_size = None
self.world_size = 1
# Users may want to set maximum expert parallel size smaller than the world size
# since very low bandwidth across nodes may constrain the performance of MoE
# When we have a maximum expert parallel size, we have a minimum data parallel size naturally
self.max_ep_size = None
self.min_dp_size = None
self.router_aux_loss = []
self.router_z_loss = []
self.parallel = None
self.max_ep_size = 1
self.min_dp_size = 1
self.aux_loss = None
self.use_kernel_optim = True

self.has_setup = False
Expand All @@ -36,14 +54,18 @@ def parallel_info_dict(self):
def is_initialized(self):
return self.has_setup

def setup(self, seed: int, use_kernel_optim: bool = True, max_ep_size: int = 8, parallel: bool = None):
def setup(self, seed: int, use_kernel_optim: bool = True):
assert not self.is_initialized, "MoE distributed context shouldn't be set up again"
_check_sanity()
assert torch.cuda.is_available(), "MoE requires to enable CUDA first"

self.world_size = dist.get_world_size()
self.max_ep_size = min(max_ep_size, dist.get_world_size())

from colossalai.core import global_context as gpc
self.max_ep_size = gpc.config.get('max_ep_size', self.world_size)
assert self.world_size % self.max_ep_size == 0, \
"Maximum expert parallel size must be a factor of the number of GPUs"
self.min_dp_size = self.world_size // self.max_ep_size
self.parallel = parallel

# Enabling kernel optimization may raise error in some cases
# Users can close kernel optimization manually
Expand All @@ -53,7 +75,7 @@ def setup(self, seed: int, use_kernel_optim: bool = True, max_ep_size: int = 8,
moe_set_seed(seed)
self.has_setup = True

def get_info(self, num_experts: int, use_tp: bool = False) -> Tuple[int, MoeParallelInfo]:
def get_info(self, num_experts: int) -> Tuple[int, MoeParallelInfo]:
"""Calculate the Data Parallel Group and Expert Parallel Group.

Parameters
Expand Down Expand Up @@ -82,34 +104,26 @@ def get_info(self, num_experts: int, use_tp: bool = False) -> Tuple[int, MoePara
ep_size = self.max_ep_size // dp_size

# Calculate the number of experts for each GPU
if use_tp:
num_local_experts = num_experts
else:
num_local_experts = 1 if lt_flag else num_experts // self.max_ep_size
num_local_experts = 1 if lt_flag else num_experts // self.max_ep_size

# Don't forget to multiply minimum data parallel size
dp_size *= self.min_dp_size
if not (ep_size in self.parallel_info_dict):
self.parallel_info_dict[ep_size] = get_moe_info(ep_size, dp_size)
self.parallel_info_dict[ep_size] = MoeParallelInfo(ep_size, dp_size)

return num_local_experts, self.parallel_info_dict[ep_size]

def set_kernel_not_use(self):
self.use_kernel_optim = False

def reset_loss(self):
self.router_aux_loss, self.router_z_loss = [], []
self.aux_loss = 0

def add_loss(self, aux_loss: float = 0., z_loss: float = 0.):
self.router_aux_loss.append(aux_loss)
self.router_z_loss.append(z_loss)
def add_loss(self, loss):
self.aux_loss += loss

def get_loss(self):
cur_loss = self.router_aux_loss, self.router_z_loss
return cur_loss

def get_parallel(self):
return self.parallel
return self.aux_loss


MOE_CONTEXT = MoeContext()
3 changes: 1 addition & 2 deletions colossalai/context/random/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
get_current_mode,
get_seeds,
get_states,
moe_set_seed,
reset_seeds,
seed,
set_mode,
Expand All @@ -14,5 +13,5 @@

__all__ = [
'seed', 'set_mode', 'with_seed', 'add_seed', 'get_seeds', 'get_states', 'get_current_mode', 'set_seed_states',
'sync_states', 'moe_set_seed', 'reset_seeds'
'sync_states', 'reset_seeds'
]
11 changes: 1 addition & 10 deletions colossalai/context/random/_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import torch.cuda
from torch import Tensor

from .seed_manager import SeedManager
from ..parallel_mode import ParallelMode
from .seed_manager import SeedManager

_SEED_MANAGER = SeedManager()

Expand Down Expand Up @@ -159,14 +159,5 @@ def wrapper(*args, **kwargs):
return wrapper


def moe_set_seed(seed):
if torch.cuda.is_available():
from colossalai.core import global_context as gpc
global_rank = gpc.get_global_rank()
diff_seed = seed + global_rank
add_seed(ParallelMode.TENSOR, diff_seed, True)
print(f"moe seed condition: {global_rank} with tensor seed {diff_seed}", flush=True)


def reset_seeds():
_SEED_MANAGER.reset()
11 changes: 0 additions & 11 deletions colossalai/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from colossalai.amp.naive_amp import NaiveAMPModel
from colossalai.builder.builder import build_gradient_handler
from colossalai.context import Config, ConfigException, ParallelMode
from colossalai.context.moe_context import MOE_CONTEXT
from colossalai.core import global_context as gpc
from colossalai.engine import Engine
from colossalai.engine.gradient_accumulation import accumulate_gradient
Expand All @@ -32,7 +31,6 @@
from colossalai.logging import get_dist_logger
from colossalai.nn.optimizer.colossalai_optimizer import ColossalaiOptimizer
from colossalai.utils import get_current_device, is_using_ddp, is_using_pp, is_using_sequence, sync_model_param
from colossalai.utils.moe import sync_moe_model_param
from colossalai.zero.legacy import ShardedOptimizerV2, convert_to_zero_v2
from colossalai.zero.legacy.gemini.ophooks import BaseOpHook

Expand Down Expand Up @@ -306,8 +304,6 @@ def initialize(model: nn.Module,
if not use_zero:
if is_using_sequence():
sync_model_param(model, ParallelMode.SEQUENCE_DP)
elif MOE_CONTEXT.is_initialized:
sync_moe_model_param(model)
elif is_using_ddp():
sync_model_param(model, ParallelMode.DATA)
else:
Expand Down Expand Up @@ -359,13 +355,6 @@ def initialize(model: nn.Module,
"Training with zero is detected, ZeROGradientHandler is automatically "
"added even though not specified in the configuration",
ranks=[0])
elif is_using_ddp() and MOE_CONTEXT.is_initialized:
gradient_handler_cfg = [dict(type='MoeGradientHandler')]
if verbose:
logger.info(
"Data parallel training is detected with moe parallel, MoeGradientHandler is automatically "
"added even though not specified in the configuration",
ranks=[0])
elif is_using_sequence():
model = DDP(model,
process_group=gpc.get_group(ParallelMode.SEQUENCE_DP),
Expand Down
10 changes: 10 additions & 0 deletions colossalai/moe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .checkpoint import MoeCheckpintIO
from .experts import EPMLPExperts, TPMLPExperts, build_ffn_experts
from .layers import SparseMLP
from .routers import MoeRouter, Top1Router, Top2Router
from .utils import NormalNoiseGenerator, UniformNoiseGenerator

__all__ = [
'EPMLPExperts', 'TPMLPExperts', 'Top1Router', 'Top2Router', 'NormalNoiseGenerator', 'UniformNoiseGenerator',
'SparseMLP', 'MoeRouter', 'MoeCheckpintIO', 'build_ffn_experts'
]
File renamed without changes.
File renamed without changes.
44 changes: 22 additions & 22 deletions colossalai/nn/layer/moe/experts.py → colossalai/moe/experts.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import math
from copy import deepcopy
from contextlib import nullcontext

import torch
import torch.distributed as dist
import torch.nn as nn

from colossalai.context import ParallelMode, seed
from colossalai.context.moe_context import MOE_CONTEXT
from colossalai.nn.layer.moe._operation import MoeInGradScaler, MoeOutGradScaler
from colossalai.nn.layer.moe.utils import get_activation
from colossalai.tensor.moe_tensor.api import get_dp_group, get_ep_group, get_ep_size, set_moe_tensor_info
from colossalai.moe._operation import MoeInGradScaler, MoeOutGradScaler
from colossalai.moe.manager import MOE_MANAGER
from colossalai.moe.utils import get_activation
from colossalai.shardformer.layer.utils import Randomizer
from colossalai.tensor.moe_tensor.api import get_ep_size, set_moe_tensor_info


class BaseMLPExperts(nn.Module):
Expand All @@ -35,13 +34,13 @@ def __init__(

# get expert parallel info
if expert_parallel is not None:
self.num_local_experts, self.moe_info = MOE_CONTEXT.get_info(
self.num_local_experts, self.moe_info = MOE_MANAGER.get_info(
num_experts, use_tp=True if expert_parallel == "TP" else False)
# get settings for different parallel
if expert_parallel == "TP":
assert intermediate_size % MOE_CONTEXT.max_ep_size == 0, \
assert intermediate_size % MOE_MANAGER.max_ep_size == 0, \
"intermediate_size should be divide by maximum expert parallel size"
intermediate_size = intermediate_size // MOE_CONTEXT.max_ep_size
intermediate_size = intermediate_size // MOE_MANAGER.max_ep_size
num_experts = self.num_total_experts
else:
num_experts = self.num_local_experts
Expand All @@ -57,14 +56,18 @@ def __init__(
self.wi = nn.Parameter(torch.empty(num_experts, hidden_size, intermediate_size))
self.wo = nn.Parameter(torch.empty(num_experts, intermediate_size, hidden_size))

# expert param should be different
if expert_parallel is not None:
with seed(ParallelMode.TENSOR):
if gated:
nn.init.trunc_normal_(self.wi_gate, std=math.sqrt(0.1 / hidden_size))
nn.init.trunc_normal_(self.wi_up, std=math.sqrt(0.1 / hidden_size))
else:
nn.init.trunc_normal_(self.wi, std=math.sqrt(0.1 / hidden_size))
nn.init.trunc_normal_(self.wo, std=math.sqrt(0.1 / intermediate_size))
seed_ctx = Randomizer(MOE_MANAGER.seed).fork_rng(enable_cpu=True)
else:
seed_ctx = nullcontext()
with seed_ctx:
if gated:
nn.init.trunc_normal_(self.wi_gate, std=math.sqrt(0.1 / hidden_size))
nn.init.trunc_normal_(self.wi_up, std=math.sqrt(0.1 / hidden_size))
else:
nn.init.trunc_normal_(self.wi, std=math.sqrt(0.1 / hidden_size))
nn.init.trunc_normal_(self.wo, std=math.sqrt(0.1 / intermediate_size))

self.act = get_activation(activation)
self.drop = nn.Dropout(p=drop_rate)
Expand All @@ -88,10 +91,7 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: # inputs [g, e, c, h]
else:
x = torch.bmm(x, self.wi)
x = self.act(x)

if self.expert_parallel is not None:
with seed(ParallelMode.TENSOR):
x = self.drop(x)
x = self.drop(x)
x = torch.bmm(x, self.wo)

x = x.reshape(inshape)
Expand Down Expand Up @@ -143,7 +143,7 @@ def get_expert_class(name: str) -> BaseMLPExperts:


def build_ffn_experts(num_experts: int, d_model: int, d_ff: int, activation=None, drop_rate: float = 0):
mep_size = MOE_CONTEXT.max_ep_size
mep_size = MOE_MANAGER.max_ep_size
if num_experts % mep_size == 0 or mep_size % num_experts == 0:
return EPMLPExperts(num_experts, d_model, d_ff, activation, drop_rate)
elif d_ff % mep_size == 0:
Expand Down
61 changes: 6 additions & 55 deletions colossalai/nn/layer/moe/layers.py → colossalai/moe/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,11 @@
import torch.nn as nn
import torch.nn.functional as F

from colossalai.context.moe_context import MOE_CONTEXT
from colossalai.nn.layer.moe._operation import (
COL_MOE_KERNEL_FLAG,
AllGather,
AllToAll,
MoeCombine,
MoeDispatch,
ReduceScatter,
)
from colossalai.nn.layer.moe.experts import BaseMLPExperts, get_expert_class
from colossalai.nn.layer.moe.routers import MoeRouter, get_router_cls
from colossalai.nn.layer.moe.utils import get_noise_generator
from colossalai.moe._operation import COL_MOE_KERNEL_FLAG, AllGather, AllToAll, MoeCombine, MoeDispatch, ReduceScatter
from colossalai.moe.experts import BaseMLPExperts, get_expert_class
from colossalai.moe.manager import MOE_MANAGER
from colossalai.moe.routers import MoeRouter, get_router_cls
from colossalai.moe.utils import get_noise_generator
from colossalai.tensor.moe_tensor.api import get_ep_group, get_ep_size


Expand Down Expand Up @@ -65,7 +58,7 @@ def __init__(self,
super().__init__()
self.hidden_size = hidden_size
self.num_experts = num_experts
self.use_kernel = True if COL_MOE_KERNEL_FLAG and MOE_CONTEXT.use_kernel_optim else False
self.use_kernel = True if COL_MOE_KERNEL_FLAG and MOE_MANAGER.use_kernel_optim else False
self.expert_parallel = expert_parallel
assert expert_parallel in ["EP", "TP", None], f"Unsupported expert parallel type {expert_parallel}"

Expand Down Expand Up @@ -156,45 +149,3 @@ def _tp_process(self, dispatch_data: torch.Tensor) -> torch.Tensor:
expert_out = self.experts(expert_in)
expert_out = ReduceScatter.apply(expert_out, self.ep_group)
return expert_out


class MoeModule(SparseMLP):
"""
For other dependency
"""

def __init__(self,
num_experts: int,
top_k: int = 1,
capacity_factor_train: float = 1.25,
capacity_factor_eval: float = 2.0,
min_capacity: int = 4,
noisy_policy: Optional[str] = None,
drop_tks: bool = True,
expert_parallel: str = "EP",
hidden_size: int = 2048,
intermediate_size: int = 2048,
activation: str = None):
super().__init__(num_experts, top_k, capacity_factor_train, capacity_factor_eval, min_capacity, noisy_policy,
drop_tks, expert_parallel, hidden_size, intermediate_size, activation)


class MoeLayer(SparseMLP):
"""
For other dependency
"""

def __init__(self,
num_experts: int,
top_k: int = 1,
capacity_factor_train: float = 1.25,
capacity_factor_eval: float = 2.0,
min_capacity: int = 4,
noisy_policy: Optional[str] = None,
drop_tks: bool = True,
expert_parallel: str = "EP",
hidden_size: int = 2048,
intermediate_size: int = 2048,
activation: str = None):
super().__init__(num_experts, top_k, capacity_factor_train, capacity_factor_eval, min_capacity, noisy_policy,
drop_tks, expert_parallel, hidden_size, intermediate_size, activation)
Loading