From decf2e8ccbc222f4d87553213ce9f4f15e6d6177 Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Fri, 19 Jul 2024 15:08:23 +0800 Subject: [PATCH 01/18] support fp8_communication in the Torch DDP grad comm, FSDP grad comm, and FSDP params comm --- colossalai/booster/plugin/torch_ddp_plugin.py | 6 + .../booster/plugin/torch_fsdp_plugin.py | 7 + colossalai/quantization/fp8.py | 251 +++++++++++++++++- 3 files changed, 252 insertions(+), 12 deletions(-) diff --git a/colossalai/booster/plugin/torch_ddp_plugin.py b/colossalai/booster/plugin/torch_ddp_plugin.py index 5116446a4295..7986fcc3724e 100644 --- a/colossalai/booster/plugin/torch_ddp_plugin.py +++ b/colossalai/booster/plugin/torch_ddp_plugin.py @@ -177,6 +177,7 @@ def __init__( check_reduction: bool = False, gradient_as_bucket_view: bool = False, static_graph: bool = False, + fp8_communication: bool = False, ) -> None: super().__init__() self.ddp_kwargs = dict( @@ -187,6 +188,7 @@ def __init__( gradient_as_bucket_view=gradient_as_bucket_view, static_graph=static_graph, ) + self.fp8_communication = fp8_communication def support_no_sync(self) -> bool: return True @@ -226,6 +228,10 @@ def configure( if optimizer is not None and not isinstance(optimizer, OptimizerWrapper): optimizer = OptimizerWrapper(optimizer) + if self.fp8_communication: + from colossalai.quantization.fp8 import fp8_compress_ddp_grad_comm_hook_async + model.module.register_comm_hook(None, fp8_compress_ddp_grad_comm_hook_async) + return model, optimizer, criterion, dataloader, lr_scheduler def control_checkpoint_io(self) -> bool: diff --git a/colossalai/booster/plugin/torch_fsdp_plugin.py b/colossalai/booster/plugin/torch_fsdp_plugin.py index cd2f9e84018a..0ec8435e111c 100644 --- a/colossalai/booster/plugin/torch_fsdp_plugin.py +++ b/colossalai/booster/plugin/torch_fsdp_plugin.py @@ -298,6 +298,7 @@ def __init__( ignored_modules: Optional[Iterable[torch.nn.Module]] = None, param_init_fn: Optional[Callable[[nn.Module], None]] = None, sync_module_states: bool = False, + fp8_communication: bool = False, ): super().__init__() self.fsdp_kwargs = dict( @@ -311,6 +312,7 @@ def __init__( param_init_fn=param_init_fn, sync_module_states=sync_module_states, ) + self.fp8_communication = fp8_communication else: raise RuntimeError("FSDP is not supported while torch version under 1.12.0.") @@ -347,6 +349,11 @@ def configure( # wrap the model with PyTorch FSDP fsdp_model = TorchFSDPModel(model, device_id=torch.cuda.current_device(), **self.fsdp_kwargs) + if self.fp8_communication: + from colossalai.quantization.fp8 import fp8_compress_fsdp_grad_comm_hook + fsdp_model.module.register_comm_hook(None, fp8_compress_fsdp_grad_comm_hook) + + if optimizer is not None: if len(optimizer.param_groups) > 1: warnings.warn( diff --git a/colossalai/quantization/fp8.py b/colossalai/quantization/fp8.py index c02223331163..e60c99354e53 100644 --- a/colossalai/quantization/fp8.py +++ b/colossalai/quantization/fp8.py @@ -1,4 +1,4 @@ -from typing import Any, Callable, List, Optional, Tuple, Union +from typing import Any, Callable, List, Optional, Tuple, Union, cast import torch import torch.distributed as dist @@ -58,7 +58,7 @@ def cast_from_fp8(inp: torch.Tensor, scale_inv: torch.Tensor, ret_type: torch.dt return ret.to(ret_type) -def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3") -> None: +def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3", group=None) -> None: r""" This is an in-place operation for compressed all_reduce using fp8. It works like dist.all_reduce but during communication the data is cast to fp8 format. @@ -71,16 +71,16 @@ def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3") -> None: None """ - world_size = dist.get_world_size() + world_size = dist.get_world_size(group=group) input_type = tensor.dtype input_shape = tensor.shape input_device = tensor.device input_size = tensor.numel() - tensor = tensor.flatten() + tensor_1d = tensor.flatten() fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 - ret, scale = cast_to_fp8(tensor, fp8_format=fp8_format) + ret, scale = cast_to_fp8(tensor_1d, fp8_format=fp8_format) inp = ret.view(torch.uint8) input_chunks = list(torch.chunk(inp, world_size, dim=0)) @@ -88,23 +88,24 @@ def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3") -> None: output_chunks = [torch.empty_like(input_chunks[-1]) for _ in range(world_size)] else: output_chunks = [torch.empty_like(input_chunks[0]) for _ in range(world_size)] - dist.all_to_all(output_chunks, input_chunks) + dist.all_to_all(output_chunks, input_chunks, group=group) scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] - dist.all_gather(scale_list, scale) + dist.all_gather(scale_list, scale, group=group) summed_out = torch.zeros_like(output_chunks[0]).to(input_type) for scale, out in zip(scale_list, output_chunks): out = out.view(fp8_type) summed_out += cast_from_fp8(out, scale, input_type) + summed_out.div_(world_size) summed_out_fp8, scale = cast_to_fp8(summed_out, fp8_format=fp8_format) - dist.all_gather(scale_list, scale) + dist.all_gather(scale_list, scale, group=group) tensor_list = list(torch.chunk(torch.empty(input_size, device=input_device, dtype=torch.uint8), world_size, dim=0)) - dist.all_gather(tensor_list, summed_out_fp8.view(torch.uint8)) + dist.all_gather(tensor_list, summed_out_fp8.view(torch.uint8), group=group) for i in range(world_size): tensor_list[i] = tensor_list[i].view(fp8_type).to(input_type) * scale_list[i] - tensor_out = torch.cat(tensor_list, dim=0) - tensor.data = tensor_out.view(input_shape).to(input_type) + out = torch.cat(tensor_list, dim=0) + tensor.copy_(out.view(input_shape).to(input_type)) @@ -171,4 +172,230 @@ def cast_from_fp8_pipeline(inp: Any, del_metadata=True) -> None: inp_tensor.data = inp_tensor.data.view(fp8_type).to(torch.float16) * scale if del_metadata: - del inp["fp8_scale"] \ No newline at end of file + del inp["fp8_scale"] + + +def reduce_scatter_fp8(output: torch.Tensor, input_list, group, fp8_format="e5m2") -> None: + r""" + This is an in-place operation for compressed reduce_scatter using fp8. + It works like dist.reduce_scatter but during communication the data is cast to fp8 format. + + Args: + tensor: torch.Tensor in fp32, fp16, bf16 datatype. + fp8_format: e4m3 or e5m2 + + Returns: + None + """ + + input_type = output.dtype + + fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 + scale_list = [] + cast_input_list = [] + output_chunks = [] + output_scale_list = [] + for input in input_list: + ret, scale = cast_to_fp8(input, fp8_format=fp8_format) + scale_list.append(scale) + ret = ret.view(torch.uint8) + cast_input_list.append(ret) + output_chunks.append(torch.empty_like(ret)) + output_scale_list.append(torch.empty_like(scale)) + dist.all_to_all(output_chunks, cast_input_list, group=group) + dist.all_to_all(output_scale_list, scale_list, group=group) + + summed_out = torch.zeros_like(output_chunks[0]).to(input_type) + for scale, out in zip(output_scale_list, output_chunks): + out = out.view(fp8_type) + summed_out += cast_from_fp8(out, scale, input_type) + output.data = summed_out + + +def fp8_compress_ddp_grad_comm_hook_async( + process_group: dist.ProcessGroup, + bucket: dist.GradBucket, +) -> torch.futures.Future[torch.Tensor]: + """ + Compress by casting ``GradBucket`` to FP8 floating-point format divided by process group size. + + This DDP communication hook implements a simple gradient compression approach that casts ``GradBucket`` tensor + to FP8 floating-point format (``torch.float8_e5m2`` or ``torch.bfloat16_e4m3``), and then divides it + by the process group size. + Once compressed gradient tensors are allreduced, the chained callback ``decompress`` casts it back + to the input data type (such as ``float32``). + + Example:: + >>> ddp_model.register_comm_hook(process_group, fp8_compress_ddp_grad_comm_hook_async) + """ + group_to_use = process_group if process_group is not None else dist.group.WORLD + + input_tensor = bucket.buffer() + fp8_format = "e4m3" + + world_size = dist.get_world_size() + input_type = input_tensor.dtype + input_device = input_tensor.device + input_size = input_tensor.numel() + tensor_1d = input_tensor.flatten() + + if input_size % world_size != 0: + padding_tensor = torch.zeros(world_size - input_size % world_size, dtype=input_tensor.dtype, device=input_device) + tensor_1d = torch.cat([padding_tensor, tensor_1d], dim=0) + input_size = tensor_1d.numel() + + fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 + ret, scale = cast_to_fp8(tensor_1d, fp8_format=fp8_format) + + inp = ret.view(torch.uint8) + input_chunks = list(torch.chunk(inp, world_size, dim=0)) + if dist.get_rank() == world_size - 1: + output_chunks = [torch.empty_like(input_chunks[-1]) for _ in range(world_size)] + else: + output_chunks = [torch.empty_like(input_chunks[0]) for _ in range(world_size)] + + scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] + output_chunks_single = torch.cat(output_chunks, dim=0) + inp_split_sizes = [t.numel() for t in input_chunks] + out_split_sizes = [t.numel() for t in output_chunks] + fut0 = dist.all_to_all_single(output_chunks_single, inp, + output_split_sizes=out_split_sizes, + input_split_sizes=inp_split_sizes, + group=group_to_use, + async_op=True).get_future() + + fut1 = dist.all_gather_into_tensor(torch.cat(scale_list, dim=0), scale, + group=group_to_use, + async_op=True).get_future() + all_to_all_fut = torch.futures.collect_all([fut0, fut1]) + + def sum_and_allgather(fut): + output_chunks_single = fut.value()[0].wait()[0] + scale_list_single = fut.value()[1].wait()[0] + + output_chunks = list(torch.chunk(output_chunks_single, world_size, dim=0)) + scale_list = scale_list_single.chunk(world_size, dim=0) + + summed_out = torch.zeros_like(output_chunks[0]).to(input_type) + for scale, out in zip(scale_list, output_chunks): + out = out.view(fp8_type) + summed_out += cast_from_fp8(out, scale, input_type) + summed_out.div_(world_size) + + summed_out_fp8, scale = cast_to_fp8(summed_out, fp8_format=fp8_format) + + + tensor_list_single = torch.empty(input_size, device=input_device, dtype=torch.uint8) + fut2 = dist.all_gather_into_tensor(tensor_list_single, summed_out_fp8.view(torch.uint8), group=group_to_use, + async_op=True).get_future() + + scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] + fut3 = dist.all_gather_into_tensor(torch.cat(scale_list, dim=0), scale, group=group_to_use, + async_op=True).get_future() + fut_combined2 = torch.futures.collect_all([fut2, fut3]) + return fut_combined2 + def decompress(fut): + tensor_list_single = fut.value().wait()[0].value()[0] + scale_list_single = fut.value().wait()[1].value()[0] + + tensor_list = list(torch.chunk(tensor_list_single, world_size, dim=0)) + scale_list = scale_list_single.chunk(world_size, dim=0) + + for i in range(world_size): + tensor_list[i] = tensor_list[i].view(fp8_type).to(input_type) * scale_list[i] + out = torch.cat(tensor_list, dim=0) + + input_tensor_size = input_tensor.numel() + input_shape = input_tensor.shape + out = out[:input_tensor_size] + + input_tensor.copy_(out.view(input_shape).to(input_type)) + return input_tensor + + return all_to_all_fut.then(sum_and_allgather).then(decompress) + + + + +def fp8_compress_ddp_grad_comm_hook_sync( + process_group: dist.ProcessGroup, + bucket: dist.GradBucket, +) -> torch.futures.Future[torch.Tensor]: + """ + Return a future that wraps the input, after the input is allreduced. However, the allreduce commnunication is synchronized. + This breaks the overlapping between allreduce communication and backward compuation. + + This hook should **only** be used for debugging purposes, instead of the normal gradient synchronization. + For asynchronized implementation, use fp8_compress_ddp_grad_comm_hook_async instead. + + Example:: + >>> # xdoctest: +SKIP + >>> ddp_model.register_comm_hook(None, fp8_compress_ddp_grad_comm_hook_sync) + """ + + buffer = bucket.buffer() + all_reduce_fp8(buffer, fp8_format="e4m3") + + fut: torch.futures.Future[torch.Tensor] = torch.futures.Future() + fut.set_result(bucket.buffer()) + + return fut + + +def fp8_compress_fsdp_grad_comm_hook(state: object, unsharded_gradient_flattened: torch.Tensor, + sharded_gradient: torch.Tensor, group=None, fp8_format="e5m2") -> None: + """ + This communication hook implements a simple gradient compression approach that casts unsharded_gradient_flattened tensor + to FP8 floating-point format (``torch.float8_e5m2`` or ``torch.bfloat16_e4m3``), and then perform scatter_allreduce logic + by using all_to_all and all_gather among the process group. + + Example:: + >>> fsdp_model.register_comm_hook(None, fp8_compress_fsdp_grad_comm_hook) + """ + grad = unsharded_gradient_flattened + fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 + input_type = grad.dtype + input_device = grad.device + world_size = dist.get_world_size(group=group) + + grad_fp8, scale = cast_to_fp8(grad, fp8_format=fp8_format) + uint8_buffer = torch.empty_like(grad_fp8).view(torch.uint8) + dist.all_to_all_single(uint8_buffer, grad_fp8.view(torch.uint8), group=group) + + scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] + dist.all_gather(scale_list, scale, group=group) + + buffer_list = list(torch.chunk(uint8_buffer.view(fp8_type), world_size, dim=0)) + sharded_gradient.zero_() + for tensor, scale in zip(buffer_list, scale_list): + sharded_gradient += cast_from_fp8(tensor, scale, input_type) + + +def fp8_compress_fsdp_params_comm_hook(state: object, padded_unsharded_flat_param: torch.Tensor, + sharded_flat_param: torch.Tensor, group=None, fp8_format="e5m2") -> None: + """ + This hook is pending the official support for parameters communication hook in FSDP, e.g. register_params_comm_hook. + + Example:: + >>> fsdp_model.register_params_comm_hook(None, fp8_compress_fsdp_params_comm_hook) + """ + + fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 + fp8_max = torch.finfo(fp8_type).max + inp = sharded_flat_param + out = padded_unsharded_flat_param + + per_tensor_max = inp.abs().max().float() + per_tensor_max = torch.where(per_tensor_max > 0, per_tensor_max, 1.0) + dist.all_reduce(per_tensor_max, op=torch.distributed.ReduceOp.MAX, group=group) + + scale = fp8_max / per_tensor_max + fp8_sharded_flat_param = (scale * inp.float()).to(fp8_type).view(torch.uint8) + + fp8_out = torch.empty(out.shape, dtype=torch.uint8, device=out.device) + dist.all_gather_into_tensor( + fp8_out, + fp8_sharded_flat_param, + group=group, + ) + padded_unsharded_flat_param.copy_((fp8_out.view(fp8_type).float() / scale).to(out.dtype)) \ No newline at end of file From 70e3f8bdcf339d23f16db96626459ca226e4179f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 26 Jul 2024 06:41:59 +0000 Subject: [PATCH 02/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- colossalai/booster/plugin/torch_ddp_plugin.py | 1 + colossalai/booster/plugin/torch_fsdp_plugin.py | 2 +- colossalai/quantization/fp8.py | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/colossalai/booster/plugin/torch_ddp_plugin.py b/colossalai/booster/plugin/torch_ddp_plugin.py index 7986fcc3724e..34caa2f684ba 100644 --- a/colossalai/booster/plugin/torch_ddp_plugin.py +++ b/colossalai/booster/plugin/torch_ddp_plugin.py @@ -230,6 +230,7 @@ def configure( if self.fp8_communication: from colossalai.quantization.fp8 import fp8_compress_ddp_grad_comm_hook_async + model.module.register_comm_hook(None, fp8_compress_ddp_grad_comm_hook_async) return model, optimizer, criterion, dataloader, lr_scheduler diff --git a/colossalai/booster/plugin/torch_fsdp_plugin.py b/colossalai/booster/plugin/torch_fsdp_plugin.py index 0ec8435e111c..47adf4d057a0 100644 --- a/colossalai/booster/plugin/torch_fsdp_plugin.py +++ b/colossalai/booster/plugin/torch_fsdp_plugin.py @@ -351,8 +351,8 @@ def configure( if self.fp8_communication: from colossalai.quantization.fp8 import fp8_compress_fsdp_grad_comm_hook - fsdp_model.module.register_comm_hook(None, fp8_compress_fsdp_grad_comm_hook) + fsdp_model.module.register_comm_hook(None, fp8_compress_fsdp_grad_comm_hook) if optimizer is not None: if len(optimizer.param_groups) > 1: diff --git a/colossalai/quantization/fp8.py b/colossalai/quantization/fp8.py index 65c72a05f462..5eb9648955b5 100644 --- a/colossalai/quantization/fp8.py +++ b/colossalai/quantization/fp8.py @@ -470,4 +470,3 @@ def all_gather_into_tensor_flat_fp8( valid_buffer = buffer[:numel].reshape(output_shape) valid_buffer = cast_from_fp8(valid_buffer, scale_inv, input_type) output_tensor[:numel].copy_(valid_buffer.view(-1)) - From 5a5f1b00d8ba0819b32abd2915eed76f3fcaac33 Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Fri, 26 Jul 2024 14:54:53 +0800 Subject: [PATCH 03/18] implement communication hook for FSDP params all-gather --- .../booster/plugin/torch_fsdp_plugin.py | 6 + colossalai/quantization/utils.py | 125 ++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 colossalai/quantization/utils.py diff --git a/colossalai/booster/plugin/torch_fsdp_plugin.py b/colossalai/booster/plugin/torch_fsdp_plugin.py index 0ec8435e111c..734a5a6f4356 100644 --- a/colossalai/booster/plugin/torch_fsdp_plugin.py +++ b/colossalai/booster/plugin/torch_fsdp_plugin.py @@ -350,6 +350,12 @@ def configure( fsdp_model = TorchFSDPModel(model, device_id=torch.cuda.current_device(), **self.fsdp_kwargs) if self.fp8_communication: + from colossalai.quantization.utils import patch_fsdp_params_comm_hook + patch_fsdp_params_comm_hook() + + from colossalai.quantization.fp8 import fp8_compress_fsdp_params_comm_hook + fsdp_model.module.register_params_comm_hook(None, fp8_compress_fsdp_params_comm_hook) + from colossalai.quantization.fp8 import fp8_compress_fsdp_grad_comm_hook fsdp_model.module.register_comm_hook(None, fp8_compress_fsdp_grad_comm_hook) diff --git a/colossalai/quantization/utils.py b/colossalai/quantization/utils.py new file mode 100644 index 000000000000..841ee2d48e54 --- /dev/null +++ b/colossalai/quantization/utils.py @@ -0,0 +1,125 @@ +import torch +import torch.distributed as dist +from torch import Tensor +from torch.distributed.fsdp._common_utils import _no_dispatch_record_stream +from torch.distributed.utils import _p_assert + + +def _all_gather_flat_param( + self, + padded_unsharded_flat_param: Tensor, +) -> Tensor: + """ + All-gather the handle's flat parameter to the destination ``padded_unsharded_flat_param``. + + Then switch to use the all-gathered tensor. + """ + _p_assert( + hasattr(self, "process_group") and hasattr(self, "world_size"), + "Expects a process group and world size to have been set via `shard()`", + ) + sharded_flat_param = self.flat_param.data + expected_numel = sharded_flat_param.numel() * self.world_size + _p_assert( + padded_unsharded_flat_param.numel() == expected_numel, + f"Expects {expected_numel} numel but got {padded_unsharded_flat_param.numel()}", + ) + + pg = ( + self._fake_process_group + if self._use_fake_all_gather + else self.process_group + ) + + # HACK this should be handled by C10D + if sharded_flat_param.is_cpu: # type: ignore[attr-defined] + tensor_list = list( + torch.chunk(padded_unsharded_flat_param, dist.get_world_size(pg)) + ) + work = dist.all_gather(tensor_list, sharded_flat_param, group=pg) + else: + if self._comm_hook is None: + dist.all_gather_into_tensor( + padded_unsharded_flat_param, + sharded_flat_param, + pg, + ) + else: + self._comm_hook( + None, padded_unsharded_flat_param, sharded_flat_param, pg + ) + + if self._offload_params: + # In case of offloading, `flat_param.data` (i.e. sharded param) is + # created on the pre-unshard stream. We need to hand it over to the + # unshard stream for all-gather + _no_dispatch_record_stream( + sharded_flat_param, + self._device_handle.current_stream(), # unshard_stream + ) + return padded_unsharded_flat_param + + +def register_params_comm_hook(self, state: object, hook: callable): + """Register a communication hook for FlatParamHandle. + + This is an enhancement that provides a flexible hook to users where they can specify how FSDP unshards + parameters across multiple workers. + + .. warning :: + FSDP communication hook should be registered before running an initial forward pass + and only once. + + Args: + state (object): Passed to the hook to maintain any state information during the training process. + hook (Callable): Callable, which has one of the following signatures: + 1) ``hook: Callable[torch.Tensor] -> None``: + This function takes in a Python tensor, which represents + the full, flattened, unsharded gradient with respect to all variables + corresponding to the model this FSDP unit is wrapping + (that are not wrapped by other FSDP sub-units). + It then performs all necessary processing and returns ``None``; + 2) ``hook: Callable[torch.Tensor, torch.Tensor] -> None``: + This function takes in two Python tensors, the first one represents + the full, flattened, unsharded gradient with respect to all variables + corresponding to the model this FSDP unit is wrapping + (that are not wrapped by other FSDP sub-units). The latter + represents a pre-sized tensor to store a chunk of a sharded gradient after + reduction. + In both cases, callable performs all necessary processing and returns ``None``. + Callables with signature 1 are expected to handle gradient communication for a `NO_SHARD` case. + Callables with signature 2 are expected to handle gradient communication for sharded cases. + + """ + if not self.check_is_root(): + raise AssertionError( + "register_comm_hook can only be called on a root instance." + ) + + # if fsdp_state.sharding_strategy in HYBRID_SHARDING_STRATEGIES: + # raise AssertionError( + # f"Communication hook is not supported for hybrid strategies: {fsdp_state.sharding_strategy}" + # ) + if self._handle._comm_hook is not None: + raise AssertionError("A communication hook is already registered") + if not callable(hook): + raise ValueError( + f"The communication hook must be callable but got {hook}" + ) + self._handle._comm_hook = hook + self._handle._comm_hook_state = state + + +from packaging import version +from torch.distributed.fsdp._flat_param import FlatParamHandle +from torch.distributed.fsdp import FullyShardedDataParallel as FSDP + +def patch_fsdp_params_comm_hook(): + if version.parse(torch.__version__) >= version.parse("2.2.0"): + FlatParamHandle._comm_hook = None + FlatParamHandle._comm_hook_state = None + FlatParamHandle._all_gather_flat_param = _all_gather_flat_param + FSDP.register_params_comm_hook = register_params_comm_hook + else: + raise RuntimeError("This fsdp_params_comm_hook patch is not supported while torch version under 2.2.0.") + From 6264da8b9c5e4e547d084ff81f0d9bd16997bf9b Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Fri, 26 Jul 2024 15:03:33 +0800 Subject: [PATCH 04/18] added unit test for fp8 operators --- tests/test_fp8/test_fp8_allgather.py | 40 +++++++++ tests/test_fp8/test_fp8_allreduce.py | 38 ++++++++ tests/test_fp8/test_fp8_cast.py | 26 ++++++ tests/test_fp8/test_fp8_ddp_comm_hook.py | 89 +++++++++++++++++++ tests/test_fp8/test_fp8_fsdp_comm_hook.py | 101 ++++++++++++++++++++++ tests/test_fp8/test_fp8_reduce_scatter.py | 44 ++++++++++ 6 files changed, 338 insertions(+) create mode 100644 tests/test_fp8/test_fp8_allgather.py create mode 100644 tests/test_fp8/test_fp8_allreduce.py create mode 100644 tests/test_fp8/test_fp8_cast.py create mode 100644 tests/test_fp8/test_fp8_ddp_comm_hook.py create mode 100644 tests/test_fp8/test_fp8_fsdp_comm_hook.py create mode 100644 tests/test_fp8/test_fp8_reduce_scatter.py diff --git a/tests/test_fp8/test_fp8_allgather.py b/tests/test_fp8/test_fp8_allgather.py new file mode 100644 index 000000000000..787b3f0be78b --- /dev/null +++ b/tests/test_fp8/test_fp8_allgather.py @@ -0,0 +1,40 @@ +import torch +import torch.distributed as dist +import torch.nn.functional as F +from torch.distributed.distributed_c10d import _get_default_group +from torch.testing import assert_close + +from colossalai import launch +from colossalai.accelerator import get_accelerator +from colossalai.quantization.fp8 import all_gather_into_tensor_flat_fp8 +from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn + + +@parameterize("shape", [(3, 7), (2, 1), (1, 2), (2, 2), (4, 2), (5,), (4,), (2,)]) +@parameterize("dtype", [torch.bfloat16, torch.float16]) +def check_4gpu(shape, dtype): + world_size = dist.get_world_size() + rank = dist.get_rank() + x = torch.rand(shape, dtype=dtype, device=get_accelerator().get_current_device()) + flat_padded_x = x.view(-1) + if flat_padded_x.size(0) % world_size != 0: + pad_size = world_size - flat_padded_x.size(0) % world_size + flat_padded_x = F.pad(flat_padded_x, (0, pad_size)) + output = torch.empty_like(flat_padded_x) + chunk = flat_padded_x.chunk(world_size)[rank].clone() + all_gather_into_tensor_flat_fp8(output, chunk, x.shape, group=_get_default_group()) + assert_close(output[: x.numel()], x.view(-1), rtol=0.1, atol=0.1) + + +def run_dist(rank, world_size, port): + launch(rank=rank, world_size=world_size, port=port, host="localhost") + check_4gpu() + + +@rerun_if_address_is_in_use() +def test_all_gather(): + spawn(run_dist, 4) + + +if __name__ == "__main__": + test_all_gather() \ No newline at end of file diff --git a/tests/test_fp8/test_fp8_allreduce.py b/tests/test_fp8/test_fp8_allreduce.py new file mode 100644 index 000000000000..8074946f3d14 --- /dev/null +++ b/tests/test_fp8/test_fp8_allreduce.py @@ -0,0 +1,38 @@ +import torch +import torch.distributed as dist +import torch.nn.functional as F +from torch.distributed.distributed_c10d import _get_default_group +from torch.testing import assert_close + +from colossalai import launch +from colossalai.accelerator import get_accelerator +from colossalai.quantization.fp8 import all_reduce_fp8 +from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn + + +@parameterize("shape", [(3, 7), (2, 1), (1, 2), (2, 2), (4, 2), (5,), (4,), (2,)]) +@parameterize("dtype", [torch.bfloat16, torch.float16]) +def check_4gpu(shape, dtype): + world_size = dist.get_world_size() + rank = dist.get_rank() + x = torch.rand(shape, dtype=dtype, device=get_accelerator().get_current_device()) + flat_padded_x = x.view(-1) + + ground_truth = flat_padded_x.clone() + all_reduce_fp8(flat_padded_x, group=_get_default_group()) + + assert_close(ground_truth, flat_padded_x, rtol=0.1, atol=0.1) + + +def run_dist(rank, world_size, port): + launch(rank=rank, world_size=world_size, port=port, host="localhost") + check_4gpu() + + +@rerun_if_address_is_in_use() +def test_all_reduce(): + spawn(run_dist, 4) + + +if __name__ == "__main__": + test_all_reduce() \ No newline at end of file diff --git a/tests/test_fp8/test_fp8_cast.py b/tests/test_fp8/test_fp8_cast.py new file mode 100644 index 000000000000..b880ba793892 --- /dev/null +++ b/tests/test_fp8/test_fp8_cast.py @@ -0,0 +1,26 @@ +import torch +from torch.testing import assert_close + +from colossalai.accelerator import get_accelerator +from colossalai.quantization.fp8 import cast_to_fp8, cast_from_fp8, cast_to_fp8_pipeline, cast_from_fp8_pipeline +from colossalai.testing import parameterize + + +@parameterize("shape", [(100, 10), (10, 100), (3, 7), (2, 1), (1, 2), (2, 2), (4, 2), (5,), (4,), (2,)]) +@parameterize("dtype", [torch.bfloat16, torch.float16, torch.float32]) +@parameterize("fp8_format", ["e4m3", "e5m2"]) +def test_fp8_cast(shape, dtype, fp8_format): + x = torch.rand(shape, dtype=dtype, device=get_accelerator().get_current_device()) + ret, scale_inv = cast_to_fp8(x, fp8_format=fp8_format) + out = cast_from_fp8(ret, scale_inv, x.dtype) + assert_close(out, x, rtol=0.1, atol=0.1) + + if x.size(-1) % 2 == 0: + inp_dict = {"hidden_states": x.clone()} + cast_to_fp8_pipeline(inp_dict) + cast_from_fp8_pipeline(inp_dict) + assert_close(inp_dict["hidden_states"], x, rtol=0.1, atol=0.1) + + +if __name__ == "__main__": + test_fp8_cast() \ No newline at end of file diff --git a/tests/test_fp8/test_fp8_ddp_comm_hook.py b/tests/test_fp8/test_fp8_ddp_comm_hook.py new file mode 100644 index 000000000000..3fbe2573efcc --- /dev/null +++ b/tests/test_fp8/test_fp8_ddp_comm_hook.py @@ -0,0 +1,89 @@ +import os +import sys +import tempfile +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.optim as optim +import torch.multiprocessing as mp +from torch.testing import assert_close + +from torch.nn.parallel import DistributedDataParallel as DDP + +# example modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html + +def setup(rank, world_size): + os.environ['MASTER_ADDR'] = 'localhost' + os.environ['MASTER_PORT'] = '12355' + + # initialize the process group + dist.init_process_group("nccl", rank=rank, world_size=world_size) + +def cleanup(): + dist.destroy_process_group() + +class ToyModel(nn.Module): + def __init__(self): + super(ToyModel, self).__init__() + self.net1 = nn.Linear(10, 10) + self.relu = nn.ReLU() + self.net2 = nn.Linear(10, 5) + + def forward(self, x): + return self.net2(self.relu(self.net1(x))) + + +def demo_basic(rank, world_size): + print(f"Running basic DDP example on rank {rank}.") + setup(rank, world_size) + + def get_grads_after_one_iteration(hook=None): + torch.manual_seed(0) + # create model and move it to GPU with id rank + model = ToyModel().to(rank) + + ddp_model = DDP(model, device_ids=[rank]) + + if hook is not None: + ddp_model.register_comm_hook(None, hook) + + loss_fn = nn.MSELoss() + optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) + + optimizer.zero_grad() + outputs = ddp_model(torch.randn(20, 10)) + labels = torch.randn(20, 5).to(rank) + loss_fn(outputs, labels).backward() + optimizer.step() + + torch.distributed.barrier() + + grad_dict = {} + for name, params in ddp_model.named_parameters(): + grad_dict[name] = params.grad + return grad_dict + + + from colossalai.quantization.fp8 import fp8_compress_ddp_grad_comm_hook_sync, fp8_compress_ddp_grad_comm_hook_async + + grad_dict = get_grads_after_one_iteration() + for hook in [fp8_compress_ddp_grad_comm_hook_sync, fp8_compress_ddp_grad_comm_hook_async]: + grad_dict_w_hook = get_grads_after_one_iteration(hook) + if dist.get_rank() == 0: + for name in grad_dict: + assert_close(grad_dict[name], grad_dict_w_hook[name], rtol=0.1, atol=0.1) + + cleanup() + +def run_demo(demo_fn, world_size): + mp.spawn(demo_fn, + args=(world_size,), + nprocs=world_size, + join=True) + + +if __name__ == "__main__": + n_gpus = torch.cuda.device_count() + assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}" + world_size = n_gpus + run_demo(demo_basic, world_size) \ No newline at end of file diff --git a/tests/test_fp8/test_fp8_fsdp_comm_hook.py b/tests/test_fp8/test_fp8_fsdp_comm_hook.py new file mode 100644 index 000000000000..785bc36327ca --- /dev/null +++ b/tests/test_fp8/test_fp8_fsdp_comm_hook.py @@ -0,0 +1,101 @@ +import os +import sys +import tempfile +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.optim as optim +import torch.multiprocessing as mp +from torch.testing import assert_close + +from torch.distributed.fsdp import FullyShardedDataParallel as FSDP + +from colossalai.quantization.utils import patch_fsdp_params_comm_hook +patch_fsdp_params_comm_hook() + +# example modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html + +def setup(rank, world_size): + os.environ['MASTER_ADDR'] = 'localhost' + os.environ['MASTER_PORT'] = '12355' + + # initialize the process group + dist.init_process_group("nccl", rank=rank, world_size=world_size) + +def cleanup(): + dist.destroy_process_group() + +class ToyModel(nn.Module): + def __init__(self): + super(ToyModel, self).__init__() + self.net1 = nn.Linear(100, 100) + self.relu = nn.ReLU() + self.net2 = nn.Linear(100, 50) + + def forward(self, x): + return self.net2(self.relu(self.net1(x))) + + + +def demo_basic(rank, world_size): + print(f"Running basic FSDP example on rank {rank}.") + setup(rank, world_size) + + def get_grads_after_one_iteration(grad_hook=None, params_hook=None): + torch.manual_seed(0) + # create model and move it to GPU with id rank + model = ToyModel().to(rank) + fsdp_model = FSDP(model) + + if grad_hook is not None: + fsdp_model.register_comm_hook(None, grad_hook) + + if params_hook is not None: + fsdp_model.register_params_comm_hook(None, params_hook) + + loss_fn = nn.MSELoss() + optimizer = optim.SGD(fsdp_model.parameters(), lr=0.001) + + optimizer.zero_grad() + outputs = fsdp_model(torch.randn(20, 100)) + labels = torch.randn(20, 50).to(rank) + loss_fn(outputs, labels).backward() + optimizer.step() + + torch.distributed.barrier() + + grad_dict = {} + for name, params in fsdp_model.named_parameters(): + grad_dict[name] = params.grad + return grad_dict + + + from colossalai.quantization.fp8 import fp8_compress_fsdp_grad_comm_hook, fp8_compress_fsdp_params_comm_hook + + grad_dict = get_grads_after_one_iteration() + for hook in [fp8_compress_fsdp_grad_comm_hook, ]: + grad_dict_w_hook = get_grads_after_one_iteration(grad_hook=hook) + if dist.get_rank() == 0: + for name in grad_dict: + assert_close(grad_dict[name], grad_dict_w_hook[name], rtol=0.1, atol=0.1) + + for hook in [fp8_compress_fsdp_params_comm_hook, ]: + grad_dict_w_hook = get_grads_after_one_iteration(params_hook=hook) + if dist.get_rank() == 0: + for name in grad_dict: + assert_close(grad_dict[name], grad_dict_w_hook[name], rtol=0.1, atol=0.1) + + cleanup() + +def run_demo(demo_fn, world_size): + mp.spawn(demo_fn, + args=(world_size,), + nprocs=world_size, + join=True) + + +if __name__ == "__main__": + n_gpus = torch.cuda.device_count() + assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}" + world_size = n_gpus + run_demo(demo_basic, world_size) \ No newline at end of file diff --git a/tests/test_fp8/test_fp8_reduce_scatter.py b/tests/test_fp8/test_fp8_reduce_scatter.py new file mode 100644 index 000000000000..1cb47e72fa0f --- /dev/null +++ b/tests/test_fp8/test_fp8_reduce_scatter.py @@ -0,0 +1,44 @@ +import torch +import torch.distributed as dist +import torch.nn.functional as F +from torch.distributed.distributed_c10d import _get_default_group +from torch.testing import assert_close + +from colossalai import launch +from colossalai.accelerator import get_accelerator +from colossalai.quantization.fp8 import reduce_scatter_fp8 +from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn + + +@parameterize("shape", [(3, 7), (2, 1), (1, 2), (2, 2), (4, 2), (5,), (4,), (2,)]) +@parameterize("dtype", [torch.bfloat16, torch.float16]) +def check_4gpu(shape, dtype): + world_size = dist.get_world_size() + rank = dist.get_rank() + x = torch.rand(shape, dtype=dtype, device=get_accelerator().get_current_device()) + flat_padded_x = x.view(-1) + + if flat_padded_x.size(0) % world_size != 0: + pad_size = world_size - flat_padded_x.size(0) % world_size + flat_padded_x = F.pad(flat_padded_x, (0, pad_size)) + + input_list = flat_padded_x.chunk(world_size, dim=0) + output = torch.empty_like(input_list[0]) + reduce_scatter_fp8(output, input_list, group=_get_default_group()) + output.div_(world_size) + + assert_close(input_list[rank], output, rtol=0.1, atol=0.1) + + +def run_dist(rank, world_size, port): + launch(rank=rank, world_size=world_size, port=port, host="localhost") + check_4gpu() + + +@rerun_if_address_is_in_use() +def test_reduce_scatter(): + spawn(run_dist, 4) + + +if __name__ == "__main__": + test_reduce_scatter() \ No newline at end of file From 1d5e5beff48204da458fc7fd94a28e9aa0522025 Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Fri, 26 Jul 2024 15:06:10 +0800 Subject: [PATCH 05/18] support fp8 communication in GeminiPlugin --- colossalai/booster/plugin/gemini_plugin.py | 2 ++ colossalai/zero/gemini/chunk/chunk.py | 14 +++++++++++--- colossalai/zero/gemini/chunk/manager.py | 4 ++++ colossalai/zero/gemini/gemini_ddp.py | 3 +++ 4 files changed, 20 insertions(+), 3 deletions(-) diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index ad131fbe739a..5ab8f05adc85 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -364,6 +364,7 @@ def __init__( enable_sequence_overlap: bool = False, enable_async_reduce: bool = True, verbose: bool = False, + fp8_communication: bool = False, ) -> None: super().__init__() assert precision in SUPPORTED_PRECISION, f"precision {precision} is not supported" @@ -395,6 +396,7 @@ def __init__( master_weights=master_weights, max_prefetch=max_prefetch, enable_async_reduce=enable_async_reduce, + fp8_communication=fp8_communication, ) self.zero_optim_config = dict( gpu_margin_mem_ratio=gpu_margin_mem_ratio, diff --git a/colossalai/zero/gemini/chunk/chunk.py b/colossalai/zero/gemini/chunk/chunk.py index 969df96214de..e65ac774b648 100644 --- a/colossalai/zero/gemini/chunk/chunk.py +++ b/colossalai/zero/gemini/chunk/chunk.py @@ -166,6 +166,7 @@ def __init__( self.grad_chunk = None # the async all-reduce/reduce-scatter work of this grad chunk (None means sync) self.grad_reduce_work = None + self.fp8_communication = False @property def memory_usage(self) -> Dict[str, int]: @@ -521,9 +522,16 @@ def __gather(self, async_op: bool = False) -> Optional[dist.Work]: alloc_storage(self.cuda_global_chunk) assert self.cuda_global_chunk.is_contiguous() - work = dist.all_gather_into_tensor( - self.cuda_global_chunk, self.cuda_shard, self.torch_pg, async_op=async_op - ) + if self.fp8_communication: + assert async_op == False, "fp8 all-gather does not support async_op!" + from colossalai.quantization.fp8 import all_gather_into_tensor_flat_fp8 + work = all_gather_into_tensor_flat_fp8( + self.cuda_global_chunk, self.cuda_shard, self.cuda_global_chunk.shape, self.torch_pg + ) + else: + work = dist.all_gather_into_tensor( + self.cuda_global_chunk, self.cuda_shard, self.torch_pg, async_op=async_op + ) self.cuda_shard = None self.is_gathered = True diff --git a/colossalai/zero/gemini/chunk/manager.py b/colossalai/zero/gemini/chunk/manager.py index d0e1755f40cb..06f9b6d18a6d 100644 --- a/colossalai/zero/gemini/chunk/manager.py +++ b/colossalai/zero/gemini/chunk/manager.py @@ -26,6 +26,7 @@ def __init__( init_device: Optional[torch.device] = None, reuse_fp16_chunk: bool = True, max_prefetch: int = 0, + fp8_communication: bool = False, ) -> None: self.device = init_device or get_accelerator().get_current_device() self.dp_degree_chunk_size_dict: Dict[int, int] = dict() @@ -44,6 +45,7 @@ def __init__( self.accumulating_grads = False self.overflow_counter = torch.tensor([0], dtype=torch.int, device=get_accelerator().get_current_device()) self._prefetch_stream = get_accelerator().Stream() if max_prefetch else None + self.fp8_communication = fp8_communication def register_tensor( self, @@ -101,6 +103,8 @@ def register_tensor( extra_dp_group=extra_dp_group, **chunk_kwargs, ) + if self.fp8_communication: + chunk.fp8_communication = True chunk_group.append(chunk) chunk.append_tensor(tensor) diff --git a/colossalai/zero/gemini/gemini_ddp.py b/colossalai/zero/gemini/gemini_ddp.py index 80b2c7961e29..0b2039a4de0c 100644 --- a/colossalai/zero/gemini/gemini_ddp.py +++ b/colossalai/zero/gemini/gemini_ddp.py @@ -98,6 +98,7 @@ def __init__( extra_dp_group: Optional[ProcessGroup] = None, verbose: bool = False, enable_async_reduce: bool = True, + fp8_communication: bool = False, ) -> None: assert mixed_precision in (torch.float16, torch.bfloat16) reuse_fp16_chunk = master_weights if not enable_gradient_accumulation else False @@ -122,6 +123,8 @@ def __init__( verbose=verbose, max_prefetch=max_prefetch, ) + if fp8_communication: + self.chunk_manager.fp8_communication = True self.gemini_manager = GeminiManager( placement_policy, self.chunk_manager, From ef68dbf4d49214188aa84078e940cfb4db560275 Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Fri, 26 Jul 2024 15:07:36 +0800 Subject: [PATCH 06/18] update training scripts to support fsdp and fp8 communication --- examples/language/bert/finetune.py | 15 ++++++++++++--- .../language/gpt/hybridparallelism/finetune.py | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/examples/language/bert/finetune.py b/examples/language/bert/finetune.py index 8a59ab6838a6..454c867b677d 100644 --- a/examples/language/bert/finetune.py +++ b/examples/language/bert/finetune.py @@ -179,7 +179,7 @@ def main(): "--plugin", type=str, default="torch_ddp", - choices=["torch_ddp", "torch_ddp_fp16", "gemini", "low_level_zero", "hybrid_parallel"], + choices=["torch_ddp", "torch_ddp_fp16", "gemini", "low_level_zero", "hybrid_parallel", "torch_fsdp"], help="plugin to use", ) parser.add_argument( @@ -215,9 +215,9 @@ def main(): if args.plugin == "torch_ddp_fp16": booster_kwargs["mixed_precision"] = "fp16" if args.plugin.startswith("torch_ddp"): - plugin = TorchDDPPlugin() + plugin = TorchDDPPlugin(fp8_communication=args.use_fp8_comm) elif args.plugin == "gemini": - plugin = GeminiPlugin(initial_scale=2**5) + plugin = GeminiPlugin(initial_scale=2**5, fp8_communication=args.use_fp8_comm) elif args.plugin == "low_level_zero": plugin = LowLevelZeroPlugin(initial_scale=2**5) elif args.plugin == "hybrid_parallel": @@ -235,6 +235,15 @@ def main(): initial_scale=1, fp8_communication=args.use_fp8_comm, ) + elif args.plugin == "torch_fsdp": + from colossalai.booster.plugin import TorchFSDPPlugin + from torch.distributed.fsdp.fully_sharded_data_parallel import CPUOffload, MixedPrecision + plugin = TorchFSDPPlugin( + mixed_precision=MixedPrecision( + param_dtype=torch.float16, reduce_dtype=torch.float16, buffer_dtype=torch.float16 + ), + fp8_communication=args.use_fp8_comm, + ) booster = Booster(plugin=plugin, **booster_kwargs) diff --git a/examples/language/gpt/hybridparallelism/finetune.py b/examples/language/gpt/hybridparallelism/finetune.py index 9b3a101609dc..3fd845b460fb 100644 --- a/examples/language/gpt/hybridparallelism/finetune.py +++ b/examples/language/gpt/hybridparallelism/finetune.py @@ -210,7 +210,7 @@ def main(): if args.plugin == "torch_ddp_fp16": booster_kwargs["mixed_precision"] = "fp16" if args.plugin.startswith("torch_ddp"): - plugin = TorchDDPPlugin() + plugin = TorchDDPPlugin(fp8_communication=args.use_fp8_comm) elif args.plugin == "gemini": plugin = GeminiPlugin(initial_scale=2**5) elif args.plugin == "low_level_zero": From 694d125538f659068e4b3ce736fb0c473ff44aad Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Fri, 26 Jul 2024 15:08:11 +0800 Subject: [PATCH 07/18] fixed some minor bugs observed in unit test --- colossalai/quantization/fp8.py | 66 ++++++++++++++++------------------ 1 file changed, 31 insertions(+), 35 deletions(-) diff --git a/colossalai/quantization/fp8.py b/colossalai/quantization/fp8.py index e60c99354e53..cb010d20e556 100644 --- a/colossalai/quantization/fp8.py +++ b/colossalai/quantization/fp8.py @@ -1,6 +1,7 @@ from typing import Any, Callable, List, Optional, Tuple, Union, cast import torch +import torch.nn.functional as F import torch.distributed as dist @@ -27,12 +28,13 @@ def cast_to_fp8(inp: torch.Tensor, fp8_format="e4m3") -> (torch.Tensor, torch.Te per_channel_max = inp.abs().max(dim=-1).values.float() per_channel_max = torch.where(per_channel_max > 0, per_channel_max, 1.0) scale = fp8_max / per_channel_max[:, None] + scale_inv = per_channel_max / fp8_max else: per_tensor_max = inp.abs().max().float() per_tensor_max = torch.where(per_tensor_max > 0, per_tensor_max, 1.0) scale = fp8_max / per_tensor_max + scale_inv = 1.0 / scale - scale_inv = 1.0 / scale ret = (scale * inp.float()).to(fp8_type) return ret, scale_inv @@ -76,18 +78,18 @@ def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3", group=None) -> None: input_shape = tensor.shape input_device = tensor.device input_size = tensor.numel() - tensor_1d = tensor.flatten() + flat_padded_x = tensor.flatten() - fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 + if flat_padded_x.size(0) % world_size != 0: + pad_size = world_size - flat_padded_x.size(0) % world_size + flat_padded_x = F.pad(flat_padded_x, (0, pad_size)) - ret, scale = cast_to_fp8(tensor_1d, fp8_format=fp8_format) + fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 + ret, scale = cast_to_fp8(flat_padded_x, fp8_format=fp8_format) inp = ret.view(torch.uint8) input_chunks = list(torch.chunk(inp, world_size, dim=0)) - if dist.get_rank() == world_size - 1: - output_chunks = [torch.empty_like(input_chunks[-1]) for _ in range(world_size)] - else: - output_chunks = [torch.empty_like(input_chunks[0]) for _ in range(world_size)] + output_chunks = list(torch.chunk(torch.empty_like(inp), world_size, dim=0)) dist.all_to_all(output_chunks, input_chunks, group=group) scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] dist.all_gather(scale_list, scale, group=group) @@ -100,12 +102,12 @@ def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3", group=None) -> None: summed_out_fp8, scale = cast_to_fp8(summed_out, fp8_format=fp8_format) dist.all_gather(scale_list, scale, group=group) - tensor_list = list(torch.chunk(torch.empty(input_size, device=input_device, dtype=torch.uint8), world_size, dim=0)) + tensor_list = [torch.empty_like(summed_out_fp8.view(torch.uint8)) for _ in range(world_size)] dist.all_gather(tensor_list, summed_out_fp8.view(torch.uint8), group=group) for i in range(world_size): tensor_list[i] = tensor_list[i].view(fp8_type).to(input_type) * scale_list[i] out = torch.cat(tensor_list, dim=0) - tensor.copy_(out.view(input_shape).to(input_type)) + tensor.copy_(out[:input_size].view(input_shape).to(input_type)) @@ -123,7 +125,9 @@ def cast_to_fp8_pipeline(inp: Any) -> None: return assert 'hidden_states' in inp, 'required by pipeline parallelism.' + assert inp["hidden_states"].size(-1) % 2 == 0, 'tensor size(-1) must be divisible by 2 to view Float8_e4m3fn as BFloat16 or Float16' inp_tensor = inp["hidden_states"] + inp_dtype = inp_tensor.dtype min_val, max_val = inp_tensor.aminmax() amax = torch.maximum(min_val.abs(), max_val.abs()) @@ -144,6 +148,7 @@ def cast_to_fp8_pipeline(inp: Any) -> None: inp_tensor.data = q_tensor.to(fp8_type).view(fp8_view_type) inp["fp8_scale"] = scale.float().reciprocal() + inp["dtype"] = torch.zeros_like(scale).to(inp_dtype) @@ -169,10 +174,11 @@ def cast_from_fp8_pipeline(inp: Any, del_metadata=True) -> None: else: raise TypeError("Only float16, bfloat16 are implemented.") - inp_tensor.data = inp_tensor.data.view(fp8_type).to(torch.float16) * scale + inp_tensor.data = inp_tensor.data.view(fp8_type).to(inp["dtype"]) * scale if del_metadata: del inp["fp8_scale"] + del inp["dtype"] def reduce_scatter_fp8(output: torch.Tensor, input_list, group, fp8_format="e5m2") -> None: @@ -215,6 +221,7 @@ def reduce_scatter_fp8(output: torch.Tensor, input_list, group, fp8_format="e5m2 def fp8_compress_ddp_grad_comm_hook_async( process_group: dist.ProcessGroup, bucket: dist.GradBucket, + fp8_format: str = "e5m2", ) -> torch.futures.Future[torch.Tensor]: """ Compress by casting ``GradBucket`` to FP8 floating-point format divided by process group size. @@ -231,39 +238,28 @@ def fp8_compress_ddp_grad_comm_hook_async( group_to_use = process_group if process_group is not None else dist.group.WORLD input_tensor = bucket.buffer() - fp8_format = "e4m3" - world_size = dist.get_world_size() input_type = input_tensor.dtype input_device = input_tensor.device - input_size = input_tensor.numel() - tensor_1d = input_tensor.flatten() + flat_padded_x = input_tensor.flatten() - if input_size % world_size != 0: - padding_tensor = torch.zeros(world_size - input_size % world_size, dtype=input_tensor.dtype, device=input_device) - tensor_1d = torch.cat([padding_tensor, tensor_1d], dim=0) - input_size = tensor_1d.numel() + if flat_padded_x.size(0) % world_size != 0: + pad_size = world_size - flat_padded_x.size(0) % world_size + flat_padded_x = F.pad(flat_padded_x, (0, pad_size)) fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 - ret, scale = cast_to_fp8(tensor_1d, fp8_format=fp8_format) + ret, scale = cast_to_fp8(flat_padded_x, fp8_format=fp8_format) inp = ret.view(torch.uint8) - input_chunks = list(torch.chunk(inp, world_size, dim=0)) - if dist.get_rank() == world_size - 1: - output_chunks = [torch.empty_like(input_chunks[-1]) for _ in range(world_size)] - else: - output_chunks = [torch.empty_like(input_chunks[0]) for _ in range(world_size)] - - scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] - output_chunks_single = torch.cat(output_chunks, dim=0) - inp_split_sizes = [t.numel() for t in input_chunks] - out_split_sizes = [t.numel() for t in output_chunks] + output_chunks_single = torch.empty_like(inp) + split_sizes = [inp.numel() // world_size for _ in range(world_size)] fut0 = dist.all_to_all_single(output_chunks_single, inp, - output_split_sizes=out_split_sizes, - input_split_sizes=inp_split_sizes, + output_split_sizes=split_sizes, + input_split_sizes=split_sizes, group=group_to_use, async_op=True).get_future() + scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] fut1 = dist.all_gather_into_tensor(torch.cat(scale_list, dim=0), scale, group=group_to_use, async_op=True).get_future() @@ -284,8 +280,7 @@ def sum_and_allgather(fut): summed_out_fp8, scale = cast_to_fp8(summed_out, fp8_format=fp8_format) - - tensor_list_single = torch.empty(input_size, device=input_device, dtype=torch.uint8) + tensor_list_single = torch.empty(summed_out_fp8.size(0) * world_size, device=input_device, dtype=torch.uint8) fut2 = dist.all_gather_into_tensor(tensor_list_single, summed_out_fp8.view(torch.uint8), group=group_to_use, async_op=True).get_future() @@ -320,6 +315,7 @@ def decompress(fut): def fp8_compress_ddp_grad_comm_hook_sync( process_group: dist.ProcessGroup, bucket: dist.GradBucket, + fp8_format="e5m2", ) -> torch.futures.Future[torch.Tensor]: """ Return a future that wraps the input, after the input is allreduced. However, the allreduce commnunication is synchronized. @@ -334,7 +330,7 @@ def fp8_compress_ddp_grad_comm_hook_sync( """ buffer = bucket.buffer() - all_reduce_fp8(buffer, fp8_format="e4m3") + all_reduce_fp8(buffer, fp8_format=fp8_format) fut: torch.futures.Future[torch.Tensor] = torch.futures.Future() fut.set_result(bucket.buffer()) From a92da21d675a7b2c3a15a6e124bc530174f6c96f Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Fri, 26 Jul 2024 15:09:09 +0800 Subject: [PATCH 08/18] add all_gather_into_tensor_flat_fp8 --- colossalai/quantization/fp8.py | 77 +++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/colossalai/quantization/fp8.py b/colossalai/quantization/fp8.py index cb010d20e556..d7c8a7e010cd 100644 --- a/colossalai/quantization/fp8.py +++ b/colossalai/quantization/fp8.py @@ -394,4 +394,79 @@ def fp8_compress_fsdp_params_comm_hook(state: object, padded_unsharded_flat_para fp8_sharded_flat_param, group=group, ) - padded_unsharded_flat_param.copy_((fp8_out.view(fp8_type).float() / scale).to(out.dtype)) \ No newline at end of file + padded_unsharded_flat_param.copy_((fp8_out.view(fp8_type).float() / scale).to(out.dtype)) + + +def split_chunk_by_channel( + chunk: torch.Tensor, channel_size: int, num_channels: int, rank: int = 0, world_size: int = 1 +): + offset = chunk.numel() * rank + end = offset + chunk.numel() + break_points = [x for x in range(0, channel_size * num_channels + 1, channel_size) if offset <= x <= end] + if len(break_points) == 0 or break_points[0] > offset: + break_points.insert(0, offset) + if break_points[-1] < end: + break_points.append(end) + sizes = [b - a for a, b in zip(break_points[:-1], break_points[1:])] + return chunk.split(sizes) + + +def all_gather_into_tensor_flat_fp8( + output_tensor: torch.Tensor, + input_tensor: torch.Tensor, + output_shape: torch.Size, + group: dist.ProcessGroup, + fp8_format: str = "e4m3", +): + """all gather into tensor in fp8 format + + Args: + output_tensor (torch.Tensor): output tensor, which is flattened + input_tensor (torch.Tensor): input tensor, which is flattened + group (dist.ProcessGroup): process group + fp8_format (str, optional): fp8 format, e4m3 or e5m2. Defaults to "e4m3". + """ + assert input_tensor.dim() == 1 and output_tensor.dim() == 1, "input/output tensor should be flattened" + world_size = dist.get_world_size(group) + assert ( + output_tensor.numel() == input_tensor.numel() * world_size + ), "output tensor size should be world_size times of input tensor size" + + input_type = output_tensor.dtype + + fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 + fp8_max = torch.finfo(fp8_type).max + + if len(output_shape) == 2: + per_channel_max = torch.zeros(output_shape[0], device=output_tensor.device, dtype=torch.float) + num_channels, channel_size = output_shape + rank = dist.get_rank(group) + channel_start_idx = (input_tensor.numel() * rank) // channel_size + per_channel_splits = split_chunk_by_channel(input_tensor, channel_size, num_channels, rank, world_size) + for i, per_channel_split in enumerate(per_channel_splits): + idx = i + channel_start_idx + if idx < num_channels: + per_channel_max[idx] = per_channel_split.abs().max().float() + dist.all_reduce(per_channel_max, op=dist.ReduceOp.MAX, group=group) + per_channel_max = torch.where(per_channel_max > 0, per_channel_max, 1.0) + scale = fp8_max / per_channel_max + fp8_input = input_tensor.float() + fp8_per_channel_splits = split_chunk_by_channel(fp8_input, channel_size, num_channels, rank, world_size) + for i, per_channel_split in enumerate(fp8_per_channel_splits): + idx = i + channel_start_idx + if idx < num_channels: + per_channel_split.mul_(scale[idx]) + fp8_input = fp8_input.to(fp8_type) + else: + per_tensor_max = input_tensor.abs().max().float() + dist.all_reduce(per_tensor_max, op=dist.ReduceOp.MAX, group=group) + per_tensor_max = torch.where(per_tensor_max > 0, per_tensor_max, 1.0) + scale = fp8_max / per_tensor_max + fp8_input = (scale * input_tensor.float()).to(fp8_type) + scale_inv = 1.0 / scale + buffer = torch.empty_like(output_tensor, dtype=fp8_type) + dist.all_gather_into_tensor(buffer.view(torch.uint8), fp8_input.view(torch.uint8), group=group) + numel = output_shape.numel() + valid_buffer = buffer[:numel].reshape(output_shape) + valid_buffer = cast_from_fp8(valid_buffer, scale_inv, input_type) + output_tensor[:numel].copy_(valid_buffer.view(-1)) \ No newline at end of file From c0c1a301899bab237a9ec630bfbac68bb7614ac4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 26 Jul 2024 07:14:54 +0000 Subject: [PATCH 09/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../booster/plugin/torch_fsdp_plugin.py | 2 + colossalai/quantization/fp8.py | 68 +++++++++++-------- colossalai/quantization/utils.py | 30 +++----- colossalai/zero/gemini/chunk/chunk.py | 1 + examples/language/bert/finetune.py | 4 +- tests/test_fp8/test_fp8_allreduce.py | 7 +- tests/test_fp8/test_fp8_cast.py | 4 +- tests/test_fp8/test_fp8_ddp_comm_hook.py | 26 ++++--- tests/test_fp8/test_fp8_fsdp_comm_hook.py | 34 +++++----- tests/test_fp8/test_fp8_reduce_scatter.py | 2 +- 10 files changed, 92 insertions(+), 86 deletions(-) diff --git a/colossalai/booster/plugin/torch_fsdp_plugin.py b/colossalai/booster/plugin/torch_fsdp_plugin.py index 910ede3a67e3..e3f81928de5e 100644 --- a/colossalai/booster/plugin/torch_fsdp_plugin.py +++ b/colossalai/booster/plugin/torch_fsdp_plugin.py @@ -351,9 +351,11 @@ def configure( if self.fp8_communication: from colossalai.quantization.utils import patch_fsdp_params_comm_hook + patch_fsdp_params_comm_hook() from colossalai.quantization.fp8 import fp8_compress_fsdp_params_comm_hook + fsdp_model.module.register_params_comm_hook(None, fp8_compress_fsdp_params_comm_hook) from colossalai.quantization.fp8 import fp8_compress_fsdp_grad_comm_hook diff --git a/colossalai/quantization/fp8.py b/colossalai/quantization/fp8.py index d7c8a7e010cd..21cf191555ae 100644 --- a/colossalai/quantization/fp8.py +++ b/colossalai/quantization/fp8.py @@ -1,8 +1,8 @@ -from typing import Any, Callable, List, Optional, Tuple, Union, cast +from typing import Any import torch -import torch.nn.functional as F import torch.distributed as dist +import torch.nn.functional as F def cast_to_fp8(inp: torch.Tensor, fp8_format="e4m3") -> (torch.Tensor, torch.Tensor): @@ -110,7 +110,6 @@ def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3", group=None) -> None: tensor.copy_(out[:input_size].view(input_shape).to(input_type)) - def cast_to_fp8_pipeline(inp: Any) -> None: """ Cast the hidden_states tensor of inp object to fp8 format before p2p communication in pipeline. @@ -124,8 +123,10 @@ def cast_to_fp8_pipeline(inp: Any) -> None: if type(inp) == torch.Tensor: return - assert 'hidden_states' in inp, 'required by pipeline parallelism.' - assert inp["hidden_states"].size(-1) % 2 == 0, 'tensor size(-1) must be divisible by 2 to view Float8_e4m3fn as BFloat16 or Float16' + assert "hidden_states" in inp, "required by pipeline parallelism." + assert ( + inp["hidden_states"].size(-1) % 2 == 0 + ), "tensor size(-1) must be divisible by 2 to view Float8_e4m3fn as BFloat16 or Float16" inp_tensor = inp["hidden_states"] inp_dtype = inp_tensor.dtype @@ -142,7 +143,7 @@ def cast_to_fp8_pipeline(inp: Any) -> None: finfo = torch.finfo(fp8_type) scale = torch.tensor(1.0).to(inp_tensor.device) if amax == 0.0 else finfo.max / amax.float() - q_tensor = (inp_tensor.data.float() * scale) + q_tensor = inp_tensor.data.float() * scale # Todo: Currently we use fp8_view_type to indicate which fp8 format is used. This is a temporary workaround due to 'Only support tensor for fast send'. # inp_tensor needs to be a float datatype to avoid error during gradient placement. inp_tensor.data = q_tensor.to(fp8_type).view(fp8_view_type) @@ -151,7 +152,6 @@ def cast_to_fp8_pipeline(inp: Any) -> None: inp["dtype"] = torch.zeros_like(scale).to(inp_dtype) - def cast_from_fp8_pipeline(inp: Any, del_metadata=True) -> None: """ Cast the FP8 encoded hidden_states tensor back to original dtype after p2p communication in pipeline. @@ -162,7 +162,7 @@ def cast_from_fp8_pipeline(inp: Any, del_metadata=True) -> None: if type(inp) == torch.Tensor: return - assert 'hidden_states' in inp, 'required by pipeline parallelism.' + assert "hidden_states" in inp, "required by pipeline parallelism." inp_tensor = inp["hidden_states"] scale = inp["fp8_scale"] @@ -253,16 +253,19 @@ def fp8_compress_ddp_grad_comm_hook_async( inp = ret.view(torch.uint8) output_chunks_single = torch.empty_like(inp) split_sizes = [inp.numel() // world_size for _ in range(world_size)] - fut0 = dist.all_to_all_single(output_chunks_single, inp, - output_split_sizes=split_sizes, - input_split_sizes=split_sizes, - group=group_to_use, - async_op=True).get_future() + fut0 = dist.all_to_all_single( + output_chunks_single, + inp, + output_split_sizes=split_sizes, + input_split_sizes=split_sizes, + group=group_to_use, + async_op=True, + ).get_future() scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] - fut1 = dist.all_gather_into_tensor(torch.cat(scale_list, dim=0), scale, - group=group_to_use, - async_op=True).get_future() + fut1 = dist.all_gather_into_tensor( + torch.cat(scale_list, dim=0), scale, group=group_to_use, async_op=True + ).get_future() all_to_all_fut = torch.futures.collect_all([fut0, fut1]) def sum_and_allgather(fut): @@ -281,14 +284,17 @@ def sum_and_allgather(fut): summed_out_fp8, scale = cast_to_fp8(summed_out, fp8_format=fp8_format) tensor_list_single = torch.empty(summed_out_fp8.size(0) * world_size, device=input_device, dtype=torch.uint8) - fut2 = dist.all_gather_into_tensor(tensor_list_single, summed_out_fp8.view(torch.uint8), group=group_to_use, - async_op=True).get_future() + fut2 = dist.all_gather_into_tensor( + tensor_list_single, summed_out_fp8.view(torch.uint8), group=group_to_use, async_op=True + ).get_future() scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] - fut3 = dist.all_gather_into_tensor(torch.cat(scale_list, dim=0), scale, group=group_to_use, - async_op=True).get_future() + fut3 = dist.all_gather_into_tensor( + torch.cat(scale_list, dim=0), scale, group=group_to_use, async_op=True + ).get_future() fut_combined2 = torch.futures.collect_all([fut2, fut3]) return fut_combined2 + def decompress(fut): tensor_list_single = fut.value().wait()[0].value()[0] scale_list_single = fut.value().wait()[1].value()[0] @@ -310,8 +316,6 @@ def decompress(fut): return all_to_all_fut.then(sum_and_allgather).then(decompress) - - def fp8_compress_ddp_grad_comm_hook_sync( process_group: dist.ProcessGroup, bucket: dist.GradBucket, @@ -338,8 +342,13 @@ def fp8_compress_ddp_grad_comm_hook_sync( return fut -def fp8_compress_fsdp_grad_comm_hook(state: object, unsharded_gradient_flattened: torch.Tensor, - sharded_gradient: torch.Tensor, group=None, fp8_format="e5m2") -> None: +def fp8_compress_fsdp_grad_comm_hook( + state: object, + unsharded_gradient_flattened: torch.Tensor, + sharded_gradient: torch.Tensor, + group=None, + fp8_format="e5m2", +) -> None: """ This communication hook implements a simple gradient compression approach that casts unsharded_gradient_flattened tensor to FP8 floating-point format (``torch.float8_e5m2`` or ``torch.bfloat16_e4m3``), and then perform scatter_allreduce logic @@ -367,8 +376,13 @@ def fp8_compress_fsdp_grad_comm_hook(state: object, unsharded_gradient_flattened sharded_gradient += cast_from_fp8(tensor, scale, input_type) -def fp8_compress_fsdp_params_comm_hook(state: object, padded_unsharded_flat_param: torch.Tensor, - sharded_flat_param: torch.Tensor, group=None, fp8_format="e5m2") -> None: +def fp8_compress_fsdp_params_comm_hook( + state: object, + padded_unsharded_flat_param: torch.Tensor, + sharded_flat_param: torch.Tensor, + group=None, + fp8_format="e5m2", +) -> None: """ This hook is pending the official support for parameters communication hook in FSDP, e.g. register_params_comm_hook. @@ -469,4 +483,4 @@ def all_gather_into_tensor_flat_fp8( numel = output_shape.numel() valid_buffer = buffer[:numel].reshape(output_shape) valid_buffer = cast_from_fp8(valid_buffer, scale_inv, input_type) - output_tensor[:numel].copy_(valid_buffer.view(-1)) \ No newline at end of file + output_tensor[:numel].copy_(valid_buffer.view(-1)) diff --git a/colossalai/quantization/utils.py b/colossalai/quantization/utils.py index 841ee2d48e54..8f2b2be077be 100644 --- a/colossalai/quantization/utils.py +++ b/colossalai/quantization/utils.py @@ -6,8 +6,8 @@ def _all_gather_flat_param( - self, - padded_unsharded_flat_param: Tensor, + self, + padded_unsharded_flat_param: Tensor, ) -> Tensor: """ All-gather the handle's flat parameter to the destination ``padded_unsharded_flat_param``. @@ -25,17 +25,11 @@ def _all_gather_flat_param( f"Expects {expected_numel} numel but got {padded_unsharded_flat_param.numel()}", ) - pg = ( - self._fake_process_group - if self._use_fake_all_gather - else self.process_group - ) + pg = self._fake_process_group if self._use_fake_all_gather else self.process_group # HACK this should be handled by C10D if sharded_flat_param.is_cpu: # type: ignore[attr-defined] - tensor_list = list( - torch.chunk(padded_unsharded_flat_param, dist.get_world_size(pg)) - ) + tensor_list = list(torch.chunk(padded_unsharded_flat_param, dist.get_world_size(pg))) work = dist.all_gather(tensor_list, sharded_flat_param, group=pg) else: if self._comm_hook is None: @@ -45,9 +39,7 @@ def _all_gather_flat_param( pg, ) else: - self._comm_hook( - None, padded_unsharded_flat_param, sharded_flat_param, pg - ) + self._comm_hook(None, padded_unsharded_flat_param, sharded_flat_param, pg) if self._offload_params: # In case of offloading, `flat_param.data` (i.e. sharded param) is @@ -92,9 +84,7 @@ def register_params_comm_hook(self, state: object, hook: callable): """ if not self.check_is_root(): - raise AssertionError( - "register_comm_hook can only be called on a root instance." - ) + raise AssertionError("register_comm_hook can only be called on a root instance.") # if fsdp_state.sharding_strategy in HYBRID_SHARDING_STRATEGIES: # raise AssertionError( @@ -103,16 +93,15 @@ def register_params_comm_hook(self, state: object, hook: callable): if self._handle._comm_hook is not None: raise AssertionError("A communication hook is already registered") if not callable(hook): - raise ValueError( - f"The communication hook must be callable but got {hook}" - ) + raise ValueError(f"The communication hook must be callable but got {hook}") self._handle._comm_hook = hook self._handle._comm_hook_state = state from packaging import version -from torch.distributed.fsdp._flat_param import FlatParamHandle from torch.distributed.fsdp import FullyShardedDataParallel as FSDP +from torch.distributed.fsdp._flat_param import FlatParamHandle + def patch_fsdp_params_comm_hook(): if version.parse(torch.__version__) >= version.parse("2.2.0"): @@ -122,4 +111,3 @@ def patch_fsdp_params_comm_hook(): FSDP.register_params_comm_hook = register_params_comm_hook else: raise RuntimeError("This fsdp_params_comm_hook patch is not supported while torch version under 2.2.0.") - diff --git a/colossalai/zero/gemini/chunk/chunk.py b/colossalai/zero/gemini/chunk/chunk.py index e65ac774b648..e2b7a8f56432 100644 --- a/colossalai/zero/gemini/chunk/chunk.py +++ b/colossalai/zero/gemini/chunk/chunk.py @@ -525,6 +525,7 @@ def __gather(self, async_op: bool = False) -> Optional[dist.Work]: if self.fp8_communication: assert async_op == False, "fp8 all-gather does not support async_op!" from colossalai.quantization.fp8 import all_gather_into_tensor_flat_fp8 + work = all_gather_into_tensor_flat_fp8( self.cuda_global_chunk, self.cuda_shard, self.cuda_global_chunk.shape, self.torch_pg ) diff --git a/examples/language/bert/finetune.py b/examples/language/bert/finetune.py index 454c867b677d..f048abdd253a 100644 --- a/examples/language/bert/finetune.py +++ b/examples/language/bert/finetune.py @@ -236,8 +236,10 @@ def main(): fp8_communication=args.use_fp8_comm, ) elif args.plugin == "torch_fsdp": + from torch.distributed.fsdp.fully_sharded_data_parallel import MixedPrecision + from colossalai.booster.plugin import TorchFSDPPlugin - from torch.distributed.fsdp.fully_sharded_data_parallel import CPUOffload, MixedPrecision + plugin = TorchFSDPPlugin( mixed_precision=MixedPrecision( param_dtype=torch.float16, reduce_dtype=torch.float16, buffer_dtype=torch.float16 diff --git a/tests/test_fp8/test_fp8_allreduce.py b/tests/test_fp8/test_fp8_allreduce.py index 8074946f3d14..8caa8e1c5508 100644 --- a/tests/test_fp8/test_fp8_allreduce.py +++ b/tests/test_fp8/test_fp8_allreduce.py @@ -1,6 +1,5 @@ import torch import torch.distributed as dist -import torch.nn.functional as F from torch.distributed.distributed_c10d import _get_default_group from torch.testing import assert_close @@ -13,8 +12,8 @@ @parameterize("shape", [(3, 7), (2, 1), (1, 2), (2, 2), (4, 2), (5,), (4,), (2,)]) @parameterize("dtype", [torch.bfloat16, torch.float16]) def check_4gpu(shape, dtype): - world_size = dist.get_world_size() - rank = dist.get_rank() + dist.get_world_size() + dist.get_rank() x = torch.rand(shape, dtype=dtype, device=get_accelerator().get_current_device()) flat_padded_x = x.view(-1) @@ -35,4 +34,4 @@ def test_all_reduce(): if __name__ == "__main__": - test_all_reduce() \ No newline at end of file + test_all_reduce() diff --git a/tests/test_fp8/test_fp8_cast.py b/tests/test_fp8/test_fp8_cast.py index b880ba793892..db9a909e60a7 100644 --- a/tests/test_fp8/test_fp8_cast.py +++ b/tests/test_fp8/test_fp8_cast.py @@ -2,7 +2,7 @@ from torch.testing import assert_close from colossalai.accelerator import get_accelerator -from colossalai.quantization.fp8 import cast_to_fp8, cast_from_fp8, cast_to_fp8_pipeline, cast_from_fp8_pipeline +from colossalai.quantization.fp8 import cast_from_fp8, cast_from_fp8_pipeline, cast_to_fp8, cast_to_fp8_pipeline from colossalai.testing import parameterize @@ -23,4 +23,4 @@ def test_fp8_cast(shape, dtype, fp8_format): if __name__ == "__main__": - test_fp8_cast() \ No newline at end of file + test_fp8_cast() diff --git a/tests/test_fp8/test_fp8_ddp_comm_hook.py b/tests/test_fp8/test_fp8_ddp_comm_hook.py index 3fbe2573efcc..9bdfe17a1465 100644 --- a/tests/test_fp8/test_fp8_ddp_comm_hook.py +++ b/tests/test_fp8/test_fp8_ddp_comm_hook.py @@ -1,27 +1,28 @@ import os -import sys -import tempfile + import torch import torch.distributed as dist +import torch.multiprocessing as mp import torch.nn as nn import torch.optim as optim -import torch.multiprocessing as mp -from torch.testing import assert_close - from torch.nn.parallel import DistributedDataParallel as DDP +from torch.testing import assert_close # example modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html + def setup(rank, world_size): - os.environ['MASTER_ADDR'] = 'localhost' - os.environ['MASTER_PORT'] = '12355' + os.environ["MASTER_ADDR"] = "localhost" + os.environ["MASTER_PORT"] = "12355" # initialize the process group dist.init_process_group("nccl", rank=rank, world_size=world_size) + def cleanup(): dist.destroy_process_group() + class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() @@ -63,8 +64,7 @@ def get_grads_after_one_iteration(hook=None): grad_dict[name] = params.grad return grad_dict - - from colossalai.quantization.fp8 import fp8_compress_ddp_grad_comm_hook_sync, fp8_compress_ddp_grad_comm_hook_async + from colossalai.quantization.fp8 import fp8_compress_ddp_grad_comm_hook_async, fp8_compress_ddp_grad_comm_hook_sync grad_dict = get_grads_after_one_iteration() for hook in [fp8_compress_ddp_grad_comm_hook_sync, fp8_compress_ddp_grad_comm_hook_async]: @@ -75,15 +75,13 @@ def get_grads_after_one_iteration(hook=None): cleanup() + def run_demo(demo_fn, world_size): - mp.spawn(demo_fn, - args=(world_size,), - nprocs=world_size, - join=True) + mp.spawn(demo_fn, args=(world_size,), nprocs=world_size, join=True) if __name__ == "__main__": n_gpus = torch.cuda.device_count() assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}" world_size = n_gpus - run_demo(demo_basic, world_size) \ No newline at end of file + run_demo(demo_basic, world_size) diff --git a/tests/test_fp8/test_fp8_fsdp_comm_hook.py b/tests/test_fp8/test_fp8_fsdp_comm_hook.py index 785bc36327ca..d611ea41753a 100644 --- a/tests/test_fp8/test_fp8_fsdp_comm_hook.py +++ b/tests/test_fp8/test_fp8_fsdp_comm_hook.py @@ -1,30 +1,32 @@ import os -import sys -import tempfile + import torch import torch.distributed as dist +import torch.multiprocessing as mp import torch.nn as nn import torch.optim as optim -import torch.multiprocessing as mp -from torch.testing import assert_close - from torch.distributed.fsdp import FullyShardedDataParallel as FSDP +from torch.testing import assert_close from colossalai.quantization.utils import patch_fsdp_params_comm_hook + patch_fsdp_params_comm_hook() # example modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html + def setup(rank, world_size): - os.environ['MASTER_ADDR'] = 'localhost' - os.environ['MASTER_PORT'] = '12355' + os.environ["MASTER_ADDR"] = "localhost" + os.environ["MASTER_PORT"] = "12355" # initialize the process group dist.init_process_group("nccl", rank=rank, world_size=world_size) + def cleanup(): dist.destroy_process_group() + class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() @@ -36,7 +38,6 @@ def forward(self, x): return self.net2(self.relu(self.net1(x))) - def demo_basic(rank, world_size): print(f"Running basic FSDP example on rank {rank}.") setup(rank, world_size) @@ -69,17 +70,20 @@ def get_grads_after_one_iteration(grad_hook=None, params_hook=None): grad_dict[name] = params.grad return grad_dict - from colossalai.quantization.fp8 import fp8_compress_fsdp_grad_comm_hook, fp8_compress_fsdp_params_comm_hook grad_dict = get_grads_after_one_iteration() - for hook in [fp8_compress_fsdp_grad_comm_hook, ]: + for hook in [ + fp8_compress_fsdp_grad_comm_hook, + ]: grad_dict_w_hook = get_grads_after_one_iteration(grad_hook=hook) if dist.get_rank() == 0: for name in grad_dict: assert_close(grad_dict[name], grad_dict_w_hook[name], rtol=0.1, atol=0.1) - for hook in [fp8_compress_fsdp_params_comm_hook, ]: + for hook in [ + fp8_compress_fsdp_params_comm_hook, + ]: grad_dict_w_hook = get_grads_after_one_iteration(params_hook=hook) if dist.get_rank() == 0: for name in grad_dict: @@ -87,15 +91,13 @@ def get_grads_after_one_iteration(grad_hook=None, params_hook=None): cleanup() + def run_demo(demo_fn, world_size): - mp.spawn(demo_fn, - args=(world_size,), - nprocs=world_size, - join=True) + mp.spawn(demo_fn, args=(world_size,), nprocs=world_size, join=True) if __name__ == "__main__": n_gpus = torch.cuda.device_count() assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}" world_size = n_gpus - run_demo(demo_basic, world_size) \ No newline at end of file + run_demo(demo_basic, world_size) diff --git a/tests/test_fp8/test_fp8_reduce_scatter.py b/tests/test_fp8/test_fp8_reduce_scatter.py index 1cb47e72fa0f..807e636a4d03 100644 --- a/tests/test_fp8/test_fp8_reduce_scatter.py +++ b/tests/test_fp8/test_fp8_reduce_scatter.py @@ -41,4 +41,4 @@ def test_reduce_scatter(): if __name__ == "__main__": - test_reduce_scatter() \ No newline at end of file + test_reduce_scatter() From e3d545501e19b4129181f11c7624566597eb8d3d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 5 Aug 2024 02:16:25 +0000 Subject: [PATCH 10/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- colossalai/quantization/fp8.py | 2 -- tests/test_fp8/test_fp8_allreduce.py | 3 --- tests/test_fp8/test_fp8_reduce_scatter.py | 5 ----- 3 files changed, 10 deletions(-) diff --git a/colossalai/quantization/fp8.py b/colossalai/quantization/fp8.py index 1752c11729c4..dbf37f791556 100644 --- a/colossalai/quantization/fp8.py +++ b/colossalai/quantization/fp8.py @@ -3,7 +3,6 @@ import torch import torch.distributed as dist import torch.nn.functional as F - from torch.distributed import ReduceOp @@ -64,7 +63,6 @@ def cast_from_fp8( return ret.to(ret_type) - def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3", op=ReduceOp.SUM, group=None) -> None: r""" This is an in-place operation for compressed all_reduce using fp8. diff --git a/tests/test_fp8/test_fp8_allreduce.py b/tests/test_fp8/test_fp8_allreduce.py index e1bbda7d17e9..c23959b5d0da 100644 --- a/tests/test_fp8/test_fp8_allreduce.py +++ b/tests/test_fp8/test_fp8_allreduce.py @@ -1,7 +1,5 @@ import torch import torch.distributed as dist - -from torch.distributed.distributed_c10d import _get_default_group from torch.testing import assert_close from colossalai import launch @@ -36,7 +34,6 @@ def check_4gpu(shape, dtype, fp8_format): assert_close(x, x_fp8, rtol=0.1, atol=0.1) - def run_dist(rank, world_size, port): launch(rank=rank, world_size=world_size, port=port, host="localhost") check_4gpu() diff --git a/tests/test_fp8/test_fp8_reduce_scatter.py b/tests/test_fp8/test_fp8_reduce_scatter.py index 95a270814f10..c18446e39ea0 100644 --- a/tests/test_fp8/test_fp8_reduce_scatter.py +++ b/tests/test_fp8/test_fp8_reduce_scatter.py @@ -1,9 +1,5 @@ import torch - -import torch.distributed as dist -import torch.nn.functional as F from torch.distributed import reduce_scatter - from torch.distributed.distributed_c10d import _get_default_group from torch.testing import assert_close @@ -13,7 +9,6 @@ from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn - @parameterize("shape", [(16, 8, 4)]) @parameterize("scatter_dim", [0, 1, 2]) @parameterize("dtype", [torch.bfloat16, torch.float16]) From 1ed135578169ac74d5fac7a123f98bfa986b58dc Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Wed, 7 Aug 2024 14:16:10 +0800 Subject: [PATCH 11/18] add skip the test if torch < 2.2.0 --- colossalai/quantization/utils.py | 8 +-- tests/test_fp8/test_fp8_fsdp_comm_hook.py | 80 +++++++++++------------ 2 files changed, 42 insertions(+), 46 deletions(-) diff --git a/colossalai/quantization/utils.py b/colossalai/quantization/utils.py index 841ee2d48e54..bbf0c60f3876 100644 --- a/colossalai/quantization/utils.py +++ b/colossalai/quantization/utils.py @@ -1,3 +1,5 @@ +from packaging import version + import torch import torch.distributed as dist from torch import Tensor @@ -110,12 +112,10 @@ def register_params_comm_hook(self, state: object, hook: callable): self._handle._comm_hook_state = state -from packaging import version -from torch.distributed.fsdp._flat_param import FlatParamHandle -from torch.distributed.fsdp import FullyShardedDataParallel as FSDP - def patch_fsdp_params_comm_hook(): if version.parse(torch.__version__) >= version.parse("2.2.0"): + from torch.distributed.fsdp._flat_param import FlatParamHandle + from torch.distributed.fsdp import FullyShardedDataParallel as FSDP FlatParamHandle._comm_hook = None FlatParamHandle._comm_hook_state = None FlatParamHandle._all_gather_flat_param = _all_gather_flat_param diff --git a/tests/test_fp8/test_fp8_fsdp_comm_hook.py b/tests/test_fp8/test_fp8_fsdp_comm_hook.py index 785bc36327ca..342e0707949c 100644 --- a/tests/test_fp8/test_fp8_fsdp_comm_hook.py +++ b/tests/test_fp8/test_fp8_fsdp_comm_hook.py @@ -1,27 +1,18 @@ -import os -import sys -import tempfile +import pytest +from packaging import version + import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim -import torch.multiprocessing as mp from torch.testing import assert_close - from torch.distributed.fsdp import FullyShardedDataParallel as FSDP -from colossalai.quantization.utils import patch_fsdp_params_comm_hook -patch_fsdp_params_comm_hook() +from colossalai import launch +from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn # example modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html -def setup(rank, world_size): - os.environ['MASTER_ADDR'] = 'localhost' - os.environ['MASTER_PORT'] = '12355' - - # initialize the process group - dist.init_process_group("nccl", rank=rank, world_size=world_size) - def cleanup(): dist.destroy_process_group() @@ -35,12 +26,12 @@ def __init__(self): def forward(self, x): return self.net2(self.relu(self.net1(x))) +@parameterize("mode", ["grad", "params"]) +def run_model(mode): + rank = dist.get_rank() - -def demo_basic(rank, world_size): - print(f"Running basic FSDP example on rank {rank}.") - setup(rank, world_size) - + from colossalai.quantization.utils import patch_fsdp_params_comm_hook + patch_fsdp_params_comm_hook() def get_grads_after_one_iteration(grad_hook=None, params_hook=None): torch.manual_seed(0) # create model and move it to GPU with id rank @@ -72,30 +63,35 @@ def get_grads_after_one_iteration(grad_hook=None, params_hook=None): from colossalai.quantization.fp8 import fp8_compress_fsdp_grad_comm_hook, fp8_compress_fsdp_params_comm_hook - grad_dict = get_grads_after_one_iteration() - for hook in [fp8_compress_fsdp_grad_comm_hook, ]: - grad_dict_w_hook = get_grads_after_one_iteration(grad_hook=hook) - if dist.get_rank() == 0: - for name in grad_dict: - assert_close(grad_dict[name], grad_dict_w_hook[name], rtol=0.1, atol=0.1) - - for hook in [fp8_compress_fsdp_params_comm_hook, ]: - grad_dict_w_hook = get_grads_after_one_iteration(params_hook=hook) - if dist.get_rank() == 0: - for name in grad_dict: - assert_close(grad_dict[name], grad_dict_w_hook[name], rtol=0.1, atol=0.1) - + if mode == "grad": + grad_dict = get_grads_after_one_iteration() + for hook in [fp8_compress_fsdp_grad_comm_hook, ]: + grad_dict_w_hook = get_grads_after_one_iteration(grad_hook=hook) + if dist.get_rank() == 0: + for name in grad_dict: + assert_close(grad_dict[name], grad_dict_w_hook[name], rtol=0.1, atol=0.1) + elif mode == "params": + grad_dict = get_grads_after_one_iteration() + for hook in [fp8_compress_fsdp_params_comm_hook, ]: + grad_dict_w_hook = get_grads_after_one_iteration(params_hook=hook) + if dist.get_rank() == 0: + for name in grad_dict: + assert_close(grad_dict[name], grad_dict_w_hook[name], rtol=0.1, atol=0.1) + else: + raise NotImplementedError + +def demo_basic(rank, world_size, port): + print(f"Running basic FSDP example on rank {rank}.") + launch(rank=rank, world_size=world_size, port=port, host="localhost") + run_model() cleanup() -def run_demo(demo_fn, world_size): - mp.spawn(demo_fn, - args=(world_size,), - nprocs=world_size, - join=True) - - -if __name__ == "__main__": +@rerun_if_address_is_in_use() +@pytest.mark.skipif(version.parse(torch.__version__) < version.parse("2.2.0"), reason="torch version < 2.2.0.") +def test_fsdp(): n_gpus = torch.cuda.device_count() assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}" - world_size = n_gpus - run_demo(demo_basic, world_size) \ No newline at end of file + spawn(demo_basic, n_gpus) + +if __name__ == "__main__": + test_fsdp() \ No newline at end of file From edd0d64b2b93eb8ba4ee642c8c3b7b1b70ea9ccb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 7 Aug 2024 06:20:00 +0000 Subject: [PATCH 12/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- colossalai/quantization/utils.py | 33 +++++++---------------- tests/test_fp8/test_fp8_fsdp_comm_hook.py | 24 ++++++++++++----- 2 files changed, 27 insertions(+), 30 deletions(-) diff --git a/colossalai/quantization/utils.py b/colossalai/quantization/utils.py index bbf0c60f3876..5b1e11c9f345 100644 --- a/colossalai/quantization/utils.py +++ b/colossalai/quantization/utils.py @@ -1,15 +1,14 @@ -from packaging import version - import torch import torch.distributed as dist +from packaging import version from torch import Tensor from torch.distributed.fsdp._common_utils import _no_dispatch_record_stream from torch.distributed.utils import _p_assert def _all_gather_flat_param( - self, - padded_unsharded_flat_param: Tensor, + self, + padded_unsharded_flat_param: Tensor, ) -> Tensor: """ All-gather the handle's flat parameter to the destination ``padded_unsharded_flat_param``. @@ -27,17 +26,11 @@ def _all_gather_flat_param( f"Expects {expected_numel} numel but got {padded_unsharded_flat_param.numel()}", ) - pg = ( - self._fake_process_group - if self._use_fake_all_gather - else self.process_group - ) + pg = self._fake_process_group if self._use_fake_all_gather else self.process_group # HACK this should be handled by C10D if sharded_flat_param.is_cpu: # type: ignore[attr-defined] - tensor_list = list( - torch.chunk(padded_unsharded_flat_param, dist.get_world_size(pg)) - ) + tensor_list = list(torch.chunk(padded_unsharded_flat_param, dist.get_world_size(pg))) work = dist.all_gather(tensor_list, sharded_flat_param, group=pg) else: if self._comm_hook is None: @@ -47,9 +40,7 @@ def _all_gather_flat_param( pg, ) else: - self._comm_hook( - None, padded_unsharded_flat_param, sharded_flat_param, pg - ) + self._comm_hook(None, padded_unsharded_flat_param, sharded_flat_param, pg) if self._offload_params: # In case of offloading, `flat_param.data` (i.e. sharded param) is @@ -94,9 +85,7 @@ def register_params_comm_hook(self, state: object, hook: callable): """ if not self.check_is_root(): - raise AssertionError( - "register_comm_hook can only be called on a root instance." - ) + raise AssertionError("register_comm_hook can only be called on a root instance.") # if fsdp_state.sharding_strategy in HYBRID_SHARDING_STRATEGIES: # raise AssertionError( @@ -105,21 +94,19 @@ def register_params_comm_hook(self, state: object, hook: callable): if self._handle._comm_hook is not None: raise AssertionError("A communication hook is already registered") if not callable(hook): - raise ValueError( - f"The communication hook must be callable but got {hook}" - ) + raise ValueError(f"The communication hook must be callable but got {hook}") self._handle._comm_hook = hook self._handle._comm_hook_state = state def patch_fsdp_params_comm_hook(): if version.parse(torch.__version__) >= version.parse("2.2.0"): - from torch.distributed.fsdp._flat_param import FlatParamHandle from torch.distributed.fsdp import FullyShardedDataParallel as FSDP + from torch.distributed.fsdp._flat_param import FlatParamHandle + FlatParamHandle._comm_hook = None FlatParamHandle._comm_hook_state = None FlatParamHandle._all_gather_flat_param = _all_gather_flat_param FSDP.register_params_comm_hook = register_params_comm_hook else: raise RuntimeError("This fsdp_params_comm_hook patch is not supported while torch version under 2.2.0.") - diff --git a/tests/test_fp8/test_fp8_fsdp_comm_hook.py b/tests/test_fp8/test_fp8_fsdp_comm_hook.py index 342e0707949c..d802f51b3db0 100644 --- a/tests/test_fp8/test_fp8_fsdp_comm_hook.py +++ b/tests/test_fp8/test_fp8_fsdp_comm_hook.py @@ -1,21 +1,22 @@ import pytest -from packaging import version - import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim -from torch.testing import assert_close +from packaging import version from torch.distributed.fsdp import FullyShardedDataParallel as FSDP +from torch.testing import assert_close from colossalai import launch from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn # example modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html + def cleanup(): dist.destroy_process_group() + class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() @@ -26,12 +27,15 @@ def __init__(self): def forward(self, x): return self.net2(self.relu(self.net1(x))) + @parameterize("mode", ["grad", "params"]) def run_model(mode): rank = dist.get_rank() from colossalai.quantization.utils import patch_fsdp_params_comm_hook + patch_fsdp_params_comm_hook() + def get_grads_after_one_iteration(grad_hook=None, params_hook=None): torch.manual_seed(0) # create model and move it to GPU with id rank @@ -60,19 +64,22 @@ def get_grads_after_one_iteration(grad_hook=None, params_hook=None): grad_dict[name] = params.grad return grad_dict - from colossalai.quantization.fp8 import fp8_compress_fsdp_grad_comm_hook, fp8_compress_fsdp_params_comm_hook if mode == "grad": grad_dict = get_grads_after_one_iteration() - for hook in [fp8_compress_fsdp_grad_comm_hook, ]: + for hook in [ + fp8_compress_fsdp_grad_comm_hook, + ]: grad_dict_w_hook = get_grads_after_one_iteration(grad_hook=hook) if dist.get_rank() == 0: for name in grad_dict: assert_close(grad_dict[name], grad_dict_w_hook[name], rtol=0.1, atol=0.1) elif mode == "params": grad_dict = get_grads_after_one_iteration() - for hook in [fp8_compress_fsdp_params_comm_hook, ]: + for hook in [ + fp8_compress_fsdp_params_comm_hook, + ]: grad_dict_w_hook = get_grads_after_one_iteration(params_hook=hook) if dist.get_rank() == 0: for name in grad_dict: @@ -80,12 +87,14 @@ def get_grads_after_one_iteration(grad_hook=None, params_hook=None): else: raise NotImplementedError + def demo_basic(rank, world_size, port): print(f"Running basic FSDP example on rank {rank}.") launch(rank=rank, world_size=world_size, port=port, host="localhost") run_model() cleanup() + @rerun_if_address_is_in_use() @pytest.mark.skipif(version.parse(torch.__version__) < version.parse("2.2.0"), reason="torch version < 2.2.0.") def test_fsdp(): @@ -93,5 +102,6 @@ def test_fsdp(): assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}" spawn(demo_basic, n_gpus) + if __name__ == "__main__": - test_fsdp() \ No newline at end of file + test_fsdp() From 53ea42b79ee367b1f58d8784511a5b792c72041b Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Wed, 7 Aug 2024 15:56:40 +0800 Subject: [PATCH 13/18] add skip the test if torch < 2.2.0 --- tests/test_fp8/test_fp8_fsdp_comm_hook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_fp8/test_fp8_fsdp_comm_hook.py b/tests/test_fp8/test_fp8_fsdp_comm_hook.py index d802f51b3db0..737c8d84996c 100644 --- a/tests/test_fp8/test_fp8_fsdp_comm_hook.py +++ b/tests/test_fp8/test_fp8_fsdp_comm_hook.py @@ -95,8 +95,8 @@ def demo_basic(rank, world_size, port): cleanup() +@pytest.mark.skipif(version.parse(torch.__version__) < version.parse("2.4.0"), reason="torch version < 2.2.0.") @rerun_if_address_is_in_use() -@pytest.mark.skipif(version.parse(torch.__version__) < version.parse("2.2.0"), reason="torch version < 2.2.0.") def test_fsdp(): n_gpus = torch.cuda.device_count() assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}" From 0314698a3e823550dcf6dbc017fc4ded3e8068c2 Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Wed, 7 Aug 2024 16:00:17 +0800 Subject: [PATCH 14/18] add skip the test if torch < 2.2.0 --- tests/test_fp8/test_fp8_fsdp_comm_hook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_fp8/test_fp8_fsdp_comm_hook.py b/tests/test_fp8/test_fp8_fsdp_comm_hook.py index 737c8d84996c..3d0660961f17 100644 --- a/tests/test_fp8/test_fp8_fsdp_comm_hook.py +++ b/tests/test_fp8/test_fp8_fsdp_comm_hook.py @@ -95,7 +95,7 @@ def demo_basic(rank, world_size, port): cleanup() -@pytest.mark.skipif(version.parse(torch.__version__) < version.parse("2.4.0"), reason="torch version < 2.2.0.") +@pytest.mark.skipif(version.parse(torch.__version__) < version.parse("2.2.0"), reason="torch version < 2.2.0.") @rerun_if_address_is_in_use() def test_fsdp(): n_gpus = torch.cuda.device_count() From b4aa1e445e16b4ca26dac275d8c645f98dcbb60c Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Wed, 7 Aug 2024 17:16:39 +0800 Subject: [PATCH 15/18] add fp8_comm flag --- examples/language/llama/benchmark.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/examples/language/llama/benchmark.py b/examples/language/llama/benchmark.py index e530e2d6a153..2bd9671d81b7 100644 --- a/examples/language/llama/benchmark.py +++ b/examples/language/llama/benchmark.py @@ -98,7 +98,7 @@ def main(): parser.add_argument("--disable-async-reduce", action="store_true", help="Disable the asynchronous reduce operation") parser.add_argument("--prefetch_num", type=int, default=0, help="chunk prefetch max number") parser.add_argument("--no_cache", action="store_true") - parser.add_argument("--overlap_allgather", action="store_true") + parser.add_argument("--use_fp8_comm", action="store_true", default=False, help="for using fp8 during communication") args = parser.parse_args() colossalai.launch_from_torch() @@ -158,6 +158,7 @@ def empty_init(): buffer_dtype=torch.float16, ), param_init_fn=empty_init(), + fp8_communication=args.use_fp8_comm, ) else: plugin = TorchFSDPPlugin( @@ -165,7 +166,8 @@ def empty_init(): param_dtype=torch.float16, reduce_dtype=torch.float16, buffer_dtype=torch.float16, - ) + ), + fp8_communication=args.use_fp8_comm, ) elif args.plugin == "fsdp_cpu": if use_empty_init: @@ -177,6 +179,7 @@ def empty_init(): ), cpu_offload=CPUOffload(offload_params=True), param_init_fn=empty_init(), + fp8_communication=args.use_fp8_comm, ) else: plugin = TorchFSDPPlugin( @@ -186,6 +189,7 @@ def empty_init(): buffer_dtype=torch.float16, ), cpu_offload=CPUOffload(offload_params=True), + fp8_communication=args.use_fp8_comm, ) elif args.plugin == "3d": plugin = HybridParallelPlugin( @@ -200,9 +204,9 @@ def empty_init(): enable_flash_attention=args.xformers, microbatch_size=args.mbs, precision="bf16", + dp_outside=False, overlap_p2p=args.overlap, enable_metadata_cache=not args.no_cache, - overlap_allgather=args.overlap_allgather, **hybrid_kwargs, ) elif args.plugin == "3d_cpu": @@ -293,7 +297,7 @@ def empty_init(): with get_profile_context( args.profile, args.ignore_steps, - 1, # avoid creating massive log files + len(dataloader) - 1, save_dir=f"profile/{time.strftime('%H:%M', time.localtime())}-{args.plugin}-llama-{args.config}", ) as prof: if isinstance(plugin, HybridParallelPlugin) and args.pp > 1: From 90c42801bd24429d54c6358e9c326be346bcc303 Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Wed, 7 Aug 2024 17:25:53 +0800 Subject: [PATCH 16/18] rebase latest fp8 operators --- colossalai/quantization/fp8.py | 117 +-------------------------------- 1 file changed, 1 insertion(+), 116 deletions(-) diff --git a/colossalai/quantization/fp8.py b/colossalai/quantization/fp8.py index 359003b915ac..64e2fe438391 100644 --- a/colossalai/quantization/fp8.py +++ b/colossalai/quantization/fp8.py @@ -1,4 +1,4 @@ -from typing import Any, Optional +from typing import Any, Callable, List, Optional, Tuple, Union, cast import torch import torch.distributed as dist @@ -119,62 +119,6 @@ def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3", op=ReduceOp.SUM, gro tensor.copy_(out[:input_size].view(input_shape).to(input_type)) -def all_to_all_single_fp8( - output, input, output_split_sizes=None, input_split_sizes=None, fp8_format="e5m2", group=None, async_op=False -) -> None: - r""" - This is an in-place operation for compressed all_reduce using fp8. - It works like dist.all_to_all_single but during communication the data is cast to fp8 format. - Args: - tensor: torch.Tensor in fp32, fp16, bf16 datatype. - fp8_format: e4m3 or e5m2 - Returns: - None - """ - world_size = dist.get_world_size(group=group) - input_type = input.dtype - input_shape = input.shape - input_device = input.device - input = input.flatten() - - fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 - - ret, scale = cast_to_fp8(input, fp8_format=fp8_format) - - inp = ret.view(torch.uint8) - if input_split_sizes is not None: - input_split_sizes = [input_split_sizes[i] * np.prod(input_shape[1:]) for i in range(world_size)] - input_chunks = list(torch.split(inp, input_split_sizes)) - else: - input_chunks = list(torch.chunk(inp, world_size, dim=0)) - - if output_split_sizes is not None: - output_chunks = [ - torch.empty((output_split_sizes[i] * np.prod(input_shape[1:]),), device=input_device, dtype=inp.dtype) - for i in range(world_size) - ] - else: - if dist.get_rank() == world_size - 1: - output_chunks = [torch.empty_like(input_chunks[-1]) for _ in range(world_size)] - else: - output_chunks = [torch.empty_like(input_chunks[0]) for _ in range(world_size)] - - dist.all_to_all(output_chunks, input_chunks, group=group) - scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] - dist.all_gather(scale_list, scale, group=group) - cast_output_chunk = [ - cast_from_fp8(out.view(fp8_type), scale, input_type) for scale, out in zip(scale_list, output_chunks) - ] - - tensor_out = torch.cat(cast_output_chunk, dim=0) - outputs_shape = list(input_shape) - if output_split_sizes is not None: - outputs_shape[0] = sum(output_split_sizes) - else: - outputs_shape = input_shape - output.data = tensor_out.view(outputs_shape).to(input_type) - - def cast_to_fp8_pipeline(inp: Any) -> None: """ Cast the hidden_states tensor of inp object to fp8 format before p2p communication in pipeline. @@ -618,62 +562,3 @@ def gather_fp8(output_list, input_, group=None, fp8_format="e5m2"): output = tensor_list[i].view(fp8_type) scale = scale_list[i] output_list[i].copy_(cast_from_fp8(output, scale, input_type)) - - -class _LinearFp8(torch.autograd.Function): - @staticmethod - def forward( - ctx: Any, - x: torch.Tensor, - w: torch.Tensor, - bias: Optional[torch.Tensor], - ) -> Any: - assert ( - x.dtype in (torch.bfloat16, torch.float16) and x.dtype == w.dtype - ), "Only float16 and bfloat16 are allowed." - if bias is not None: - assert bias.dtype == x.dtype, "Bias should have the same dtype as input." - # ensure x and w are row-major - assert x.is_contiguous() and w.is_contiguous(), "Input and weight should be contiguous." - ctx.x_shape = x.shape - ctx.has_bias = bias is not None - ctx.out_dtype = x.dtype - x = x.reshape(-1, x.shape[-1]) - - x_fp8, inv_scale_x = cast_to_fp8(x, fp8_format="e4m3") - w_fp8, inv_scale_w = cast_to_fp8(w, fp8_format="e4m3") - ctx.x_fp8 = x_fp8 - ctx.w_fp8_t = w_fp8.t() - ctx.inv_scale_x = inv_scale_x - ctx.inv_scale_w = inv_scale_w - out = torch._scaled_mm( - x_fp8, ctx.w_fp8_t, bias=bias, out_dtype=ctx.out_dtype, scale_a=inv_scale_x, scale_b=inv_scale_w - )[0] - return out.reshape(*ctx.x_shape[:-1], w.shape[0]) - - @staticmethod - def backward(ctx: Any, out_grad) -> Any: - out_grad = out_grad.reshape(-1, out_grad.shape[-1]) - out_grad_fp8, out_grad_scale = cast_to_fp8(out_grad, fp8_format="e5m2") - x_grad = torch._scaled_mm( - out_grad_fp8, - ctx.w_fp8_t.contiguous().t(), - out_dtype=ctx.out_dtype, - scale_a=out_grad_scale, - scale_b=ctx.inv_scale_w, - )[0] - w_grad = torch._scaled_mm( - out_grad_fp8.t().contiguous(), - ctx.x_fp8.t().contiguous().t(), - out_dtype=ctx.out_dtype, - scale_a=out_grad_scale, - scale_b=ctx.inv_scale_x, - )[0] - bias_grad = None - if ctx.has_bias: - bias_grad = out_grad.sum(0) - return x_grad.reshape(ctx.x_shape), w_grad, bias_grad - - -def linear_fp8(x: torch.Tensor, w: torch.Tensor, bias: Optional[torch.Tensor] = None) -> torch.Tensor: - return _LinearFp8.apply(x, w, bias) From 6079b138773412049319bfe575cfed0c6096d4f5 Mon Sep 17 00:00:00 2001 From: BurkeHulk Date: Wed, 7 Aug 2024 18:04:57 +0800 Subject: [PATCH 17/18] rebase latest fp8 operators --- colossalai/quantization/fp8.py | 143 +++++++++++++++++++++++++++------ 1 file changed, 118 insertions(+), 25 deletions(-) diff --git a/colossalai/quantization/fp8.py b/colossalai/quantization/fp8.py index 64e2fe438391..c7f84e47f8bb 100644 --- a/colossalai/quantization/fp8.py +++ b/colossalai/quantization/fp8.py @@ -1,10 +1,12 @@ -from typing import Any, Callable, List, Optional, Tuple, Union, cast +from typing import Any, Optional import torch import torch.distributed as dist import torch.nn.functional as F from torch.distributed import ReduceOp +import numpy as np + def cast_to_fp8(inp: torch.Tensor, fp8_format="e4m3", per_channel_scale=False) -> (torch.Tensor, torch.Tensor): r""" @@ -44,12 +46,10 @@ def cast_from_fp8( inp: torch.Tensor, scale_inv: torch.Tensor, ret_type: torch.dtype, per_channel_scale=False ) -> torch.Tensor: r""" - Args: inp: should be a fp8 torch tensor in one of the types: [torch.float8_e4m3fn, torch.float8_e5m2]. scale: scaling factor returned by cast_to_fp8 function. ret_type: the datatype of the returned tensor. - Returns: torch.Tensor """ @@ -103,7 +103,6 @@ def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3", op=ReduceOp.SUM, gro for scale, out in zip(scale_list, output_chunks): out = out.view(fp8_type) summed_out += cast_from_fp8(out, scale, input_type) - summed_out.div_(world_size) if op == ReduceOp.AVG: summed_out.div_(world_size) @@ -119,6 +118,62 @@ def all_reduce_fp8(tensor: torch.Tensor, fp8_format="e4m3", op=ReduceOp.SUM, gro tensor.copy_(out[:input_size].view(input_shape).to(input_type)) +def all_to_all_single_fp8( + output, input, output_split_sizes=None, input_split_sizes=None, fp8_format="e5m2", group=None, async_op=False +) -> None: + r""" + This is an in-place operation for compressed all_reduce using fp8. + It works like dist.all_to_all_single but during communication the data is cast to fp8 format. + Args: + tensor: torch.Tensor in fp32, fp16, bf16 datatype. + fp8_format: e4m3 or e5m2 + Returns: + None + """ + world_size = dist.get_world_size(group=group) + input_type = input.dtype + input_shape = input.shape + input_device = input.device + input = input.flatten() + + fp8_type = torch.float8_e4m3fn if fp8_format == "e4m3" else torch.float8_e5m2 + + ret, scale = cast_to_fp8(input, fp8_format=fp8_format) + + inp = ret.view(torch.uint8) + if input_split_sizes is not None: + input_split_sizes = [input_split_sizes[i] * np.prod(input_shape[1:]) for i in range(world_size)] + input_chunks = list(torch.split(inp, input_split_sizes)) + else: + input_chunks = list(torch.chunk(inp, world_size, dim=0)) + + if output_split_sizes is not None: + output_chunks = [ + torch.empty((output_split_sizes[i] * np.prod(input_shape[1:]),), device=input_device, dtype=inp.dtype) + for i in range(world_size) + ] + else: + if dist.get_rank() == world_size - 1: + output_chunks = [torch.empty_like(input_chunks[-1]) for _ in range(world_size)] + else: + output_chunks = [torch.empty_like(input_chunks[0]) for _ in range(world_size)] + + dist.all_to_all(output_chunks, input_chunks, group=group) + scale_list = [torch.ones(1, dtype=scale.dtype, device=input_device) for _ in range(world_size)] + dist.all_gather(scale_list, scale, group=group) + cast_output_chunk = [ + cast_from_fp8(out.view(fp8_type), scale, input_type) for scale, out in zip(scale_list, output_chunks) + ] + + tensor_out = torch.cat(cast_output_chunk, dim=0) + outputs_shape = list(input_shape) + if output_split_sizes is not None: + outputs_shape[0] = sum(output_split_sizes) + else: + outputs_shape = input_shape + output.data = tensor_out.view(outputs_shape).to(input_type) + + def cast_to_fp8_pipeline(inp: Any) -> None: """ Cast the hidden_states tensor of inp object to fp8 format before p2p communication in pipeline. @@ -523,27 +578,6 @@ def all_to_all_fp8(output_list, input_list, group=None, fp8_format="e5m2"): output_list[i].copy_(cast_from_fp8(tensor, scale, input_type)) -def all_to_all_single_fp8(output_tensor, input_tensor, group=None, fp8_format="e5m2"): - - world_size = dist.get_world_size(group) - - per_slice_len = input_tensor.size(0) // world_size - input_type = input_tensor.dtype - ret, scale = cast_to_fp8(input_tensor, fp8_format=fp8_format) - fp8_type = ret.dtype - input_tensor = ret.view(torch.uint8) - tensor = torch.empty_like(input_tensor) - scale_list = [torch.empty_like(scale) for _ in range(world_size)] - dist.all_to_all_single(tensor, input_tensor, group=group) - dist.all_gather(scale_list, scale, group=group) - cast_tensor_list = [] - - for i in range(world_size): - output_part = tensor[per_slice_len * i : per_slice_len * (i + 1)].view(fp8_type) - output_part = cast_from_fp8(output_part, scale_list[i], input_type) - cast_tensor_list.append(output_part) - output_tensor.copy_(torch.concatenate(cast_tensor_list, dim=0)) - def gather_fp8(output_list, input_, group=None, fp8_format="e5m2"): @@ -562,3 +596,62 @@ def gather_fp8(output_list, input_, group=None, fp8_format="e5m2"): output = tensor_list[i].view(fp8_type) scale = scale_list[i] output_list[i].copy_(cast_from_fp8(output, scale, input_type)) + + +class _LinearFp8(torch.autograd.Function): + @staticmethod + def forward( + ctx: Any, + x: torch.Tensor, + w: torch.Tensor, + bias: Optional[torch.Tensor], + ) -> Any: + assert ( + x.dtype in (torch.bfloat16, torch.float16) and x.dtype == w.dtype + ), "Only float16 and bfloat16 are allowed." + if bias is not None: + assert bias.dtype == x.dtype, "Bias should have the same dtype as input." + # ensure x and w are row-major + assert x.is_contiguous() and w.is_contiguous(), "Input and weight should be contiguous." + ctx.x_shape = x.shape + ctx.has_bias = bias is not None + ctx.out_dtype = x.dtype + x = x.reshape(-1, x.shape[-1]) + + x_fp8, inv_scale_x = cast_to_fp8(x, fp8_format="e4m3") + w_fp8, inv_scale_w = cast_to_fp8(w, fp8_format="e4m3") + ctx.x_fp8 = x_fp8 + ctx.w_fp8_t = w_fp8.t() + ctx.inv_scale_x = inv_scale_x + ctx.inv_scale_w = inv_scale_w + out = torch._scaled_mm( + x_fp8, ctx.w_fp8_t, bias=bias, out_dtype=ctx.out_dtype, scale_a=inv_scale_x, scale_b=inv_scale_w + )[0] + return out.reshape(*ctx.x_shape[:-1], w.shape[0]) + + @staticmethod + def backward(ctx: Any, out_grad) -> Any: + out_grad = out_grad.reshape(-1, out_grad.shape[-1]) + out_grad_fp8, out_grad_scale = cast_to_fp8(out_grad, fp8_format="e5m2") + x_grad = torch._scaled_mm( + out_grad_fp8, + ctx.w_fp8_t.contiguous().t(), + out_dtype=ctx.out_dtype, + scale_a=out_grad_scale, + scale_b=ctx.inv_scale_w, + )[0] + w_grad = torch._scaled_mm( + out_grad_fp8.t().contiguous(), + ctx.x_fp8.t().contiguous().t(), + out_dtype=ctx.out_dtype, + scale_a=out_grad_scale, + scale_b=ctx.inv_scale_x, + )[0] + bias_grad = None + if ctx.has_bias: + bias_grad = out_grad.sum(0) + return x_grad.reshape(ctx.x_shape), w_grad, bias_grad + + +def linear_fp8(x: torch.Tensor, w: torch.Tensor, bias: Optional[torch.Tensor] = None) -> torch.Tensor: + return _LinearFp8.apply(x, w, bias) \ No newline at end of file From 6a93e238730799ce3447f243c56605b5fc89ee6f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 7 Aug 2024 10:06:07 +0000 Subject: [PATCH 18/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- colossalai/quantization/fp8.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/colossalai/quantization/fp8.py b/colossalai/quantization/fp8.py index c7f84e47f8bb..a6700fcaf123 100644 --- a/colossalai/quantization/fp8.py +++ b/colossalai/quantization/fp8.py @@ -1,12 +1,11 @@ from typing import Any, Optional +import numpy as np import torch import torch.distributed as dist import torch.nn.functional as F from torch.distributed import ReduceOp -import numpy as np - def cast_to_fp8(inp: torch.Tensor, fp8_format="e4m3", per_channel_scale=False) -> (torch.Tensor, torch.Tensor): r""" @@ -578,7 +577,6 @@ def all_to_all_fp8(output_list, input_list, group=None, fp8_format="e5m2"): output_list[i].copy_(cast_from_fp8(tensor, scale, input_type)) - def gather_fp8(output_list, input_, group=None, fp8_format="e5m2"): world_size = dist.get_world_size(group) @@ -654,4 +652,4 @@ def backward(ctx: Any, out_grad) -> Any: def linear_fp8(x: torch.Tensor, w: torch.Tensor, bias: Optional[torch.Tensor] = None) -> torch.Tensor: - return _LinearFp8.apply(x, w, bias) \ No newline at end of file + return _LinearFp8.apply(x, w, bias)