From 31d4c6b40c6cdbfc41649d8a58f44df54b77a3ac Mon Sep 17 00:00:00 2001 From: Mingyan Jiang <1829166702@qq.com> Date: Mon, 8 May 2023 17:24:35 +0800 Subject: [PATCH 01/10] [booster] update tests for booster --- .../test_gemini_checkpoint_io.py | 118 ++++++++++++++++++ .../test_low_level_zero_checkpoint_io.py | 0 .../test_torch_ddp_checkpoint_io.py | 0 3 files changed, 118 insertions(+) create mode 100644 tests/test_checkpoint_io/test_gemini_checkpoint_io.py create mode 100644 tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py create mode 100644 tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py diff --git a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py new file mode 100644 index 000000000000..c5ed70f9933a --- /dev/null +++ b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py @@ -0,0 +1,118 @@ +import tempfile + +import pytest +import torch + +import colossalai +from colossalai.booster.plugin.gemini_plugin import GeminiCheckpointIO +from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn +from colossalai.utils.cuda import get_current_device +from colossalai.zero import ColoInitContext, ZeroDDP +from colossalai.zero.gemini.chunk import ChunkManager, search_chunk_configuration +from colossalai.zero.gemini.gemini_mgr import GeminiManager +from tests.components_to_test.registry import non_distributed_component_funcs + + +@parameterize('placement_policy', ['cuda', 'cpu']) +@parameterize('model_name', ['bert']) +@parameterize('use_safetensors', [True, False]) +def exam_state_dict_with_origin(placement_policy, model_name, use_safetensors: bool): + from transformers import BertForSequenceClassification + model_ckpt_dir = tempfile.TemporaryDirectory() + get_components_func = non_distributed_component_funcs.get_callable(model_name) + model_builder, *_ = get_components_func() + with ColoInitContext(device=(get_current_device())): + bert_model = model_builder() + bert_model.config.save_pretrained(save_directory=(model_ckpt_dir.name)) + + config_dict, *_ = search_chunk_configuration(bert_model, search_range_mb=1, search_interval_byte=100) + chunk_manager = ChunkManager(config_dict) + gemini_manager = GeminiManager(placement_policy, chunk_manager) + bert_model = ZeroDDP(bert_model, gemini_manager) + bert_model.train() + + ckpt_io = GeminiCheckpointIO() + if ckpt_io.coordinator.is_master(): + model_size = sum(p.numel() * p.element_size() for p in bert_model.parameters()) / 1024**2 + ckpt_io.save_model(bert_model, (model_ckpt_dir.name), + True, + True, + '', (model_size / 3), + use_safetensors=use_safetensors) + new_bert_model = BertForSequenceClassification.from_pretrained(model_ckpt_dir.name) + recursive_check(bert_model.state_dict(only_rank_0=True, dtype=(torch.float32)), new_bert_model.state_dict()) + model_ckpt_dir.cleanup() + + +@parameterize('placement_policy', ['cuda', 'cpu']) +@parameterize('model_name', ['gpt2', 'bert']) +@parameterize('use_safetensors', [True, False]) +def exam_state_dict(placement_policy, model_name: str, use_safetensors: bool): + get_components_func = non_distributed_component_funcs.get_callable(model_name) + model_builder, *_ = get_components_func() + with ColoInitContext(device=(get_current_device())): + model = model_builder() + new_model = model_builder() + config_dict, *_ = search_chunk_configuration(model, search_range_mb=1, search_interval_byte=100) + chunk_manager = ChunkManager(config_dict) + gemini_manager = GeminiManager(placement_policy, chunk_manager) + model = ZeroDDP(model, gemini_manager) + + model.train() + #new model + new_config_dict, *_ = search_chunk_configuration(new_model, search_range_mb=1, search_interval_byte=100) + new_chunk_manager = ChunkManager(new_config_dict) + new_gemini_manager = GeminiManager(placement_policy, new_chunk_manager) + new_model = ZeroDDP(new_model, new_gemini_manager) + + model_ckpt_dir = tempfile.TemporaryDirectory() + ckpt_io = GeminiCheckpointIO() + model_size = sum(p.numel() * p.element_size() for p in model.parameters()) / 1024**2 + ckpt_io.save_model(model, (model_ckpt_dir.name), + True, + True, + 'epoch', (model_size / 3), + use_safetensors=use_safetensors) + + if ckpt_io.coordinator.is_master(): + ckpt_io.load_model(new_model, (model_ckpt_dir.name), strict=True) + model_dict = model.state_dict(only_rank_0=True) + new_model_dict = new_model.state_dict(only_rank_0=True) + recursive_check(model_dict, new_model_dict) + model_ckpt_dir.cleanup() + + +def run_dist(rank, world_size, port): + config = {} + colossalai.launch(config=config, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + exam_state_dict() + exam_state_dict_with_origin() + + +@pytest.mark.dist +@pytest.mark.parametrize('world_size', [4, 4]) +@rerun_if_address_is_in_use() +def test_gemini_ckpIO(world_size): + spawn(run_dist, world_size) + + +def recursive_check(d1, d2): + for k, v in d1.items(): + if isinstance(v, dict): + recursive_check(v, d2[k]) + elif isinstance(v, list): + for i in range(len(v)): + if isinstance(v[i], torch.Tensor): + v[i] = v[i].to('cpu') + d2[k][i] = d2[k][i].to('cpu') + if not torch.equal(v[i], d2[k][i]): + raise AssertionError + elif not v[i] == d2[k][i]: + raise AssertionError + + elif isinstance(v, torch.Tensor): + v = v.to('cpu') + d2[k] = d2[k].to('cpu') + assert torch.equal(v, d2[k]) + elif not v == d2[k]: + raise AssertionError diff --git a/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py b/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py new file mode 100644 index 000000000000..e69de29bb2d1 From d8bee3ff5b16cb8695cec53e5e71554aff481fe1 Mon Sep 17 00:00:00 2001 From: Mingyan Jiang <1829166702@qq.com> Date: Tue, 9 May 2023 16:07:23 +0800 Subject: [PATCH 02/10] [booster] update tests for booster --- colossalai/booster/plugin/torch_ddp_plugin.py | 4 + docs/source/en/basics/colossalai-booster.md | 124 ++++++++++++++++++ .../zh-Hans/basics/colossalai_booster.md | 0 .../test_gemini_checkpoint_io.py | 1 + .../test_general_checkpoint_io.py | 98 +------------- .../test_low_level_zero_checkpoint_io.py | 74 +++++++++++ .../test_torch_ddp_checkpoint_io.py | 86 ++++++++++++ 7 files changed, 294 insertions(+), 93 deletions(-) create mode 100644 docs/source/en/basics/colossalai-booster.md create mode 100644 docs/source/zh-Hans/basics/colossalai_booster.md diff --git a/colossalai/booster/plugin/torch_ddp_plugin.py b/colossalai/booster/plugin/torch_ddp_plugin.py index 76906d844ef1..dfef384567ae 100644 --- a/colossalai/booster/plugin/torch_ddp_plugin.py +++ b/colossalai/booster/plugin/torch_ddp_plugin.py @@ -1,3 +1,4 @@ +from contextlib import contextmanager from typing import Callable, List, Tuple, Union import torch.nn as nn @@ -115,6 +116,9 @@ def control_device(self) -> bool: def supported_devices(self) -> List[str]: return ['cuda'] + def no_sync(self, model) -> contextmanager: + return model.no_sync() + def configure( self, model: nn.Module, diff --git a/docs/source/en/basics/colossalai-booster.md b/docs/source/en/basics/colossalai-booster.md new file mode 100644 index 000000000000..fc33e8cbe039 --- /dev/null +++ b/docs/source/en/basics/colossalai-booster.md @@ -0,0 +1,124 @@ +# colossal-ai booster + +**Prerequisite:** +- [Distributed Training](../concepts/distributed_training.md) +- [Colossal-AI Overview](../concepts/colossalai_overview.md) + +## Introduction +In our new design, `colossalai.booster` replaces the role of `colossalai.initialize` to inject features into your training components (e.g. model, optimizer, dataloader) seamlessly. With these new APIs, user can integrate their model with our parallelism features more friendly. Also calling `colossalai.booster` is the standard procedure before you run into your training loops. In the sections below, I will cover how `colossalai.booster` works and what we should take note of. + +### Plugin +
Plugin is an important component that manages parallel configuration (eg: The gemini plugin encapsulates the gemini acceleration solution). Currently supported plugins are as follows:
+ +***GeminiPlugin:***This plugin wrapps the Gemini acceleration solution, that ZeRO with chunk-based memory management.
+ +***TorchDDPPlugin:***This plugin wrapps the DDP acceleration solution, it implements data parallelism at the module level which can run across multiple machines.
+ +***LowLevelZeroPlugin:***This plugin wraps the 1/2 stage of Zero Redundancy Optimizer. Stage 1 : Shards optimizer states across data parallel workers/GPUs. Stage 2 : Shards optimizer states + gradients across data parallel workers/GPUs.
+ +### API of booster +Booster.__init__(...): +* Args: + * device (str or torch.device): The device to run the training. Default: 'cuda'. + * mixed_precision (str or MixedPrecision): The mixed precision to run the training. Default: None.If the argument is a string, it can be 'fp16', 'fp16_apex', 'bf16', or 'fp8'.'fp16' would use PyTorch AMP while 'fp16_apex' would use Nvidia Apex. + * plugin (Plugin): The plugin to run the training. Default: None. +* Return: + * booster (Booster) + + +booster.boost(...): This function is called to boost objects. (e.g. model, optimizer, criterion). +* Args: + * model (nn.Module): The model to be boosted. + * optimizer (Optimizer): The optimizer to be boosted. + * criterion (Callable): The criterion to be boosted. + * dataloader (DataLoader): The dataloader to be boosted. + * lr_scheduler (LRScheduler): The lr_scheduler to be boosted. +* Return: + * model, optimizer, criterion, dataloader, lr_scheduler + +booster.backward(loss, optimizer): This function run the backward operation +* Args: + * loss (torch.Tensor) + * optimizer (Optimizer) + +booster.no_sync(model) :A context manager to disable gradient synchronizations across processes. + +booster.save_model(...): This function is called to save model checkpoints +* Args: + * model: nn.Module, + * checkpoint: str, + * prefix: str = None, + * shard: bool = False, # if saved as shards + * size_per_shard: int = 1024 # the max length of shard + +booster.load_model(...): +* Args: + * model: nn.Module, + * checkpoint: str, + * strict: bool = True + +booster.save_optimizer(...): This function is called to save optimizer checkpoints +* Args: + * optimizer: Optimizer, + * checkpoint: str, + * shard: bool = False, # if saved as shards + * size_per_shard: int = 1024 # the max length of shard + +booster.load_optimizer(...): +* Args: + * optimizer: Optimizer, + * checkpoint: str, + +booster.save_lr_scheduler(...): This function is called to save lr scheduler checkpoints +* Args: + * lr_scheduler: LRScheduler, + * checkpoint: str, + +booster.load_lr_scheduler(...): +* Args: + * lr_scheduler: LRScheduler, + * checkpoint: str, + +## usage +In a typical workflow, you need to launch distributed environment at the beginning of training script and create objects needed (such as models, optimizers, loss function, data loaders etc.) firstly, then call `colossalai.booster` to inject features into these objects, After that, you can use our booster API and these returned objects to continue the rest of your training processes. + +A pseudo-code example is like below:
+ +```python +import torch +from torch.optim import SGD +from torchvision.models import resnet18 + +import colossalai +from colossalai.booster import Booster +from colossalai.booster.plugin import TorchDDPPlugin + +def train(): + colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host='localhost') + plugin = TorchDDPPlugin() + booster = Booster(plugin=plugin) + model = resnet18() + criterion = lambda x: x.mean() + optimizer = SGD((model.parameters()), lr=0.001) + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1) + model, optimizer, criterion, _, scheduler = booster.boost(model, optimizer, criterion, lr_scheduler=scheduler) + + x = torch.randn(4, 3, 224, 224) + x = x.to('cuda') + output = model(x) + loss = criterion(output) + booster.backward(loss, optimizer) + optimizer.clip_grad_by_norm(1.0) + optimizer.step() + scheduler.step() + + save_path = "./model" + booster.save_model(model, save_path, True, True, "", 10, use_safetensors=use_safetensors) + + new_model = resnet18() + booster.load_model(new_model, save_path) +``` + +if you want to run a example, [click here](../../../../examples/tutorial/new_api/cifar_resnet/README.md) + +[more design detailers](https://github.com/hpcaitech/ColossalAI/discussions/3046) diff --git a/docs/source/zh-Hans/basics/colossalai_booster.md b/docs/source/zh-Hans/basics/colossalai_booster.md new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py index c5ed70f9933a..837714c232a5 100644 --- a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py @@ -18,6 +18,7 @@ @parameterize('use_safetensors', [True, False]) def exam_state_dict_with_origin(placement_policy, model_name, use_safetensors: bool): from transformers import BertForSequenceClassification + model_ckpt_dir = tempfile.TemporaryDirectory() get_components_func = non_distributed_component_funcs.get_callable(model_name) model_builder, *_ = get_components_func() diff --git a/tests/test_checkpoint_io/test_general_checkpoint_io.py b/tests/test_checkpoint_io/test_general_checkpoint_io.py index 752ca706bfd4..6fc6048995ab 100644 --- a/tests/test_checkpoint_io/test_general_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_general_checkpoint_io.py @@ -1,21 +1,14 @@ import tempfile + import pytest import torch from torch.optim import Adam from torchvision.models import resnet18 -from colossalai.checkpoint_io import GeneralCheckpointIO from colossalai.booster.plugin.gemini_plugin import GeminiCheckpointIO +from colossalai.checkpoint_io import GeneralCheckpointIO from colossalai.testing import clear_cache_before_run, parameterize -import colossalai -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn -from colossalai.utils.cuda import get_current_device -from colossalai.zero import ColoInitContext, ZeroDDP -from colossalai.zero.gemini.chunk import ChunkManager, search_chunk_configuration -from colossalai.zero.gemini.gemini_mgr import GeminiManager -from tests.components_to_test.registry import non_distributed_component_funcs - # ======== # Note: # 1. due to checkpoint IO can be quite slow if tested with all models, we will only test on resnet for now @@ -61,11 +54,11 @@ def test_unsharded_checkpoint(use_safetensors: bool): ckpt_io.load_model(new_model, model_ckpt_tempfile.name) ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) - # check for model and optimizer state dict recursively recursive_check(model.state_dict(), new_model.state_dict()) recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) + @pytest.mark.parametrize('use_safetensors', [True, False]) def test_sharded_checkpoint(use_safetensors: bool): # create a model and optimizer @@ -87,7 +80,7 @@ def test_sharded_checkpoint(use_safetensors: bool): else: suffix = ".bin" WEIGHTS_INDEX_NAME = "model.bin.index.json" - + model_ckpt_dir = tempfile.TemporaryDirectory() optimizer_ckpt_tempfile = tempfile.NamedTemporaryFile() @@ -96,7 +89,7 @@ def test_sharded_checkpoint(use_safetensors: bool): ckpt_io.save_model(model, model_ckpt_dir.name, True, True, "", 10, use_safetensors=use_safetensors) ckpt_io.save_optimizer(optimizer, optimizer_ckpt_tempfile.name, shard=False) - + # create new model new_model = resnet18() new_optimizer = Adam(new_model.parameters(), lr=0.001) @@ -108,87 +101,6 @@ def test_sharded_checkpoint(use_safetensors: bool): recursive_check(model.state_dict(), new_model.state_dict()) recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) -@parameterize('placement_policy', ['cuda', 'cpu']) -@parameterize('model_name', ['bert']) -@parameterize('use_safetensors', [True, False]) -def hf_load_colossalai_checkpoint(placement_policy, model_name, use_safetensors: bool): - from transformers import BertTokenizer, BertModel, BertForMaskedLM, BertConfig, BertForSequenceClassification - - model_ckpt_dir = tempfile.TemporaryDirectory() - get_components_func = non_distributed_component_funcs.get_callable(model_name) - model_builder, *_ = get_components_func() - - with ColoInitContext(device=get_current_device()): - bert_model = model_builder() - bert_model.config.save_pretrained(save_directory=model_ckpt_dir.name) - config_dict, *_ = search_chunk_configuration(bert_model, search_range_mb=1, search_interval_byte=100) - chunk_manager = ChunkManager(config_dict) - gemini_manager = GeminiManager(placement_policy, chunk_manager) - bert_model = ZeroDDP(bert_model, gemini_manager) - bert_model.train() - - ckpt_io = GeminiCheckpointIO() - if ckpt_io.coordinator.is_master(): - model_size = sum(p.numel() * p.element_size() for p in bert_model.parameters()) / 1024**2 - ckpt_io.save_model(bert_model, model_ckpt_dir.name, True, True, "", (model_size / 3), use_safetensors=use_safetensors) - new_bert_model = BertForSequenceClassification.from_pretrained(model_ckpt_dir.name) - recursive_check(bert_model.state_dict(only_rank_0=True, dtype=torch.float32), new_bert_model.state_dict()) - - model_ckpt_dir.cleanup() - - - -@parameterize('placement_policy', ['cuda', 'cpu']) -@parameterize('model_name', ['gpt2', 'bert']) -@parameterize('use_safetensors', [True, False]) -def exam_state_dict(placement_policy, model_name: str, use_safetensors: bool): - get_components_func = non_distributed_component_funcs.get_callable(model_name) - model_builder, *_ = get_components_func() - - with ColoInitContext(device=get_current_device()): - model = model_builder() - new_model = model_builder() - - config_dict, *_ = search_chunk_configuration(model, search_range_mb=1, search_interval_byte=100) - chunk_manager = ChunkManager(config_dict) - gemini_manager = GeminiManager(placement_policy, chunk_manager) - model = ZeroDDP(model, gemini_manager) - model.train() - - new_config_dict, *_ = search_chunk_configuration(new_model, search_range_mb=1, search_interval_byte=100) - new_chunk_manager = ChunkManager(new_config_dict) - new_gemini_manager = GeminiManager(placement_policy, new_chunk_manager) - new_model = ZeroDDP(new_model, new_gemini_manager) - - model_ckpt_dir = tempfile.TemporaryDirectory() - - ckpt_io = GeminiCheckpointIO() - model_size = sum(p.numel() * p.element_size() for p in model.parameters()) / 1024**2 - ckpt_io.save_model(model, model_ckpt_dir.name, True, True, "epoch", (model_size / 3), use_safetensors=use_safetensors) - - # load model - if ckpt_io.coordinator.is_master(): - ckpt_io.load_model(new_model, model_ckpt_dir.name, strict=True) - model_dict = model.state_dict(only_rank_0=True) - new_model_dict = new_model.state_dict(only_rank_0=True) - recursive_check(model_dict, new_model_dict) - - model_ckpt_dir.cleanup() - - -def run_dist(rank, world_size, port): - config = {} - colossalai.launch(config=config, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - exam_state_dict() - hf_load_colossalai_checkpoint() - - -@pytest.mark.dist -@pytest.mark.parametrize('world_size', [4, 4]) -@rerun_if_address_is_in_use() -def test_gemini_ckpIO(world_size): - spawn(run_dist, world_size) - # do recursive check for the optimizer state dict # if the value is a dict, compare its values diff --git a/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py index e69de29bb2d1..46074507a52e 100644 --- a/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py @@ -0,0 +1,74 @@ +import tempfile + +import pytest +import torch +from torchvision.models import resnet18 + +import colossalai +from colossalai.booster import Booster +from colossalai.booster.plugin import LowLevelZeroPlugin +from colossalai.booster.plugin.low_level_zero_plugin import LowLevelZeroCheckpointIO +from colossalai.nn.optimizer import HybridAdam +from colossalai.testing import clear_cache_before_run, parameterize, rerun_if_address_is_in_use, spawn + + +@clear_cache_before_run() +@parameterize('stage', [2]) +def check_low_level_zero_checkpointIO(stage: int): + plugin = LowLevelZeroPlugin(stage=stage, max_norm=1.0, initial_scale=32) + booster = Booster(plugin=plugin) + model = resnet18() + criterion = lambda x: x.mean() + optimizer = HybridAdam((model.parameters()), lr=0.001) + model, optimizer, criterion, _, _ = booster.boost(model, optimizer, criterion) + + x = torch.randn(4, 3, 224, 224) + x = x.to('cuda') + output = model(x) + loss = criterion(output) + booster.backward(loss, optimizer) + optimizer.step() + + optimizer_ckpt_tempfile = tempfile.NamedTemporaryFile() + ckpt_io = LowLevelZeroCheckpointIO() + ckpt_io.save_optimizer(optimizer, optimizer_ckpt_tempfile.name) + + if ckpt_io.coordinator.is_master(): + new_model = resnet18() + new_optimizer = HybridAdam((new_model.parameters()), lr=0.001) + _, new_optimizer, _, _, _ = booster.boost(new_model, new_optimizer) + ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) + recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) + + +def run_dist(rank, world_size, port): + colossalai.launch(config=(dict()), rank=rank, world_size=world_size, port=port, host='localhost') + check_low_level_zero_checkpointIO() + + +@rerun_if_address_is_in_use() +def test_low_level_zero_checkpointIO(): + spawn(run_dist, 2) + + +def recursive_check(d1, d2): + for k, v in d1.items(): + if isinstance(v, dict): + recursive_check(v, d2[k]) + elif isinstance(v, list): + for i in range(len(v)): + if isinstance(v[i], torch.Tensor): + v[i] = v[i].to('cpu') + d2[k][i] = d2[k][i].to('cpu') + if not torch.equal(v[i], d2[k][i]): + raise AssertionError + elif v[i] != d2[k][i]: + assert v[i] == d2[k][i] + + elif isinstance(v, torch.Tensor): + v = v.to('cpu') + d2[k] = d2[k].to('cpu') + if not torch.equal(v, d2[k]): + raise AssertionError + elif not v == d2[k]: + raise AssertionError diff --git a/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py b/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py index e69de29bb2d1..38d0a381ab5c 100644 --- a/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py @@ -0,0 +1,86 @@ +import tempfile + +import torch +from torch.nn.parallel import DistributedDataParallel as DDP +from torch.optim import SGD +from torchvision.models import resnet18 + +import colossalai +from colossalai.booster import Booster +from colossalai.booster.plugin import TorchDDPPlugin +from colossalai.booster.plugin.torch_ddp_plugin import TorchDDPCheckpointIO +from colossalai.interface import OptimizerWrapper +from colossalai.testing import clear_cache_before_run, rerun_if_address_is_in_use, spawn + + +def check_torch_ddp_checkpointIO(): + plugin = TorchDDPPlugin() + booster = Booster(plugin=plugin) + model = resnet18() + criterion = lambda x: x.mean() + optimizer = SGD((model.parameters()), lr=0.001) + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1) + model, optimizer, criterion, _, _ = booster.boost(model, optimizer, criterion, lr_scheduler=scheduler) + + assert isinstance(model.module, DDP) + assert isinstance(optimizer, OptimizerWrapper) + + x = torch.randn(4, 3, 224, 224) + x = x.to('cuda') + output = model(x) + loss = criterion(output) + booster.backward(loss, optimizer) + optimizer.clip_grad_by_norm(1.0) + optimizer.step() + scheduler.step() + + optimizer_ckpt_tempfile = tempfile.NamedTemporaryFile() + lr_scheduler_ckpt_tempfile = tempfile.NamedTemporaryFile() + ckpt_io = TorchDDPCheckpointIO() + ckpt_io.save_optimizer(optimizer, optimizer_ckpt_tempfile.name) + ckpt_io.save_lr_scheduler(scheduler, lr_scheduler_ckpt_tempfile.name) + + if ckpt_io.coordinator.is_master(): + new_model = resnet18() + new_optimizer = SGD((new_model.parameters()), lr=0.001) + new_scheduler = torch.optim.lr_scheduler.StepLR(new_optimizer, step_size=1, gamma=0.1) + _, new_optimizer, _, _, new_scheduler = booster.boost(new_model, new_optimizer, lr_scheduler=new_scheduler) + + ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) + recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) + + ckpt_io.load_lr_scheduler(new_scheduler, lr_scheduler_ckpt_tempfile.name) + recursive_check(scheduler.state_dict(), new_scheduler.state_dict()) + + +def run_dist(rank, world_size, port): + colossalai.launch(config=(dict()), rank=rank, world_size=world_size, port=port, host='localhost') + check_torch_ddp_checkpointIO() + + +@rerun_if_address_is_in_use() +def test_torch_ddp_checkpointIO(): + spawn(run_dist, 2) + + +def recursive_check(d1, d2): + for k, v in d1.items(): + if isinstance(v, dict): + recursive_check(v, d2[k]) + elif isinstance(v, list): + for i in range(len(v)): + if isinstance(v[i], torch.Tensor): + v[i] = v[i].to('cpu') + d2[k][i] = d2[k][i].to('cpu') + if not torch.equal(v[i], d2[k][i]): + raise AssertionError + elif v[i] != d2[k][i]: + assert v[i] == d2[k][i] + + elif isinstance(v, torch.Tensor): + v = v.to('cpu') + d2[k] = d2[k].to('cpu') + if not torch.equal(v, d2[k]): + raise AssertionError + elif not v == d2[k]: + raise AssertionError From 852d0d6c63aa16b9fd12a0bd1fa5dd938ab22856 Mon Sep 17 00:00:00 2001 From: Mingyan Jiang <1829166702@qq.com> Date: Tue, 9 May 2023 17:50:09 +0800 Subject: [PATCH 03/10] [booster] update tests for booster --- colossalai/testing/__init__.py | 11 +- colossalai/testing/comparison.py | 20 +++ docs/source/en/basics/colossalai-booster.md | 124 ------------------ .../zh-Hans/basics/colossalai_booster.md | 0 .../test_gemini_checkpoint_io.py | 24 +--- .../test_general_checkpoint_io.py | 27 +--- .../test_low_level_zero_checkpoint_io.py | 25 +--- .../test_torch_ddp_checkpoint_io.py | 25 +--- 8 files changed, 33 insertions(+), 223 deletions(-) delete mode 100644 docs/source/en/basics/colossalai-booster.md delete mode 100644 docs/source/zh-Hans/basics/colossalai_booster.md diff --git a/colossalai/testing/__init__.py b/colossalai/testing/__init__.py index c53e0f44c7e0..7af7dfa56099 100644 --- a/colossalai/testing/__init__.py +++ b/colossalai/testing/__init__.py @@ -1,4 +1,11 @@ -from .comparison import assert_close, assert_close_loose, assert_equal, assert_equal_in_group, assert_not_equal +from .comparison import ( + assert_close, + assert_close_loose, + assert_equal, + assert_equal_in_group, + assert_not_equal, + recursive_check, +) from .pytest_wrapper import run_on_environment_flag from .utils import ( clear_cache_before_run, @@ -13,5 +20,5 @@ __all__ = [ 'assert_equal', 'assert_not_equal', 'assert_close', 'assert_close_loose', 'assert_equal_in_group', 'parameterize', 'rerun_on_exception', 'rerun_if_address_is_in_use', 'skip_if_not_enough_gpus', 'free_port', 'spawn', - 'clear_cache_before_run', 'run_on_environment_flag' + 'clear_cache_before_run', 'run_on_environment_flag', 'recursive_check' ] diff --git a/colossalai/testing/comparison.py b/colossalai/testing/comparison.py index e00d0da168c7..94bb15b082a5 100644 --- a/colossalai/testing/comparison.py +++ b/colossalai/testing/comparison.py @@ -28,3 +28,23 @@ def assert_equal_in_group(tensor: Tensor, process_group: ProcessGroup = None): a = tensor_list[i] b = tensor_list[i + 1] assert torch.all(a == b), f'expected tensors on rank {i} and {i + 1} to be equal but they are not, {a} vs {b}' + + +def recursive_check(d1, d2): + for k, v in d1.items(): + if isinstance(v, dict): + recursive_check(v, d2[k]) + elif isinstance(v, list): + for i in range(len(v)): + if isinstance(v[i], torch.Tensor): + v[i] = v[i].to("cpu") + d2[k][i] = d2[k][i].to("cpu") + assert torch.equal(v[i], d2[k][i]) + else: + assert v[i] == d2[k][i] + elif isinstance(v, torch.Tensor): + v = v.to("cpu") + d2[k] = d2[k].to("cpu") + assert torch.equal(v, d2[k]) + else: + assert v == d2[k] diff --git a/docs/source/en/basics/colossalai-booster.md b/docs/source/en/basics/colossalai-booster.md deleted file mode 100644 index fc33e8cbe039..000000000000 --- a/docs/source/en/basics/colossalai-booster.md +++ /dev/null @@ -1,124 +0,0 @@ -# colossal-ai booster - -**Prerequisite:** -- [Distributed Training](../concepts/distributed_training.md) -- [Colossal-AI Overview](../concepts/colossalai_overview.md) - -## Introduction -In our new design, `colossalai.booster` replaces the role of `colossalai.initialize` to inject features into your training components (e.g. model, optimizer, dataloader) seamlessly. With these new APIs, user can integrate their model with our parallelism features more friendly. Also calling `colossalai.booster` is the standard procedure before you run into your training loops. In the sections below, I will cover how `colossalai.booster` works and what we should take note of. - -### Plugin -Plugin is an important component that manages parallel configuration (eg: The gemini plugin encapsulates the gemini acceleration solution). Currently supported plugins are as follows:
- -***GeminiPlugin:***This plugin wrapps the Gemini acceleration solution, that ZeRO with chunk-based memory management.
- -***TorchDDPPlugin:***This plugin wrapps the DDP acceleration solution, it implements data parallelism at the module level which can run across multiple machines.
- -***LowLevelZeroPlugin:***This plugin wraps the 1/2 stage of Zero Redundancy Optimizer. Stage 1 : Shards optimizer states across data parallel workers/GPUs. Stage 2 : Shards optimizer states + gradients across data parallel workers/GPUs.
- -### API of booster -Booster.__init__(...): -* Args: - * device (str or torch.device): The device to run the training. Default: 'cuda'. - * mixed_precision (str or MixedPrecision): The mixed precision to run the training. Default: None.If the argument is a string, it can be 'fp16', 'fp16_apex', 'bf16', or 'fp8'.'fp16' would use PyTorch AMP while 'fp16_apex' would use Nvidia Apex. - * plugin (Plugin): The plugin to run the training. Default: None. -* Return: - * booster (Booster) - - -booster.boost(...): This function is called to boost objects. (e.g. model, optimizer, criterion). -* Args: - * model (nn.Module): The model to be boosted. - * optimizer (Optimizer): The optimizer to be boosted. - * criterion (Callable): The criterion to be boosted. - * dataloader (DataLoader): The dataloader to be boosted. - * lr_scheduler (LRScheduler): The lr_scheduler to be boosted. -* Return: - * model, optimizer, criterion, dataloader, lr_scheduler - -booster.backward(loss, optimizer): This function run the backward operation -* Args: - * loss (torch.Tensor) - * optimizer (Optimizer) - -booster.no_sync(model) :A context manager to disable gradient synchronizations across processes. - -booster.save_model(...): This function is called to save model checkpoints -* Args: - * model: nn.Module, - * checkpoint: str, - * prefix: str = None, - * shard: bool = False, # if saved as shards - * size_per_shard: int = 1024 # the max length of shard - -booster.load_model(...): -* Args: - * model: nn.Module, - * checkpoint: str, - * strict: bool = True - -booster.save_optimizer(...): This function is called to save optimizer checkpoints -* Args: - * optimizer: Optimizer, - * checkpoint: str, - * shard: bool = False, # if saved as shards - * size_per_shard: int = 1024 # the max length of shard - -booster.load_optimizer(...): -* Args: - * optimizer: Optimizer, - * checkpoint: str, - -booster.save_lr_scheduler(...): This function is called to save lr scheduler checkpoints -* Args: - * lr_scheduler: LRScheduler, - * checkpoint: str, - -booster.load_lr_scheduler(...): -* Args: - * lr_scheduler: LRScheduler, - * checkpoint: str, - -## usage -In a typical workflow, you need to launch distributed environment at the beginning of training script and create objects needed (such as models, optimizers, loss function, data loaders etc.) firstly, then call `colossalai.booster` to inject features into these objects, After that, you can use our booster API and these returned objects to continue the rest of your training processes. - -A pseudo-code example is like below:
- -```python -import torch -from torch.optim import SGD -from torchvision.models import resnet18 - -import colossalai -from colossalai.booster import Booster -from colossalai.booster.plugin import TorchDDPPlugin - -def train(): - colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host='localhost') - plugin = TorchDDPPlugin() - booster = Booster(plugin=plugin) - model = resnet18() - criterion = lambda x: x.mean() - optimizer = SGD((model.parameters()), lr=0.001) - scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1) - model, optimizer, criterion, _, scheduler = booster.boost(model, optimizer, criterion, lr_scheduler=scheduler) - - x = torch.randn(4, 3, 224, 224) - x = x.to('cuda') - output = model(x) - loss = criterion(output) - booster.backward(loss, optimizer) - optimizer.clip_grad_by_norm(1.0) - optimizer.step() - scheduler.step() - - save_path = "./model" - booster.save_model(model, save_path, True, True, "", 10, use_safetensors=use_safetensors) - - new_model = resnet18() - booster.load_model(new_model, save_path) -``` - -if you want to run a example, [click here](../../../../examples/tutorial/new_api/cifar_resnet/README.md) - -[more design detailers](https://github.com/hpcaitech/ColossalAI/discussions/3046) diff --git a/docs/source/zh-Hans/basics/colossalai_booster.md b/docs/source/zh-Hans/basics/colossalai_booster.md deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py index 837714c232a5..5dbca63f6c5e 100644 --- a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py @@ -5,7 +5,7 @@ import colossalai from colossalai.booster.plugin.gemini_plugin import GeminiCheckpointIO -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn +from colossalai.testing import parameterize, recursive_check, rerun_if_address_is_in_use, spawn from colossalai.utils.cuda import get_current_device from colossalai.zero import ColoInitContext, ZeroDDP from colossalai.zero.gemini.chunk import ChunkManager, search_chunk_configuration @@ -95,25 +95,3 @@ def run_dist(rank, world_size, port): @rerun_if_address_is_in_use() def test_gemini_ckpIO(world_size): spawn(run_dist, world_size) - - -def recursive_check(d1, d2): - for k, v in d1.items(): - if isinstance(v, dict): - recursive_check(v, d2[k]) - elif isinstance(v, list): - for i in range(len(v)): - if isinstance(v[i], torch.Tensor): - v[i] = v[i].to('cpu') - d2[k][i] = d2[k][i].to('cpu') - if not torch.equal(v[i], d2[k][i]): - raise AssertionError - elif not v[i] == d2[k][i]: - raise AssertionError - - elif isinstance(v, torch.Tensor): - v = v.to('cpu') - d2[k] = d2[k].to('cpu') - assert torch.equal(v, d2[k]) - elif not v == d2[k]: - raise AssertionError diff --git a/tests/test_checkpoint_io/test_general_checkpoint_io.py b/tests/test_checkpoint_io/test_general_checkpoint_io.py index 6fc6048995ab..b4063a672e87 100644 --- a/tests/test_checkpoint_io/test_general_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_general_checkpoint_io.py @@ -7,7 +7,7 @@ from colossalai.booster.plugin.gemini_plugin import GeminiCheckpointIO from colossalai.checkpoint_io import GeneralCheckpointIO -from colossalai.testing import clear_cache_before_run, parameterize +from colossalai.testing import clear_cache_before_run, parameterize, recursive_check # ======== # Note: @@ -100,28 +100,3 @@ def test_sharded_checkpoint(use_safetensors: bool): # check for model and optimizer state dict recursively recursive_check(model.state_dict(), new_model.state_dict()) recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) - - -# do recursive check for the optimizer state dict -# if the value is a dict, compare its values -# if the value is a list, comapre all elements one-by-one -# if the value is a torch.Tensor, use torch.equal -# otherwise use assertEqual -def recursive_check(d1, d2): - for k, v in d1.items(): - if isinstance(v, dict): - recursive_check(v, d2[k]) - elif isinstance(v, list): - for i in range(len(v)): - if isinstance(v[i], torch.Tensor): - v[i] = v[i].to("cpu") - d2[k][i] = d2[k][i].to("cpu") - assert torch.equal(v[i], d2[k][i]) - else: - assert v[i] == d2[k][i] - elif isinstance(v, torch.Tensor): - v = v.to("cpu") - d2[k] = d2[k].to("cpu") - assert torch.equal(v, d2[k]) - else: - assert v == d2[k] diff --git a/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py index 46074507a52e..163cdea2e5cd 100644 --- a/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py @@ -9,7 +9,7 @@ from colossalai.booster.plugin import LowLevelZeroPlugin from colossalai.booster.plugin.low_level_zero_plugin import LowLevelZeroCheckpointIO from colossalai.nn.optimizer import HybridAdam -from colossalai.testing import clear_cache_before_run, parameterize, rerun_if_address_is_in_use, spawn +from colossalai.testing import clear_cache_before_run, parameterize, recursive_check, rerun_if_address_is_in_use, spawn @clear_cache_before_run() @@ -49,26 +49,3 @@ def run_dist(rank, world_size, port): @rerun_if_address_is_in_use() def test_low_level_zero_checkpointIO(): spawn(run_dist, 2) - - -def recursive_check(d1, d2): - for k, v in d1.items(): - if isinstance(v, dict): - recursive_check(v, d2[k]) - elif isinstance(v, list): - for i in range(len(v)): - if isinstance(v[i], torch.Tensor): - v[i] = v[i].to('cpu') - d2[k][i] = d2[k][i].to('cpu') - if not torch.equal(v[i], d2[k][i]): - raise AssertionError - elif v[i] != d2[k][i]: - assert v[i] == d2[k][i] - - elif isinstance(v, torch.Tensor): - v = v.to('cpu') - d2[k] = d2[k].to('cpu') - if not torch.equal(v, d2[k]): - raise AssertionError - elif not v == d2[k]: - raise AssertionError diff --git a/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py b/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py index 38d0a381ab5c..cf890deb7400 100644 --- a/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py @@ -10,7 +10,7 @@ from colossalai.booster.plugin import TorchDDPPlugin from colossalai.booster.plugin.torch_ddp_plugin import TorchDDPCheckpointIO from colossalai.interface import OptimizerWrapper -from colossalai.testing import clear_cache_before_run, rerun_if_address_is_in_use, spawn +from colossalai.testing import recursive_check, rerun_if_address_is_in_use, spawn def check_torch_ddp_checkpointIO(): @@ -61,26 +61,3 @@ def run_dist(rank, world_size, port): @rerun_if_address_is_in_use() def test_torch_ddp_checkpointIO(): spawn(run_dist, 2) - - -def recursive_check(d1, d2): - for k, v in d1.items(): - if isinstance(v, dict): - recursive_check(v, d2[k]) - elif isinstance(v, list): - for i in range(len(v)): - if isinstance(v[i], torch.Tensor): - v[i] = v[i].to('cpu') - d2[k][i] = d2[k][i].to('cpu') - if not torch.equal(v[i], d2[k][i]): - raise AssertionError - elif v[i] != d2[k][i]: - assert v[i] == d2[k][i] - - elif isinstance(v, torch.Tensor): - v = v.to('cpu') - d2[k] = d2[k].to('cpu') - if not torch.equal(v, d2[k]): - raise AssertionError - elif not v == d2[k]: - raise AssertionError From f18e667dc44447c1f207d46e4d91aed4b33b5a82 Mon Sep 17 00:00:00 2001 From: Mingyan Jiang <1829166702@qq.com> Date: Tue, 9 May 2023 17:52:29 +0800 Subject: [PATCH 04/10] [booster] update tests for booster --- colossalai/booster/plugin/torch_ddp_plugin.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/colossalai/booster/plugin/torch_ddp_plugin.py b/colossalai/booster/plugin/torch_ddp_plugin.py index dfef384567ae..bbe8624f8ed2 100644 --- a/colossalai/booster/plugin/torch_ddp_plugin.py +++ b/colossalai/booster/plugin/torch_ddp_plugin.py @@ -116,9 +116,6 @@ def control_device(self) -> bool: def supported_devices(self) -> List[str]: return ['cuda'] - def no_sync(self, model) -> contextmanager: - return model.no_sync() - def configure( self, model: nn.Module, From ac981420859b8608d84560e1437f55bc05e9f9cb Mon Sep 17 00:00:00 2001 From: Mingyan Jiang <1829166702@qq.com> Date: Tue, 9 May 2023 17:53:12 +0800 Subject: [PATCH 05/10] [booster] update tests for booster --- colossalai/booster/plugin/torch_ddp_plugin.py | 1 - 1 file changed, 1 deletion(-) diff --git a/colossalai/booster/plugin/torch_ddp_plugin.py b/colossalai/booster/plugin/torch_ddp_plugin.py index bbe8624f8ed2..76906d844ef1 100644 --- a/colossalai/booster/plugin/torch_ddp_plugin.py +++ b/colossalai/booster/plugin/torch_ddp_plugin.py @@ -1,4 +1,3 @@ -from contextlib import contextmanager from typing import Callable, List, Tuple, Union import torch.nn as nn From ecfbbf2ca88607c8af46bfc0f858c6b9b41b29ea Mon Sep 17 00:00:00 2001 From: Mingyan Jiang <1829166702@qq.com> Date: Tue, 9 May 2023 18:27:52 +0800 Subject: [PATCH 06/10] [booster] update booster tutorials#3717, fix recursive check --- colossalai/testing/__init__.py | 4 ++-- colossalai/testing/comparison.py | 16 ++++++++++------ .../test_gemini_checkpoint_io.py | 7 ++++--- .../test_general_checkpoint_io.py | 10 +++++----- .../test_low_level_zero_checkpoint_io.py | 10 ++++++++-- .../test_torch_ddp_checkpoint_io.py | 6 +++--- 6 files changed, 32 insertions(+), 21 deletions(-) diff --git a/colossalai/testing/__init__.py b/colossalai/testing/__init__.py index 7af7dfa56099..9d0475ed064c 100644 --- a/colossalai/testing/__init__.py +++ b/colossalai/testing/__init__.py @@ -4,7 +4,7 @@ assert_equal, assert_equal_in_group, assert_not_equal, - recursive_check, + check_state_dict_equal, ) from .pytest_wrapper import run_on_environment_flag from .utils import ( @@ -20,5 +20,5 @@ __all__ = [ 'assert_equal', 'assert_not_equal', 'assert_close', 'assert_close_loose', 'assert_equal_in_group', 'parameterize', 'rerun_on_exception', 'rerun_if_address_is_in_use', 'skip_if_not_enough_gpus', 'free_port', 'spawn', - 'clear_cache_before_run', 'run_on_environment_flag', 'recursive_check' + 'clear_cache_before_run', 'run_on_environment_flag', 'check_state_dict_equal' ] diff --git a/colossalai/testing/comparison.py b/colossalai/testing/comparison.py index 94bb15b082a5..faf61638d8bb 100644 --- a/colossalai/testing/comparison.py +++ b/colossalai/testing/comparison.py @@ -1,3 +1,5 @@ +from typing import OrderedDict + import torch import torch.distributed as dist from torch import Tensor @@ -30,21 +32,23 @@ def assert_equal_in_group(tensor: Tensor, process_group: ProcessGroup = None): assert torch.all(a == b), f'expected tensors on rank {i} and {i + 1} to be equal but they are not, {a} vs {b}' -def recursive_check(d1, d2): +def check_state_dict_equal(d1: OrderedDict, d2: OrderedDict, ignore_device: bool = True): for k, v in d1.items(): if isinstance(v, dict): - recursive_check(v, d2[k]) + check_state_dict_equal(v, d2[k]) elif isinstance(v, list): for i in range(len(v)): if isinstance(v[i], torch.Tensor): - v[i] = v[i].to("cpu") - d2[k][i] = d2[k][i].to("cpu") + if not ignore_device: + v[i] = v[i].to("cpu") + d2[k][i] = d2[k][i].to("cpu") assert torch.equal(v[i], d2[k][i]) else: assert v[i] == d2[k][i] elif isinstance(v, torch.Tensor): - v = v.to("cpu") - d2[k] = d2[k].to("cpu") + if not ignore_device: + v = v.to("cpu") + d2[k] = d2[k].to("cpu") assert torch.equal(v, d2[k]) else: assert v == d2[k] diff --git a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py index 5dbca63f6c5e..1e5a2e1c4b44 100644 --- a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py @@ -5,7 +5,7 @@ import colossalai from colossalai.booster.plugin.gemini_plugin import GeminiCheckpointIO -from colossalai.testing import parameterize, recursive_check, rerun_if_address_is_in_use, spawn +from colossalai.testing import check_state_dict_equal, parameterize, rerun_if_address_is_in_use, spawn from colossalai.utils.cuda import get_current_device from colossalai.zero import ColoInitContext, ZeroDDP from colossalai.zero.gemini.chunk import ChunkManager, search_chunk_configuration @@ -41,7 +41,8 @@ def exam_state_dict_with_origin(placement_policy, model_name, use_safetensors: b '', (model_size / 3), use_safetensors=use_safetensors) new_bert_model = BertForSequenceClassification.from_pretrained(model_ckpt_dir.name) - recursive_check(bert_model.state_dict(only_rank_0=True, dtype=(torch.float32)), new_bert_model.state_dict()) + check_state_dict_equal(bert_model.state_dict(only_rank_0=True, dtype=(torch.float32)), + new_bert_model.state_dict(), False) model_ckpt_dir.cleanup() @@ -79,7 +80,7 @@ def exam_state_dict(placement_policy, model_name: str, use_safetensors: bool): ckpt_io.load_model(new_model, (model_ckpt_dir.name), strict=True) model_dict = model.state_dict(only_rank_0=True) new_model_dict = new_model.state_dict(only_rank_0=True) - recursive_check(model_dict, new_model_dict) + check_state_dict_equal(model_dict, new_model_dict, False) model_ckpt_dir.cleanup() diff --git a/tests/test_checkpoint_io/test_general_checkpoint_io.py b/tests/test_checkpoint_io/test_general_checkpoint_io.py index b4063a672e87..9e973bb23e0b 100644 --- a/tests/test_checkpoint_io/test_general_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_general_checkpoint_io.py @@ -7,7 +7,7 @@ from colossalai.booster.plugin.gemini_plugin import GeminiCheckpointIO from colossalai.checkpoint_io import GeneralCheckpointIO -from colossalai.testing import clear_cache_before_run, parameterize, recursive_check +from colossalai.testing import check_state_dict_equal, clear_cache_before_run, parameterize # ======== # Note: @@ -55,8 +55,8 @@ def test_unsharded_checkpoint(use_safetensors: bool): ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) # check for model and optimizer state dict recursively - recursive_check(model.state_dict(), new_model.state_dict()) - recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) + check_state_dict_equal(model.state_dict(), new_model.state_dict()) + check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict()) @pytest.mark.parametrize('use_safetensors', [True, False]) @@ -98,5 +98,5 @@ def test_sharded_checkpoint(use_safetensors: bool): ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) # check for model and optimizer state dict recursively - recursive_check(model.state_dict(), new_model.state_dict()) - recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) + check_state_dict_equal(model.state_dict(), new_model.state_dict()) + check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict()) diff --git a/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py index 163cdea2e5cd..217a950d8155 100644 --- a/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py @@ -9,7 +9,13 @@ from colossalai.booster.plugin import LowLevelZeroPlugin from colossalai.booster.plugin.low_level_zero_plugin import LowLevelZeroCheckpointIO from colossalai.nn.optimizer import HybridAdam -from colossalai.testing import clear_cache_before_run, parameterize, recursive_check, rerun_if_address_is_in_use, spawn +from colossalai.testing import ( + check_state_dict_equal, + clear_cache_before_run, + parameterize, + rerun_if_address_is_in_use, + spawn, +) @clear_cache_before_run() @@ -38,7 +44,7 @@ def check_low_level_zero_checkpointIO(stage: int): new_optimizer = HybridAdam((new_model.parameters()), lr=0.001) _, new_optimizer, _, _, _ = booster.boost(new_model, new_optimizer) ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) - recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) + check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict(), False) def run_dist(rank, world_size, port): diff --git a/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py b/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py index cf890deb7400..9128f8c0fe9e 100644 --- a/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py @@ -10,7 +10,7 @@ from colossalai.booster.plugin import TorchDDPPlugin from colossalai.booster.plugin.torch_ddp_plugin import TorchDDPCheckpointIO from colossalai.interface import OptimizerWrapper -from colossalai.testing import recursive_check, rerun_if_address_is_in_use, spawn +from colossalai.testing import check_state_dict_equal, rerun_if_address_is_in_use, spawn def check_torch_ddp_checkpointIO(): @@ -47,10 +47,10 @@ def check_torch_ddp_checkpointIO(): _, new_optimizer, _, _, new_scheduler = booster.boost(new_model, new_optimizer, lr_scheduler=new_scheduler) ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) - recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) + check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict(), False) ckpt_io.load_lr_scheduler(new_scheduler, lr_scheduler_ckpt_tempfile.name) - recursive_check(scheduler.state_dict(), new_scheduler.state_dict()) + check_state_dict_equal(scheduler.state_dict(), new_scheduler.state_dict(), False) def run_dist(rank, world_size, port): From 351e344da69f6a05ed9861e468c4a4b37b0d51e6 Mon Sep 17 00:00:00 2001 From: Mingyan Jiang <1829166702@qq.com> Date: Thu, 11 May 2023 14:40:20 +0800 Subject: [PATCH 07/10] [booster] update booster tutorials#3717, update setup doc --- colossalai/zero/gemini/chunk/chunk.py | 4 ++++ docs/source/en/get_started/installation.md | 6 +++--- docs/source/zh-Hans/basics/launch_colossalai.md | 16 ++++++++++++---- docs/source/zh-Hans/get_started/installation.md | 6 +++--- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/colossalai/zero/gemini/chunk/chunk.py b/colossalai/zero/gemini/chunk/chunk.py index a7682eaf62e9..6b960076f8e8 100644 --- a/colossalai/zero/gemini/chunk/chunk.py +++ b/colossalai/zero/gemini/chunk/chunk.py @@ -77,6 +77,7 @@ def __init__(self, keep_gathered (bool): optional, if True, this chunk is always gathered in CUDA memory pin_memory (bool): optional, if True, this chunk always has a shard copied in pinned CPU memory """ + # chunk的个数 self.count_id = Chunk._total_number Chunk._total_number += 1 @@ -214,6 +215,7 @@ def can_move(self) -> bool: @property def can_release(self) -> bool: + # 如果是gathered的状态,或者chunk所包含的tensor全部都是hold状态则,不可释放 if self.keep_gathered: return False else: @@ -222,10 +224,12 @@ def can_release(self) -> bool: @property def can_reduce(self): + # chunk中所有tensor都是bwd return self.tensor_state_cnter[TensorState.READY_FOR_REDUCE] == self.num_tensors @property def has_inf_or_nan(self) -> bool: + # 判断是否有inf或者nan的值 """Check if the chunk has inf or nan values on CUDA. """ if self.is_gathered: diff --git a/docs/source/en/get_started/installation.md b/docs/source/en/get_started/installation.md index 290879219074..93f9d074ead4 100644 --- a/docs/source/en/get_started/installation.md +++ b/docs/source/en/get_started/installation.md @@ -39,13 +39,13 @@ cd ColossalAI pip install -r requirements/requirements.txt # install colossalai -pip install . +CUDA_EXT=1 pip install . ``` -If you don't want to install and enable CUDA kernel fusion (compulsory installation when using fused optimizer): +If you don't want to install and enable CUDA kernel fusion (compulsory installation when using fused optimizer), just don't specify the `CUDA_EXT`: ```shell -CUDA_EXT=1 pip install . +pip install . ``` diff --git a/docs/source/zh-Hans/basics/launch_colossalai.md b/docs/source/zh-Hans/basics/launch_colossalai.md index ca927de578d5..e90ec88df68e 100644 --- a/docs/source/zh-Hans/basics/launch_colossalai.md +++ b/docs/source/zh-Hans/basics/launch_colossalai.md @@ -74,17 +74,15 @@ import colossalai args = colossalai.get_default_parser().parse_args() # launch distributed environment -colossalai.launch(config=
+
+