diff --git a/colossalai/communication/p2p.py b/colossalai/communication/p2p.py index 7e761e180f0e..3eb94ac60d84 100644 --- a/colossalai/communication/p2p.py +++ b/colossalai/communication/p2p.py @@ -17,8 +17,6 @@ def _communicate(tensor_send_next=None, recv_next_shape=None, prev_rank=None, next_rank=None, - up_group=None, - down_group=None, dtype=None): """ Adapted from megatron.p2p_communication. @@ -59,60 +57,44 @@ def _communicate(tensor_send_next=None, if prev_rank is None: prev_rank = gpc.get_prev_global_rank( ParallelMode.PIPELINE) - if up_group is None: - up_group = gpc.get_group(ParallelMode.PIPELINE_PREV) if tensor_send_next is not None or recv_next: if next_rank is None: next_rank = gpc.get_next_global_rank( ParallelMode.PIPELINE) - if down_group is None: - down_group = gpc.get_group(ParallelMode.PIPELINE_NEXT) # rank = dist.get_rank() rank = gpc.get_global_rank() ops = [] if tensor_send_prev is not None: - send_prev_op = dist.broadcast(tensor_send_prev, - src=rank, - group=up_group, - async_op=True) + send_prev_op = dist.P2POp(dist.isend, tensor_send_prev, prev_rank) ops.append(send_prev_op) if tensor_recv_prev is not None: - recv_prev_op = dist.broadcast(tensor_recv_prev, - src=prev_rank, - group=up_group, - async_op=True) + recv_prev_op = dist.P2POp(dist.irecv, tensor_recv_prev, prev_rank) ops.append(recv_prev_op) if tensor_recv_next is not None: - recv_next_op = dist.broadcast(tensor_recv_next, - src=next_rank, - group=down_group, - async_op=True) + recv_next_op = dist.P2POp(dist.irecv, tensor_recv_next, next_rank) ops.append(recv_next_op) if tensor_send_next is not None: - send_next_op = dist.broadcast(tensor_send_next, - src=rank, - group=down_group, - async_op=True) + send_next_op = dist.P2POp(dist.isend, tensor_send_next, next_rank) ops.append(send_next_op) - for req in ops: - req.wait() + if len(ops) > 0: + reqs = dist.batch_isend_irecv(ops) + for req in reqs: + req.wait() # To protect against race condition when using batch_isend_irecv(). torch.cuda.synchronize() return tensor_recv_prev, tensor_recv_next -def recv_forward(input_tensor_shape, prev_rank=None, up_group=None): +def recv_forward(input_tensor_shape, prev_rank=None): """Receives the input tensor from the previous member in pipeline. - + :param input_tensor_shape: The shape of the tensor to be recieved :param prev_rank: The rank of the source of the tensor - :param up_group: Communication group including the previous member in pipeline parallel group :type input_tensor_shape: torch.Size :type prev_rank: int, optional - :type up_group: ProcessGroup, optional :return: The input tensor in forward step :rtype: Tensor """ @@ -121,20 +103,17 @@ def recv_forward(input_tensor_shape, prev_rank=None, up_group=None): else: input_tensor, _ = _communicate(recv_prev=True, recv_prev_shape=input_tensor_shape, - prev_rank=prev_rank, - up_group=up_group) + prev_rank=prev_rank) return input_tensor -def recv_backward(output_grad_shape, next_rank=None, down_group=None): +def recv_backward(output_grad_shape, next_rank=None): """Receives the grad tensor from the next member in pipeline. - + :param output_grad_shape: The shape of the tensor to be recieved :param next_rank: The rank of the source of the tensor - :param down_group: Communication group including the next member in pipeline parallel group :type output_grad_shape: torch.Size :type next_rank: int, optional - :type down_group: ProcessGroup, optional :return: The grad of output tensor in forward step :rtype: Tensor """ @@ -143,56 +122,44 @@ def recv_backward(output_grad_shape, next_rank=None, down_group=None): else: _, output_tensor_grad = _communicate(recv_next=True, recv_next_shape=output_grad_shape, - next_rank=next_rank, - down_group=down_group) + next_rank=next_rank) return output_tensor_grad -def send_forward(output_tensor, - next_rank=None, - down_group=None): +def send_forward(output_tensor, next_rank=None): """Sends the input tensor to the next member in pipeline. - + :param output_tensor: Tensor to be sent :param next_rank: The rank of the recipient of the tensor - :param down_group: Communication group including the next member in pipeline parallel group :type output_tensor: Tensor :type next_rank: int, optional - :type down_group: ProcessGroup, optional """ if not gpc.is_last_rank(ParallelMode.PIPELINE): _communicate(tensor_send_next=output_tensor, - next_rank=next_rank, - down_group=down_group) + next_rank=next_rank) -def send_backward(input_tensor_grad, - prev_rank=None, - up_group=None): +def send_backward(input_tensor_grad, prev_rank=None): """Sends the grad tensor to the previous member in pipeline. - + :param input_tensor_grad: Tensor to be sent :param prev_rank: The rank of the recipient of the tensor - :param up_group: Communication group including the previous member in pipeline parallel group :type input_tensor_grad: Tensor :type prev_rank: int, optional - :type up_group: ProcessGroup, optional """ if not gpc.is_first_rank(ParallelMode.PIPELINE): _communicate(tensor_send_prev=input_tensor_grad, - prev_rank=prev_rank, - up_group=up_group) + prev_rank=prev_rank) def send_forward_recv_backward(output_tensor, output_grad_shape, recv_next=True, - next_rank=None, - down_group=None): + next_rank=None): """Batched communication operation. Sends the input tensor to the next member in pipeline, while recieves the grad tensor from the next member in pipeline. - + :param output_tensor: Tensor to be sent :param output_grad_shape: The shape of the tensor to be recieved :type output_tensor: Tensor @@ -206,20 +173,18 @@ def send_forward_recv_backward(output_tensor, _, output_tensor_grad = _communicate(tensor_send_next=output_tensor, recv_next=recv_next, recv_next_shape=output_grad_shape, - next_rank=next_rank, - down_group=down_group) + next_rank=next_rank) return output_tensor_grad def send_backward_recv_forward(input_tensor_grad, input_tensor_shape, recv_prev=True, - prev_rank=None, - up_group=None): + prev_rank=None): """Batched communication operation. Sends the grad tensor to the previous member in pipeline, while recieves the input tensor from the previous member in pipeline. - + :param input_tensor_grad: Tensor to be sent :param input_tensor_shape: The shape of the tensor to be recieved :type input_tensor_grad: Tensor @@ -233,8 +198,7 @@ def send_backward_recv_forward(input_tensor_grad, input_tensor, _ = _communicate(tensor_send_prev=input_tensor_grad, recv_prev=recv_prev, recv_prev_shape=input_tensor_shape, - prev_rank=prev_rank, - up_group=up_group) + prev_rank=prev_rank) return input_tensor @@ -242,13 +206,11 @@ def send_forward_recv_forward(output_tensor, input_tensor_shape, recv_prev=True, prev_rank=None, - next_rank=None, - up_group=None, - down_group=None): + next_rank=None): """Batched communication operation. Sends the input tensor to the next member in pipeline, while recieves the input tensor from the previous member in pipeline. - + :param output_tensor: Tensor to be sent :param input_tensor_shape: The shape of the tensor to be recieved :type output_tensor: Tensor @@ -260,9 +222,7 @@ def send_forward_recv_forward(output_tensor, recv_prev=recv_prev, recv_prev_shape=input_tensor_shape, prev_rank=prev_rank, - next_rank=next_rank, - up_group=up_group, - down_group=down_group) + next_rank=next_rank) return input_tensor @@ -270,13 +230,11 @@ def send_backward_recv_backward(input_tensor_grad, output_grad_shape, recv_next=True, prev_rank=None, - next_rank=None, - up_group=None, - down_group=None): + next_rank=None): """Batched communication operation. Sends the grad tensor to the previous member in pipeline, while recieves the grad tensor from the next member in pipeline. - + :param input_tensor_grad: Tensor to be sent :param output_grad_shape: The shape of the tensor to be recieved :type input_tensor_grad: Tensor @@ -288,9 +246,7 @@ def send_backward_recv_backward(input_tensor_grad, recv_next=recv_next, recv_next_shape=output_grad_shape, prev_rank=prev_rank, - next_rank=next_rank, - up_group=up_group, - down_group=down_group) + next_rank=next_rank) return output_tensor_grad @@ -301,13 +257,11 @@ def send_forward_backward_recv_forward_backward(output_tensor, recv_prev=True, recv_next=True, prev_rank=None, - next_rank=None, - up_group=None, - down_group=None): + next_rank=None): """Batched communication operation. Sends the input tensor to the next and the grad tensor to the previous, while recieves the grad tensor from the next and the input tensor from the previous. - + :param output_tensor: Tensor sent to the next :param input_tensor_grad: Tensor sent to the previous :param input_tensor_shape: The shape of the tensor recieved from the previous @@ -327,7 +281,5 @@ def send_forward_backward_recv_forward_backward(output_tensor, recv_prev_shape=input_tensor_shape, recv_next_shape=output_grad_shape, prev_rank=prev_rank, - next_rank=next_rank, - up_group=up_group, - down_group=down_group) + next_rank=next_rank) return input_tensor, output_tensor_grad diff --git a/colossalai/communication/utils.py b/colossalai/communication/utils.py index d6d7dc091e70..a8dc0da1afcd 100644 --- a/colossalai/communication/utils.py +++ b/colossalai/communication/utils.py @@ -6,7 +6,7 @@ from colossalai.utils import get_current_device -def send_tensor_meta(tensor, need_meta=True, down_group=None): +def send_tensor_meta(tensor, need_meta=True, next_rank=None): """Sends tensor meta information before sending a specific tensor. Since the recipient must know the shape of the tensor in p2p communications, meta information of the tensor should be sent before communications. This function @@ -14,31 +14,34 @@ def send_tensor_meta(tensor, need_meta=True, down_group=None): :param tensor: Tensor to be sent :param need_meta: If False, meta information won't be sent - :param down_group: Communication group including the next member in pipeline parallel group + :param next_rank: The rank of the next member in pipeline parallel group :type tensor: Tensor :type need_meta: bool, optional - :type down_group: ProcessGroup, optional + :type next_rank: int :return: False :rtype: bool """ if need_meta: - rank = gpc.get_global_rank() - - if down_group is None: - down_group = gpc.get_group(ParallelMode.PIPELINE_NEXT) + if next_rank is None: + next_rank = gpc.get_next_global_rank(ParallelMode.PIPELINE) tensor_kwargs = {'dtype': torch.long, 'device': get_current_device()} send_shape = torch.tensor(tensor.size(), **tensor_kwargs) send_ndims = torch.tensor(len(tensor.size()), **tensor_kwargs) - - dist.broadcast(send_ndims, src=rank, group=down_group) - dist.broadcast(send_shape, src=rank, group=down_group) + ops = [ + dist.P2POp(dist.isend, send_ndims, next_rank), + dist.P2POp(dist.isend, send_shape, next_rank) + ] + reqs = dist.batch_isend_irecv(ops) + for req in reqs: + req.wait() + torch.cuda.synchronize() return False -def recv_tensor_meta(tensor_shape, prev_rank=None, up_group=None): +def recv_tensor_meta(tensor_shape, prev_rank=None): """Recieves tensor meta information before recieving a specific tensor. Since the recipient must know the shape of the tensor in p2p communications, meta information of the tensor should be recieved before communications. This function @@ -46,27 +49,21 @@ def recv_tensor_meta(tensor_shape, prev_rank=None, up_group=None): :param tensor_shape: The shape of the tensor to be recieved :param prev_rank: The rank of the source of the tensor - :param up_group: Communication group including the previous member in pipeline parallel group :type tensor_shape: torch.Size :type prev_rank: int, optional - :type up_group: ProcessGroup, optional :return: The shape of the tensor to be recieved :rtype: torch.Size """ if tensor_shape is None: if prev_rank is None: - prev_rank = gpc.get_prev_global_rank( - ParallelMode.PIPELINE) - if up_group is None: - up_group = gpc.get_group(ParallelMode.PIPELINE_PREV) + prev_rank = gpc.get_prev_global_rank(ParallelMode.PIPELINE) tensor_kwargs = {'dtype': torch.long, 'device': get_current_device()} recv_ndims = torch.empty((), **tensor_kwargs) - dist.broadcast(recv_ndims, src=prev_rank, group=up_group) - + dist.recv(recv_ndims, prev_rank) recv_shape = torch.empty(recv_ndims, **tensor_kwargs) - dist.broadcast(recv_shape, src=prev_rank, group=up_group) + dist.recv(recv_shape, prev_rank) tensor_shape = torch.Size(recv_shape) diff --git a/colossalai/engine/schedule/_no_pipeline.py b/colossalai/engine/schedule/_no_pipeline.py index 2d0488125dfc..a495a52d434b 100644 --- a/colossalai/engine/schedule/_no_pipeline.py +++ b/colossalai/engine/schedule/_no_pipeline.py @@ -14,6 +14,7 @@ from typing import Iterable import torch + import torch.nn as nn from torch.optim import Optimizer diff --git a/colossalai/engine/schedule/_pipeline.py b/colossalai/engine/schedule/_pipeline.py index 6defea93d57a..7a1b5fdadef3 100644 --- a/colossalai/engine/schedule/_pipeline.py +++ b/colossalai/engine/schedule/_pipeline.py @@ -33,20 +33,22 @@ class PipelineSchedule(BaseSchedule): :param num_microbatches: The number of microbatches :param amp_type: The type of automatic mixed precision :param amp_config: The configuration of automatic mixed procision + :param sync_data: If set to `True`, will sync data every batch over pipeline stages :type num_microbatches: int :type amp_type: AMP_TYPE :type amp_config: dict + :type sync_data: bool """ def __init__(self, num_microbatches, amp_type: AMP_TYPE = None, - amp_config: dict = None): + amp_config: dict = None, + sync_data: bool = True): super().__init__() self.num_microbatches = num_microbatches - self.data_sync = True # close after making sure data is identical - + self.sync_data = sync_data # amp # LSGL: amp_config is not used, but leave here for future extension self.amp_type = amp_type @@ -67,30 +69,37 @@ def _move_to_device(self, data): return data def _sync_data(self): + reqs = [] if gpc.is_first_rank(ParallelMode.PIPELINE): src_rank = gpc.get_global_rank() - dist.broadcast( + reqs.append(dist.broadcast( tensor=self.batch_data, src=src_rank, - group=gpc.get_group(ParallelMode.PIPELINE_PREV) - ) - dist.broadcast( + group=gpc.get_group(ParallelMode.PIPELINE_PREV), + async_op=True + )) + reqs.append(dist.broadcast( tensor=self.batch_label, src=src_rank, - group=gpc.get_group(ParallelMode.PIPELINE_PREV) - ) + group=gpc.get_group(ParallelMode.PIPELINE_PREV), + async_op=True + )) if gpc.is_last_rank(ParallelMode.PIPELINE): src_rank = gpc.get_next_global_rank(ParallelMode.PIPELINE) - dist.broadcast( + reqs.append(dist.broadcast( tensor=self.batch_data, src=src_rank, - group=gpc.get_group(ParallelMode.PIPELINE_NEXT) - ) - dist.broadcast( + group=gpc.get_group(ParallelMode.PIPELINE_NEXT), + async_op=True + )) + reqs.append(dist.broadcast( tensor=self.batch_label, src=src_rank, - group=gpc.get_group(ParallelMode.PIPELINE_NEXT) - ) + group=gpc.get_group(ParallelMode.PIPELINE_NEXT), + async_op=True + )) + for req in reqs: + req.wait() # Pipeline schedule just puts data in memory def load_batch(self, data_iter): @@ -104,7 +113,7 @@ def load_batch(self, data_iter): assert batch_size % self.num_microbatches == 0, \ "Batch size should divided by the number of microbatches" self.microbatch_size = batch_size // self.num_microbatches - if self.data_sync: + if self.sync_data: self._sync_data() def _get_data_slice(self, tensor): @@ -149,6 +158,7 @@ def forward_step(self, model, criterion, input_tensor, return_tensors, input_tensor, label = self.load_micro_batch() loss_reduced = criterion(output_tensor, *label) \ / (self.num_microbatches * grad_accum_size) + return_tensors.append( tuple((output_tensor, label[0], loss_reduced))) return loss_reduced diff --git a/colossalai/nn/optimizer/_utils.py b/colossalai/nn/optimizer/_utils.py index 6cd92bb38c34..255b48ea9720 100644 --- a/colossalai/nn/optimizer/_utils.py +++ b/colossalai/nn/optimizer/_utils.py @@ -10,7 +10,7 @@ print('Colossalai should be built with cuda extension to use the FP16 optimizer') from ..multi_tensor_apply import multi_tensor_applier - +import torch.distributed as dist from colossalai.constants import IS_TENSOR_PARALLEL, TENSOR_PARALLEL_ATTRIBUTES from colossalai.context.parallel_mode import ParallelMode from colossalai.core import global_context as gpc @@ -83,11 +83,22 @@ def clip_grad_norm_fp32(parameters, max_norm, norm_type=2): if norm_type == inf: total_norm = max(p.grad.data.abs().max() for p in params) total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)]) - if gpc.is_initialized(ParallelMode.TENSOR): - # Take max across all model-parallel GPUs. - torch.distributed.all_reduce(total_norm_cuda, - op=torch.distributed.ReduceOp.MAX, - group=gpc.get_group(ParallelMode.TENSOR)) + ops = [] + # Take max across all model-parallel GPUs. + if gpc.is_initialized(ParallelMode.TENSOR) and gpc.get_world_size(ParallelMode.TENSOR) > 1: + ops.append(dist.all_reduce(total_norm_cuda, + op=dist.ReduceOp.MAX, + group=gpc.get_group( + ParallelMode.TENSOR), + async_op=True)) + if gpc.is_initialized(ParallelMode.PIPELINE) and gpc.get_world_size(ParallelMode.PIPELINE) > 1: + ops.append(dist.all_reduce(total_norm_cuda, + op=dist.ReduceOp.MAX, + group=gpc.get_group( + ParallelMode.PIPELINE), + async_op=True)) + for req in ops: + req.wait() total_norm = total_norm_cuda[0].item() else: tensor_parallel_grads = [] @@ -106,13 +117,17 @@ def clip_grad_norm_fp32(parameters, max_norm, norm_type=2): tensor_parallel_norm = _calc_lp(tensor_parallel_grads, norm_type) no_tensor_parallel_grads = _calc_lp( no_tensor_parallel_grads, norm_type) + # Sum across all model-parallel GPUs. if gpc.is_initialized(ParallelMode.TENSOR) and len(tensor_parallel_grads) > 0: - # Sum across all model-parallel GPUs. - torch.distributed.all_reduce(tensor_parallel_norm, - op=torch.distributed.ReduceOp.SUM, - group=gpc.get_group(ParallelMode.TENSOR)) - total_norm = (tensor_parallel_norm + - no_tensor_parallel_norm) ** (1.0 / norm_type) + dist.all_reduce(tensor_parallel_norm, + op=dist.ReduceOp.SUM, + group=gpc.get_group(ParallelMode.TENSOR)) + total_norm = tensor_parallel_norm + no_tensor_parallel_norm + if gpc.is_initialized(ParallelMode.PIPELINE) and gpc.get_world_size(ParallelMode.PIPELINE) > 1: + dist.all_reduce(total_norm, + op=dist.ReduceOp.SUM, + group=gpc.get_group(ParallelMode.PIPELINE)) + total_norm = total_norm ** (1.0 / norm_type) if type(total_norm) == 'torch.cuda.FloatTensor': total_norm = total_norm.item() @@ -147,9 +162,17 @@ def count_zeros_fp32(parameters): total_num_zeros = num_zeros + total_num_zeros # Sum across all model-parallel GPUs. - torch.distributed.all_reduce(total_num_zeros, - op=torch.distributed.ReduceOp.SUM, - group=gpc.get_group(ParallelMode.TENSOR)) + ops = [] + ops.append(dist.all_reduce(total_num_zeros, + op=dist.ReduceOp.SUM, + group=gpc.get_group(ParallelMode.TENSOR), + async_op=True)) + ops.append(dist.all_reduce(total_num_zeros, + op=dist.ReduceOp.SUM, + group=gpc.get_group(ParallelMode.PIPELINE), + async_op=True)) + for req in ops: + req.wait() total_num_zeros = total_num_zeros.item() return total_num_zeros diff --git a/colossalai/nn/optimizer/lamb.py b/colossalai/nn/optimizer/lamb.py index 68531e92a249..f7248bd68fe7 100644 --- a/colossalai/nn/optimizer/lamb.py +++ b/colossalai/nn/optimizer/lamb.py @@ -94,7 +94,7 @@ def step(self, closure=None): # * math.sqrt(bias_correction2) / bias_correction1 step_size = group['lr'] - weight_norm = p.data.pow(2).sum().sqrt().clamp(0, 10) + weight_norm = p.data.pow(2).sum().sqrt() adam_step = exp_avg / exp_avg_sq.sqrt().add(group['eps']) if group['weight_decay'] != 0: diff --git a/colossalai/trainer/hooks/_metric_hook.py b/colossalai/trainer/hooks/_metric_hook.py index cf345d0f8117..31004d90ec22 100644 --- a/colossalai/trainer/hooks/_metric_hook.py +++ b/colossalai/trainer/hooks/_metric_hook.py @@ -104,6 +104,7 @@ def after_test_iter(self, logits, label, *args): self.metric.update(logits, label) + @HOOKS.register_module class Accuracy2DHook(MetricHook): """Specialized hook class for :class:`Accuracy2D`. diff --git a/configs/vit/vit_2d.py b/configs/vit/vit_2d.py index b771b583e9d9..23ddc8d6cad8 100644 --- a/configs/vit/vit_2d.py +++ b/configs/vit/vit_2d.py @@ -144,7 +144,7 @@ parallel = dict( pipeline=dict(size=1), - tensor=dict(size=4, mode='2d'), + tensor=dict(size=1, mode='2d'), ) # for fp16 training diff --git a/docs/conf.py b/docs/conf.py index b0a57bdbc08b..695477e35fbe 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -24,6 +24,8 @@ # The full version, including alpha/beta/rc tags release = '0.0.1' +if 'SPHINX_LANG' in os.environ: + root_doc = f'index_{os.environ["SPHINX_LANG"]}' # -- General configuration --------------------------------------------------- # Add any Sphinx extension module names here, as strings. They can be diff --git a/docs/index.rst b/docs/index.rst index f9a6ce444a79..16141b5ead8e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -3,27 +3,27 @@ You can adapt this file completely to your liking, but it should at least contain the root `toctree` directive. -夸父AI系统(Colossal-AI)开发文档 +Colossal-AI documentation ====================================== .. toctree:: :maxdepth: 1 - :caption: 快速上手指南 + :caption: GETTING STARTED - installation_zh.md - run_demo_zh.md + installation.md + run_demo.md .. toctree:: :maxdepth: 1 - :caption: 个性化您的训练 - - parallelization_zh.md - model_zh.md - trainer_engine_zh.md - amp_zh.md - zero_zh.md - add_your_parallel_zh.md - config_zh.md + :caption: CUSTOMIZE YOUR TRAINING + + parallelization.md + model.md + trainer_engine.md + amp.md + zero.md + add_your_parallel.md + config.md diff --git a/docs/index_en.rst b/docs/index_zh.rst similarity index 62% rename from docs/index_en.rst rename to docs/index_zh.rst index 16141b5ead8e..f9a6ce444a79 100644 --- a/docs/index_en.rst +++ b/docs/index_zh.rst @@ -3,27 +3,27 @@ You can adapt this file completely to your liking, but it should at least contain the root `toctree` directive. -Colossal-AI documentation +夸父AI系统(Colossal-AI)开发文档 ====================================== .. toctree:: :maxdepth: 1 - :caption: GETTING STARTED + :caption: 快速上手指南 - installation.md - run_demo.md + installation_zh.md + run_demo_zh.md .. toctree:: :maxdepth: 1 - :caption: CUSTOMIZE YOUR TRAINING - - parallelization.md - model.md - trainer_engine.md - amp.md - zero.md - add_your_parallel.md - config.md + :caption: 个性化您的训练 + + parallelization_zh.md + model_zh.md + trainer_engine_zh.md + amp_zh.md + zero_zh.md + add_your_parallel_zh.md + config_zh.md diff --git a/examples/vit-b16/README.md b/examples/vit-b16/README.md new file mode 100644 index 000000000000..c28c7ed4477b --- /dev/null +++ b/examples/vit-b16/README.md @@ -0,0 +1,40 @@ +# Overview + +Here is an example of training ViT-B/16 on Imagenet-1K with batch size 32K. +We use 8x NVIDIA A100 GPU in this example. + +# How to run +Using [Slurm](https://slurm.schedmd.com/documentation.html): +```shell +srun python train_dali.py --local_rank=$SLURM_PROCID --world_size=$SLURM_NPROCS --host=$HOST --port=29500 --config=vit-b16.py +``` + +# Results + +![Loss Curve](./loss.jpeg) +![Accuracy](./acc.jpeg) + +# Details +`vit-b16.py` + +It is a [config file](https://colossalai.org/config.html), which is used by ColossalAI to define all kinds of training arguments, such as the model, dataset, and training method (optimizer, lr_scheduler, epoch, etc.). You can access config content by `gpc.config`. + +In this example, we train the ViT-Base patch 16 model 300 epochs on ImageNet-1K. The batch size is set to 32K through data parallel (4K on each GPU from 16x gradient accumulation with batch size 256). Since the batch size is very large than common usage, leading to convergence difficulties, we use a +large batch optimizer [LAMB](https://arxiv.org/abs/1904.00962), and we can scale the batch size to 32K with a little accuracy loss. The learning rate and weight decay of the optimizer are set to 1.8e-2 and 0.1, respectively. We use a linear warmup learning rate scheduler and warmup 150 epochs. +We introduce FP16 mixed precision to accelerate training and use gradient clipping to help convergence. +For simplicity and speed, we didn't apply `RandAug` and just used [Mixup](https://arxiv.org/abs/1710.09412) in data augmentation. + +If you have enough computing resources, you can expand this example conveniently with data parallel on a very large scale without gradient accumulation, and finish the training process even within one hour. + + +`imagenet_dali_dataloader.py` +To accelerate the training process, we use [DALI](https://github.com/NVIDIA/DALI) as data loader. Note that it requires the dataset in TFRecord format, avoiding read raw images which reduces efficiency of the file system. + +`train_dali.py` +We build the DALI data loader and train process using Colossal-AI here. + +`mixup.py` +Since we used Mixup, we define mixup loss in this file. + +`hooks.py` +We also define useful hooks to log information help debugging. \ No newline at end of file diff --git a/examples/vit-b16/acc.jpeg b/examples/vit-b16/acc.jpeg new file mode 100755 index 000000000000..43f67fd39167 Binary files /dev/null and b/examples/vit-b16/acc.jpeg differ diff --git a/examples/vit-b16/dataloader/__init__.py b/examples/vit-b16/dataloader/__init__.py new file mode 100755 index 000000000000..e69de29bb2d1 diff --git a/examples/vit-b16/dataloader/imagenet_dali_dataloader.py b/examples/vit-b16/dataloader/imagenet_dali_dataloader.py new file mode 100755 index 000000000000..a39d73e26c36 --- /dev/null +++ b/examples/vit-b16/dataloader/imagenet_dali_dataloader.py @@ -0,0 +1,112 @@ +from nvidia.dali.pipeline import Pipeline +from nvidia.dali.plugin.pytorch import DALIClassificationIterator, LastBatchPolicy +import nvidia.dali.fn as fn +import nvidia.dali.types as types +import nvidia.dali.tfrecord as tfrec +import torch +import numpy as np + + +class DaliDataloader(DALIClassificationIterator): + def __init__(self, + tfrec_filenames, + tfrec_idx_filenames, + shard_id=0, + num_shards=1, + batch_size=128, + num_threads=4, + resize=256, + crop=224, + prefetch=2, + training=True, + gpu_aug=False, + cuda=True, + mixup_alpha=0.0): + self.mixup_alpha = mixup_alpha + self.training = training + pipe = Pipeline(batch_size=batch_size, + num_threads=num_threads, + device_id=torch.cuda.current_device() if cuda else None, + seed=1024) + with pipe: + inputs = fn.readers.tfrecord( + path=tfrec_filenames, + index_path=tfrec_idx_filenames, + random_shuffle=training, + shard_id=shard_id, + num_shards=num_shards, + initial_fill=10000, + read_ahead=True, + prefetch_queue_depth=prefetch, + name='Reader', + features={ + 'image/encoded': tfrec.FixedLenFeature((), tfrec.string, ""), + 'image/class/label': tfrec.FixedLenFeature([1], tfrec.int64, -1), + }) + images = inputs["image/encoded"] + + if training: + images = fn.decoders.image(images, + device='mixed' if gpu_aug else 'cpu', + output_type=types.RGB) + images = fn.random_resized_crop(images, + size=crop, + device='gpu' if gpu_aug else 'cpu') + flip_lr = fn.random.coin_flip(probability=0.5) + else: + # decode jpeg and resize + images = fn.decoders.image(images, + device='mixed' if gpu_aug else 'cpu', + output_type=types.RGB) + images = fn.resize(images, + device='gpu' if gpu_aug else 'cpu', + resize_x=resize, + resize_y=resize, + dtype=types.FLOAT, + interp_type=types.INTERP_TRIANGULAR) + flip_lr = False + + # center crop and normalise + images = fn.crop_mirror_normalize(images, + dtype=types.FLOAT, + crop=(crop, crop), + mean=[127.5], + std=[127.5], + mirror=flip_lr) + label = inputs["image/class/label"] - 1 # 0-999 + # LSG: element_extract will raise exception, let's flatten outside + # label = fn.element_extract(label, element_map=0) # Flatten + if cuda: # transfer data to gpu + pipe.set_outputs(images.gpu(), label.gpu()) + else: + pipe.set_outputs(images, label) + + pipe.build() + last_batch_policy = 'DROP' if training else 'PARTIAL' + super().__init__(pipe, reader_name="Reader", + auto_reset=True, + last_batch_policy=last_batch_policy) + + def __iter__(self): + # if not reset (after an epoch), reset; if just initialize, ignore + if self._counter >= self._size or self._size < 0: + self.reset() + return self + + def __next__(self): + data = super().__next__() + img, label = data[0]['data'], data[0]['label'] + label = label.squeeze() + if self.mixup_alpha > 0.0: + if self.training: + lam = np.random.beta(self.mixup_alpha, self.mixup_alpha) + idx = torch.randperm(img.size(0)).to(img.device) + img = lam * img + (1 - lam) * img[idx, :] + label_a, label_b = label, label[idx] + lam = torch.tensor([lam], device=img.device, dtype=img.dtype) + label = (label_a, label_b, lam) + else: + label = (label, label, torch.ones( + 1, device=img.device, dtype=img.dtype)) + return (img,), label + return (img,), (label,) diff --git a/examples/vit-b16/hooks.py b/examples/vit-b16/hooks.py new file mode 100644 index 000000000000..b6c306ed7184 --- /dev/null +++ b/examples/vit-b16/hooks.py @@ -0,0 +1,15 @@ +from colossalai.registry import HOOKS +from colossalai.trainer import BaseHook +from colossalai.core import global_context as gpc +from colossalai.context import ParallelMode + + +@HOOKS.register_module +class TotalBatchsizeHook(BaseHook): + def __init__(self, trainer, priority: int = 2) -> None: + super().__init__(trainer, priority) + + def before_train(self): + total_batch_size = gpc.config.BATCH_SIZE * \ + gpc.config.engine.gradient_accumulation * gpc.get_world_size(ParallelMode.DATA) + self.logger.info(f'Total batch size = {total_batch_size}', ranks=[0]) diff --git a/examples/vit-b16/loss.jpeg b/examples/vit-b16/loss.jpeg new file mode 100755 index 000000000000..a16c333cc8e9 Binary files /dev/null and b/examples/vit-b16/loss.jpeg differ diff --git a/examples/vit-b16/mixup.py b/examples/vit-b16/mixup.py new file mode 100644 index 000000000000..822bc8659df0 --- /dev/null +++ b/examples/vit-b16/mixup.py @@ -0,0 +1,12 @@ +import torch.nn as nn +from colossalai.registry import LOSSES + +@LOSSES.register_module +class MixupLoss(nn.Module): + def __init__(self, loss_fn_cls): + super().__init__() + self.loss_fn = loss_fn_cls() + + def forward(self, inputs, *args): + targets_a, targets_b, lam = args + return lam * self.loss_fn(inputs, targets_a) + (1 - lam) * self.loss_fn(inputs, targets_b) diff --git a/examples/vit-b16/train_dali.py b/examples/vit-b16/train_dali.py new file mode 100644 index 000000000000..fed39c3cc3f8 --- /dev/null +++ b/examples/vit-b16/train_dali.py @@ -0,0 +1,70 @@ +import glob +import os +import colossalai +from colossalai.context import ParallelMode +from colossalai.core import global_context as gpc +from colossalai.logging import get_global_dist_logger +from colossalai.trainer import Trainer +from colossalai.utils import set_global_multitimer_status +from dataloader.imagenet_dali_dataloader import DaliDataloader + + +def build_dali_train(): + root = gpc.config.dali.root + train_pat = os.path.join(root, 'train/*') + train_idx_pat = os.path.join(root, 'idx_files/train/*') + return DaliDataloader( + sorted(glob.glob(train_pat)), + sorted(glob.glob(train_idx_pat)), + batch_size=gpc.config.BATCH_SIZE, + shard_id=gpc.get_local_rank(ParallelMode.DATA), + num_shards=gpc.get_world_size(ParallelMode.DATA), + training=True, + gpu_aug=gpc.config.dali.gpu_aug, + cuda=True, + mixup_alpha=gpc.config.dali.mixup_alpha + ) + + +def build_dali_test(): + root = gpc.config.dali.root + val_pat = os.path.join(root, 'validation/*') + val_idx_pat = os.path.join(root, 'idx_files/validation/*') + return DaliDataloader( + sorted(glob.glob(val_pat)), + sorted(glob.glob(val_idx_pat)), + batch_size=gpc.config.BATCH_SIZE, + shard_id=gpc.get_local_rank(ParallelMode.DATA), + num_shards=gpc.get_world_size(ParallelMode.DATA), + training=False, + # gpu_aug=gpc.config.dali.gpu_aug, + gpu_aug=False, + cuda=True, + mixup_alpha=gpc.config.dali.mixup_alpha + ) + + +def main(): + engine, train_dataloader, test_dataloader = colossalai.initialize( + train_dataloader=build_dali_train, + test_dataloader=build_dali_test + ) + logger = get_global_dist_logger() + set_global_multitimer_status(True) + timer = colossalai.utils.get_global_multitimer() + trainer = Trainer(engine=engine, + verbose=True, + timer=timer) + + trainer.fit( + train_dataloader=train_dataloader, + test_dataloader=test_dataloader, + epochs=gpc.config.NUM_EPOCHS, + hooks_cfg=gpc.config.hooks, + display_progress=True, + test_interval=1 + ) + + +if __name__ == '__main__': + main() diff --git a/examples/vit-b16/vit-b16.py b/examples/vit-b16/vit-b16.py new file mode 100755 index 000000000000..ac51e226ef81 --- /dev/null +++ b/examples/vit-b16/vit-b16.py @@ -0,0 +1,78 @@ +from colossalai.engine import AMP_TYPE +from torch.nn import CrossEntropyLoss +from mixup import MixupLoss +from hooks import TotalBatchsizeHook +from colossalai.registry import MODELS +from timm.models import vit_base_patch16_224 + +MODELS.register_module(vit_base_patch16_224) + +LOG_NAME = 'vit-b16-1k-32k-mixup-light2' +# ViT Base +BATCH_SIZE = 256 +DROP_RATE = 0.1 +NUM_EPOCHS = 300 + +parallel = dict( + pipeline=dict(size=1), + tensor=dict(size=1, mode=None), +) + +optimizer = dict( + type='Lamb', + lr=1.8e-2, + weight_decay=0.1, +) + + +loss = dict( + type='MixupLoss', + loss_fn_cls=CrossEntropyLoss +) + +model = dict( + type='vit_base_patch16_224', + drop_rate=DROP_RATE, +) + +hooks = [ + dict(type='LogMetricByEpochHook'), + dict(type='AccuracyHook'), + dict(type='LossHook'), + dict(type='TotalBatchsizeHook'), + dict(type='TensorboardHook', log_dir=f'./tb_logs/{LOG_NAME}'), + dict(type='SaveCheckpointHook', interval=1, + checkpoint_dir=f'./ckpt/{LOG_NAME}'), + # dict(type='LoadCheckpointHook', epoch=10, + # checkpoint_dir=f'./ckpt/{LOG_NAME}'), + dict( + type='LRSchedulerHook', + by_epoch=True, + lr_scheduler_cfg=dict( + type='LinearWarmupLR', + warmup_steps=150 + ) + ), +] + +fp16 = dict( + mode=AMP_TYPE.TORCH, +) + + +logging = dict( + root_path=f"./logs/{LOG_NAME}" +) + +dali = dict( + root='./dataset/ILSVRC2012_1k', + gpu_aug=True, + mixup_alpha=0.2 +) + +engine = dict( + schedule=None, + gradient_handlers=None, + gradient_accumulation=16, + gradient_clipping=1.0, +) diff --git a/setup.py b/setup.py index 9949e9eeadc2..8541b0a6ce3a 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,6 @@ import os import subprocess import sys -import warnings import torch from setuptools import setup, find_packages @@ -23,13 +22,36 @@ def get_cuda_bare_metal_version(cuda_dir): return raw_output, bare_metal_major, bare_metal_minor +def check_cuda_torch_binary_vs_bare_metal(cuda_dir): + raw_output, bare_metal_major, bare_metal_minor = get_cuda_bare_metal_version( + cuda_dir) + torch_binary_major = torch.version.cuda.split(".")[0] + torch_binary_minor = torch.version.cuda.split(".")[1] + + print("\nCompiling cuda extensions with") + print(raw_output + "from " + cuda_dir + "/bin\n") + + if (bare_metal_major != torch_binary_major) or (bare_metal_minor != torch_binary_minor): + raise RuntimeError("Cuda extensions are being compiled with a version of Cuda that does " + + "not match the version used to compile Pytorch binaries. " + + "Pytorch binaries were compiled with Cuda {}.\n".format(torch.version.cuda) + + "In some cases, a minor-version mismatch will not cause later errors: " + + "https://github.com/NVIDIA/apex/pull/323#discussion_r287021798. " + "You can try commenting out this check (at your own risk).") + + +def fetch_requirements(path): + with open(path, 'r') as fd: + return [r.strip() for r in fd.readlines()] + + if not torch.cuda.is_available(): # https://github.com/NVIDIA/apex/issues/486 # Extension builds after https://github.com/pytorch/pytorch/pull/23408 attempt to query torch.cuda.get_device_capability(), # which will fail if you are compiling in an environment without visible GPUs (e.g. during an nvidia-docker build command). print('\nWarning: Torch did not find available GPUs on this system.\n', 'If your intention is to cross-compile, this is not an error.\n' - 'By default, Apex will cross-compile for Pascal (compute capabilities 6.0, 6.1, 6.2),\n' + 'By default, Colossal-AI will cross-compile for Pascal (compute capabilities 6.0, 6.1, 6.2),\n' 'Volta (compute capability 7.0), Turing (compute capability 7.5),\n' 'and, if the CUDA version is >= 11.0, Ampere (compute capability 8.0).\n' 'If you wish to cross-compile for a single specific architecture,\n' @@ -46,66 +68,12 @@ def get_cuda_bare_metal_version(cuda_dir): TORCH_MINOR = int(torch.__version__.split('.')[1]) if TORCH_MAJOR == 0 and TORCH_MINOR < 4: - raise RuntimeError("Apex requires Pytorch 0.4 or newer.\n" + + raise RuntimeError("Colossal-AI requires Pytorch 0.4 or newer.\n" + "The latest stable release can be obtained from https://pytorch.org/") cmdclass = {} ext_modules = [] -extras = {} -if "--pyprof" in sys.argv: - string = "\n\nPyprof has been moved to its own dedicated repository and will " + \ - "soon be removed from Apex. Please visit\n" + \ - "https://github.com/NVIDIA/PyProf\n" + \ - "for the latest version." - warnings.warn(string, DeprecationWarning) - with open('requirements.txt') as f: - required_packages = f.read().splitlines() - extras['pyprof'] = required_packages - try: - sys.argv.remove("--pyprof") - except: - pass -else: - warnings.warn( - "Option --pyprof not specified. Not installing PyProf dependencies!") - -if "--cuda_ext" in sys.argv: - if TORCH_MAJOR == 0: - raise RuntimeError("--cuda_ext requires Pytorch 1.0 or later, " - "found torch.__version__ = {}".format(torch.__version__)) - - -def get_cuda_bare_metal_version(cuda_dir): - raw_output = subprocess.check_output( - [cuda_dir + "/bin/nvcc", "-V"], universal_newlines=True) - output = raw_output.split() - release_idx = output.index("release") + 1 - release = output[release_idx].split(".") - bare_metal_major = release[0] - bare_metal_minor = release[1][0] - - return raw_output, bare_metal_major, bare_metal_minor - - -def check_cuda_torch_binary_vs_bare_metal(cuda_dir): - raw_output, bare_metal_major, bare_metal_minor = get_cuda_bare_metal_version( - cuda_dir) - torch_binary_major = torch.version.cuda.split(".")[0] - torch_binary_minor = torch.version.cuda.split(".")[1] - - print("\nCompiling cuda extensions with") - print(raw_output + "from " + cuda_dir + "/bin\n") - - if (bare_metal_major != torch_binary_major) or (bare_metal_minor != torch_binary_minor): - raise RuntimeError("Cuda extensions are being compiled with a version of Cuda that does " + - "not match the version used to compile Pytorch binaries. " + - "Pytorch binaries were compiled with Cuda {}.\n".format(torch.version.cuda) + - "In some cases, a minor-version mismatch will not cause later errors: " + - "https://github.com/NVIDIA/apex/pull/323#discussion_r287021798. " - "You can try commenting out this check (at your own risk).") - - # Set up macros for forward/backward compatibility hack around # https://github.com/pytorch/pytorch/commit/4404762d7dd955383acee92e6f06b48144a0742e # and @@ -123,6 +91,10 @@ def check_cuda_torch_binary_vs_bare_metal(cuda_dir): version_dependent_macros = version_ge_1_1 + version_ge_1_3 + version_ge_1_5 if "--cuda_ext" in sys.argv: + if TORCH_MAJOR == 0: + raise RuntimeError("--cuda_ext requires Pytorch 1.0 or later, " + "found torch.__version__ = {}".format(torch.__version__)) + sys.argv.remove("--cuda_ext") if CUDA_HOME is None: @@ -145,17 +117,6 @@ def check_cuda_torch_binary_vs_bare_metal(cuda_dir): # '--resource-usage', '--use_fast_math'] + version_dependent_macros})) -# Check, if ATen/CUDAGenerator.h is found, otherwise use the new ATen/CUDAGeneratorImpl.h, due to breaking change in https://github.com/pytorch/pytorch/pull/36026 -generator_flag = [] -torch_dir = torch.__path__[0] -if os.path.exists(os.path.join(torch_dir, 'include', 'ATen', 'CUDAGenerator.h')): - generator_flag = ['-DOLD_GENERATOR'] - - -def fetch_requirements(path): - with open(path, 'r') as fd: - return [r.strip() for r in fd.readlines()] - install_requires = fetch_requirements('requirements/requirements.txt') @@ -170,6 +131,5 @@ def fetch_requirements(path): description='An integrated large-scale model training system with efficient parallelization techniques', ext_modules=ext_modules, cmdclass={'build_ext': BuildExtension} if ext_modules else {}, - extras_require=extras, install_requires=install_requires, )