diff --git a/colossalai/amp/apex_amp/apex_amp.py b/colossalai/amp/apex_amp/apex_amp.py index ba603ca0975c..acc051181562 100644 --- a/colossalai/amp/apex_amp/apex_amp.py +++ b/colossalai/amp/apex_amp/apex_amp.py @@ -11,7 +11,7 @@ from torch import Tensor from colossalai.interface import OptimizerWrapper -from colossalai.utils import clip_grad_norm_fp32 +from colossalai.legacy.utils import clip_grad_norm_fp32 class ApexAMPOptimizer(OptimizerWrapper): diff --git a/colossalai/amp/naive_amp/__init__.py b/colossalai/amp/naive_amp/__init__.py index 5b2f71d3ced7..b1aaf8b4d268 100644 --- a/colossalai/amp/naive_amp/__init__.py +++ b/colossalai/amp/naive_amp/__init__.py @@ -3,7 +3,7 @@ import torch.nn as nn from torch.optim import Optimizer -from colossalai.utils import is_no_pp_or_last_stage +from colossalai.legacy.utils import is_no_pp_or_last_stage from ._fp16_optimizer import FP16Optimizer from .grad_scaler import ConstantGradScaler, DynamicGradScaler diff --git a/colossalai/amp/naive_amp/_fp16_optimizer.py b/colossalai/amp/naive_amp/_fp16_optimizer.py index e4699f92b944..44b0606ddf5c 100644 --- a/colossalai/amp/naive_amp/_fp16_optimizer.py +++ b/colossalai/amp/naive_amp/_fp16_optimizer.py @@ -9,8 +9,9 @@ from colossalai.context import ParallelMode from colossalai.core import global_context as gpc from colossalai.kernel.op_builder import FusedOptimBuilder +from colossalai.legacy.utils import clip_grad_norm_fp32, copy_tensor_parallel_attributes from colossalai.logging import get_dist_logger -from colossalai.utils import clip_grad_norm_fp32, copy_tensor_parallel_attributes, multi_tensor_applier +from colossalai.utils import multi_tensor_applier from ._utils import has_inf_or_nan, zero_gard_by_list from .grad_scaler import BaseGradScaler diff --git a/colossalai/amp/torch_amp/torch_amp.py b/colossalai/amp/torch_amp/torch_amp.py index 452b3d8a00fc..c45a5956a205 100644 --- a/colossalai/amp/torch_amp/torch_amp.py +++ b/colossalai/amp/torch_amp/torch_amp.py @@ -8,7 +8,7 @@ from torch.optim import Optimizer from colossalai.interface import OptimizerWrapper -from colossalai.utils import clip_grad_norm_fp32 +from colossalai.legacy.utils import clip_grad_norm_fp32 from ._grad_scaler import GradScaler diff --git a/colossalai/initialize.py b/colossalai/initialize.py index 510c4555cc9b..52b8010d98c1 100644 --- a/colossalai/initialize.py +++ b/colossalai/initialize.py @@ -30,10 +30,11 @@ PipelineSchedule, get_tensor_shape, ) +from colossalai.legacy.utils import is_using_ddp, is_using_pp, is_using_sequence, sync_model_param from colossalai.legacy.zero import ShardedOptimizerV2, convert_to_zero_v2 from colossalai.legacy.zero.gemini.ophooks import BaseOpHook from colossalai.logging import get_dist_logger -from colossalai.utils import get_current_device, is_using_ddp, is_using_pp, is_using_sequence, sync_model_param +from colossalai.utils import get_current_device from colossalai.utils.moe import sync_moe_model_param diff --git a/colossalai/legacy/engine/schedule/_pipeline_schedule.py b/colossalai/legacy/engine/schedule/_pipeline_schedule.py index 227729501be2..56a496f33f15 100644 --- a/colossalai/legacy/engine/schedule/_pipeline_schedule.py +++ b/colossalai/legacy/engine/schedule/_pipeline_schedule.py @@ -10,8 +10,8 @@ from colossalai.amp.naive_amp import NaiveAMPModel from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc +from colossalai.legacy.utils import switch_virtual_pipeline_parallel_rank from colossalai.logging import get_dist_logger -from colossalai.utils import switch_virtual_pipeline_parallel_rank from colossalai.utils.cuda import get_current_device from ._base_schedule import BaseSchedule diff --git a/colossalai/legacy/nn/layer/parallel_1d/layers.py b/colossalai/legacy/nn/layer/parallel_1d/layers.py index c0a169c1596f..1f6ed0e0c61b 100644 --- a/colossalai/legacy/nn/layer/parallel_1d/layers.py +++ b/colossalai/legacy/nn/layer/parallel_1d/layers.py @@ -16,12 +16,12 @@ from colossalai.kernel import LayerNorm from colossalai.legacy.communication import broadcast from colossalai.legacy.registry import LAYERS -from colossalai.nn import init as init -from colossalai.utils.checkpointing import ( +from colossalai.legacy.utils.checkpointing import ( broadcast_state_dict, gather_tensor_parallel_state_dict, partition_tensor_parallel_state_dict, ) +from colossalai.nn import init as init from colossalai.utils.cuda import get_current_device from ..base_layer import ParallelLayer diff --git a/colossalai/legacy/nn/layer/parallel_2d/layers.py b/colossalai/legacy/nn/layer/parallel_2d/layers.py index b458d15c78e7..ba0e1196954d 100644 --- a/colossalai/legacy/nn/layer/parallel_2d/layers.py +++ b/colossalai/legacy/nn/layer/parallel_2d/layers.py @@ -13,8 +13,11 @@ from colossalai.global_variables import tensor_parallel_env as env from colossalai.legacy.communication import broadcast from colossalai.legacy.registry import LAYERS +from colossalai.legacy.utils.checkpointing import ( + gather_tensor_parallel_state_dict, + partition_tensor_parallel_state_dict, +) from colossalai.nn import init as init -from colossalai.utils.checkpointing import gather_tensor_parallel_state_dict, partition_tensor_parallel_state_dict from colossalai.utils.cuda import get_current_device from ..base_layer import ParallelLayer diff --git a/colossalai/legacy/nn/layer/parallel_2p5d/layers.py b/colossalai/legacy/nn/layer/parallel_2p5d/layers.py index 04acc2bb0f4c..9c38d72a2461 100644 --- a/colossalai/legacy/nn/layer/parallel_2p5d/layers.py +++ b/colossalai/legacy/nn/layer/parallel_2p5d/layers.py @@ -13,12 +13,12 @@ from colossalai.global_variables import tensor_parallel_env as env from colossalai.legacy.communication import broadcast from colossalai.legacy.registry import LAYERS -from colossalai.nn import init as init -from colossalai.utils.checkpointing import ( +from colossalai.legacy.utils.checkpointing import ( broadcast_state_dict, gather_tensor_parallel_state_dict, partition_tensor_parallel_state_dict, ) +from colossalai.nn import init as init from colossalai.utils.cuda import get_current_device from ..base_layer import ParallelLayer diff --git a/colossalai/legacy/nn/layer/parallel_3d/layers.py b/colossalai/legacy/nn/layer/parallel_3d/layers.py index b815a842ca52..5f235a27e5fe 100644 --- a/colossalai/legacy/nn/layer/parallel_3d/layers.py +++ b/colossalai/legacy/nn/layer/parallel_3d/layers.py @@ -15,12 +15,12 @@ from colossalai.legacy.communication import all_reduce, broadcast from colossalai.legacy.nn.layer.base_layer import ParallelLayer from colossalai.legacy.registry import LAYERS -from colossalai.nn import init as init -from colossalai.utils.checkpointing import ( +from colossalai.legacy.utils.checkpointing import ( broadcast_state_dict, gather_tensor_parallel_state_dict, partition_tensor_parallel_state_dict, ) +from colossalai.nn import init as init from colossalai.utils.cuda import get_current_device from ..utils import divide, set_tensor_parallel_attribute_by_partition, to_2tuple diff --git a/colossalai/legacy/nn/layer/utils/common.py b/colossalai/legacy/nn/layer/utils/common.py index d8f3ad2a7eca..0f5f964df65f 100644 --- a/colossalai/legacy/nn/layer/utils/common.py +++ b/colossalai/legacy/nn/layer/utils/common.py @@ -10,7 +10,7 @@ from colossalai.constants import IS_TENSOR_PARALLEL, NUM_PARTITIONS from colossalai.global_variables import tensor_parallel_env as env -from colossalai.utils import checkpoint +from colossalai.legacy.utils import checkpoint class CheckpointModule(nn.Module): diff --git a/colossalai/legacy/trainer/_trainer.py b/colossalai/legacy/trainer/_trainer.py index 1847e56222a1..1cb99fcc90ed 100644 --- a/colossalai/legacy/trainer/_trainer.py +++ b/colossalai/legacy/trainer/_trainer.py @@ -6,8 +6,9 @@ from colossalai.legacy.engine import Engine from colossalai.legacy.trainer.hooks import BaseHook +from colossalai.legacy.utils import is_dp_rank_0, is_no_pp_or_last_stage, is_tp_rank_0 from colossalai.logging import DistributedLogger -from colossalai.utils import MultiTimer, is_dp_rank_0, is_no_pp_or_last_stage, is_tp_rank_0 +from colossalai.utils import MultiTimer class Trainer: diff --git a/colossalai/legacy/trainer/hooks/_checkpoint_hook.py b/colossalai/legacy/trainer/hooks/_checkpoint_hook.py index 6b150d29139f..cda10030bf65 100644 --- a/colossalai/legacy/trainer/hooks/_checkpoint_hook.py +++ b/colossalai/legacy/trainer/hooks/_checkpoint_hook.py @@ -4,8 +4,8 @@ from colossalai.legacy.registry import HOOKS from colossalai.legacy.trainer.hooks import BaseHook +from colossalai.legacy.utils.checkpointing import save_checkpoint from colossalai.logging import get_dist_logger -from colossalai.utils.checkpointing import save_checkpoint from ._lr_scheduler_hook import LRSchedulerHook diff --git a/colossalai/legacy/trainer/hooks/_log_hook.py b/colossalai/legacy/trainer/hooks/_log_hook.py index 7d9ad19aa9e9..839f1450c257 100644 --- a/colossalai/legacy/trainer/hooks/_log_hook.py +++ b/colossalai/legacy/trainer/hooks/_log_hook.py @@ -9,8 +9,9 @@ from colossalai.core import global_context as gpc from colossalai.legacy.registry import HOOKS from colossalai.legacy.trainer.hooks._metric_hook import ThroughputMetric +from colossalai.legacy.utils import is_dp_rank_0, is_no_pp_or_last_stage, is_tp_rank_0, report_memory_usage from colossalai.logging import DistributedLogger -from colossalai.utils import MultiTimer, is_dp_rank_0, is_no_pp_or_last_stage, is_tp_rank_0, report_memory_usage +from colossalai.utils import MultiTimer from ._base_hook import BaseHook from ._commons_ import _format_number diff --git a/colossalai/legacy/trainer/hooks/_metric_hook.py b/colossalai/legacy/trainer/hooks/_metric_hook.py index f1bd19387cb5..95b8a6abf15e 100644 --- a/colossalai/legacy/trainer/hooks/_metric_hook.py +++ b/colossalai/legacy/trainer/hooks/_metric_hook.py @@ -11,7 +11,8 @@ from colossalai.core import global_context as gpc from colossalai.legacy.communication import all_reduce from colossalai.legacy.registry import HOOKS -from colossalai.utils import get_current_device, is_no_pp_or_last_stage +from colossalai.legacy.utils import is_no_pp_or_last_stage +from colossalai.utils import get_current_device from ._base_hook import BaseHook from ._commons_ import _format_number diff --git a/colossalai/legacy/utils/__init__.py b/colossalai/legacy/utils/__init__.py new file mode 100644 index 000000000000..ae358f8bebcb --- /dev/null +++ b/colossalai/legacy/utils/__init__.py @@ -0,0 +1,53 @@ +from .checkpointing import load_checkpoint, save_checkpoint +from .common import ( + clip_grad_norm_fp32, + copy_tensor_parallel_attributes, + count_zeros_fp32, + is_dp_rank_0, + is_model_parallel_parameter, + is_no_pp_or_last_stage, + is_tp_rank_0, + is_using_ddp, + is_using_pp, + is_using_sequence, + param_is_not_tensor_parallel_duplicate, + print_rank_0, + switch_virtual_pipeline_parallel_rank, + sync_model_param, +) +from .data_sampler import DataParallelSampler, get_dataloader +from .memory import ( + colo_device_memory_capacity, + colo_device_memory_used, + colo_get_cpu_memory_capacity, + colo_set_cpu_memory_capacity, + colo_set_process_memory_fraction, + report_memory_usage, +) + +__all__ = [ + 'DataParallelSampler', + 'get_dataloader', + 'save_checkpoint', + 'load_checkpoint', + 'colo_device_memory_capacity', + 'colo_device_memory_used', + 'colo_get_cpu_memory_capacity', + 'colo_set_cpu_memory_capacity', + 'colo_set_process_memory_fraction', + 'report_memory_usage', + 'clip_grad_norm_fp32', + 'copy_tensor_parallel_attributes', + 'count_zeros_fp32', + 'is_dp_rank_0', + 'is_model_parallel_parameter', + 'is_no_pp_or_last_stage', + 'is_tp_rank_0', + 'is_using_ddp', + 'is_using_pp', + 'is_using_sequence', + 'param_is_not_tensor_parallel_duplicate', + 'print_rank_0', + 'switch_virtual_pipeline_parallel_rank', + 'sync_model_param', +] diff --git a/colossalai/utils/activation_checkpoint.py b/colossalai/legacy/utils/activation_checkpoint.py similarity index 95% rename from colossalai/utils/activation_checkpoint.py rename to colossalai/legacy/utils/activation_checkpoint.py index fa9ed827a8a7..7fcaa73f4f83 100644 --- a/colossalai/utils/activation_checkpoint.py +++ b/colossalai/legacy/utils/activation_checkpoint.py @@ -1,13 +1,13 @@ #!/usr/bin/env python # -*- encoding: utf-8 -*- +import weakref + import torch from torch.utils.checkpoint import check_backward_validity, detach_variable -from colossalai.context.random import get_states, get_current_mode, set_seed_states, set_mode, sync_states -from .cuda import get_current_device - -import weakref +from colossalai.context.random import get_current_mode, get_states, set_mode, set_seed_states, sync_states +from colossalai.utils import get_current_device def copy_to_device(obj, device): @@ -143,7 +143,7 @@ def checkpoint(function, activation_offload, *args, use_reentrant: bool = True): Args: function: Describe the forward pass function. It should know how to handle the input tuples. - activation_offload: The variable to check whether we should offload activation to cpu + activation_offload: The variable to check whether we should offload activation to cpu args (list): Tuple containing the parameters of the function use_reentrant: Bool type to check if we need to use_reentrant, if use_reentrant=False, there might be more flexibility for user to define there checkpoint function @@ -227,12 +227,12 @@ def inner_unpack(packed): # rerun forward, the inner_pack will store all the activations in storage if has_autocast_in_fwd: with torch.enable_grad(), \ - torch.cuda.amp.autocast(), \ - torch.autograd.graph.saved_tensors_hooks(inner_pack, inner_unpack): + torch.cuda.amp.autocast(), \ + torch.autograd.graph.saved_tensors_hooks(inner_pack, inner_unpack): _unused = function(*args) else: with torch.enable_grad(), \ - torch.autograd.graph.saved_tensors_hooks(inner_pack, inner_unpack): + torch.autograd.graph.saved_tensors_hooks(inner_pack, inner_unpack): _unused = function(*args) if x not in storage: diff --git a/colossalai/legacy/utils/checkpoint/__init__.py b/colossalai/legacy/utils/checkpoint/__init__.py new file mode 100644 index 000000000000..558a956b31ac --- /dev/null +++ b/colossalai/legacy/utils/checkpoint/__init__.py @@ -0,0 +1,3 @@ +from .module_checkpoint import load_checkpoint, save_checkpoint + +__all__ = ['save_checkpoint', 'load_checkpoint'] diff --git a/colossalai/utils/checkpoint/module_checkpoint.py b/colossalai/legacy/utils/checkpoint/module_checkpoint.py similarity index 98% rename from colossalai/utils/checkpoint/module_checkpoint.py rename to colossalai/legacy/utils/checkpoint/module_checkpoint.py index ee8773e5059c..9bd2907abf9d 100644 --- a/colossalai/utils/checkpoint/module_checkpoint.py +++ b/colossalai/legacy/utils/checkpoint/module_checkpoint.py @@ -5,7 +5,8 @@ from colossalai.interface import OptimizerWrapper from colossalai.tensor import ColoTensor -from colossalai.utils.checkpoint.utils import gather_tensor, scatter_tensor + +from .utils import gather_tensor, scatter_tensor def save_checkpoint(path: str, diff --git a/colossalai/utils/checkpoint/utils.py b/colossalai/legacy/utils/checkpoint/utils.py similarity index 91% rename from colossalai/utils/checkpoint/utils.py rename to colossalai/legacy/utils/checkpoint/utils.py index 682cd0903d5b..c830d4811463 100644 --- a/colossalai/utils/checkpoint/utils.py +++ b/colossalai/legacy/utils/checkpoint/utils.py @@ -1,63 +1,65 @@ -import torch -import torch.distributed as dist -from colossalai.tensor import ColoTensor, ColoTensorSpec -from colossalai.tensor.distspec import _DistSpec, DistPlacementPattern - - -def robust_broadcast(tensor): - with torch.no_grad(): - is_cpu_ten = tensor.device.type == 'cpu' - if is_cpu_ten: - b_data = tensor.cuda() - else: - b_data = tensor - - dist.broadcast(b_data, 0) - - if is_cpu_ten: - tensor.copy_(b_data) - - -def gather_tensor(colo_tensor: ColoTensor) -> None: - """Make colo_tensor replicated when the rank is 0 - """ - if not colo_tensor.is_replicate(): - pg = colo_tensor.get_process_group() - # for the group which contains rank 0 - if pg.dp_local_rank() == 0: - old_dist_spec = colo_tensor.dist_spec - colo_tensor.to_replicate_() - if dist.get_rank() != 0: - colo_tensor.set_dist_spec(old_dist_spec) - - # synchronize all processes for unexpected problems - dist.barrier() - - if dist.get_rank() == 0: - setattr(colo_tensor, 'save_ready', True) # set saving signature - - -def scatter_tensor(colo_tensor: ColoTensor, dist_spec: _DistSpec) -> None: - """Reversal operation of `gather_tensor`. - """ - if dist_spec.placement == DistPlacementPattern.REPLICATE: - robust_broadcast(colo_tensor.data) - else: - global_size = colo_tensor.size_global() - - if dist.get_rank() == 0: - entire_data = colo_tensor.data - else: - entire_data = torch.empty(global_size, device=colo_tensor.device) - robust_broadcast(entire_data) - - if dist.get_rank() == 0: - colo_tensor.set_dist_spec(dist_spec) - else: - rep_tensor = ColoTensor( - entire_data, ColoTensorSpec(pg=colo_tensor.get_process_group(), compute_attr=colo_tensor.compute_spec)) - rep_tensor.set_dist_spec(dist_spec) - with torch.no_grad(): - colo_tensor.data.copy_(rep_tensor.data) - # synchronize all processes for unexpected problems - dist.barrier() +import torch +import torch.distributed as dist + +from colossalai.legacy.tensor import ColoTensorSpec +from colossalai.legacy.tensor.distspec import DistPlacementPattern, _DistSpec +from colossalai.tensor import ColoTensor + + +def robust_broadcast(tensor): + with torch.no_grad(): + is_cpu_ten = tensor.device.type == 'cpu' + if is_cpu_ten: + b_data = tensor.cuda() + else: + b_data = tensor + + dist.broadcast(b_data, 0) + + if is_cpu_ten: + tensor.copy_(b_data) + + +def gather_tensor(colo_tensor: ColoTensor) -> None: + """Make colo_tensor replicated when the rank is 0 + """ + if not colo_tensor.is_replicate(): + pg = colo_tensor.get_process_group() + # for the group which contains rank 0 + if pg.dp_local_rank() == 0: + old_dist_spec = colo_tensor.dist_spec + colo_tensor.to_replicate_() + if dist.get_rank() != 0: + colo_tensor.set_dist_spec(old_dist_spec) + + # synchronize all processes for unexpected problems + dist.barrier() + + if dist.get_rank() == 0: + setattr(colo_tensor, 'save_ready', True) # set saving signature + + +def scatter_tensor(colo_tensor: ColoTensor, dist_spec: _DistSpec) -> None: + """Reversal operation of `gather_tensor`. + """ + if dist_spec.placement == DistPlacementPattern.REPLICATE: + robust_broadcast(colo_tensor.data) + else: + global_size = colo_tensor.size_global() + + if dist.get_rank() == 0: + entire_data = colo_tensor.data + else: + entire_data = torch.empty(global_size, device=colo_tensor.device) + robust_broadcast(entire_data) + + if dist.get_rank() == 0: + colo_tensor.set_dist_spec(dist_spec) + else: + rep_tensor = ColoTensor( + entire_data, ColoTensorSpec(pg=colo_tensor.get_process_group(), compute_attr=colo_tensor.compute_spec)) + rep_tensor.set_dist_spec(dist_spec) + with torch.no_grad(): + colo_tensor.data.copy_(rep_tensor.data) + # synchronize all processes for unexpected problems + dist.barrier() diff --git a/colossalai/utils/checkpointing.py b/colossalai/legacy/utils/checkpointing.py similarity index 99% rename from colossalai/utils/checkpointing.py rename to colossalai/legacy/utils/checkpointing.py index d1c6b6370ede..9f56dcaeb28d 100644 --- a/colossalai/utils/checkpointing.py +++ b/colossalai/legacy/utils/checkpointing.py @@ -3,9 +3,11 @@ import torch import torch.distributed as dist + +from colossalai.constants import IS_TENSOR_PARALLEL from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc -from colossalai.constants import IS_TENSOR_PARALLEL + try: from torch.nn.modules.module import _EXTRA_STATE_KEY_SUFFIX except ImportError: diff --git a/colossalai/legacy/utils/common.py b/colossalai/legacy/utils/common.py new file mode 100644 index 000000000000..b124a986eabe --- /dev/null +++ b/colossalai/legacy/utils/common.py @@ -0,0 +1,434 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +from collections import defaultdict +from contextlib import contextmanager +from typing import Dict, List, Optional, Union + +import torch +import torch.distributed as dist +from torch import inf +from torch.nn.parameter import Parameter + +from colossalai.constants import IS_TENSOR_PARALLEL, NUM_PARTITIONS, TENSOR_PARALLEL_ATTRIBUTES +from colossalai.context.parallel_mode import ParallelMode +from colossalai.core import global_context as gpc +from colossalai.global_variables import tensor_parallel_env as env +from colossalai.legacy.tensor import ProcessGroup +from colossalai.tensor import ColoParameter +from colossalai.utils.multi_tensor_apply import multi_tensor_applier + +try: + from colossalai._C import fused_optim +except: + fused_optim = None + + +def print_rank_0(msg: str, logger=None): + """Print messages and save logs(optional). This is executed only if you are the rank-0 gpu. + + Args: + msg (str): A string message to output. + logger (:class:`colossalai.logging.DistributedLogger`, optional): + The logger to record the message, defaults to None. + """ + if gpc.get_global_rank() == 0: + if logger is None: + print(msg, flush=True) + else: + logger.info(msg) + + +def sync_model_param(model, parallel_mode): + r"""Make sure data parameters are consistent during Data Parallel Mode. + + Args: + model (:class:`torch.nn.Module`): A pyTorch model on whose parameters you check the consistency. + parallel_mode (:class:`colossalai.context.ParallelMode`): Parallel mode to be checked. + + Note: + The parallel_mode should be concluded in ``ParallelMode``. More details about ``ParallelMode`` could be found + in `parallel_mode `_ + """ + if gpc.is_initialized(parallel_mode) and gpc.get_world_size(parallel_mode) > 1: + for param in model.parameters(): + ranks = gpc.get_ranks_in_group(parallel_mode) + dist.broadcast(param, src=ranks[0], group=gpc.get_group(parallel_mode)) + + +def is_dp_rank_0(): + return not gpc.is_initialized(ParallelMode.DATA) or gpc.is_first_rank(ParallelMode.DATA) + + +def is_tp_rank_0(): + return not gpc.is_initialized(ParallelMode.TENSOR) or gpc.is_first_rank(ParallelMode.TENSOR) + + +def is_no_pp_or_last_stage(): + return not gpc.is_initialized(ParallelMode.PIPELINE) or gpc.is_last_rank(ParallelMode.PIPELINE) + + +def is_using_ddp(): + return gpc.is_initialized(ParallelMode.DATA) and gpc.get_world_size(ParallelMode.DATA) > 1 + + +def is_using_pp(): + return gpc.is_initialized(ParallelMode.PIPELINE) and gpc.get_world_size(ParallelMode.PIPELINE) > 1 + + +def is_using_sequence(): + return gpc.is_initialized(ParallelMode.SEQUENCE) and gpc.get_world_size(ParallelMode.SEQUENCE) > 1 + + +class model_branch_context(object): + + def __enter__(self): + self.env_status = env.save() + + def __exit__(self, *exc_info): + env.load(**self.env_status) + + +def is_model_parallel_parameter(p): + return hasattr(p, IS_TENSOR_PARALLEL) and getattr(p, IS_TENSOR_PARALLEL) + + +def _calc_l2_norm(grads): + # we should not + global fused_optim + + if fused_optim is None: + from colossalai.kernel.op_builder import FusedOptimBuilder + fused_optim = FusedOptimBuilder().load() + + norm = 0.0 + if len(grads) > 0: + dummy_overflow_buf = torch.cuda.IntTensor([0]) + norm, _ = multi_tensor_applier( + fused_optim.multi_tensor_l2norm, + dummy_overflow_buf, + [grads], + False # no per-parameter norm + ) + return norm + + +def _calc_lp(grads, norm_type): + norm = 0.0 + for grad in grads: + grad_norm = torch.norm(grad, norm_type) + norm += grad_norm**norm_type + return norm + + +def _move_norm_to_cuda(norm: Union[float, torch.Tensor]) -> Union[float, torch.Tensor]: + if torch.is_tensor(norm) and norm.device.type != 'cuda': + norm = norm.to(torch.cuda.current_device()) + return norm + + +def _get_tensor_norm(norm: Union[float, torch.Tensor], move_to_cuda) -> torch.Tensor: + if isinstance(norm, float): + norm = torch.Tensor([norm]) + if move_to_cuda: + norm = norm.to(torch.cuda.current_device()) + return norm + + +# ======== Gradient Clipping ========= + + +def _compute_local_lp(params: List[ColoParameter], norm_type: float) -> float: + if len(params) == 0: + return 0.0 + grads = [p.grad for p in params] + use_cuda_kernel = grads[0].device.type == 'cuda' + if norm_type == inf: + local_lp = max([g.abs().max() for g in grads]) + elif norm_type == 2.0 and use_cuda_kernel: + local_lp = _calc_l2_norm(grads)**norm_type + else: + local_lp = _calc_lp(grads, norm_type) + if isinstance(local_lp, torch.Tensor): + return local_lp.item() + return local_lp + + +def _compute_buckets_lp(params: List[ColoParameter], norm_type: float) -> float: + if len(params) == 0: + return 0.0 + buckets: Dict[Optional[ProcessGroup], List[ColoParameter]] = defaultdict(list) + for p in params: + if p.is_replicate(): + buckets[None].append(p) + else: + buckets[p.get_process_group().tp_process_group()].append(p) + total_lp = 0.0 + for group, bucket in buckets.items(): + local_lp = _compute_local_lp(bucket, norm_type) + if group is not None: + local_lp_tensor = torch.tensor([local_lp], device=torch.cuda.current_device()) + if norm_type == inf: + dist.all_reduce(local_lp_tensor, op=dist.ReduceOp.MAX, group=group) + else: + dist.all_reduce(local_lp_tensor, group=group) + local_lp = local_lp_tensor.item() + if norm_type == inf: + total_lp = max(total_lp, local_lp) + else: + total_lp += local_lp + return total_lp + + +def _compute_pp_grad_lp(total_lp: float, norm_type: float) -> float: + if gpc.is_initialized(ParallelMode.PIPELINE) and gpc.get_world_size(ParallelMode.PIPELINE) > 1: + total_lp_tensor = torch.tensor([total_lp], device=torch.cuda.current_device()) + if norm_type == inf: + dist.all_reduce(total_lp_tensor, op=dist.ReduceOp.MAX, group=gpc.get_group(ParallelMode.PIPELINE)) + else: + dist.all_reduce(total_lp_tensor, group=gpc.get_group(ParallelMode.PIPELINE)) + total_lp = total_lp_tensor.item() + return total_lp + + +def _compute_grad_lp(parameters, norm_type: float = 2.0) -> float: + if isinstance(parameters, torch.Tensor): + parameters = [parameters] + grad_dtype = None + cpu_grad_params: List[ColoParameter] = [] + cuda_grad_params: List[ColoParameter] = [] + for p in parameters: + if p.grad is None: + continue + assert isinstance(p, ColoParameter) + if grad_dtype is None: + grad_dtype = p.grad.dtype + assert p.grad.dtype == grad_dtype, f'Expected all grads are {grad_dtype}, got {p.grad.dtype}' + if p.grad.device.type == 'cuda': + cuda_grad_params.append(p) + else: + cpu_grad_params.append(p) + norm_type = float(norm_type) + cpu_lp = _compute_buckets_lp(cpu_grad_params, norm_type) + cuda_lp = _compute_buckets_lp(cuda_grad_params, norm_type) + if norm_type == inf: + total_lp = max(cpu_lp, cuda_lp) + else: + total_lp = cpu_lp + cuda_lp + return _compute_pp_grad_lp(total_lp, norm_type) + + +def compute_grad_norm(parameters, norm_type: float = 2.0) -> float: + norm_type = float(norm_type) + total_norm = _compute_grad_lp(parameters, norm_type) + if norm_type != inf: + total_norm = total_norm**(1 / norm_type) + return total_norm + + +def _clip_grad_norm(parameters, max_norm: float, total_norm: float) -> None: + clip_coef = max_norm / (total_norm + 1e-6) + if clip_coef < 1.0: + cuda_grads: List[torch.Tensor] = [] + cpu_grads: List[torch.Tensor] = [] + if isinstance(parameters, torch.Tensor): + parameters = [parameters] + for p in parameters: + if p.grad is None: + continue + if p.grad.device.type == 'cuda': + cuda_grads.append(p.grad.detach()) + else: + cpu_grads.append(p.grad.detach()) + if len(cuda_grads) > 0: + dummy_overflow_buf = torch.cuda.IntTensor([0]) + multi_tensor_applier(fused_optim.multi_tensor_scale, dummy_overflow_buf, [cuda_grads, cuda_grads], + clip_coef) + for g in cpu_grads: + g.mul_(clip_coef) + + +def clip_grad_norm(parameters, max_norm: float, norm_type: float = 2.0) -> float: + total_norm = compute_grad_norm(parameters, norm_type) + _clip_grad_norm(parameters, max_norm, total_norm) + return total_norm + + +def clip_grad_norm_fp32(parameters, max_norm, norm_type=2): + """Clips gradient norm of an iterable of parameters whose gradients are in fp32. + + This is adapted from :func:`torch.nn.utils.clip_grad.clip_grad_norm_` and + added functionality to handle model parallel parameters. + + Note: + the gradients are modified in place. + + Args: + parameters (Iterable[:class:`torch.tensor`] or :class:`torch.tensor`): + An iterable of Tensors or a single Tensor that will have gradients normalized. + max_norm (Union[float, int]): Max norm of the gradients. + norm_type (Union[float, int, 'inf']): Type of the used p-norm. Can be ``'inf'`` for infinity norm. + + Returns: + float: Total norm of the parameters. + """ + + if isinstance(parameters, torch.Tensor): + parameters = [parameters] + + # Filter parameters based on: + # - grad should not be none + # - parameter should not be shared + # - should not be a replica due to tensor model parallelism + params: List[Parameter] = [] + has_zero_shared_param: bool = False + for param in parameters: + if param.grad is not None: + # Make sure the grads are in fp32 + assert param.grad.dtype == torch.float, \ + f'expected gradient to be dtype torch.float, but got {param.grad.type()}' + if hasattr(param, 'colo_attr') and param.colo_attr.sharded_data_tensor.is_sharded: + has_zero_shared_param = True + params.append(param) + + if len(params) == 0: + enable_cuda_kernels = False + else: + enable_cuda_kernels = params[0].grad.device.type == 'cuda' + # Norm parameters. + max_norm = float(max_norm) + norm_type = float(norm_type) + + # Parameters can be on CPU or CUDA + # If parameters are on CPU, disable CUDA kernels + + # Calculate norm. + if norm_type == inf: + total_norm = max(p.grad.data.abs().max() for p in params) + total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)]) + # Take max across all model-parallel GPUs. + if gpc.is_initialized(ParallelMode.MODEL) and gpc.get_world_size(ParallelMode.MODEL) > 1: + dist.all_reduce(total_norm_cuda, + op=dist.ReduceOp.MAX, + group=gpc.get_group(ParallelMode.MODEL), + async_op=False) + if has_zero_shared_param: + dist.all_reduce(total_norm_cuda, + op=dist.ReduceOp.MAX, + group=gpc.get_group(ParallelMode.DATA), + async_op=False) + total_norm = total_norm_cuda[0].item() + else: + tensor_parallel_grads = [] + no_tensor_parallel_grads = [] + zero_sharded_grads = [] + for p in params: + if is_model_parallel_parameter(p): + reductor = (gpc.get_world_size(ParallelMode.TENSOR) / getattr(p, NUM_PARTITIONS))**(1 / norm_type) + tensor_parallel_grads.append(p.grad.data / reductor) + elif hasattr(p, 'colo_attr') and p.colo_attr.sharded_data_tensor.is_sharded: + zero_sharded_grads.append(p.grad.data) + else: + no_tensor_parallel_grads.append(p.grad.data) + + if norm_type == 2.0 and enable_cuda_kernels: + tensor_parallel_norm = _calc_l2_norm(tensor_parallel_grads)**norm_type + no_tensor_parallel_norm = _calc_l2_norm(no_tensor_parallel_grads)**norm_type + zero_sharded_norm = _calc_l2_norm(zero_sharded_grads)**norm_type + else: + tensor_parallel_norm = _calc_lp(tensor_parallel_grads, norm_type) + no_tensor_parallel_norm = _calc_lp(no_tensor_parallel_grads, norm_type) + zero_sharded_norm = _calc_lp(zero_sharded_grads, norm_type) + # If norm is type of float, then we convert them into torch.Tensor. + tensor_parallel_norm = _get_tensor_norm(tensor_parallel_norm, enable_cuda_kernels) + no_tensor_parallel_norm = _get_tensor_norm(no_tensor_parallel_norm, enable_cuda_kernels) + zero_sharded_norm = _get_tensor_norm(zero_sharded_norm, enable_cuda_kernels) + # If grads are on CPU, the norms is also on CPU. Cast them to CUDA tensors + if not enable_cuda_kernels: + tensor_parallel_norm = _move_norm_to_cuda(tensor_parallel_norm) + no_tensor_parallel_norm = _move_norm_to_cuda(no_tensor_parallel_norm) + zero_sharded_norm = _move_norm_to_cuda(zero_sharded_norm) + + # Sum across all model-parallel GPUs. + if gpc.is_initialized(ParallelMode.TENSOR) and len(tensor_parallel_grads) > 0: + dist.all_reduce(tensor_parallel_norm, op=dist.ReduceOp.SUM, group=gpc.get_group(ParallelMode.TENSOR)) + # Sum across all zero sharded GPUs + if len(zero_sharded_grads) > 0: + dist.all_reduce(zero_sharded_norm, group=gpc.get_group(ParallelMode.DATA)) + no_tensor_parallel_norm += zero_sharded_norm + total_norm = tensor_parallel_norm + no_tensor_parallel_norm + if gpc.is_initialized(ParallelMode.PIPELINE) and gpc.get_world_size(ParallelMode.PIPELINE) > 1: + dist.all_reduce(total_norm, op=dist.ReduceOp.SUM, group=gpc.get_group(ParallelMode.PIPELINE)) + total_norm = total_norm**(1.0 / norm_type) + if torch.is_tensor(total_norm): + total_norm = total_norm.item() + + # Scale. + clip_coeff = max_norm / (total_norm + 1.0e-6) + if clip_coeff < 1.0: + if enable_cuda_kernels: + grads = [p.grad.detach() for p in params] + dummy_overflow_buf = torch.cuda.IntTensor([0]) + multi_tensor_applier(fused_optim.multi_tensor_scale, dummy_overflow_buf, [grads, grads], clip_coeff) + else: + for p in params: + p.grad.detach().mul_(clip_coeff) + return total_norm + + +def count_zeros_fp32(parameters): + if isinstance(parameters, torch.Tensor): + parameters = [parameters] + + # Filter parameters based on: + # - grad should not be none + # - parameter should not be shared + # - should not be a replica due to tensor model parallelism + total_num_zeros = 0.0 + for param in parameters: + grad_not_none = param.grad is not None + is_not_tp_duplicate = param_is_not_tensor_parallel_duplicate(param) + if grad_not_none and is_not_tp_duplicate: + grad = param.grad.detach() + num_zeros = grad.numel() - torch.count_nonzero(grad) + total_num_zeros = num_zeros + total_num_zeros + + total_num_zeros = torch.IntTensor([int(total_num_zeros)]).cuda() + + # Sum across all model-parallel GPUs. + ops = [] + ops.append( + dist.all_reduce(total_num_zeros, op=dist.ReduceOp.SUM, group=gpc.get_group(ParallelMode.TENSOR), async_op=True)) + if gpc.is_initialized(ParallelMode.PIPELINE): + ops.append( + dist.all_reduce(total_num_zeros, + op=dist.ReduceOp.SUM, + group=gpc.get_group(ParallelMode.PIPELINE), + async_op=True)) + + for req in ops: + req.wait() + total_num_zeros = total_num_zeros.item() + + return total_num_zeros + + +def copy_tensor_parallel_attributes(src_tensor, dst_tensor): + for attr in TENSOR_PARALLEL_ATTRIBUTES: + if hasattr(src_tensor, attr): + val = getattr(src_tensor, attr) + setattr(dst_tensor, attr, val) + + +def param_is_not_tensor_parallel_duplicate(param): + return (hasattr(param, IS_TENSOR_PARALLEL) and getattr(param, IS_TENSOR_PARALLEL)) or (gpc.get_local_rank( + ParallelMode.TENSOR) == 0) + + +@contextmanager +def switch_virtual_pipeline_parallel_rank(rank): + prev_rank = gpc.virtual_pipeline_parallel_rank + try: + gpc.set_virtual_pipeline_parallel_rank(rank) + yield + finally: + gpc.set_virtual_pipeline_parallel_rank(prev_rank) diff --git a/colossalai/utils/data_sampler/__init__.py b/colossalai/legacy/utils/data_sampler/__init__.py similarity index 100% rename from colossalai/utils/data_sampler/__init__.py rename to colossalai/legacy/utils/data_sampler/__init__.py diff --git a/colossalai/utils/data_sampler/base_sampler.py b/colossalai/legacy/utils/data_sampler/base_sampler.py similarity index 100% rename from colossalai/utils/data_sampler/base_sampler.py rename to colossalai/legacy/utils/data_sampler/base_sampler.py diff --git a/colossalai/utils/data_sampler/data_parallel_sampler.py b/colossalai/legacy/utils/data_sampler/data_parallel_sampler.py similarity index 100% rename from colossalai/utils/data_sampler/data_parallel_sampler.py rename to colossalai/legacy/utils/data_sampler/data_parallel_sampler.py diff --git a/colossalai/utils/memory.py b/colossalai/legacy/utils/memory.py similarity index 96% rename from colossalai/utils/memory.py rename to colossalai/legacy/utils/memory.py index 434e90edd3b9..1779908933c9 100644 --- a/colossalai/utils/memory.py +++ b/colossalai/legacy/utils/memory.py @@ -1,14 +1,14 @@ -import torch import gc -import psutil from collections import namedtuple -from colossalai.context.parallel_mode import ParallelMode -from colossalai.utils import get_current_device +import psutil +import torch +import torch.distributed as dist +from packaging import version + from colossalai.core import global_context as gpc -from colossalai.context.parallel_mode import ParallelMode from colossalai.logging import get_dist_logger -from packaging import version +from colossalai.utils import get_current_device _GLOBAL_CUDA_MEM_FRACTION = 1.0 _GLOBAL_CPU_MEM_CAPACITY = -1 @@ -68,7 +68,7 @@ def report_memory_usage(message, logger=None, report_cpu=False): Raises: EnvironmentError: Raise error if no distributed environment has been initialized. """ - if not gpc.is_initialized(ParallelMode.GLOBAL): + if not dist.is_initialized(): raise EnvironmentError("No distributed environment is initialized") gpu_allocated = _bytes_to_MB(torch.cuda.memory_allocated()) @@ -138,7 +138,7 @@ def colo_device_memory_used(device: torch.device) -> int: def colo_set_process_memory_fraction(ratio: float) -> None: - """colo_set_process_memory_fraction + """colo_set_process_memory_fraction set how much cuda memory used on the gpu belonging to the current process. diff --git a/colossalai/utils/profiler/__init__.py b/colossalai/legacy/utils/profiler/__init__.py similarity index 100% rename from colossalai/utils/profiler/__init__.py rename to colossalai/legacy/utils/profiler/__init__.py diff --git a/colossalai/utils/profiler/extention.py b/colossalai/legacy/utils/profiler/extention.py similarity index 100% rename from colossalai/utils/profiler/extention.py rename to colossalai/legacy/utils/profiler/extention.py diff --git a/colossalai/utils/profiler/legacy/__init__.py b/colossalai/legacy/utils/profiler/legacy/__init__.py similarity index 77% rename from colossalai/utils/profiler/legacy/__init__.py rename to colossalai/legacy/utils/profiler/legacy/__init__.py index 849c7fca3053..88beed86d7de 100644 --- a/colossalai/utils/profiler/legacy/__init__.py +++ b/colossalai/legacy/utils/profiler/legacy/__init__.py @@ -1,6 +1,6 @@ -from .comm_profiler import CommProfiler -from .pcie_profiler import PcieProfiler -from .prof_utils import ProfilerContext, BaseProfiler -from .mem_profiler import MemProfiler - -__all__ = ['BaseProfiler', 'CommProfiler', 'PcieProfiler', 'MemProfiler', 'ProfilerContext'] +from .comm_profiler import CommProfiler +from .mem_profiler import MemProfiler +from .pcie_profiler import PcieProfiler +from .prof_utils import BaseProfiler, ProfilerContext + +__all__ = ['BaseProfiler', 'CommProfiler', 'PcieProfiler', 'MemProfiler', 'ProfilerContext'] diff --git a/colossalai/utils/profiler/legacy/comm_profiler.py b/colossalai/legacy/utils/profiler/legacy/comm_profiler.py similarity index 96% rename from colossalai/utils/profiler/legacy/comm_profiler.py rename to colossalai/legacy/utils/profiler/legacy/comm_profiler.py index 334f0113ee90..bb7e2654c740 100644 --- a/colossalai/utils/profiler/legacy/comm_profiler.py +++ b/colossalai/legacy/utils/profiler/legacy/comm_profiler.py @@ -1,308 +1,311 @@ -import inspect -from pathlib import Path -from functools import partial -import torch -from torch.autograd.profiler import profile -import torch.distributed as dist -from torch.distributed import ReduceOp -from colossalai.utils import get_current_device -from .prof_utils import BaseProfiler, _format_time, _format_memory, _format_bandwidth -from typing import List, Optional - - -def _get_code_location(depth: int): - ret = [] - length = min(len(inspect.stack()), depth + 1) - for i in range(3, length): - upper_frame = inspect.stack()[i] - function_name = inspect.stack()[i - 1].function - ret.append(upper_frame.filename) - ret.append('(') - ret.append(str(upper_frame.lineno)) - ret.append('): ') - ret.append(function_name) - if i != length - 1: - ret.append('\n') - - return ''.join(ret) - - -torch_all_reduce = dist.all_reduce -torch_all_gather = dist.all_gather -torch_reduce_scatter = dist.reduce_scatter -torch_broadcast = dist.broadcast -torch_reduce = dist.reduce - - -class CommEvent(object): - """Communication Event. Used for communication time and communication - volume recording. - """ - - def __init__(self, count: int = 0, comm_vol: float = 0., cuda_time: int = 0): - self.self_count = count - self.self_comm_vol = comm_vol - self.self_cuda_time = cuda_time - - def add(self, rhs): - self.self_count += rhs.self_count - self.self_comm_vol += rhs.self_comm_vol - self.self_cuda_time += rhs.self_cuda_time - - -class CommProfiler(BaseProfiler): - """Communication profiler. Records all communication events. - """ - - def __init__(self, depth: int = 0, total_count: int = 0, total_comm_vol: float = 0, total_cuda_time: int = 0): - super().__init__(profiler_name="Collective_Communication", priority=0) - self.depth = 3 + depth - self.total_count = total_count - self.total_comm_vol = total_comm_vol - self.total_cuda_time = total_cuda_time - - self.ops_record = dict() - self.profiler = None - self.pending_op = None - self.pending_metadata = None - self.warn_flag = False - - def reset(self): - self.total_count = 0 - self.total_comm_vol = 0 - self.total_cuda_time = 0 - - self.ops_record = dict() - self.profiler = None - self.pending_op = None - self.pending_metadata = None - self.warn_flag = False - - def enable(self): - dist.all_reduce = partial(all_reduce, profiler=self) - dist.all_gather = partial(all_gather, profiler=self) - dist.reduce_scatter = partial(reduce_scatter, profiler=self) - dist.broadcast = partial(broadcast, profiler=self) - dist.reduce = partial(reduce, profiler=self) - - def disable(self): - dist.all_reduce = torch_all_reduce - dist.all_gather = torch_all_gather - dist.reduce_scatter = torch_reduce_scatter - dist.broadcast = torch_broadcast - dist.reduce = torch_reduce - - def to_tensorboard(self, writer): - writer.add_text(tag="Collective Communication", text_string=self.result_str("\n\n")) - - def to_file(self, filename: Path): - with open(filename, "w") as f: - f.write(self.result_str()) - - def show(self): - print(self.result_str()) - - def result_str(self, sep: str = "\n"): - res = [] - - def append(s: str = None): - if s is not None: - res.append(s) - res.append(sep) - - if self.warn_flag: - append("Warning: there exists multiple communication operations in the same time. As a result, " - "the profiling result is not accurate.") - - if self.total_cuda_time == 0: - return "No collective communication has been called yet!" - - append("Collective communication profiling result:") - append("total cuda time: {}".format(_format_time(self.total_cuda_time))) - append("average bandwidth: {}".format(_format_bandwidth(self.total_comm_vol, self.total_cuda_time))) - append("total number of calls: {}".format(self.total_count)) - append("All events:") - - separation = '-' * 74 - row_format = '{:^10}' + '{:^12}' * 2 + '{:^16}' + '{:^12}' * 2 - - append(separation) - append(row_format.format('Location', 'GPU time', 'Percentage', 'Comm volume', 'Bandwidth', 'Num of calls')) - append(separation) - - show_list = sorted(self.ops_record.items(), key=lambda kv: -kv[1].self_cuda_time) - for location, event in show_list: - append(location) - append( - row_format.format('', _format_time(event.self_cuda_time), - '{:.1f}%'.format(event.self_cuda_time / self.total_cuda_time * 100.0), - _format_memory(event.self_comm_vol), - _format_bandwidth(event.self_comm_vol, event.self_cuda_time), event.self_count)) - append() - - return ''.join(res) - - @property - def has_aync_op(self): - return self.pending_op is not None - - def activate_profiler(self, kn: str, vol: float): - self.pending_metadata = (kn, _get_code_location(self.depth), vol) - self.profiler = profile(enabled=True, use_cuda=True, use_cpu=True, use_kineto=True) - self.profiler.__enter__() - - def close_profiler(self, group=None): - assert self.profiler is not None, "There is no running dist op" - kernel_name, code_location, vol = self.pending_metadata - self.profiler.__exit__(None, None, None) - - if self.profiler.enabled and dist.get_world_size(group) > 1: - assert_flag = 0 - current_comm_event = None - events = self.profiler.function_events - for event in events: - if kernel_name in event.name: - assert assert_flag == 0, "Multiple dist ops has been called " - current_comm_event = CommEvent(1, vol, event.self_cuda_time_total) - assert_flag += 1 - - assert current_comm_event is not None, "dist op has not been found" - - buffer = torch.tensor([current_comm_event.self_cuda_time], device=get_current_device()) - torch_all_reduce(buffer, op=ReduceOp.MIN, group=group) - current_comm_event.self_cuda_time = buffer.item() - - self.total_count += current_comm_event.self_count - self.total_comm_vol += current_comm_event.self_comm_vol - self.total_cuda_time += current_comm_event.self_cuda_time - if code_location in self.ops_record: - self.ops_record[code_location].add(current_comm_event) - else: - self.ops_record[code_location] = current_comm_event - - self.profiler = None - self.pending_op = None - self.pending_metadata = None - - def wait_async_op(self): - if self.pending_op is not None: - op = self.pending_op - op.wait() - self.close_profiler() - - -class CommHandler(object): - """Communication handler. A dummy handler to wait aync operations. - """ - - def __init__(self, profiler: CommProfiler): - super().__init__() - self.prof = profiler - - def wait(self): - self.prof.wait_async_op() - - -def async_check(profiler: CommProfiler): - if profiler.pending_op is not None: - profiler.warn_flag = True - profiler.wait_async_op() - - -def all_reduce(tensor: torch.Tensor, - op: ReduceOp = ReduceOp.SUM, - group=None, - async_op: bool = False, - profiler: CommProfiler = None) -> Optional[CommHandler]: - async_check(profiler) - - comm_size = dist.get_world_size(group) - correction = 2 * (comm_size - 1) / comm_size - comm_vol = correction * tensor.element_size() * tensor.numel() - profiler.activate_profiler("ncclKernel_AllReduce_", comm_vol) - profiler.pending_op = torch_all_reduce(tensor, op, group, async_op) - - if async_op: - return CommHandler(profiler) - - profiler.close_profiler(group) - - -def reduce_scatter(output: torch.Tensor, - input_list: List[torch.Tensor], - op: ReduceOp = ReduceOp.SUM, - group=None, - async_op: bool = False, - profiler: CommProfiler = None) -> Optional[CommHandler]: - async_check(profiler) - - comm_size = dist.get_world_size(group) - correction = (comm_size - 1) / comm_size - comm_vol = 0 - for tensor in input_list: - comm_vol += tensor.element_size() * tensor.numel() - comm_vol *= correction - profiler.activate_profiler("ncclKernel_ReduceScatter_", comm_vol) - profiler.pending_op = torch_reduce_scatter(output, input_list, op, group, async_op) - - if async_op: - return CommHandler(profiler) - - profiler.close_profiler(group) - - -def all_gather(tensor_list: List[torch.Tensor], - tensor: torch.Tensor, - group=None, - async_op: bool = False, - profiler: CommProfiler = None) -> Optional[CommHandler]: - async_check(profiler) - - comm_size = dist.get_world_size(group) - correction = (comm_size - 1) / comm_size - comm_vol = 0 - for ten in tensor_list: - comm_vol += ten.element_size() * ten.numel() - comm_vol *= correction - profiler.activate_profiler("ncclKernel_AllGather_", comm_vol) - profiler.pending_op = torch_all_gather(tensor_list, tensor, group, async_op) - - if async_op: - return CommHandler(profiler) - - profiler.close_profiler(group) - - -def broadcast(tensor: torch.Tensor, - src: int, - group=None, - async_op: bool = False, - profiler: CommProfiler = None) -> Optional[CommHandler]: - async_check(profiler) - - comm_vol = 1.0 * tensor.element_size() * tensor.numel() - profiler.activate_profiler("ncclKernel_Broadcast_", comm_vol) - profiler.pending_op = torch_broadcast(tensor, src, group, async_op) - - if async_op: - return CommHandler(profiler) - - profiler.close_profiler(group) - - -def reduce(tensor: torch.Tensor, - dst: int, - op: ReduceOp = ReduceOp.SUM, - group=None, - async_op: bool = False, - profiler: CommProfiler = None) -> Optional[CommHandler]: - async_check(profiler) - - comm_vol = 1.0 * tensor.element_size() * tensor.numel() - profiler.activate_profiler("ncclKernel_Reduce_", comm_vol) - profiler.pending_op = torch_reduce(tensor, dst, op, group, async_op) - - if async_op: - return CommHandler(profiler) - - profiler.close_profiler(group) +import inspect +from functools import partial +from pathlib import Path +from typing import List, Optional + +import torch +import torch.distributed as dist +from torch.autograd.profiler import profile +from torch.distributed import ReduceOp + +from colossalai.utils import get_current_device + +from .prof_utils import BaseProfiler, _format_bandwidth, _format_memory, _format_time + + +def _get_code_location(depth: int): + ret = [] + length = min(len(inspect.stack()), depth + 1) + for i in range(3, length): + upper_frame = inspect.stack()[i] + function_name = inspect.stack()[i - 1].function + ret.append(upper_frame.filename) + ret.append('(') + ret.append(str(upper_frame.lineno)) + ret.append('): ') + ret.append(function_name) + if i != length - 1: + ret.append('\n') + + return ''.join(ret) + + +torch_all_reduce = dist.all_reduce +torch_all_gather = dist.all_gather +torch_reduce_scatter = dist.reduce_scatter +torch_broadcast = dist.broadcast +torch_reduce = dist.reduce + + +class CommEvent(object): + """Communication Event. Used for communication time and communication + volume recording. + """ + + def __init__(self, count: int = 0, comm_vol: float = 0., cuda_time: int = 0): + self.self_count = count + self.self_comm_vol = comm_vol + self.self_cuda_time = cuda_time + + def add(self, rhs): + self.self_count += rhs.self_count + self.self_comm_vol += rhs.self_comm_vol + self.self_cuda_time += rhs.self_cuda_time + + +class CommProfiler(BaseProfiler): + """Communication profiler. Records all communication events. + """ + + def __init__(self, depth: int = 0, total_count: int = 0, total_comm_vol: float = 0, total_cuda_time: int = 0): + super().__init__(profiler_name="Collective_Communication", priority=0) + self.depth = 3 + depth + self.total_count = total_count + self.total_comm_vol = total_comm_vol + self.total_cuda_time = total_cuda_time + + self.ops_record = dict() + self.profiler = None + self.pending_op = None + self.pending_metadata = None + self.warn_flag = False + + def reset(self): + self.total_count = 0 + self.total_comm_vol = 0 + self.total_cuda_time = 0 + + self.ops_record = dict() + self.profiler = None + self.pending_op = None + self.pending_metadata = None + self.warn_flag = False + + def enable(self): + dist.all_reduce = partial(all_reduce, profiler=self) + dist.all_gather = partial(all_gather, profiler=self) + dist.reduce_scatter = partial(reduce_scatter, profiler=self) + dist.broadcast = partial(broadcast, profiler=self) + dist.reduce = partial(reduce, profiler=self) + + def disable(self): + dist.all_reduce = torch_all_reduce + dist.all_gather = torch_all_gather + dist.reduce_scatter = torch_reduce_scatter + dist.broadcast = torch_broadcast + dist.reduce = torch_reduce + + def to_tensorboard(self, writer): + writer.add_text(tag="Collective Communication", text_string=self.result_str("\n\n")) + + def to_file(self, filename: Path): + with open(filename, "w") as f: + f.write(self.result_str()) + + def show(self): + print(self.result_str()) + + def result_str(self, sep: str = "\n"): + res = [] + + def append(s: str = None): + if s is not None: + res.append(s) + res.append(sep) + + if self.warn_flag: + append("Warning: there exists multiple communication operations in the same time. As a result, " + "the profiling result is not accurate.") + + if self.total_cuda_time == 0: + return "No collective communication has been called yet!" + + append("Collective communication profiling result:") + append("total cuda time: {}".format(_format_time(self.total_cuda_time))) + append("average bandwidth: {}".format(_format_bandwidth(self.total_comm_vol, self.total_cuda_time))) + append("total number of calls: {}".format(self.total_count)) + append("All events:") + + separation = '-' * 74 + row_format = '{:^10}' + '{:^12}' * 2 + '{:^16}' + '{:^12}' * 2 + + append(separation) + append(row_format.format('Location', 'GPU time', 'Percentage', 'Comm volume', 'Bandwidth', 'Num of calls')) + append(separation) + + show_list = sorted(self.ops_record.items(), key=lambda kv: -kv[1].self_cuda_time) + for location, event in show_list: + append(location) + append( + row_format.format('', _format_time(event.self_cuda_time), + '{:.1f}%'.format(event.self_cuda_time / self.total_cuda_time * 100.0), + _format_memory(event.self_comm_vol), + _format_bandwidth(event.self_comm_vol, event.self_cuda_time), event.self_count)) + append() + + return ''.join(res) + + @property + def has_aync_op(self): + return self.pending_op is not None + + def activate_profiler(self, kn: str, vol: float): + self.pending_metadata = (kn, _get_code_location(self.depth), vol) + self.profiler = profile(enabled=True, use_cuda=True, use_cpu=True, use_kineto=True) + self.profiler.__enter__() + + def close_profiler(self, group=None): + assert self.profiler is not None, "There is no running dist op" + kernel_name, code_location, vol = self.pending_metadata + self.profiler.__exit__(None, None, None) + + if self.profiler.enabled and dist.get_world_size(group) > 1: + assert_flag = 0 + current_comm_event = None + events = self.profiler.function_events + for event in events: + if kernel_name in event.name: + assert assert_flag == 0, "Multiple dist ops has been called " + current_comm_event = CommEvent(1, vol, event.self_cuda_time_total) + assert_flag += 1 + + assert current_comm_event is not None, "dist op has not been found" + + buffer = torch.tensor([current_comm_event.self_cuda_time], device=get_current_device()) + torch_all_reduce(buffer, op=ReduceOp.MIN, group=group) + current_comm_event.self_cuda_time = buffer.item() + + self.total_count += current_comm_event.self_count + self.total_comm_vol += current_comm_event.self_comm_vol + self.total_cuda_time += current_comm_event.self_cuda_time + if code_location in self.ops_record: + self.ops_record[code_location].add(current_comm_event) + else: + self.ops_record[code_location] = current_comm_event + + self.profiler = None + self.pending_op = None + self.pending_metadata = None + + def wait_async_op(self): + if self.pending_op is not None: + op = self.pending_op + op.wait() + self.close_profiler() + + +class CommHandler(object): + """Communication handler. A dummy handler to wait aync operations. + """ + + def __init__(self, profiler: CommProfiler): + super().__init__() + self.prof = profiler + + def wait(self): + self.prof.wait_async_op() + + +def async_check(profiler: CommProfiler): + if profiler.pending_op is not None: + profiler.warn_flag = True + profiler.wait_async_op() + + +def all_reduce(tensor: torch.Tensor, + op: ReduceOp = ReduceOp.SUM, + group=None, + async_op: bool = False, + profiler: CommProfiler = None) -> Optional[CommHandler]: + async_check(profiler) + + comm_size = dist.get_world_size(group) + correction = 2 * (comm_size - 1) / comm_size + comm_vol = correction * tensor.element_size() * tensor.numel() + profiler.activate_profiler("ncclKernel_AllReduce_", comm_vol) + profiler.pending_op = torch_all_reduce(tensor, op, group, async_op) + + if async_op: + return CommHandler(profiler) + + profiler.close_profiler(group) + + +def reduce_scatter(output: torch.Tensor, + input_list: List[torch.Tensor], + op: ReduceOp = ReduceOp.SUM, + group=None, + async_op: bool = False, + profiler: CommProfiler = None) -> Optional[CommHandler]: + async_check(profiler) + + comm_size = dist.get_world_size(group) + correction = (comm_size - 1) / comm_size + comm_vol = 0 + for tensor in input_list: + comm_vol += tensor.element_size() * tensor.numel() + comm_vol *= correction + profiler.activate_profiler("ncclKernel_ReduceScatter_", comm_vol) + profiler.pending_op = torch_reduce_scatter(output, input_list, op, group, async_op) + + if async_op: + return CommHandler(profiler) + + profiler.close_profiler(group) + + +def all_gather(tensor_list: List[torch.Tensor], + tensor: torch.Tensor, + group=None, + async_op: bool = False, + profiler: CommProfiler = None) -> Optional[CommHandler]: + async_check(profiler) + + comm_size = dist.get_world_size(group) + correction = (comm_size - 1) / comm_size + comm_vol = 0 + for ten in tensor_list: + comm_vol += ten.element_size() * ten.numel() + comm_vol *= correction + profiler.activate_profiler("ncclKernel_AllGather_", comm_vol) + profiler.pending_op = torch_all_gather(tensor_list, tensor, group, async_op) + + if async_op: + return CommHandler(profiler) + + profiler.close_profiler(group) + + +def broadcast(tensor: torch.Tensor, + src: int, + group=None, + async_op: bool = False, + profiler: CommProfiler = None) -> Optional[CommHandler]: + async_check(profiler) + + comm_vol = 1.0 * tensor.element_size() * tensor.numel() + profiler.activate_profiler("ncclKernel_Broadcast_", comm_vol) + profiler.pending_op = torch_broadcast(tensor, src, group, async_op) + + if async_op: + return CommHandler(profiler) + + profiler.close_profiler(group) + + +def reduce(tensor: torch.Tensor, + dst: int, + op: ReduceOp = ReduceOp.SUM, + group=None, + async_op: bool = False, + profiler: CommProfiler = None) -> Optional[CommHandler]: + async_check(profiler) + + comm_vol = 1.0 * tensor.element_size() * tensor.numel() + profiler.activate_profiler("ncclKernel_Reduce_", comm_vol) + profiler.pending_op = torch_reduce(tensor, dst, op, group, async_op) + + if async_op: + return CommHandler(profiler) + + profiler.close_profiler(group) diff --git a/colossalai/utils/profiler/legacy/pcie_profiler.py b/colossalai/legacy/utils/profiler/legacy/pcie_profiler.py similarity index 95% rename from colossalai/utils/profiler/legacy/pcie_profiler.py rename to colossalai/legacy/utils/profiler/legacy/pcie_profiler.py index 8f812f5cfc7b..514d3c6fabfa 100644 --- a/colossalai/utils/profiler/legacy/pcie_profiler.py +++ b/colossalai/legacy/utils/profiler/legacy/pcie_profiler.py @@ -1,148 +1,150 @@ -from pathlib import Path -from torch.autograd.profiler import profile -from .prof_utils import BaseProfiler, _format_time, _format_memory, _format_bandwidth -from typing import List - - -def _get_size(dtype: str): - if dtype == "fp16": - return 2 - elif dtype == "fp32": - return 4 - else: - raise NotImplementedError - - -def _get_numel(my_list: List[int]) -> int: - from functools import reduce - from operator import mul - return reduce(mul, my_list) - - -def _reduce_location(locations: List[str]) -> str: - ret = [] - for lo in locations: - ret.append(lo) - ret.append("\n") - ret = ret[:-1] - return ''.join(ret) - - -class PcieEvent(object): - """Pcie Event. - """ - - def __init__(self, count: int = 0, pcie_vol: int = 0, cuda_time: int = 0): - self.count = count - self.pcie_vol = pcie_vol - self.cuda_time = cuda_time - - def add(self, rhs): - self.count += rhs.count - self.pcie_vol += rhs.pcie_vol - self.cuda_time += rhs.cuda_time - - -class PcieProfiler(BaseProfiler): - """Pcie profiler. Records all data transmission between CPU and GPU. - - TODO: Merge pcie profiler into communication profiler - """ - - def __init__(self, dtype: str = "fp32", depth: int = 1): - super().__init__(profiler_name="Pcie", priority=10) - self.depth = depth - self.data_size = _get_size(dtype) - self.h2d_count = 0 - self.h2d_time = 0 - self.d2h_count = 0 - self.d2h_time = 0 - - self.ops_record = dict() - self.profiler = None - - def reset(self): - self.h2d_count = 0 - self.h2d_time = 0 - self.d2h_count = 0 - self.d2h_time = 0 - - self.ops_record = dict() - self.profiler = None - - def enable(self): - self.profiler = profile(enabled=True, - use_cuda=True, - use_cpu=True, - use_kineto=True, - record_shapes=True, - with_stack=True) - self.profiler.__enter__() - - def disable(self): - self.profiler.__exit__(None, None, None) - - if self.profiler.enabled: - events = self.profiler.function_events - for event in events: - if event.name == "aten::copy_": - t_shape = event.input_shapes[0] - if len(t_shape) == 0 or event.cuda_time_total == 0 or len(event.stack) == 0: - continue - current_comm_event = PcieEvent(1, self.data_size * _get_numel(t_shape), event.cuda_time_total) - code_location = _reduce_location(event.stack[:self.depth]) - if code_location in self.ops_record: - self.ops_record[code_location].add(current_comm_event) - else: - self.ops_record[code_location] = current_comm_event - elif 'Memcpy HtoD' in event.name: - self.h2d_count += 1 - self.h2d_time += event.cuda_time_total - elif 'Memcpy DtoH' in event.name: - self.d2h_count += 1 - self.d2h_time += event.cuda_time_total - - self.profiler = None - - def to_tensorboard(self, writer): - writer.add_text(tag="Data Transmission", text_string=self.result_str("\n\n")) - - def to_file(self, filename: Path): - with open(filename, "w") as f: - f.write(self.result_str()) - - def show(self): - print(self.result_str()) - - def result_str(self, sep: str = "\n"): - res = [] - - def append(s: str = None): - if s is not None: - res.append(s) - res.append(sep) - - append("Pcie profiling result:") - append("time of data transmission (CPU -> GPU): {}".format(_format_time(self.h2d_time))) - append("number of transmission (CPU -> GPU): {}".format(self.h2d_count)) - append("time of data transmission (GPU -> CPU): {}".format(_format_time(self.d2h_time))) - append("number of transmission (GPU -> CPU): {}".format(self.d2h_count)) - - append("Possible data transmission events in PCIE:") - - separation = '-' * 62 - row_format = '{:^10}' + '{:^12}' + '{:^16}' + '{:^12}' * 2 - - append(separation) - append(row_format.format('Location', 'GPU time', 'Trans volume', 'Bandwidth', 'Num of calls')) - append(separation) - - show_list = sorted(self.ops_record.items(), key=lambda kv: -kv[1].cuda_time) - for location, event in show_list: - append(location) - append( - row_format.format('', _format_time(event.cuda_time), _format_memory(event.pcie_vol), - _format_bandwidth(event.pcie_vol, event.cuda_time), event.count)) - append() - - return ''.join(res) +from pathlib import Path +from typing import List + +from torch.autograd.profiler import profile + +from .prof_utils import BaseProfiler, _format_bandwidth, _format_memory, _format_time + + +def _get_size(dtype: str): + if dtype == "fp16": + return 2 + elif dtype == "fp32": + return 4 + else: + raise NotImplementedError + + +def _get_numel(my_list: List[int]) -> int: + from functools import reduce + from operator import mul + return reduce(mul, my_list) + + +def _reduce_location(locations: List[str]) -> str: + ret = [] + for lo in locations: + ret.append(lo) + ret.append("\n") + ret = ret[:-1] + return ''.join(ret) + + +class PcieEvent(object): + """Pcie Event. + """ + + def __init__(self, count: int = 0, pcie_vol: int = 0, cuda_time: int = 0): + self.count = count + self.pcie_vol = pcie_vol + self.cuda_time = cuda_time + + def add(self, rhs): + self.count += rhs.count + self.pcie_vol += rhs.pcie_vol + self.cuda_time += rhs.cuda_time + + +class PcieProfiler(BaseProfiler): + """Pcie profiler. Records all data transmission between CPU and GPU. + + TODO: Merge pcie profiler into communication profiler + """ + + def __init__(self, dtype: str = "fp32", depth: int = 1): + super().__init__(profiler_name="Pcie", priority=10) + self.depth = depth + self.data_size = _get_size(dtype) + self.h2d_count = 0 + self.h2d_time = 0 + self.d2h_count = 0 + self.d2h_time = 0 + + self.ops_record = dict() + self.profiler = None + + def reset(self): + self.h2d_count = 0 + self.h2d_time = 0 + self.d2h_count = 0 + self.d2h_time = 0 + + self.ops_record = dict() + self.profiler = None + + def enable(self): + self.profiler = profile(enabled=True, + use_cuda=True, + use_cpu=True, + use_kineto=True, + record_shapes=True, + with_stack=True) + self.profiler.__enter__() + + def disable(self): + self.profiler.__exit__(None, None, None) + + if self.profiler.enabled: + events = self.profiler.function_events + for event in events: + if event.name == "aten::copy_": + t_shape = event.input_shapes[0] + if len(t_shape) == 0 or event.cuda_time_total == 0 or len(event.stack) == 0: + continue + current_comm_event = PcieEvent(1, self.data_size * _get_numel(t_shape), event.cuda_time_total) + code_location = _reduce_location(event.stack[:self.depth]) + if code_location in self.ops_record: + self.ops_record[code_location].add(current_comm_event) + else: + self.ops_record[code_location] = current_comm_event + elif 'Memcpy HtoD' in event.name: + self.h2d_count += 1 + self.h2d_time += event.cuda_time_total + elif 'Memcpy DtoH' in event.name: + self.d2h_count += 1 + self.d2h_time += event.cuda_time_total + + self.profiler = None + + def to_tensorboard(self, writer): + writer.add_text(tag="Data Transmission", text_string=self.result_str("\n\n")) + + def to_file(self, filename: Path): + with open(filename, "w") as f: + f.write(self.result_str()) + + def show(self): + print(self.result_str()) + + def result_str(self, sep: str = "\n"): + res = [] + + def append(s: str = None): + if s is not None: + res.append(s) + res.append(sep) + + append("Pcie profiling result:") + append("time of data transmission (CPU -> GPU): {}".format(_format_time(self.h2d_time))) + append("number of transmission (CPU -> GPU): {}".format(self.h2d_count)) + append("time of data transmission (GPU -> CPU): {}".format(_format_time(self.d2h_time))) + append("number of transmission (GPU -> CPU): {}".format(self.d2h_count)) + + append("Possible data transmission events in PCIE:") + + separation = '-' * 62 + row_format = '{:^10}' + '{:^12}' + '{:^16}' + '{:^12}' * 2 + + append(separation) + append(row_format.format('Location', 'GPU time', 'Trans volume', 'Bandwidth', 'Num of calls')) + append(separation) + + show_list = sorted(self.ops_record.items(), key=lambda kv: -kv[1].cuda_time) + for location, event in show_list: + append(location) + append( + row_format.format('', _format_time(event.cuda_time), _format_memory(event.pcie_vol), + _format_bandwidth(event.pcie_vol, event.cuda_time), event.count)) + append() + + return ''.join(res) diff --git a/colossalai/utils/profiler/legacy/prof_utils.py b/colossalai/legacy/utils/profiler/legacy/prof_utils.py similarity index 95% rename from colossalai/utils/profiler/legacy/prof_utils.py rename to colossalai/legacy/utils/profiler/legacy/prof_utils.py index 2f7eee827651..e0c4f22a5fad 100644 --- a/colossalai/utils/profiler/legacy/prof_utils.py +++ b/colossalai/legacy/utils/profiler/legacy/prof_utils.py @@ -1,131 +1,132 @@ -from abc import ABC, abstractmethod -from pathlib import Path -from typing import Union, List -from colossalai.core import global_context as gpc - - -# copied from high version pytorch to support low version -def _format_time(time_us): - """Defines how to format time in FunctionEvent""" - US_IN_SECOND = 1000.0 * 1000.0 - US_IN_MS = 1000.0 - if time_us >= US_IN_SECOND: - return '{:.3f}s'.format(time_us / US_IN_SECOND) - if time_us >= US_IN_MS: - return '{:.3f}ms'.format(time_us / US_IN_MS) - return '{:.3f}us'.format(time_us) - - -# copied from high version pytorch to support low version -def _format_memory(nbytes): - """Returns a formatted memory size string""" - KB = 1024 - MB = 1024 * KB - GB = 1024 * MB - if (abs(nbytes) >= GB): - return '{:.2f} GB'.format(nbytes * 1.0 / GB) - elif (abs(nbytes) >= MB): - return '{:.2f} MB'.format(nbytes * 1.0 / MB) - elif (abs(nbytes) >= KB): - return '{:.2f} KB'.format(nbytes * 1.0 / KB) - else: - return str(nbytes) + ' B' - - -def _format_bandwidth(volume: float or int, time_us: int): - sec_div_mb = (1000.0 / 1024.0)**2 - mb_per_sec = volume / time_us * sec_div_mb - - if mb_per_sec >= 1024.0: - return '{:.3f} GB/s'.format(mb_per_sec / 1024.0) - else: - return '{:.3f} MB/s'.format(mb_per_sec) - - -class BaseProfiler(ABC): - - def __init__(self, profiler_name: str, priority: int): - self.name = profiler_name - self.priority = priority - - @abstractmethod - def enable(self): - pass - - @abstractmethod - def disable(self): - pass - - @abstractmethod - def to_tensorboard(self, writer): - pass - - @abstractmethod - def to_file(self, filename: Path): - pass - - @abstractmethod - def show(self): - pass - - -class ProfilerContext(object): - """Profiler context manager - - Usage:: - - world_size = 4 - inputs = torch.randn(10, 10, dtype=torch.float32, device=get_current_device()) - outputs = torch.empty(world_size, 10, 10, dtype=torch.float32, device=get_current_device()) - outputs_list = list(torch.chunk(outputs, chunks=world_size, dim=0)) - - cc_prof = CommProfiler() - - with ProfilerContext([cc_prof]) as prof: - op = dist.all_reduce(inputs, async_op=True) - dist.all_gather(outputs_list, inputs) - op.wait() - dist.reduce_scatter(inputs, outputs_list) - dist.broadcast(inputs, 0) - dist.reduce(inputs, 0) - - prof.show() - """ - - def __init__(self, profilers: List[BaseProfiler] = None, enable: bool = True): - self.enable = enable - self.profilers = sorted(profilers, key=lambda prof: prof.priority) - - def __enter__(self): - if self.enable: - for prof in self.profilers: - prof.enable() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if self.enable: - for prof in self.profilers: - prof.disable() - - def to_tensorboard(self, writer): - from torch.utils.tensorboard import SummaryWriter - - assert isinstance(writer, SummaryWriter), \ - f'torch.utils.tensorboard.SummaryWriter is required, but found {type(writer)}.' - - for prof in self.profilers: - prof.to_tensorboard(writer) - - def to_file(self, log_dir: Union[str, Path]): - if isinstance(log_dir, str): - log_dir = Path(log_dir) - - if not log_dir.exists(): - log_dir.mkdir(parents=True, exist_ok=True) - for prof in self.profilers: - log_file = log_dir.joinpath(f'{prof.name}_rank_{gpc.get_global_rank()}.log') - prof.to_file(log_file) - - def show(self): - for prof in self.profilers: - prof.show() +from abc import ABC, abstractmethod +from pathlib import Path +from typing import List, Union + +from colossalai.core import global_context as gpc + + +# copied from high version pytorch to support low version +def _format_time(time_us): + """Defines how to format time in FunctionEvent""" + US_IN_SECOND = 1000.0 * 1000.0 + US_IN_MS = 1000.0 + if time_us >= US_IN_SECOND: + return '{:.3f}s'.format(time_us / US_IN_SECOND) + if time_us >= US_IN_MS: + return '{:.3f}ms'.format(time_us / US_IN_MS) + return '{:.3f}us'.format(time_us) + + +# copied from high version pytorch to support low version +def _format_memory(nbytes): + """Returns a formatted memory size string""" + KB = 1024 + MB = 1024 * KB + GB = 1024 * MB + if (abs(nbytes) >= GB): + return '{:.2f} GB'.format(nbytes * 1.0 / GB) + elif (abs(nbytes) >= MB): + return '{:.2f} MB'.format(nbytes * 1.0 / MB) + elif (abs(nbytes) >= KB): + return '{:.2f} KB'.format(nbytes * 1.0 / KB) + else: + return str(nbytes) + ' B' + + +def _format_bandwidth(volume: float or int, time_us: int): + sec_div_mb = (1000.0 / 1024.0)**2 + mb_per_sec = volume / time_us * sec_div_mb + + if mb_per_sec >= 1024.0: + return '{:.3f} GB/s'.format(mb_per_sec / 1024.0) + else: + return '{:.3f} MB/s'.format(mb_per_sec) + + +class BaseProfiler(ABC): + + def __init__(self, profiler_name: str, priority: int): + self.name = profiler_name + self.priority = priority + + @abstractmethod + def enable(self): + pass + + @abstractmethod + def disable(self): + pass + + @abstractmethod + def to_tensorboard(self, writer): + pass + + @abstractmethod + def to_file(self, filename: Path): + pass + + @abstractmethod + def show(self): + pass + + +class ProfilerContext(object): + """Profiler context manager + + Usage:: + + world_size = 4 + inputs = torch.randn(10, 10, dtype=torch.float32, device=get_current_device()) + outputs = torch.empty(world_size, 10, 10, dtype=torch.float32, device=get_current_device()) + outputs_list = list(torch.chunk(outputs, chunks=world_size, dim=0)) + + cc_prof = CommProfiler() + + with ProfilerContext([cc_prof]) as prof: + op = dist.all_reduce(inputs, async_op=True) + dist.all_gather(outputs_list, inputs) + op.wait() + dist.reduce_scatter(inputs, outputs_list) + dist.broadcast(inputs, 0) + dist.reduce(inputs, 0) + + prof.show() + """ + + def __init__(self, profilers: List[BaseProfiler] = None, enable: bool = True): + self.enable = enable + self.profilers = sorted(profilers, key=lambda prof: prof.priority) + + def __enter__(self): + if self.enable: + for prof in self.profilers: + prof.enable() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.enable: + for prof in self.profilers: + prof.disable() + + def to_tensorboard(self, writer): + from torch.utils.tensorboard import SummaryWriter + + assert isinstance(writer, SummaryWriter), \ + f'torch.utils.tensorboard.SummaryWriter is required, but found {type(writer)}.' + + for prof in self.profilers: + prof.to_tensorboard(writer) + + def to_file(self, log_dir: Union[str, Path]): + if isinstance(log_dir, str): + log_dir = Path(log_dir) + + if not log_dir.exists(): + log_dir.mkdir(parents=True, exist_ok=True) + for prof in self.profilers: + log_file = log_dir.joinpath(f'{prof.name}_rank_{gpc.get_global_rank()}.log') + prof.to_file(log_file) + + def show(self): + for prof in self.profilers: + prof.show() diff --git a/colossalai/utils/profiler/profiler.py b/colossalai/legacy/utils/profiler/profiler.py similarity index 97% rename from colossalai/utils/profiler/profiler.py rename to colossalai/legacy/utils/profiler/profiler.py index 3026d723deb0..0827f06b586c 100644 --- a/colossalai/utils/profiler/profiler.py +++ b/colossalai/legacy/utils/profiler/profiler.py @@ -9,9 +9,9 @@ from torch.profiler.profiler import ProfilerAction from colossalai.legacy.engine import Engine +from colossalai.legacy.utils.profiler.extention import ProfilerExtension +from colossalai.legacy.utils.profiler.stateful_tensor_mem_extention import StatefulTensorMemoryProfilerExtention from colossalai.logging import get_dist_logger -from colossalai.utils.profiler.extention import ProfilerExtension -from colossalai.utils.profiler.stateful_tensor_mem_extention import StatefulTensorMemoryProfilerExtention class profile(torch_profile): diff --git a/colossalai/utils/profiler/stateful_tensor_mem_extention.py b/colossalai/legacy/utils/profiler/stateful_tensor_mem_extention.py similarity index 98% rename from colossalai/utils/profiler/stateful_tensor_mem_extention.py rename to colossalai/legacy/utils/profiler/stateful_tensor_mem_extention.py index 412bd7277eee..f3bb66ced583 100644 --- a/colossalai/utils/profiler/stateful_tensor_mem_extention.py +++ b/colossalai/legacy/utils/profiler/stateful_tensor_mem_extention.py @@ -9,7 +9,7 @@ from colossalai.gemini.ophooks import BaseOpHook from colossalai.gemini.stateful_tensor import StatefulTensor from colossalai.legacy.engine import Engine -from colossalai.utils.profiler.extention import ProfilerExtension +from colossalai.legacy.utils.profiler.extention import ProfilerExtension class DeviceType(Enum): diff --git a/colossalai/legacy/zero/gemini/tensor_placement_policy.py b/colossalai/legacy/zero/gemini/tensor_placement_policy.py index 165ae51fee60..275933ec2cfb 100644 --- a/colossalai/legacy/zero/gemini/tensor_placement_policy.py +++ b/colossalai/legacy/zero/gemini/tensor_placement_policy.py @@ -5,8 +5,8 @@ import torch +from colossalai.legacy.utils.memory import colo_device_memory_capacity from colossalai.utils import get_current_device -from colossalai.utils.memory import colo_device_memory_capacity from colossalai.zero.gemini.memory_tracer import MemStatsCollector from .stateful_tensor import StatefulTensor diff --git a/colossalai/legacy/zero/sharded_model/sharded_model_v2.py b/colossalai/legacy/zero/sharded_model/sharded_model_v2.py index 353f09fbaaaf..8344b014959c 100644 --- a/colossalai/legacy/zero/sharded_model/sharded_model_v2.py +++ b/colossalai/legacy/zero/sharded_model/sharded_model_v2.py @@ -13,6 +13,7 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc +from colossalai.legacy.utils.memory import colo_device_memory_capacity from colossalai.legacy.zero.gemini.ophooks import register_ophooks_recursively from colossalai.legacy.zero.gemini.paramhooks import BaseParamHookMgr from colossalai.legacy.zero.gemini.stateful_tensor import TensorState @@ -23,7 +24,6 @@ from colossalai.legacy.zero.sharded_model.reduce_scatter import ReduceScatterBucketer from colossalai.logging import get_dist_logger from colossalai.utils import disposable, get_current_device -from colossalai.utils.memory import colo_device_memory_capacity from colossalai.zero.gemini.memory_tracer import MemStatsCollector, StaticMemStatsCollector from ._utils import ( diff --git a/colossalai/utils/__init__.py b/colossalai/utils/__init__.py index ba4c0423ee3b..06ac6c84c9cd 100644 --- a/colossalai/utils/__init__.py +++ b/colossalai/utils/__init__.py @@ -1,81 +1,31 @@ -from .activation_checkpoint import checkpoint -from .checkpointing import load_checkpoint, save_checkpoint from .common import ( _cast_float, - clip_grad_norm_fp32, conditional_context, - copy_tensor_parallel_attributes, - count_zeros_fp32, disposable, ensure_path_exists, free_storage, is_ddp_ignored, - is_dp_rank_0, - is_model_parallel_parameter, - is_no_pp_or_last_stage, - is_tp_rank_0, - is_using_ddp, - is_using_pp, - is_using_sequence, - multi_tensor_applier, - param_is_not_tensor_parallel_duplicate, - print_rank_0, set_seed, - switch_virtual_pipeline_parallel_rank, - sync_model_param, ) from .cuda import empty_cache, get_current_device, set_to_cuda, synchronize -from .data_sampler import DataParallelSampler, get_dataloader -from .memory import ( - colo_device_memory_capacity, - colo_device_memory_used, - colo_get_cpu_memory_capacity, - colo_set_cpu_memory_capacity, - colo_set_process_memory_fraction, - report_memory_usage, -) +from .multi_tensor_apply import multi_tensor_applier from .tensor_detector import TensorDetector from .timer import MultiTimer, Timer __all__ = [ - 'checkpoint', - 'print_rank_0', - 'sync_model_param', - 'is_ddp_ignored', - 'is_dp_rank_0', - 'is_tp_rank_0', - 'is_no_pp_or_last_stage', - 'is_using_ddp', - 'is_using_pp', - 'is_using_sequence', 'conditional_context', - 'is_model_parallel_parameter', - 'clip_grad_norm_fp32', - 'count_zeros_fp32', - 'copy_tensor_parallel_attributes', - 'param_is_not_tensor_parallel_duplicate', 'get_current_device', 'synchronize', 'empty_cache', 'set_to_cuda', - 'report_memory_usage', - 'colo_device_memory_capacity', - 'colo_device_memory_used', - 'colo_set_process_memory_fraction', 'Timer', 'MultiTimer', 'multi_tensor_applier', - 'DataParallelSampler', - 'get_dataloader', - 'switch_virtual_pipeline_parallel_rank', 'TensorDetector', - 'load_checkpoint', - 'save_checkpoint', 'ensure_path_exists', 'disposable', - 'colo_set_cpu_memory_capacity', - 'colo_get_cpu_memory_capacity', '_cast_float', 'free_storage', 'set_seed', + 'is_ddp_ignored', ] diff --git a/colossalai/utils/checkpoint/__init__.py b/colossalai/utils/checkpoint/__init__.py deleted file mode 100644 index 1795b4ce36f4..000000000000 --- a/colossalai/utils/checkpoint/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .module_checkpoint import save_checkpoint, load_checkpoint - -__all__ = ['save_checkpoint', 'load_checkpoint'] diff --git a/colossalai/utils/common.py b/colossalai/utils/common.py index 92159b57eb45..8c769c5b13c0 100644 --- a/colossalai/utils/common.py +++ b/colossalai/utils/common.py @@ -3,46 +3,12 @@ import functools import os import random -import socket -from collections import defaultdict from contextlib import contextmanager from pathlib import Path -from typing import Callable, Dict, List, Optional, Union +from typing import Callable import numpy as np import torch -import torch.distributed as dist -from torch import inf -from torch.nn.parameter import Parameter - -from colossalai.constants import IS_TENSOR_PARALLEL, NUM_PARTITIONS, TENSOR_PARALLEL_ATTRIBUTES -from colossalai.context.parallel_mode import ParallelMode -from colossalai.core import global_context as gpc -from colossalai.global_variables import tensor_parallel_env as env -from colossalai.legacy.tensor import ProcessGroup -from colossalai.tensor import ColoParameter - -from .multi_tensor_apply import multi_tensor_applier - -try: - from colossalai._C import fused_optim -except: - fused_optim = None - - -def print_rank_0(msg: str, logger=None): - """Print messages and save logs(optional). This is executed only if you are the rank-0 gpu. - - Args: - msg (str): A string message to output. - logger (:class:`colossalai.logging.DistributedLogger`, optional): - The logger to record the message, defaults to None. - """ - if gpc.get_global_rank() == 0: - if logger is None: - print(msg, flush=True) - else: - logger.info(msg) def ensure_path_exists(filename: str): @@ -52,47 +18,6 @@ def ensure_path_exists(filename: str): Path(dirpath).mkdir(parents=True, exist_ok=True) -def sync_model_param(model, parallel_mode): - r"""Make sure data parameters are consistent during Data Parallel Mode. - - Args: - model (:class:`torch.nn.Module`): A pyTorch model on whose parameters you check the consistency. - parallel_mode (:class:`colossalai.context.ParallelMode`): Parallel mode to be checked. - - Note: - The parallel_mode should be concluded in ``ParallelMode``. More details about ``ParallelMode`` could be found - in `parallel_mode `_ - """ - if gpc.is_initialized(parallel_mode) and gpc.get_world_size(parallel_mode) > 1: - for param in model.parameters(): - ranks = gpc.get_ranks_in_group(parallel_mode) - dist.broadcast(param, src=ranks[0], group=gpc.get_group(parallel_mode)) - - -def is_dp_rank_0(): - return not gpc.is_initialized(ParallelMode.DATA) or gpc.is_first_rank(ParallelMode.DATA) - - -def is_tp_rank_0(): - return not gpc.is_initialized(ParallelMode.TENSOR) or gpc.is_first_rank(ParallelMode.TENSOR) - - -def is_no_pp_or_last_stage(): - return not gpc.is_initialized(ParallelMode.PIPELINE) or gpc.is_last_rank(ParallelMode.PIPELINE) - - -def is_using_ddp(): - return gpc.is_initialized(ParallelMode.DATA) and gpc.get_world_size(ParallelMode.DATA) > 1 - - -def is_using_pp(): - return gpc.is_initialized(ParallelMode.PIPELINE) and gpc.get_world_size(ParallelMode.PIPELINE) > 1 - - -def is_using_sequence(): - return gpc.is_initialized(ParallelMode.SEQUENCE) and gpc.get_world_size(ParallelMode.SEQUENCE) > 1 - - @contextmanager def conditional_context(context_manager, enable=True): if enable: @@ -102,365 +27,10 @@ def conditional_context(context_manager, enable=True): yield -class model_branch_context(object): - - def __enter__(self): - self.env_status = env.save() - - def __exit__(self, *exc_info): - env.load(**self.env_status) - - -def is_model_parallel_parameter(p): - return hasattr(p, IS_TENSOR_PARALLEL) and getattr(p, IS_TENSOR_PARALLEL) - - def is_ddp_ignored(p): return getattr(p, '_ddp_to_ignore', False) -def _calc_l2_norm(grads): - # we should not - global fused_optim - - if fused_optim is None: - from colossalai.kernel.op_builder import FusedOptimBuilder - fused_optim = FusedOptimBuilder().load() - - norm = 0.0 - if len(grads) > 0: - dummy_overflow_buf = torch.cuda.IntTensor([0]) - norm, _ = multi_tensor_applier( - fused_optim.multi_tensor_l2norm, - dummy_overflow_buf, - [grads], - False # no per-parameter norm - ) - return norm - - -def _calc_lp(grads, norm_type): - norm = 0.0 - for grad in grads: - grad_norm = torch.norm(grad, norm_type) - norm += grad_norm**norm_type - return norm - - -def _move_norm_to_cuda(norm: Union[float, torch.Tensor]) -> Union[float, torch.Tensor]: - if torch.is_tensor(norm) and norm.device.type != 'cuda': - norm = norm.to(torch.cuda.current_device()) - return norm - - -def _get_tensor_norm(norm: Union[float, torch.Tensor], move_to_cuda) -> torch.Tensor: - if isinstance(norm, float): - norm = torch.Tensor([norm]) - if move_to_cuda: - norm = norm.to(torch.cuda.current_device()) - return norm - - -# ======== Gradient Clipping ========= - - -def _compute_local_lp(params: List[ColoParameter], norm_type: float) -> float: - if len(params) == 0: - return 0.0 - grads = [p.grad for p in params] - use_cuda_kernel = grads[0].device.type == 'cuda' - if norm_type == inf: - local_lp = max([g.abs().max() for g in grads]) - elif norm_type == 2.0 and use_cuda_kernel: - local_lp = _calc_l2_norm(grads)**norm_type - else: - local_lp = _calc_lp(grads, norm_type) - if isinstance(local_lp, torch.Tensor): - return local_lp.item() - return local_lp - - -def _compute_buckets_lp(params: List[ColoParameter], norm_type: float) -> float: - if len(params) == 0: - return 0.0 - buckets: Dict[Optional[ProcessGroup], List[ColoParameter]] = defaultdict(list) - for p in params: - if p.is_replicate(): - buckets[None].append(p) - else: - buckets[p.get_process_group().tp_process_group()].append(p) - total_lp = 0.0 - for group, bucket in buckets.items(): - local_lp = _compute_local_lp(bucket, norm_type) - if group is not None: - local_lp_tensor = torch.tensor([local_lp], device=torch.cuda.current_device()) - if norm_type == inf: - dist.all_reduce(local_lp_tensor, op=dist.ReduceOp.MAX, group=group) - else: - dist.all_reduce(local_lp_tensor, group=group) - local_lp = local_lp_tensor.item() - if norm_type == inf: - total_lp = max(total_lp, local_lp) - else: - total_lp += local_lp - return total_lp - - -def _compute_pp_grad_lp(total_lp: float, norm_type: float) -> float: - if gpc.is_initialized(ParallelMode.PIPELINE) and gpc.get_world_size(ParallelMode.PIPELINE) > 1: - total_lp_tensor = torch.tensor([total_lp], device=torch.cuda.current_device()) - if norm_type == inf: - dist.all_reduce(total_lp_tensor, op=dist.ReduceOp.MAX, group=gpc.get_group(ParallelMode.PIPELINE)) - else: - dist.all_reduce(total_lp_tensor, group=gpc.get_group(ParallelMode.PIPELINE)) - total_lp = total_lp_tensor.item() - return total_lp - - -def _compute_grad_lp(parameters, norm_type: float = 2.0) -> float: - if isinstance(parameters, torch.Tensor): - parameters = [parameters] - grad_dtype = None - cpu_grad_params: List[ColoParameter] = [] - cuda_grad_params: List[ColoParameter] = [] - for p in parameters: - if p.grad is None: - continue - assert isinstance(p, ColoParameter) - if grad_dtype is None: - grad_dtype = p.grad.dtype - assert p.grad.dtype == grad_dtype, f'Expected all grads are {grad_dtype}, got {p.grad.dtype}' - if p.grad.device.type == 'cuda': - cuda_grad_params.append(p) - else: - cpu_grad_params.append(p) - norm_type = float(norm_type) - cpu_lp = _compute_buckets_lp(cpu_grad_params, norm_type) - cuda_lp = _compute_buckets_lp(cuda_grad_params, norm_type) - if norm_type == inf: - total_lp = max(cpu_lp, cuda_lp) - else: - total_lp = cpu_lp + cuda_lp - return _compute_pp_grad_lp(total_lp, norm_type) - - -def compute_grad_norm(parameters, norm_type: float = 2.0) -> float: - norm_type = float(norm_type) - total_norm = _compute_grad_lp(parameters, norm_type) - if norm_type != inf: - total_norm = total_norm**(1 / norm_type) - return total_norm - - -def _clip_grad_norm(parameters, max_norm: float, total_norm: float) -> None: - clip_coef = max_norm / (total_norm + 1e-6) - if clip_coef < 1.0: - cuda_grads: List[torch.Tensor] = [] - cpu_grads: List[torch.Tensor] = [] - if isinstance(parameters, torch.Tensor): - parameters = [parameters] - for p in parameters: - if p.grad is None: - continue - if p.grad.device.type == 'cuda': - cuda_grads.append(p.grad.detach()) - else: - cpu_grads.append(p.grad.detach()) - if len(cuda_grads) > 0: - dummy_overflow_buf = torch.cuda.IntTensor([0]) - multi_tensor_applier(fused_optim.multi_tensor_scale, dummy_overflow_buf, [cuda_grads, cuda_grads], - clip_coef) - for g in cpu_grads: - g.mul_(clip_coef) - - -def clip_grad_norm(parameters, max_norm: float, norm_type: float = 2.0) -> float: - total_norm = compute_grad_norm(parameters, norm_type) - _clip_grad_norm(parameters, max_norm, total_norm) - return total_norm - - -def clip_grad_norm_fp32(parameters, max_norm, norm_type=2): - """Clips gradient norm of an iterable of parameters whose gradients are in fp32. - - This is adapted from :func:`torch.nn.utils.clip_grad.clip_grad_norm_` and - added functionality to handle model parallel parameters. - - Note: - the gradients are modified in place. - - Args: - parameters (Iterable[:class:`torch.tensor`] or :class:`torch.tensor`): - An iterable of Tensors or a single Tensor that will have gradients normalized. - max_norm (Union[float, int]): Max norm of the gradients. - norm_type (Union[float, int, 'inf']): Type of the used p-norm. Can be ``'inf'`` for infinity norm. - - Returns: - float: Total norm of the parameters. - """ - - if isinstance(parameters, torch.Tensor): - parameters = [parameters] - - # Filter parameters based on: - # - grad should not be none - # - parameter should not be shared - # - should not be a replica due to tensor model parallelism - params: List[Parameter] = [] - has_zero_shared_param: bool = False - for param in parameters: - if param.grad is not None: - # Make sure the grads are in fp32 - assert param.grad.dtype == torch.float, \ - f'expected gradient to be dtype torch.float, but got {param.grad.type()}' - if hasattr(param, 'colo_attr') and param.colo_attr.sharded_data_tensor.is_sharded: - has_zero_shared_param = True - params.append(param) - - if len(params) == 0: - enable_cuda_kernels = False - else: - enable_cuda_kernels = params[0].grad.device.type == 'cuda' - # Norm parameters. - max_norm = float(max_norm) - norm_type = float(norm_type) - - # Parameters can be on CPU or CUDA - # If parameters are on CPU, disable CUDA kernels - - # Calculate norm. - if norm_type == inf: - total_norm = max(p.grad.data.abs().max() for p in params) - total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)]) - # Take max across all model-parallel GPUs. - if gpc.is_initialized(ParallelMode.MODEL) and gpc.get_world_size(ParallelMode.MODEL) > 1: - dist.all_reduce(total_norm_cuda, - op=dist.ReduceOp.MAX, - group=gpc.get_group(ParallelMode.MODEL), - async_op=False) - if has_zero_shared_param: - dist.all_reduce(total_norm_cuda, - op=dist.ReduceOp.MAX, - group=gpc.get_group(ParallelMode.DATA), - async_op=False) - total_norm = total_norm_cuda[0].item() - else: - tensor_parallel_grads = [] - no_tensor_parallel_grads = [] - zero_sharded_grads = [] - for p in params: - if is_model_parallel_parameter(p): - reductor = (gpc.get_world_size(ParallelMode.TENSOR) / getattr(p, NUM_PARTITIONS))**(1 / norm_type) - tensor_parallel_grads.append(p.grad.data / reductor) - elif hasattr(p, 'colo_attr') and p.colo_attr.sharded_data_tensor.is_sharded: - zero_sharded_grads.append(p.grad.data) - else: - no_tensor_parallel_grads.append(p.grad.data) - - if norm_type == 2.0 and enable_cuda_kernels: - tensor_parallel_norm = _calc_l2_norm(tensor_parallel_grads)**norm_type - no_tensor_parallel_norm = _calc_l2_norm(no_tensor_parallel_grads)**norm_type - zero_sharded_norm = _calc_l2_norm(zero_sharded_grads)**norm_type - else: - tensor_parallel_norm = _calc_lp(tensor_parallel_grads, norm_type) - no_tensor_parallel_norm = _calc_lp(no_tensor_parallel_grads, norm_type) - zero_sharded_norm = _calc_lp(zero_sharded_grads, norm_type) - # If norm is type of float, then we convert them into torch.Tensor. - tensor_parallel_norm = _get_tensor_norm(tensor_parallel_norm, enable_cuda_kernels) - no_tensor_parallel_norm = _get_tensor_norm(no_tensor_parallel_norm, enable_cuda_kernels) - zero_sharded_norm = _get_tensor_norm(zero_sharded_norm, enable_cuda_kernels) - # If grads are on CPU, the norms is also on CPU. Cast them to CUDA tensors - if not enable_cuda_kernels: - tensor_parallel_norm = _move_norm_to_cuda(tensor_parallel_norm) - no_tensor_parallel_norm = _move_norm_to_cuda(no_tensor_parallel_norm) - zero_sharded_norm = _move_norm_to_cuda(zero_sharded_norm) - - # Sum across all model-parallel GPUs. - if gpc.is_initialized(ParallelMode.TENSOR) and len(tensor_parallel_grads) > 0: - dist.all_reduce(tensor_parallel_norm, op=dist.ReduceOp.SUM, group=gpc.get_group(ParallelMode.TENSOR)) - # Sum across all zero sharded GPUs - if len(zero_sharded_grads) > 0: - dist.all_reduce(zero_sharded_norm, group=gpc.get_group(ParallelMode.DATA)) - no_tensor_parallel_norm += zero_sharded_norm - total_norm = tensor_parallel_norm + no_tensor_parallel_norm - if gpc.is_initialized(ParallelMode.PIPELINE) and gpc.get_world_size(ParallelMode.PIPELINE) > 1: - dist.all_reduce(total_norm, op=dist.ReduceOp.SUM, group=gpc.get_group(ParallelMode.PIPELINE)) - total_norm = total_norm**(1.0 / norm_type) - if torch.is_tensor(total_norm): - total_norm = total_norm.item() - - # Scale. - clip_coeff = max_norm / (total_norm + 1.0e-6) - if clip_coeff < 1.0: - if enable_cuda_kernels: - grads = [p.grad.detach() for p in params] - dummy_overflow_buf = torch.cuda.IntTensor([0]) - multi_tensor_applier(fused_optim.multi_tensor_scale, dummy_overflow_buf, [grads, grads], clip_coeff) - else: - for p in params: - p.grad.detach().mul_(clip_coeff) - return total_norm - - -def count_zeros_fp32(parameters): - if isinstance(parameters, torch.Tensor): - parameters = [parameters] - - # Filter parameters based on: - # - grad should not be none - # - parameter should not be shared - # - should not be a replica due to tensor model parallelism - total_num_zeros = 0.0 - for param in parameters: - grad_not_none = param.grad is not None - is_not_tp_duplicate = param_is_not_tensor_parallel_duplicate(param) - if grad_not_none and is_not_tp_duplicate: - grad = param.grad.detach() - num_zeros = grad.numel() - torch.count_nonzero(grad) - total_num_zeros = num_zeros + total_num_zeros - - total_num_zeros = torch.IntTensor([int(total_num_zeros)]).cuda() - - # Sum across all model-parallel GPUs. - ops = [] - ops.append( - dist.all_reduce(total_num_zeros, op=dist.ReduceOp.SUM, group=gpc.get_group(ParallelMode.TENSOR), async_op=True)) - if gpc.is_initialized(ParallelMode.PIPELINE): - ops.append( - dist.all_reduce(total_num_zeros, - op=dist.ReduceOp.SUM, - group=gpc.get_group(ParallelMode.PIPELINE), - async_op=True)) - - for req in ops: - req.wait() - total_num_zeros = total_num_zeros.item() - - return total_num_zeros - - -def copy_tensor_parallel_attributes(src_tensor, dst_tensor): - for attr in TENSOR_PARALLEL_ATTRIBUTES: - if hasattr(src_tensor, attr): - val = getattr(src_tensor, attr) - setattr(dst_tensor, attr, val) - - -def param_is_not_tensor_parallel_duplicate(param): - return (hasattr(param, IS_TENSOR_PARALLEL) and getattr(param, IS_TENSOR_PARALLEL)) or (gpc.get_local_rank( - ParallelMode.TENSOR) == 0) - - -@contextmanager -def switch_virtual_pipeline_parallel_rank(rank): - prev_rank = gpc.virtual_pipeline_parallel_rank - try: - gpc.set_virtual_pipeline_parallel_rank(rank) - yield - finally: - gpc.set_virtual_pipeline_parallel_rank(prev_rank) - - def disposable(func: Callable) -> Callable: executed = False diff --git a/colossalai/utils/moe.py b/colossalai/utils/moe.py index 86d04c11958b..35205414f5e9 100644 --- a/colossalai/utils/moe.py +++ b/colossalai/utils/moe.py @@ -1,52 +1,54 @@ -import torch.nn as nn -import torch.distributed as dist -from colossalai.core import global_context as gpc -from colossalai.context.moe_context import MOE_CONTEXT -from colossalai.context import ParallelMode -from .common import is_using_ddp -from typing import Dict, List - - -def get_moe_epsize_param_dict(model: nn.Module) -> Dict[int, List[nn.Parameter]]: - """Returns a parameter dictionary, the key of which is the expert parallel - size of every parameter. Since the parameters in data parallelism is replicated - in each GPU, we set their ep_size to 1. - - Args: - model (:class:`torch.nn.Module`): A pyTorch `nn.Module` from which we get dict. - """ - epsize_param_dict = dict() - for param in model.parameters(): - if not hasattr(param, 'moe_info'): - ep_size = 1 # set ep_size to 1 for dp parameters - else: - ep_size = param.moe_info.ep_size - if ep_size not in epsize_param_dict: - epsize_param_dict[ep_size] = [] - epsize_param_dict[ep_size].append(param) - - return epsize_param_dict - - -def sync_moe_model_param(model: nn.Module): - """Make sure model parameters are consistent in MoE parallel context. - - Args: - model (:class:`torch.nn.Module`): A pyTorch model on whose parameters you check the consistency. - """ - if is_using_ddp(): - - param_dict = get_moe_epsize_param_dict(model) - - # synchronize the parameters whose dp_group is the whole world - if 1 in param_dict: - src_rank = gpc.get_ranks_in_group(ParallelMode.DATA)[0] - for param in param_dict[1]: - dist.broadcast(param, src=src_rank, group=gpc.get_group(ParallelMode.DATA)) - - for ep_size in param_dict: - # When ep_size = world_size, communication is not needed - if ep_size != 1 and ep_size != MOE_CONTEXT.world_size: - src_rank = dist.get_rank(MOE_CONTEXT.parallel_info_dict[ep_size].ep_group) - for param in param_dict[ep_size]: - dist.broadcast(param, src=src_rank, group=param.moe_info.dp_group) +from typing import Dict, List + +import torch.distributed as dist +import torch.nn as nn + +from colossalai.context import ParallelMode +from colossalai.context.moe_context import MOE_CONTEXT +from colossalai.core import global_context as gpc +from colossalai.legacy.utils import is_using_ddp + + +def get_moe_epsize_param_dict(model: nn.Module) -> Dict[int, List[nn.Parameter]]: + """Returns a parameter dictionary, the key of which is the expert parallel + size of every parameter. Since the parameters in data parallelism is replicated + in each GPU, we set their ep_size to 1. + + Args: + model (:class:`torch.nn.Module`): A pyTorch `nn.Module` from which we get dict. + """ + epsize_param_dict = dict() + for param in model.parameters(): + if not hasattr(param, 'moe_info'): + ep_size = 1 # set ep_size to 1 for dp parameters + else: + ep_size = param.moe_info.ep_size + if ep_size not in epsize_param_dict: + epsize_param_dict[ep_size] = [] + epsize_param_dict[ep_size].append(param) + + return epsize_param_dict + + +def sync_moe_model_param(model: nn.Module): + """Make sure model parameters are consistent in MoE parallel context. + + Args: + model (:class:`torch.nn.Module`): A pyTorch model on whose parameters you check the consistency. + """ + if is_using_ddp(): + + param_dict = get_moe_epsize_param_dict(model) + + # synchronize the parameters whose dp_group is the whole world + if 1 in param_dict: + src_rank = gpc.get_ranks_in_group(ParallelMode.DATA)[0] + for param in param_dict[1]: + dist.broadcast(param, src=src_rank, group=gpc.get_group(ParallelMode.DATA)) + + for ep_size in param_dict: + # When ep_size = world_size, communication is not needed + if ep_size != 1 and ep_size != MOE_CONTEXT.world_size: + src_rank = dist.get_rank(MOE_CONTEXT.parallel_info_dict[ep_size].ep_group) + for param in param_dict[ep_size]: + dist.broadcast(param, src=src_rank, group=param.moe_info.dp_group) diff --git a/colossalai/zero/gemini/memory_tracer/chunk_memstats_collector.py b/colossalai/zero/gemini/memory_tracer/chunk_memstats_collector.py index 83903bbf4023..d65dc2c79abd 100644 --- a/colossalai/zero/gemini/memory_tracer/chunk_memstats_collector.py +++ b/colossalai/zero/gemini/memory_tracer/chunk_memstats_collector.py @@ -1,7 +1,7 @@ from typing import Optional +from colossalai.legacy.utils.memory import colo_device_memory_capacity from colossalai.utils import get_current_device -from colossalai.utils.memory import colo_device_memory_capacity from colossalai.zero.gemini.chunk import ChunkManager from .memory_stats import MemStats diff --git a/colossalai/zero/gemini/memory_tracer/memory_monitor.py b/colossalai/zero/gemini/memory_tracer/memory_monitor.py index 4bb585677d5b..13c3283a9c7f 100644 --- a/colossalai/zero/gemini/memory_tracer/memory_monitor.py +++ b/colossalai/zero/gemini/memory_tracer/memory_monitor.py @@ -5,7 +5,8 @@ import torch -from colossalai.utils import colo_device_memory_used, get_current_device +from colossalai.legacy.utils import colo_device_memory_used +from colossalai.utils import get_current_device class MemoryMonitor: diff --git a/colossalai/zero/gemini/placement_policy.py b/colossalai/zero/gemini/placement_policy.py index cd775da5e11f..a35529723a68 100644 --- a/colossalai/zero/gemini/placement_policy.py +++ b/colossalai/zero/gemini/placement_policy.py @@ -6,8 +6,8 @@ import torch +from colossalai.legacy.utils.memory import colo_device_memory_capacity from colossalai.utils import get_current_device -from colossalai.utils.memory import colo_device_memory_capacity from colossalai.zero.gemini.chunk import Chunk from .chunk import Chunk, ChunkManager diff --git a/colossalai/zero/low_level/_utils.py b/colossalai/zero/low_level/_utils.py index 4205a9891534..4064fa0312e6 100644 --- a/colossalai/zero/low_level/_utils.py +++ b/colossalai/zero/low_level/_utils.py @@ -7,8 +7,8 @@ from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors from torch.distributed import ProcessGroup +from colossalai.legacy.utils import is_model_parallel_parameter from colossalai.tensor import ColoParameter -from colossalai.utils import is_model_parallel_parameter def flatten(input_): diff --git a/examples/language/gpt/titans/model/gpt1d.py b/examples/language/gpt/titans/model/gpt1d.py index 72297c540da1..18a801ce8697 100644 --- a/examples/language/gpt/titans/model/gpt1d.py +++ b/examples/language/gpt/titans/model/gpt1d.py @@ -14,8 +14,8 @@ from colossalai.legacy.nn.layer import Linear1D_Col, Linear1D_Row from colossalai.legacy.nn.layer.base_layer import ParallelLayer from colossalai.legacy.nn.layer.utils import ACT2FN, divide +from colossalai.legacy.utils.activation_checkpoint import checkpoint from colossalai.utils import checkpoint -from colossalai.utils.activation_checkpoint import checkpoint __all__ = [ 'GPTMLP1D', 'GPTSelfAttention1D', 'GPTTransformerLayer1D', 'FusedGPTSelfAttention1D', 'FusedGPTTransformerLayer1D' diff --git a/examples/tutorial/opt/opt/run_clm.py b/examples/tutorial/opt/opt/run_clm.py index 91380e243fb8..efcf3fc83962 100755 --- a/examples/tutorial/opt/opt/run_clm.py +++ b/examples/tutorial/opt/opt/run_clm.py @@ -53,10 +53,11 @@ import colossalai from colossalai.context import ParallelMode from colossalai.core import global_context as gpc +from colossalai.legacy.tensor import ProcessGroup +from colossalai.legacy.utils import get_dataloader from colossalai.logging import disable_existing_loggers, get_dist_logger from colossalai.nn.optimizer import HybridAdam -from colossalai.tensor import ProcessGroup -from colossalai.utils import get_current_device, get_dataloader +from colossalai.utils import get_current_device from colossalai.zero import GeminiOptimizer require_version("datasets>=1.8.0", "To fix: pip install -r examples/pytorch/language-modeling/requirements.txt") diff --git a/examples/tutorial/sequence_parallel/train.py b/examples/tutorial/sequence_parallel/train.py index 86c4edeb5550..66ea499364f4 100644 --- a/examples/tutorial/sequence_parallel/train.py +++ b/examples/tutorial/sequence_parallel/train.py @@ -13,9 +13,10 @@ from colossalai.core import global_context as gpc from colossalai.kernel import LayerNorm from colossalai.legacy.engine.schedule import PipelineSchedule +from colossalai.legacy.utils import is_using_pp from colossalai.logging import get_dist_logger from colossalai.nn.optimizer import FusedAdam -from colossalai.utils import MultiTimer, is_using_pp +from colossalai.utils import MultiTimer def process_batch_data(batch_data): diff --git a/tests/components_to_test/resnet.py b/tests/components_to_test/resnet.py index 193832ebc12d..df01e4c4847e 100644 --- a/tests/components_to_test/resnet.py +++ b/tests/components_to_test/resnet.py @@ -1,11 +1,14 @@ -from torchvision.models import resnet18 -from .registry import non_distributed_component_funcs -from pathlib import Path import os +from pathlib import Path + import torch -from torchvision.transforms import transforms from torchvision.datasets import CIFAR10 -from colossalai.utils import get_dataloader +from torchvision.models import resnet18 +from torchvision.transforms import transforms + +from colossalai.legacy.utils import get_dataloader + +from .registry import non_distributed_component_funcs def get_cifar10_dataloader(train): diff --git a/tests/test_data/test_cifar10_dataset.py b/tests/test_legacy/test_data/test_cifar10_dataset.py similarity index 100% rename from tests/test_data/test_cifar10_dataset.py rename to tests/test_legacy/test_data/test_cifar10_dataset.py diff --git a/tests/test_data/test_data_parallel_sampler.py b/tests/test_legacy/test_data/test_data_parallel_sampler.py similarity index 97% rename from tests/test_data/test_data_parallel_sampler.py rename to tests/test_legacy/test_data/test_data_parallel_sampler.py index 7beef707c096..e09dedad72a5 100644 --- a/tests/test_data/test_data_parallel_sampler.py +++ b/tests/test_legacy/test_data/test_data_parallel_sampler.py @@ -12,8 +12,8 @@ import colossalai from colossalai.context import Config, ParallelMode from colossalai.core import global_context as gpc +from colossalai.legacy.utils import get_dataloader from colossalai.testing import rerun_if_address_is_in_use, spawn -from colossalai.utils import get_dataloader CONFIG = Config(dict( parallel=dict( diff --git a/tests/test_data/test_deterministic_dataloader.py b/tests/test_legacy/test_data/test_deterministic_dataloader.py similarity index 97% rename from tests/test_data/test_deterministic_dataloader.py rename to tests/test_legacy/test_data/test_deterministic_dataloader.py index 283b5cc35279..28b12048350d 100644 --- a/tests/test_data/test_deterministic_dataloader.py +++ b/tests/test_legacy/test_data/test_deterministic_dataloader.py @@ -12,8 +12,8 @@ import colossalai from colossalai.context import Config, ParallelMode from colossalai.core import global_context as gpc +from colossalai.legacy.utils import get_dataloader from colossalai.testing import rerun_if_address_is_in_use, spawn -from colossalai.utils import get_dataloader CONFIG = Config( dict( diff --git a/tests/test_legacy/test_engine/test_gradient_accumluation.py b/tests/test_legacy/test_engine/test_gradient_accumluation.py index 7783827c7c44..eae1eaa45b9b 100644 --- a/tests/test_legacy/test_engine/test_gradient_accumluation.py +++ b/tests/test_legacy/test_engine/test_gradient_accumluation.py @@ -11,9 +11,9 @@ import colossalai from colossalai.core import global_context as gpc +from colossalai.legacy.utils import get_dataloader from colossalai.logging import get_dist_logger from colossalai.testing import rerun_if_address_is_in_use, spawn -from colossalai.utils import get_dataloader # Config BATCH_SIZE = 2 diff --git a/tests/test_legacy/test_layers/test_1d/checks_1d/check_layer_1d.py b/tests/test_legacy/test_layers/test_1d/checks_1d/check_layer_1d.py index dcb2be62671b..03986be62f74 100644 --- a/tests/test_legacy/test_layers/test_1d/checks_1d/check_layer_1d.py +++ b/tests/test_legacy/test_layers/test_1d/checks_1d/check_layer_1d.py @@ -15,7 +15,8 @@ VocabParallelCrossEntropyLoss1D, VocabParallelEmbedding1D, ) -from colossalai.utils import get_current_device, print_rank_0 +from colossalai.legacy.utils import print_rank_0 +from colossalai.utils import get_current_device from .common import BATCH_SIZE, DEPTH, HIDDEN_SIZE, NUM_CLASSES, SEQ_LENGTH, VOCAB_SIZE, check_equal diff --git a/tests/test_legacy/test_layers/test_2d/checks_2d/check_layer_2d.py b/tests/test_legacy/test_layers/test_2d/checks_2d/check_layer_2d.py index 0ee88c26035f..e026d8a8c58d 100644 --- a/tests/test_legacy/test_layers/test_2d/checks_2d/check_layer_2d.py +++ b/tests/test_legacy/test_layers/test_2d/checks_2d/check_layer_2d.py @@ -15,7 +15,8 @@ VocabParallelCrossEntropyLoss2D, VocabParallelEmbedding2D, ) -from colossalai.utils import get_current_device, print_rank_0 +from colossalai.legacy.utils import print_rank_0 +from colossalai.utils import get_current_device from .common import BATCH_SIZE, DEPTH, HIDDEN_SIZE, IMG_SIZE, NUM_CLASSES, SEQ_LENGTH, VOCAB_SIZE, check_equal diff --git a/tests/test_legacy/test_layers/test_2d/checks_2d/check_operation_2d.py b/tests/test_legacy/test_layers/test_2d/checks_2d/check_operation_2d.py index ae1d1120cfb9..28c4e00e4eef 100644 --- a/tests/test_legacy/test_layers/test_2d/checks_2d/check_operation_2d.py +++ b/tests/test_legacy/test_layers/test_2d/checks_2d/check_operation_2d.py @@ -6,7 +6,8 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.legacy.nn.layer.parallel_2d._operation import Matmul_AB_2D, Matmul_ABT_2D, Matmul_ATB_2D -from colossalai.utils import get_current_device, print_rank_0 +from colossalai.legacy.utils import print_rank_0 +from colossalai.utils import get_current_device from .common import BATCH_SIZE, DEPTH, HIDDEN_SIZE, SEQ_LENGTH, check_equal diff --git a/tests/test_legacy/test_layers/test_2p5d/checks_2p5d/check_layer_2p5d.py b/tests/test_legacy/test_layers/test_2p5d/checks_2p5d/check_layer_2p5d.py index 5a99b05cfe7e..e6eac4b5f222 100644 --- a/tests/test_legacy/test_layers/test_2p5d/checks_2p5d/check_layer_2p5d.py +++ b/tests/test_legacy/test_layers/test_2p5d/checks_2p5d/check_layer_2p5d.py @@ -16,7 +16,8 @@ VocabParallelCrossEntropyLoss2p5D, VocabParallelEmbedding2p5D, ) -from colossalai.utils import get_current_device, print_rank_0 +from colossalai.legacy.utils import print_rank_0 +from colossalai.utils import get_current_device from .common import * diff --git a/tests/test_legacy/test_layers/test_2p5d/checks_2p5d/check_operation_2p5d.py b/tests/test_legacy/test_layers/test_2p5d/checks_2p5d/check_operation_2p5d.py index db19967676d2..5a88e776a27d 100644 --- a/tests/test_legacy/test_layers/test_2p5d/checks_2p5d/check_operation_2p5d.py +++ b/tests/test_legacy/test_layers/test_2p5d/checks_2p5d/check_operation_2p5d.py @@ -3,7 +3,8 @@ from colossalai.context import ParallelMode from colossalai.core import global_context as gpc from colossalai.legacy.nn.layer.parallel_2p5d._operation import Matmul_AB_2p5D, Matmul_ABT_2p5D, Matmul_ATB_2p5D -from colossalai.utils import get_current_device, print_rank_0 +from colossalai.legacy.utils import print_rank_0 +from colossalai.utils import get_current_device from .common import * diff --git a/tests/test_legacy/test_layers/test_3d/checks_3d/check_layer_3d.py b/tests/test_legacy/test_layers/test_3d/checks_3d/check_layer_3d.py index cee639a9f00a..4a12169a4f54 100644 --- a/tests/test_legacy/test_layers/test_3d/checks_3d/check_layer_3d.py +++ b/tests/test_legacy/test_layers/test_3d/checks_3d/check_layer_3d.py @@ -21,8 +21,9 @@ VocabParallelEmbedding3D, ) from colossalai.legacy.nn.layer.parallel_3d._utils import get_parallel_mode_from_env +from colossalai.legacy.utils import print_rank_0 from colossalai.logging import get_dist_logger -from colossalai.utils import get_current_device, print_rank_0 +from colossalai.utils import get_current_device from .common import BATCH_SIZE, DEPTH, HIDDEN_SIZE, IMG_SIZE, NUM_CLASSES, SEQ_LENGTH, VOCAB_SIZE, check_equal diff --git a/tests/test_legacy/test_trainer/test_pipeline/test_pipeline_schedule.py b/tests/test_legacy/test_trainer/test_pipeline/test_pipeline_schedule.py index 6d7bf6b3d89f..4cc887eea686 100644 --- a/tests/test_legacy/test_trainer/test_pipeline/test_pipeline_schedule.py +++ b/tests/test_legacy/test_trainer/test_pipeline/test_pipeline_schedule.py @@ -14,8 +14,8 @@ from colossalai.context import ParallelMode from colossalai.core import global_context as gpc from colossalai.initialize import launch +from colossalai.legacy.utils import get_dataloader, print_rank_0 from colossalai.testing import rerun_if_address_is_in_use, spawn -from colossalai.utils import get_dataloader, print_rank_0 BATCH_SIZE = 8 diff --git a/tests/test_legacy/test_trainer/test_trainer_with_pipe_schedule.py b/tests/test_legacy/test_trainer/test_trainer_with_pipe_schedule.py index 7dfbec854ccc..335aad44c01b 100644 --- a/tests/test_legacy/test_trainer/test_trainer_with_pipe_schedule.py +++ b/tests/test_legacy/test_trainer/test_trainer_with_pipe_schedule.py @@ -13,9 +13,10 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.legacy.trainer import Trainer +from colossalai.legacy.utils import get_dataloader from colossalai.logging import get_dist_logger from colossalai.testing import rerun_if_address_is_in_use, spawn -from colossalai.utils import MultiTimer, get_dataloader +from colossalai.utils import MultiTimer BATCH_SIZE = 4 IMG_SIZE = 32 diff --git a/tests/test_utils/test_activation_checkpointing.py b/tests/test_legacy/test_utils/test_activation_checkpointing.py similarity index 98% rename from tests/test_utils/test_activation_checkpointing.py rename to tests/test_legacy/test_utils/test_activation_checkpointing.py index b7764c2f4371..dae49627b13f 100644 --- a/tests/test_utils/test_activation_checkpointing.py +++ b/tests/test_legacy/test_utils/test_activation_checkpointing.py @@ -7,8 +7,8 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.context.random import add_seed, reset_seeds, seed, set_mode +from colossalai.legacy.utils.activation_checkpoint import checkpoint from colossalai.testing import clear_cache_before_run, parameterize -from colossalai.utils.activation_checkpoint import checkpoint def forward(x, weight): diff --git a/tests/test_utils/test_checkpoint/test_checkpoint_1d.py b/tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_1d.py similarity index 93% rename from tests/test_utils/test_checkpoint/test_checkpoint_1d.py rename to tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_1d.py index 9e4f5934c92b..f89e5995762c 100644 --- a/tests/test_utils/test_checkpoint/test_checkpoint_1d.py +++ b/tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_1d.py @@ -11,10 +11,10 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.initialize import launch +from colossalai.legacy.utils import is_using_pp +from colossalai.legacy.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint from colossalai.logging import disable_existing_loggers from colossalai.testing import rerun_if_address_is_in_use, skip_if_not_enough_gpus, spawn -from colossalai.utils import is_using_pp -from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint def build_pipeline(model): diff --git a/tests/test_utils/test_checkpoint/test_checkpoint_2d.py b/tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_2d.py similarity index 93% rename from tests/test_utils/test_checkpoint/test_checkpoint_2d.py rename to tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_2d.py index d3aaa783e073..fa8fdd1003d0 100644 --- a/tests/test_utils/test_checkpoint/test_checkpoint_2d.py +++ b/tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_2d.py @@ -11,10 +11,10 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.initialize import launch +from colossalai.legacy.utils import is_using_pp +from colossalai.legacy.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint from colossalai.logging import disable_existing_loggers from colossalai.testing import rerun_if_address_is_in_use, skip_if_not_enough_gpus, spawn -from colossalai.utils import is_using_pp -from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint def build_pipeline(model): diff --git a/tests/test_utils/test_checkpoint/test_checkpoint_2p5d.py b/tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_2p5d.py similarity index 93% rename from tests/test_utils/test_checkpoint/test_checkpoint_2p5d.py rename to tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_2p5d.py index 9e52263a2b76..f0b9ef7d9df1 100644 --- a/tests/test_utils/test_checkpoint/test_checkpoint_2p5d.py +++ b/tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_2p5d.py @@ -11,10 +11,10 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.initialize import launch +from colossalai.legacy.utils import is_using_pp +from colossalai.legacy.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint from colossalai.logging import disable_existing_loggers from colossalai.testing import rerun_if_address_is_in_use, skip_if_not_enough_gpus, spawn -from colossalai.utils import is_using_pp -from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint def build_pipeline(model): diff --git a/tests/test_utils/test_checkpoint/test_checkpoint_3d.py b/tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_3d.py similarity index 93% rename from tests/test_utils/test_checkpoint/test_checkpoint_3d.py rename to tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_3d.py index 9f090efd7362..d7647990b3b3 100644 --- a/tests/test_utils/test_checkpoint/test_checkpoint_3d.py +++ b/tests/test_legacy/test_utils/test_checkpoint/test_checkpoint_3d.py @@ -11,10 +11,10 @@ from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc from colossalai.initialize import launch +from colossalai.legacy.utils import is_using_pp +from colossalai.legacy.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint from colossalai.logging import disable_existing_loggers from colossalai.testing import rerun_if_address_is_in_use, skip_if_not_enough_gpus, spawn -from colossalai.utils import is_using_pp -from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint def build_pipeline(model): diff --git a/tests/test_utils/test_memory.py b/tests/test_legacy/test_utils/test_memory.py similarity index 88% rename from tests/test_utils/test_memory.py rename to tests/test_legacy/test_utils/test_memory.py index c88c2f8ec3c5..04dc09d1fd3c 100644 --- a/tests/test_utils/test_memory.py +++ b/tests/test_legacy/test_utils/test_memory.py @@ -1,9 +1,9 @@ import pytest import colossalai +from colossalai.legacy.utils.memory import colo_device_memory_capacity, colo_set_process_memory_fraction from colossalai.testing import spawn from colossalai.utils.cuda import get_current_device -from colossalai.utils.memory import colo_device_memory_capacity, colo_set_process_memory_fraction def _run_colo_set_process_memory_fraction_and_colo_device_memory_capacity(): diff --git a/tests/test_utils/test_norm_gradient_clipping.py b/tests/test_legacy/test_utils/test_norm_gradient_clipping.py similarity index 95% rename from tests/test_utils/test_norm_gradient_clipping.py rename to tests/test_legacy/test_utils/test_norm_gradient_clipping.py index 4fd7c3c60a95..3a2d67593c9c 100644 --- a/tests/test_utils/test_norm_gradient_clipping.py +++ b/tests/test_legacy/test_utils/test_norm_gradient_clipping.py @@ -4,12 +4,12 @@ from torch.nn.utils import clip_grad_norm_ import colossalai +from colossalai.legacy.tensor import ColoTensorSpec, ProcessGroup, distspec +from colossalai.legacy.utils.common import clip_grad_norm from colossalai.logging import disable_existing_loggers -from colossalai.tensor import ColoTensorSpec, ProcessGroup, distspec from colossalai.tensor.colo_parameter import ColoParameter from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn from colossalai.utils import get_current_device -from colossalai.utils.common import clip_grad_norm def close(num: float, other: float, rtol: float = 1e-5, atol: float = 1e-8): diff --git a/tests/test_zero/test_gemini/test_gemini_use_rmt.py b/tests/test_zero/test_gemini/test_gemini_use_rmt.py index a80a2f62de22..614a96ccdbcd 100644 --- a/tests/test_zero/test_gemini/test_gemini_use_rmt.py +++ b/tests/test_zero/test_gemini/test_gemini_use_rmt.py @@ -4,12 +4,12 @@ import colossalai from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn +from colossalai.utils import set_seed from colossalai.zero import GeminiDDP from colossalai.zero.gemini.chunk import search_chunk_configuration from colossalai.zero.gemini.memory_tracer.runtime_mem_tracer import RuntimeMemTracer from tests.components_to_test import run_fwd_bwd from tests.components_to_test.registry import non_distributed_component_funcs -from tests.test_tensor.common_utils import set_seed # run gemini use the runtime memory tracer diff --git a/tests/test_zero/test_low_level/test_zero_tp.py b/tests/test_zero/test_low_level/test_zero_tp.py deleted file mode 100644 index 4a2b49f63b7e..000000000000 --- a/tests/test_zero/test_low_level/test_zero_tp.py +++ /dev/null @@ -1,96 +0,0 @@ -import pytest -import torch -import torch.nn as nn -from torch.nn.parallel import DistributedDataParallel as DDP -from torch.testing import assert_close - -import colossalai -from colossalai.tensor import ProcessGroup -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn -from colossalai.utils import get_current_device -from colossalai.zero import ColoInitContext, LowLevelZeroOptimizer -from tests.test_tensor.common_utils import set_seed, split_param_col_tp1d, split_param_row_tp1d, tensor_shard_equal - - -def strict_shard_equal(tensor, shard, tp_pg, rtol=1e-3, atol=1e-4): - return tensor_shard_equal(tensor, shard, tp_pg.tp_local_rank(), tp_pg.tp_world_size(), rtol, atol) - - -class MlpModel(nn.Module): - - def __init__(self): - super(MlpModel, self).__init__() - self.linear1 = nn.Linear(32, 128) - self.act = nn.GELU() - self.linear2 = nn.Linear(128, 32) - - def forward(self, x): - y = self.linear1(x) - y = self.act(y) - y = self.linear2(y) - return x + y - - -@parameterize("overlap_flag", [False, True]) -@parameterize("partition_flag", [False, True]) -def exam_zero_with_tp(overlap_flag, partition_flag): - set_seed(233010) - tp_pg = ProcessGroup(tp_degree=2) - - with ColoInitContext(device=get_current_device(), default_pg=tp_pg): - hybrid_model = MlpModel() - torch_model = MlpModel().cuda() - for pt, ph in zip(torch_model.parameters(), hybrid_model.parameters()): - pt.data.copy_(ph.data) - - for name, param in hybrid_model.named_parameters(): - if 'linear1' in name: - split_param_row_tp1d(param, tp_pg) - param.compute_spec.set_output_replicate(False) - if 'linear2.weight' in name: - split_param_col_tp1d(param, tp_pg) - - torch_model = DDP(torch_model, device_ids=[tp_pg.rank()], process_group=tp_pg.dp_process_group()) - torch_optim = torch.optim.Adam(torch_model.parameters(), lr=1e-2) # set to 1e-2 for torch-1.11 - hybrid_optim = torch.optim.Adam(hybrid_model.parameters(), lr=1e-2) - hybrid_optim = LowLevelZeroOptimizer(hybrid_optim, - initial_scale=2, - clip_grad_norm=1.0, - overlap_communication=overlap_flag, - partition_grad=partition_flag, - dp_process_group=tp_pg.dp_process_group(), - tp_process_group=tp_pg.tp_process_group()) - - dp_local_rank = tp_pg.dp_local_rank() - set_seed(255 + dp_local_rank) - - data = torch.randn(8, 32, device=get_current_device()) - torch_loss = torch_model(data).sum() - hybrid_loss = hybrid_model(data).sum() - assert_close(torch_loss, hybrid_loss) - - torch_loss.backward() - torch.nn.utils.clip_grad_norm_(torch_model.parameters(), 1.0) - hybrid_optim.backward(hybrid_loss) - - torch_optim.step() - hybrid_optim.step() - - for (name, pt), ph in zip(torch_model.named_parameters(), hybrid_model.parameters()): - assert strict_shard_equal(pt.data, ph.data, tp_pg) - - -def run_dist(rank, world_size, port): - colossalai.launch(config={}, rank=rank, world_size=world_size, port=port, host='localhost') - exam_zero_with_tp() - - -@pytest.mark.skip('this will be rewritten by shardformer') -@pytest.mark.dist -@rerun_if_address_is_in_use() -def test_zero_with_tp(): - spawn(run_dist, 4) - - -if __name__ == '__main__': - test_zero_with_tp()