diff --git a/applications/Chat/coati/kernels/opt_attn.py b/applications/Chat/coati/kernels/opt_attn.py index c10f341e94a3..e99f9c2247d1 100644 --- a/applications/Chat/coati/kernels/opt_attn.py +++ b/applications/Chat/coati/kernels/opt_attn.py @@ -77,7 +77,7 @@ def forward( scale=self.scaling) # Use the `embed_dim` from the config (stored in the class) rather than `hidden_state` because `attn_output` can be - # partitioned aross GPUs when using tensor-parallelism. + # partitioned across GPUs when using tensor-parallelism. attn_output = attn_output.reshape(bsz, tgt_len, self.embed_dim) attn_output = self.out_proj(attn_output) diff --git a/applications/Chat/examples/community/README.md b/applications/Chat/examples/community/README.md index c9c645032288..cd7b9d99bf06 100644 --- a/applications/Chat/examples/community/README.md +++ b/applications/Chat/examples/community/README.md @@ -17,7 +17,7 @@ Community examples consist of both inference and training examples that have bee | Example | Description | Code Example | Colab | Author | |:---------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------:| | Peft | Adding Peft support for SFT and Prompts model training | [Huggingface Peft](https://github.com/hpcaitech/ColossalAI/tree/main/applications/Chat/examples/community/peft) | - | [YY Lin](https://github.com/yynil) | -| Train prompts on Ray | A Ray based implementation of Train prompts example | [Huggingface Peft](https://github.com/hpcaitech/ColossalAI/tree/main/applications/Chat/examples/community/ray) | - | [MisterLin1995](https://github.com/MisterLin1995) | +| Train prompts on Ray | A Ray based implementation of Train prompts example | [Training On Ray](https://github.com/hpcaitech/ColossalAI/tree/main/applications/Chat/examples/community/ray) | - | [MisterLin1995](https://github.com/MisterLin1995) | |...|...|...|...|...| ### How to get involved diff --git a/colossalai/booster/plugin/gemini_plugin.py b/colossalai/booster/plugin/gemini_plugin.py index 4850b52defaf..a3789a39d94b 100644 --- a/colossalai/booster/plugin/gemini_plugin.py +++ b/colossalai/booster/plugin/gemini_plugin.py @@ -2,7 +2,7 @@ import os import warnings from pathlib import Path -from typing import Callable, List, Optional, Tuple, Union +from typing import Callable, Iterator, List, Optional, Tuple, Union import torch import torch.nn as nn @@ -286,3 +286,6 @@ def control_checkpoint_io(self) -> bool: def get_checkpoint_io(self) -> CheckpointIO: return GeminiCheckpointIO() + + def no_sync(self, model: nn.Module) -> Iterator[None]: + raise NotImplementedError diff --git a/colossalai/booster/plugin/low_level_zero_plugin.py b/colossalai/booster/plugin/low_level_zero_plugin.py index f0f5768560a7..edc0b7679686 100644 --- a/colossalai/booster/plugin/low_level_zero_plugin.py +++ b/colossalai/booster/plugin/low_level_zero_plugin.py @@ -1,5 +1,5 @@ import warnings -from typing import Callable, List, Optional, Tuple, Union +from typing import Callable, Iterator, List, Optional, Tuple, Union import torch import torch.nn as nn @@ -197,3 +197,6 @@ def control_checkpoint_io(self) -> bool: def get_checkpoint_io(self) -> CheckpointIO: return LowLevelZeroCheckpointIO() + + def no_sync(self, model: nn.Module) -> Iterator[None]: + raise NotImplementedError diff --git a/colossalai/booster/plugin/plugin_base.py b/colossalai/booster/plugin/plugin_base.py index eb5478595542..561f58bc5570 100644 --- a/colossalai/booster/plugin/plugin_base.py +++ b/colossalai/booster/plugin/plugin_base.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Callable, List, Tuple, Union +from typing import Callable, Iterator, List, Tuple, Union import torch.nn as nn from torch.optim import Optimizer @@ -60,6 +60,13 @@ def get_checkpoint_io(self) -> CheckpointIO: """ pass + @abstractmethod + def no_sync(self, model: nn.Module) -> Iterator[None]: + """ + Context manager to disable gradient synchronization. + """ + pass + @abstractmethod def prepare_dataloader(self, dataset: Dataset, diff --git a/colossalai/booster/plugin/torch_ddp_plugin.py b/colossalai/booster/plugin/torch_ddp_plugin.py index 76906d844ef1..99cd2f7791d3 100644 --- a/colossalai/booster/plugin/torch_ddp_plugin.py +++ b/colossalai/booster/plugin/torch_ddp_plugin.py @@ -1,4 +1,4 @@ -from typing import Callable, List, Tuple, Union +from typing import Callable, Iterator, List, Tuple, Union import torch.nn as nn from torch.nn.parallel import DistributedDataParallel as DDP @@ -142,3 +142,7 @@ def control_checkpoint_io(self) -> bool: def get_checkpoint_io(self) -> CheckpointIO: return TorchDDPCheckpointIO() + + def no_sync(self, model: nn.Module) -> Iterator[None]: + assert isinstance(model, TorchDDPModel), 'Model is not boosted by TorchDDPPlugin.' + return model.module.no_sync() diff --git a/colossalai/communication/p2p.py b/colossalai/communication/p2p.py index 0200cd3c6553..1f20fca4f74d 100644 --- a/colossalai/communication/p2p.py +++ b/colossalai/communication/p2p.py @@ -217,7 +217,7 @@ def recv_backward(output_grad_shape, next_rank (int, optional): The rank of the source of the tensor. Returns: - Union[:class:`torch.Tensor`, List[:class:`torch.Tensor`]]: The input gradient tensor or gradident tensor list. + Union[:class:`torch.Tensor`, List[:class:`torch.Tensor`]]: The input gradient tensor or gradient tensor list. """ if gpc.is_pipeline_last_stage(): output_tensor_grad = None diff --git a/colossalai/communication/p2p_v2.py b/colossalai/communication/p2p_v2.py index 0dacd8c3c9b5..090311cb35f2 100644 --- a/colossalai/communication/p2p_v2.py +++ b/colossalai/communication/p2p_v2.py @@ -19,7 +19,7 @@ def init_process_group(): - """intialise process group by dist.new_group in the adjacent stages + """initialise process group by dist.new_group in the adjacent stages Args: None diff --git a/colossalai/context/process_group_initializer/initializer_sequence.py b/colossalai/context/process_group_initializer/initializer_sequence.py index eaacb14d2282..251a2940778a 100644 --- a/colossalai/context/process_group_initializer/initializer_sequence.py +++ b/colossalai/context/process_group_initializer/initializer_sequence.py @@ -91,11 +91,11 @@ def init_dist_group(self): parallel_setting = [] - local_rank, group_world_size, process_group, cpu_grop, ranks_in_group, mode = \ + local_rank, group_world_size, process_group, cpu_group, ranks_in_group, mode = \ self._sequence_initializer.init_dist_group() # change mode to sequence mode = ParallelMode.SEQUENCE - parallel_setting.append((local_rank, group_world_size, process_group, cpu_grop, ranks_in_group, mode)) + parallel_setting.append((local_rank, group_world_size, process_group, cpu_group, ranks_in_group, mode)) parallel_setting.append(self._sequence_dp_initializer.init_dist_group()) return parallel_setting diff --git a/colossalai/testing/__init__.py b/colossalai/testing/__init__.py index c53e0f44c7e0..9d0475ed064c 100644 --- a/colossalai/testing/__init__.py +++ b/colossalai/testing/__init__.py @@ -1,4 +1,11 @@ -from .comparison import assert_close, assert_close_loose, assert_equal, assert_equal_in_group, assert_not_equal +from .comparison import ( + assert_close, + assert_close_loose, + assert_equal, + assert_equal_in_group, + assert_not_equal, + check_state_dict_equal, +) from .pytest_wrapper import run_on_environment_flag from .utils import ( clear_cache_before_run, @@ -13,5 +20,5 @@ __all__ = [ 'assert_equal', 'assert_not_equal', 'assert_close', 'assert_close_loose', 'assert_equal_in_group', 'parameterize', 'rerun_on_exception', 'rerun_if_address_is_in_use', 'skip_if_not_enough_gpus', 'free_port', 'spawn', - 'clear_cache_before_run', 'run_on_environment_flag' + 'clear_cache_before_run', 'run_on_environment_flag', 'check_state_dict_equal' ] diff --git a/colossalai/testing/comparison.py b/colossalai/testing/comparison.py index e00d0da168c7..faf61638d8bb 100644 --- a/colossalai/testing/comparison.py +++ b/colossalai/testing/comparison.py @@ -1,3 +1,5 @@ +from typing import OrderedDict + import torch import torch.distributed as dist from torch import Tensor @@ -28,3 +30,25 @@ def assert_equal_in_group(tensor: Tensor, process_group: ProcessGroup = None): a = tensor_list[i] b = tensor_list[i + 1] assert torch.all(a == b), f'expected tensors on rank {i} and {i + 1} to be equal but they are not, {a} vs {b}' + + +def check_state_dict_equal(d1: OrderedDict, d2: OrderedDict, ignore_device: bool = True): + for k, v in d1.items(): + if isinstance(v, dict): + check_state_dict_equal(v, d2[k]) + elif isinstance(v, list): + for i in range(len(v)): + if isinstance(v[i], torch.Tensor): + if not ignore_device: + v[i] = v[i].to("cpu") + d2[k][i] = d2[k][i].to("cpu") + assert torch.equal(v[i], d2[k][i]) + else: + assert v[i] == d2[k][i] + elif isinstance(v, torch.Tensor): + if not ignore_device: + v = v.to("cpu") + d2[k] = d2[k].to("cpu") + assert torch.equal(v, d2[k]) + else: + assert v == d2[k] diff --git a/examples/tutorial/new_api/cifar_resnet/train.py b/examples/tutorial/new_api/cifar_resnet/train.py index a96a4b640a22..fe0dabf08377 100644 --- a/examples/tutorial/new_api/cifar_resnet/train.py +++ b/examples/tutorial/new_api/cifar_resnet/train.py @@ -28,7 +28,7 @@ def build_dataloader(batch_size: int, coordinator: DistCoordinator, plugin: DPPluginBase): - # trainsform + # transform transform_train = transforms.Compose( [transforms.Pad(4), transforms.RandomHorizontalFlip(), diff --git a/examples/tutorial/new_api/cifar_vit/train.py b/examples/tutorial/new_api/cifar_vit/train.py index 2405fdfc60d5..82a8f2ed97e4 100644 --- a/examples/tutorial/new_api/cifar_vit/train.py +++ b/examples/tutorial/new_api/cifar_vit/train.py @@ -25,7 +25,7 @@ # Prepare Hyperparameters # ============================== NUM_EPOCHS = 60 -WARMUP_EPOCSH = 5 +WARMUP_EPOCHS = 5 LEARNING_RATE = 1e-3 @@ -37,7 +37,7 @@ def vit_cifar(**kwargs): def build_dataloader(batch_size: int, coordinator: DistCoordinator, plugin: DPPluginBase): - # trainsform + # transform transform_train = transforms.Compose([ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), @@ -177,7 +177,7 @@ def main(): optimizer = HybridAdam(model.parameters(), lr=LEARNING_RATE) # lr scheduler - lr_scheduler = LinearWarmupLR(optimizer, NUM_EPOCHS, WARMUP_EPOCSH) + lr_scheduler = LinearWarmupLR(optimizer, NUM_EPOCHS, WARMUP_EPOCHS) # ============================== # Boost with ColossalAI diff --git a/op_builder/utils.py b/op_builder/utils.py index 1b1bd5f49970..2dbd976fbcbb 100644 --- a/op_builder/utils.py +++ b/op_builder/utils.py @@ -36,7 +36,7 @@ def get_cuda_version_in_pytorch() -> List[int]: torch_cuda_minor = torch.version.cuda.split(".")[1] except: raise ValueError( - "[extension] Cannot retrive the CUDA version in the PyTorch binary given by torch.version.cuda") + "[extension] Cannot retrieve the CUDA version in the PyTorch binary given by torch.version.cuda") return torch_cuda_major, torch_cuda_minor diff --git a/tests/components_to_test/albert.py b/tests/components_to_test/albert.py index d5b6bc89a83e..8924eb2fbc92 100644 --- a/tests/components_to_test/albert.py +++ b/tests/components_to_test/albert.py @@ -27,8 +27,8 @@ def bert_model_builder(checkpoint: bool = False): attention_probs_dropout_prob=0.) print('building AlbertForSequenceClassification model') - # adapting huggingface BertForSequenceClassification for single unitest calling interface - class ModelAaptor(AlbertForSequenceClassification): + # adapting huggingface BertForSequenceClassification for single unittest calling interface + class ModelAdaptor(AlbertForSequenceClassification): def forward(self, input_ids, labels): """ @@ -37,23 +37,23 @@ def forward(self, input_ids, labels): """ return super().forward(input_ids=input_ids, labels=labels)[0] - model = ModelAaptor(config) + model = ModelAdaptor(config) # if checkpoint and version.parse(transformers.__version__) >= version.parse("4.11.0"): # model.gradient_checkpointing_enable() return model - is_distrbuted = torch.distributed.is_initialized() + is_distributed = torch.distributed.is_initialized() trainloader = get_bert_data_loader(n_class=vocab_size, batch_size=2, total_samples=10000, sequence_length=sequence_length, - is_distrbuted=is_distrbuted) + is_distributed=is_distributed) testloader = get_bert_data_loader(n_class=vocab_size, batch_size=2, total_samples=10000, sequence_length=sequence_length, - is_distrbuted=is_distrbuted) + is_distributed=is_distributed) criterion = None return bert_model_builder, trainloader, testloader, torch.optim.Adam, criterion diff --git a/tests/components_to_test/beit.py b/tests/components_to_test/beit.py index 1252071f4075..2021ae6f6e35 100644 --- a/tests/components_to_test/beit.py +++ b/tests/components_to_test/beit.py @@ -27,7 +27,7 @@ def generate(self): @non_distributed_component_funcs.register(name='beit') def get_training_components(): - def model_buider(checkpoint=False): + def model_builder(checkpoint=False): model = Beit(img_size=DummyDataLoader.img_size, num_classes=DummyDataLoader.num_class, embed_dim=32, @@ -39,4 +39,4 @@ def model_buider(checkpoint=False): testloader = DummyDataLoader() criterion = torch.nn.CrossEntropyLoss() - return model_buider, trainloader, testloader, torch.optim.Adam, criterion + return model_builder, trainloader, testloader, torch.optim.Adam, criterion diff --git a/tests/components_to_test/bert.py b/tests/components_to_test/bert.py index c1faa6f9d892..e7d1d50806b8 100644 --- a/tests/components_to_test/bert.py +++ b/tests/components_to_test/bert.py @@ -13,7 +13,7 @@ def get_bert_data_loader( total_samples, sequence_length, device=torch.device('cpu:0'), - is_distrbuted=False, + is_distributed=False, ): train_data = torch.randint( low=0, @@ -24,7 +24,7 @@ def get_bert_data_loader( ) train_label = torch.randint(low=0, high=2, size=(total_samples,), device=device, dtype=torch.long) train_dataset = torch.utils.data.TensorDataset(train_data, train_label) - if is_distrbuted: + if is_distributed: sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) else: sampler = SequentialSampler(train_dataset) @@ -52,8 +52,8 @@ def bert_model_builder(checkpoint: bool = False): attention_probs_dropout_prob=0.) print('building BertForSequenceClassification model') - # adapting huggingface BertForSequenceClassification for single unitest calling interface - class ModelAaptor(BertForSequenceClassification): + # adapting huggingface BertForSequenceClassification for single unittest calling interface + class ModelAdaptor(BertForSequenceClassification): def forward(self, input_ids, labels): """ @@ -62,23 +62,23 @@ def forward(self, input_ids, labels): """ return super().forward(input_ids=input_ids, labels=labels)[0] - model = ModelAaptor(config) + model = ModelAdaptor(config) if checkpoint and version.parse(transformers.__version__) >= version.parse("4.11.0"): model.gradient_checkpointing_enable() return model - is_distrbuted = torch.distributed.is_initialized() + is_distributed = torch.distributed.is_initialized() trainloader = get_bert_data_loader(n_class=vocab_size, batch_size=2, total_samples=10000, sequence_length=sequence_length, - is_distrbuted=is_distrbuted) + is_distributed=is_distributed) testloader = get_bert_data_loader(n_class=vocab_size, batch_size=2, total_samples=10000, sequence_length=sequence_length, - is_distrbuted=is_distrbuted) + is_distributed=is_distributed) criterion = None return bert_model_builder, trainloader, testloader, torch.optim.Adam, criterion diff --git a/tests/components_to_test/registry.py b/tests/components_to_test/registry.py index 728ed9eba6ea..edfcaaa7275b 100644 --- a/tests/components_to_test/registry.py +++ b/tests/components_to_test/registry.py @@ -9,10 +9,10 @@ def __init__(self): def register(self, name): assert name not in self._registry - def _regsiter(callable_): + def _register(callable_): self._registry[name] = callable_ - return _regsiter + return _register def get_callable(self, name: str): return self._registry[name] @@ -34,6 +34,6 @@ def __next__(self): non_distributed_component_funcs = Registry() -model_paralle_component_funcs = Registry() +model_parallel_component_funcs = Registry() -__all__ = ['non_distributed_component_funcs', 'model_paralle_component_funcs'] +__all__ = ['non_distributed_component_funcs', 'model_parallel_component_funcs'] diff --git a/tests/test_booster/test_accelerator.py b/tests/test_booster/test_accelerator.py index 895c494d0c17..6f3f66ed41b8 100644 --- a/tests/test_booster/test_accelerator.py +++ b/tests/test_booster/test_accelerator.py @@ -7,8 +7,8 @@ @clear_cache_before_run() @parameterize('device', ['cpu', 'cuda']) def test_accelerator(device): - acceleartor = Accelerator(device) + accelerator = Accelerator(device) model = nn.Linear(8, 8) - model = acceleartor.configure_model(model) + model = accelerator.configure_model(model) assert next(model.parameters()).device.type == device - del model, acceleartor + del model, accelerator diff --git a/tests/test_booster/test_plugin/test_dp_plugin_base.py b/tests/test_booster/test_plugin/test_dp_plugin_base.py index eab949828db9..689b334cae50 100644 --- a/tests/test_booster/test_plugin/test_dp_plugin_base.py +++ b/tests/test_booster/test_plugin/test_dp_plugin_base.py @@ -1,4 +1,4 @@ -from typing import Callable, List, Tuple, Union +from typing import Callable, Iterator, List, Tuple, Union import torch import torch.distributed as dist @@ -49,11 +49,14 @@ def supported_devices(self) -> List[str]: def supported_precisions(self) -> List[str]: pass + def no_sync(self, model: nn.Module) -> Iterator[None]: + pass + def check_dataloader_sharding(): plugin = DPPluginWrapper() - # create a custom dasetset with 0 to 10 + # create a custom dataset with 0 to 10 dataset = TensorDataset(torch.arange(0, 10)) train_dataloader = plugin.prepare_dataloader(dataset, batch_size=2) diff --git a/tests/test_booster/test_plugin/test_torch_ddp_plugin.py b/tests/test_booster/test_plugin/test_torch_ddp_plugin.py index 30c4db12309f..fbe44e5ce6fb 100644 --- a/tests/test_booster/test_plugin/test_torch_ddp_plugin.py +++ b/tests/test_booster/test_plugin/test_torch_ddp_plugin.py @@ -1,5 +1,8 @@ +from contextlib import nullcontext + import torch import torch.distributed as dist +import torch.nn as nn from torch.nn.parallel import DistributedDataParallel as DDP from torch.optim import SGD @@ -44,10 +47,67 @@ def check_torch_ddp_plugin(): torch.cuda.empty_cache() +class DummyModel(nn.Module): + + def __init__(self): + super().__init__() + self.weight = nn.Parameter(torch.rand(1)) + + def forward(self, x): + return self.weight * x + + +def check_torch_ddp_no_sync(): + plugin = TorchDDPPlugin() + booster = Booster(plugin=plugin) + + model = DummyModel() + criterion = lambda x: x.mean() + optimizer = SGD(model.parameters(), lr=1e-3) + # create a custom dasetset with 0 to 10 + dataset = torch.arange(0, 10) + train_dataloader = plugin.prepare_dataloader(dataset, batch_size=2) + model, optimizer, criterion, train_dataloader, _ = booster.boost(model, + optimizer, + criterion, + dataloader=train_dataloader) + + def fwd_bwd(): + output = model(batch.cuda()) + loss = criterion(output) + booster.backward(loss, optimizer) + + def get_grad_set_over_all_ranks(): + for p in model.parameters(): + # grad shape is (1, ) + assert p.grad.shape == (1,) + grad_list = [torch.empty_like(p.grad) for _ in range(dist.get_world_size())] + dist.all_gather(grad_list, p.grad) + # get grad set of all ranks + grad_set = set([grad.item() for grad in grad_list]) + # as the model only has one parameter, we can return here + return grad_set + + for i, batch in enumerate(train_dataloader): + if i > 1: + # only check the first two batches + break + # no_sync for the first batch, sync for the second batch + ctx = booster.no_sync(model) if i == 0 else nullcontext() + with ctx: + fwd_bwd() + grad_set = get_grad_set_over_all_ranks() + # for the first batch, all ranks should have different grads + # for the second batch, as grad is synchronized,all ranks should have the same grads + target_num_different_grad = dist.get_world_size() if i == 0 else 1 + assert len(grad_set) == target_num_different_grad + + def run_dist(rank, world_size, port): # init dist env colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host='localhost') check_torch_ddp_plugin() + check_torch_ddp_no_sync() @rerun_if_address_is_in_use() diff --git a/tests/test_checkpoint_io/test_gemini_checkpoint_io.py b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py new file mode 100644 index 000000000000..1e5a2e1c4b44 --- /dev/null +++ b/tests/test_checkpoint_io/test_gemini_checkpoint_io.py @@ -0,0 +1,98 @@ +import tempfile + +import pytest +import torch + +import colossalai +from colossalai.booster.plugin.gemini_plugin import GeminiCheckpointIO +from colossalai.testing import check_state_dict_equal, parameterize, rerun_if_address_is_in_use, spawn +from colossalai.utils.cuda import get_current_device +from colossalai.zero import ColoInitContext, ZeroDDP +from colossalai.zero.gemini.chunk import ChunkManager, search_chunk_configuration +from colossalai.zero.gemini.gemini_mgr import GeminiManager +from tests.components_to_test.registry import non_distributed_component_funcs + + +@parameterize('placement_policy', ['cuda', 'cpu']) +@parameterize('model_name', ['bert']) +@parameterize('use_safetensors', [True, False]) +def exam_state_dict_with_origin(placement_policy, model_name, use_safetensors: bool): + from transformers import BertForSequenceClassification + + model_ckpt_dir = tempfile.TemporaryDirectory() + get_components_func = non_distributed_component_funcs.get_callable(model_name) + model_builder, *_ = get_components_func() + with ColoInitContext(device=(get_current_device())): + bert_model = model_builder() + bert_model.config.save_pretrained(save_directory=(model_ckpt_dir.name)) + + config_dict, *_ = search_chunk_configuration(bert_model, search_range_mb=1, search_interval_byte=100) + chunk_manager = ChunkManager(config_dict) + gemini_manager = GeminiManager(placement_policy, chunk_manager) + bert_model = ZeroDDP(bert_model, gemini_manager) + bert_model.train() + + ckpt_io = GeminiCheckpointIO() + if ckpt_io.coordinator.is_master(): + model_size = sum(p.numel() * p.element_size() for p in bert_model.parameters()) / 1024**2 + ckpt_io.save_model(bert_model, (model_ckpt_dir.name), + True, + True, + '', (model_size / 3), + use_safetensors=use_safetensors) + new_bert_model = BertForSequenceClassification.from_pretrained(model_ckpt_dir.name) + check_state_dict_equal(bert_model.state_dict(only_rank_0=True, dtype=(torch.float32)), + new_bert_model.state_dict(), False) + model_ckpt_dir.cleanup() + + +@parameterize('placement_policy', ['cuda', 'cpu']) +@parameterize('model_name', ['gpt2', 'bert']) +@parameterize('use_safetensors', [True, False]) +def exam_state_dict(placement_policy, model_name: str, use_safetensors: bool): + get_components_func = non_distributed_component_funcs.get_callable(model_name) + model_builder, *_ = get_components_func() + with ColoInitContext(device=(get_current_device())): + model = model_builder() + new_model = model_builder() + config_dict, *_ = search_chunk_configuration(model, search_range_mb=1, search_interval_byte=100) + chunk_manager = ChunkManager(config_dict) + gemini_manager = GeminiManager(placement_policy, chunk_manager) + model = ZeroDDP(model, gemini_manager) + + model.train() + #new model + new_config_dict, *_ = search_chunk_configuration(new_model, search_range_mb=1, search_interval_byte=100) + new_chunk_manager = ChunkManager(new_config_dict) + new_gemini_manager = GeminiManager(placement_policy, new_chunk_manager) + new_model = ZeroDDP(new_model, new_gemini_manager) + + model_ckpt_dir = tempfile.TemporaryDirectory() + ckpt_io = GeminiCheckpointIO() + model_size = sum(p.numel() * p.element_size() for p in model.parameters()) / 1024**2 + ckpt_io.save_model(model, (model_ckpt_dir.name), + True, + True, + 'epoch', (model_size / 3), + use_safetensors=use_safetensors) + + if ckpt_io.coordinator.is_master(): + ckpt_io.load_model(new_model, (model_ckpt_dir.name), strict=True) + model_dict = model.state_dict(only_rank_0=True) + new_model_dict = new_model.state_dict(only_rank_0=True) + check_state_dict_equal(model_dict, new_model_dict, False) + model_ckpt_dir.cleanup() + + +def run_dist(rank, world_size, port): + config = {} + colossalai.launch(config=config, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') + exam_state_dict() + exam_state_dict_with_origin() + + +@pytest.mark.dist +@pytest.mark.parametrize('world_size', [4, 4]) +@rerun_if_address_is_in_use() +def test_gemini_ckpIO(world_size): + spawn(run_dist, world_size) diff --git a/tests/test_checkpoint_io/test_general_checkpoint_io.py b/tests/test_checkpoint_io/test_general_checkpoint_io.py index 752ca706bfd4..9e973bb23e0b 100644 --- a/tests/test_checkpoint_io/test_general_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_general_checkpoint_io.py @@ -1,20 +1,13 @@ import tempfile + import pytest import torch from torch.optim import Adam from torchvision.models import resnet18 -from colossalai.checkpoint_io import GeneralCheckpointIO from colossalai.booster.plugin.gemini_plugin import GeminiCheckpointIO -from colossalai.testing import clear_cache_before_run, parameterize - -import colossalai -from colossalai.testing import parameterize, rerun_if_address_is_in_use, spawn -from colossalai.utils.cuda import get_current_device -from colossalai.zero import ColoInitContext, ZeroDDP -from colossalai.zero.gemini.chunk import ChunkManager, search_chunk_configuration -from colossalai.zero.gemini.gemini_mgr import GeminiManager -from tests.components_to_test.registry import non_distributed_component_funcs +from colossalai.checkpoint_io import GeneralCheckpointIO +from colossalai.testing import check_state_dict_equal, clear_cache_before_run, parameterize # ======== # Note: @@ -61,10 +54,10 @@ def test_unsharded_checkpoint(use_safetensors: bool): ckpt_io.load_model(new_model, model_ckpt_tempfile.name) ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) - # check for model and optimizer state dict recursively - recursive_check(model.state_dict(), new_model.state_dict()) - recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) + check_state_dict_equal(model.state_dict(), new_model.state_dict()) + check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict()) + @pytest.mark.parametrize('use_safetensors', [True, False]) def test_sharded_checkpoint(use_safetensors: bool): @@ -87,7 +80,7 @@ def test_sharded_checkpoint(use_safetensors: bool): else: suffix = ".bin" WEIGHTS_INDEX_NAME = "model.bin.index.json" - + model_ckpt_dir = tempfile.TemporaryDirectory() optimizer_ckpt_tempfile = tempfile.NamedTemporaryFile() @@ -96,7 +89,7 @@ def test_sharded_checkpoint(use_safetensors: bool): ckpt_io.save_model(model, model_ckpt_dir.name, True, True, "", 10, use_safetensors=use_safetensors) ckpt_io.save_optimizer(optimizer, optimizer_ckpt_tempfile.name, shard=False) - + # create new model new_model = resnet18() new_optimizer = Adam(new_model.parameters(), lr=0.001) @@ -105,111 +98,5 @@ def test_sharded_checkpoint(use_safetensors: bool): ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) # check for model and optimizer state dict recursively - recursive_check(model.state_dict(), new_model.state_dict()) - recursive_check(optimizer.state_dict(), new_optimizer.state_dict()) - -@parameterize('placement_policy', ['cuda', 'cpu']) -@parameterize('model_name', ['bert']) -@parameterize('use_safetensors', [True, False]) -def hf_load_colossalai_checkpoint(placement_policy, model_name, use_safetensors: bool): - from transformers import BertTokenizer, BertModel, BertForMaskedLM, BertConfig, BertForSequenceClassification - - model_ckpt_dir = tempfile.TemporaryDirectory() - get_components_func = non_distributed_component_funcs.get_callable(model_name) - model_builder, *_ = get_components_func() - - with ColoInitContext(device=get_current_device()): - bert_model = model_builder() - bert_model.config.save_pretrained(save_directory=model_ckpt_dir.name) - config_dict, *_ = search_chunk_configuration(bert_model, search_range_mb=1, search_interval_byte=100) - chunk_manager = ChunkManager(config_dict) - gemini_manager = GeminiManager(placement_policy, chunk_manager) - bert_model = ZeroDDP(bert_model, gemini_manager) - bert_model.train() - - ckpt_io = GeminiCheckpointIO() - if ckpt_io.coordinator.is_master(): - model_size = sum(p.numel() * p.element_size() for p in bert_model.parameters()) / 1024**2 - ckpt_io.save_model(bert_model, model_ckpt_dir.name, True, True, "", (model_size / 3), use_safetensors=use_safetensors) - new_bert_model = BertForSequenceClassification.from_pretrained(model_ckpt_dir.name) - recursive_check(bert_model.state_dict(only_rank_0=True, dtype=torch.float32), new_bert_model.state_dict()) - - model_ckpt_dir.cleanup() - - - -@parameterize('placement_policy', ['cuda', 'cpu']) -@parameterize('model_name', ['gpt2', 'bert']) -@parameterize('use_safetensors', [True, False]) -def exam_state_dict(placement_policy, model_name: str, use_safetensors: bool): - get_components_func = non_distributed_component_funcs.get_callable(model_name) - model_builder, *_ = get_components_func() - - with ColoInitContext(device=get_current_device()): - model = model_builder() - new_model = model_builder() - - config_dict, *_ = search_chunk_configuration(model, search_range_mb=1, search_interval_byte=100) - chunk_manager = ChunkManager(config_dict) - gemini_manager = GeminiManager(placement_policy, chunk_manager) - model = ZeroDDP(model, gemini_manager) - model.train() - - new_config_dict, *_ = search_chunk_configuration(new_model, search_range_mb=1, search_interval_byte=100) - new_chunk_manager = ChunkManager(new_config_dict) - new_gemini_manager = GeminiManager(placement_policy, new_chunk_manager) - new_model = ZeroDDP(new_model, new_gemini_manager) - - model_ckpt_dir = tempfile.TemporaryDirectory() - - ckpt_io = GeminiCheckpointIO() - model_size = sum(p.numel() * p.element_size() for p in model.parameters()) / 1024**2 - ckpt_io.save_model(model, model_ckpt_dir.name, True, True, "epoch", (model_size / 3), use_safetensors=use_safetensors) - - # load model - if ckpt_io.coordinator.is_master(): - ckpt_io.load_model(new_model, model_ckpt_dir.name, strict=True) - model_dict = model.state_dict(only_rank_0=True) - new_model_dict = new_model.state_dict(only_rank_0=True) - recursive_check(model_dict, new_model_dict) - - model_ckpt_dir.cleanup() - - -def run_dist(rank, world_size, port): - config = {} - colossalai.launch(config=config, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') - exam_state_dict() - hf_load_colossalai_checkpoint() - - -@pytest.mark.dist -@pytest.mark.parametrize('world_size', [4, 4]) -@rerun_if_address_is_in_use() -def test_gemini_ckpIO(world_size): - spawn(run_dist, world_size) - - -# do recursive check for the optimizer state dict -# if the value is a dict, compare its values -# if the value is a list, comapre all elements one-by-one -# if the value is a torch.Tensor, use torch.equal -# otherwise use assertEqual -def recursive_check(d1, d2): - for k, v in d1.items(): - if isinstance(v, dict): - recursive_check(v, d2[k]) - elif isinstance(v, list): - for i in range(len(v)): - if isinstance(v[i], torch.Tensor): - v[i] = v[i].to("cpu") - d2[k][i] = d2[k][i].to("cpu") - assert torch.equal(v[i], d2[k][i]) - else: - assert v[i] == d2[k][i] - elif isinstance(v, torch.Tensor): - v = v.to("cpu") - d2[k] = d2[k].to("cpu") - assert torch.equal(v, d2[k]) - else: - assert v == d2[k] + check_state_dict_equal(model.state_dict(), new_model.state_dict()) + check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict()) diff --git a/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py new file mode 100644 index 000000000000..217a950d8155 --- /dev/null +++ b/tests/test_checkpoint_io/test_low_level_zero_checkpoint_io.py @@ -0,0 +1,57 @@ +import tempfile + +import pytest +import torch +from torchvision.models import resnet18 + +import colossalai +from colossalai.booster import Booster +from colossalai.booster.plugin import LowLevelZeroPlugin +from colossalai.booster.plugin.low_level_zero_plugin import LowLevelZeroCheckpointIO +from colossalai.nn.optimizer import HybridAdam +from colossalai.testing import ( + check_state_dict_equal, + clear_cache_before_run, + parameterize, + rerun_if_address_is_in_use, + spawn, +) + + +@clear_cache_before_run() +@parameterize('stage', [2]) +def check_low_level_zero_checkpointIO(stage: int): + plugin = LowLevelZeroPlugin(stage=stage, max_norm=1.0, initial_scale=32) + booster = Booster(plugin=plugin) + model = resnet18() + criterion = lambda x: x.mean() + optimizer = HybridAdam((model.parameters()), lr=0.001) + model, optimizer, criterion, _, _ = booster.boost(model, optimizer, criterion) + + x = torch.randn(4, 3, 224, 224) + x = x.to('cuda') + output = model(x) + loss = criterion(output) + booster.backward(loss, optimizer) + optimizer.step() + + optimizer_ckpt_tempfile = tempfile.NamedTemporaryFile() + ckpt_io = LowLevelZeroCheckpointIO() + ckpt_io.save_optimizer(optimizer, optimizer_ckpt_tempfile.name) + + if ckpt_io.coordinator.is_master(): + new_model = resnet18() + new_optimizer = HybridAdam((new_model.parameters()), lr=0.001) + _, new_optimizer, _, _, _ = booster.boost(new_model, new_optimizer) + ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) + check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict(), False) + + +def run_dist(rank, world_size, port): + colossalai.launch(config=(dict()), rank=rank, world_size=world_size, port=port, host='localhost') + check_low_level_zero_checkpointIO() + + +@rerun_if_address_is_in_use() +def test_low_level_zero_checkpointIO(): + spawn(run_dist, 2) diff --git a/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py b/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py new file mode 100644 index 000000000000..9128f8c0fe9e --- /dev/null +++ b/tests/test_checkpoint_io/test_torch_ddp_checkpoint_io.py @@ -0,0 +1,63 @@ +import tempfile + +import torch +from torch.nn.parallel import DistributedDataParallel as DDP +from torch.optim import SGD +from torchvision.models import resnet18 + +import colossalai +from colossalai.booster import Booster +from colossalai.booster.plugin import TorchDDPPlugin +from colossalai.booster.plugin.torch_ddp_plugin import TorchDDPCheckpointIO +from colossalai.interface import OptimizerWrapper +from colossalai.testing import check_state_dict_equal, rerun_if_address_is_in_use, spawn + + +def check_torch_ddp_checkpointIO(): + plugin = TorchDDPPlugin() + booster = Booster(plugin=plugin) + model = resnet18() + criterion = lambda x: x.mean() + optimizer = SGD((model.parameters()), lr=0.001) + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1) + model, optimizer, criterion, _, _ = booster.boost(model, optimizer, criterion, lr_scheduler=scheduler) + + assert isinstance(model.module, DDP) + assert isinstance(optimizer, OptimizerWrapper) + + x = torch.randn(4, 3, 224, 224) + x = x.to('cuda') + output = model(x) + loss = criterion(output) + booster.backward(loss, optimizer) + optimizer.clip_grad_by_norm(1.0) + optimizer.step() + scheduler.step() + + optimizer_ckpt_tempfile = tempfile.NamedTemporaryFile() + lr_scheduler_ckpt_tempfile = tempfile.NamedTemporaryFile() + ckpt_io = TorchDDPCheckpointIO() + ckpt_io.save_optimizer(optimizer, optimizer_ckpt_tempfile.name) + ckpt_io.save_lr_scheduler(scheduler, lr_scheduler_ckpt_tempfile.name) + + if ckpt_io.coordinator.is_master(): + new_model = resnet18() + new_optimizer = SGD((new_model.parameters()), lr=0.001) + new_scheduler = torch.optim.lr_scheduler.StepLR(new_optimizer, step_size=1, gamma=0.1) + _, new_optimizer, _, _, new_scheduler = booster.boost(new_model, new_optimizer, lr_scheduler=new_scheduler) + + ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name) + check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict(), False) + + ckpt_io.load_lr_scheduler(new_scheduler, lr_scheduler_ckpt_tempfile.name) + check_state_dict_equal(scheduler.state_dict(), new_scheduler.state_dict(), False) + + +def run_dist(rank, world_size, port): + colossalai.launch(config=(dict()), rank=rank, world_size=world_size, port=port, host='localhost') + check_torch_ddp_checkpointIO() + + +@rerun_if_address_is_in_use() +def test_torch_ddp_checkpointIO(): + spawn(run_dist, 2) diff --git a/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor.py b/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor.py index 4d63592f12b0..4992acbd7cc2 100644 --- a/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor.py +++ b/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor.py @@ -48,7 +48,7 @@ def run_trainer(rank, world_size, port): pipelinable.policy = "uniform" model = pipelinable.partition(1, gpc.pipeline_parallel_size, gpc.get_local_rank(ParallelMode.PIPELINE)) - # craete dataloaders + # create dataloaders root = Path(os.environ['DATA']) transform_train = transforms.Compose([ transforms.RandomCrop(32, padding=4, pad_if_needed=True), @@ -68,7 +68,7 @@ def run_trainer(rank, world_size, port): # create lr scheduler lr_scheduler = CosineAnnealingWarmupLR(optimizer=optimizer, total_steps=NUM_EPOCHS, warmup_steps=WARMUP_EPOCHS) - # intiailize + # initialize engine, train_dataloader, *_ = colossalai.initialize(model=model, optimizer=optimizer, criterion=criterion, diff --git a/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor_v2.py b/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor_v2.py index 67d2ba5f5d98..62bbb8f50391 100644 --- a/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor_v2.py +++ b/tests/test_data_pipeline_tensor_parallel/test_cifar_with_data_pipeline_tensor_v2.py @@ -50,7 +50,7 @@ def run_trainer(rank, world_size, port): pipelinable.policy = "uniform" model = pipelinable.partition(1, gpc.pipeline_parallel_size, gpc.get_local_rank(ParallelMode.PIPELINE)) - # craete dataloaders + # create dataloaders root = Path(os.environ['DATA']) transform_train = transforms.Compose([ transforms.RandomCrop(32, padding=4, pad_if_needed=True), @@ -70,7 +70,7 @@ def run_trainer(rank, world_size, port): # create lr scheduler lr_scheduler = CosineAnnealingWarmupLR(optimizer=optimizer, total_steps=NUM_EPOCHS, warmup_steps=WARMUP_EPOCHS) - # intiailize + # initialize engine, train_dataloader, *_ = colossalai.initialize(model=model, optimizer=optimizer, criterion=criterion, diff --git a/tests/test_fx/test_codegen/test_activation_checkpoint_codegen.py b/tests/test_fx/test_codegen/test_activation_checkpoint_codegen.py index ab483f7e47a3..bcac2ec426d9 100644 --- a/tests/test_fx/test_codegen/test_activation_checkpoint_codegen.py +++ b/tests/test_fx/test_codegen/test_activation_checkpoint_codegen.py @@ -64,7 +64,7 @@ def forward(self, x, y): def _run_act_ckpt_codegen(rank, world_size, port): - # launch colossalai to make sure we could execute colossalai.utils.checkpoint currectly + # launch colossalai to make sure we could execute colossalai.utils.checkpoint currently colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') # build model and run forward @@ -122,7 +122,7 @@ def test_act_ckpt_codegen(): def _run_act_ckpt_python_code_torch11(rank, world_size, port): - # launch colossalai to make sure we could execute colossalai.utils.checkpoint currectly + # launch colossalai to make sure we could execute colossalai.utils.checkpoint currently colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') # build model and run forward diff --git a/tests/test_fx/test_codegen/test_nested_activation_checkpoint_codegen.py b/tests/test_fx/test_codegen/test_nested_activation_checkpoint_codegen.py index 9064023d4f68..5b327807a57b 100644 --- a/tests/test_fx/test_codegen/test_nested_activation_checkpoint_codegen.py +++ b/tests/test_fx/test_codegen/test_nested_activation_checkpoint_codegen.py @@ -32,7 +32,7 @@ def forward(self, x): def _run_act_ckpt_codegen(rank, world_size, port): - # launch colossalai to make sure we could execute colossalai.utils.checkpoint currectly + # launch colossalai to make sure we could execute colossalai.utils.checkpoint currently colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') # build model and run forward @@ -89,7 +89,7 @@ def test_act_ckpt_codegen(): def _run_act_ckpt_python_code_torch11(rank, world_size, port): - # launch colossalai to make sure we could execute colossalai.utils.checkpoint currectly + # launch colossalai to make sure we could execute colossalai.utils.checkpoint currently colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') # build model and run forward diff --git a/tests/test_fx/test_codegen/test_offload_codegen.py b/tests/test_fx/test_codegen/test_offload_codegen.py index 96e88eb92b33..c217b96586fe 100644 --- a/tests/test_fx/test_codegen/test_offload_codegen.py +++ b/tests/test_fx/test_codegen/test_offload_codegen.py @@ -56,7 +56,7 @@ def _test_fwd_and_bwd(model: torch.nn.Module, gm: ColoGraphModule, data: torch.T fx_out = gm(data) assert torch.equal(non_fx_out, fx_out), "fx_out doesn't comply with original output" - # test barckward + # test backward loss0 = non_fx_out.sum() loss0.backward() loss1 = fx_out.sum() @@ -65,7 +65,7 @@ def _test_fwd_and_bwd(model: torch.nn.Module, gm: ColoGraphModule, data: torch.T def _run_offload_codegen(rank, world_size, port): - # launch colossalai to make sure we could execute colossalai.utils.checkpoint currectly + # launch colossalai to make sure we could execute colossalai.utils.checkpoint currently colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') # build model and input @@ -120,7 +120,7 @@ def test_act_ckpt_codegen(): def _run_offload_codegen_torch11(rank, world_size, port): - # launch colossalai to make sure we could execute colossalai.utils.checkpoint currectly + # launch colossalai to make sure we could execute colossalai.utils.checkpoint currently colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') # build model and input diff --git a/tests/test_layers/test_sequence/test_sequence.py b/tests/test_layers/test_sequence/test_sequence.py index aac192d7eff0..60f2d55f43af 100644 --- a/tests/test_layers/test_sequence/test_sequence.py +++ b/tests/test_layers/test_sequence/test_sequence.py @@ -45,7 +45,7 @@ def check_ring_qk(rank, world_size): ring_qk = colossalai.nn.layer.parallel_sequence.RingQK.apply sub_a = ring_qk(sub_q, sub_k, batch_size, num_heads, sub_seq_length) - # check master and distributed attetion scores + # check master and distributed attention scores sub_master_a = a[:, rank * sub_seq_length:(rank + 1) * sub_seq_length] assert torch.allclose(sub_a, sub_master_a, rtol=1e-5, atol=1e-2) diff --git a/tests/test_moe/test_kernel.py b/tests/test_moe/test_kernel.py index ad9a172b72aa..39603c158731 100644 --- a/tests/test_moe/test_kernel.py +++ b/tests/test_moe/test_kernel.py @@ -41,7 +41,7 @@ def run_routing(rank, world_size, port, rs=2, hidden_size=128, data_type=torch.f if data_type == torch.float16: layer = layer.half() - # use matrix multiplication instead of COL_MOE_KERNL in MOE dispatch and combine + # use matrix multiplication instead of COL_MOE_KERNEL in MOE dispatch and combine layer.use_kernel = False old_out, _ = layer(tokens) ech = old_out.shape @@ -57,7 +57,7 @@ def run_routing(rank, world_size, port, rs=2, hidden_size=128, data_type=torch.f layer.gate_weight.grad.zero_() layer.use_kernel = True - new_out, _ = layer(tokens) # get ouputs through colossal kernel + new_out, _ = layer(tokens) # get outputs through colossal kernel if data_type == torch.float32: check_equal(old_out, new_out) diff --git a/tests/test_tensor/model/test_model.py b/tests/test_tensor/model/test_model.py index 79d70e53c5cb..288bd20e3844 100644 --- a/tests/test_tensor/model/test_model.py +++ b/tests/test_tensor/model/test_model.py @@ -329,6 +329,6 @@ def test_pretrain_load(world_size): if __name__ == '__main__': # test_model_parameters() - # test_colo_optgimizer() + # test_colo_optimizer() test_model(4) # test_pretrain_load(4) diff --git a/tests/test_trainer/test_pipeline/test_p2p.py b/tests/test_trainer/test_pipeline/test_p2p.py index cb7a193d2bfa..8ad366133d18 100644 --- a/tests/test_trainer/test_pipeline/test_p2p.py +++ b/tests/test_trainer/test_pipeline/test_p2p.py @@ -90,7 +90,7 @@ def run_check(rank, world_size, port): prev_rank = gpc.get_prev_global_rank(ParallelMode.PIPELINE) next_rank = gpc.get_next_global_rank(ParallelMode.PIPELINE) logger.info('Rank {0}: prev rank {1}, next rank {2}'.format(rank, prev_rank, next_rank)) - logger.info('Distributed environment is initialzied.') + logger.info('Distributed environment is initialized.') check_comm(world_size, rank, prev_rank, next_rank, logger) gpc.destroy() diff --git a/tests/test_utils/test_activation_checkpointing.py b/tests/test_utils/test_activation_checkpointing.py index 59a8acd4b210..2930552cc4e7 100644 --- a/tests/test_utils/test_activation_checkpointing.py +++ b/tests/test_utils/test_activation_checkpointing.py @@ -51,7 +51,7 @@ def test_activation_checkpointing(cpu_offload, use_reentrant): # other tests might affect this test reset_seeds() - # We put initilization here to avoid change cuda rng state below + # We put initialization here to avoid change cuda rng state below inputs = torch.rand(2, 2, requires_grad=True, device='cuda') weight = torch.rand(2, 4, requires_grad=True, device='cuda') diff --git a/tests/test_utils/test_checkpoint_io/test_load.py b/tests/test_utils/test_checkpoint_io/test_load.py index b1a741515728..2949c9f0752d 100644 --- a/tests/test_utils/test_checkpoint_io/test_load.py +++ b/tests/test_utils/test_checkpoint_io/test_load.py @@ -23,7 +23,7 @@ def check_model_state_dict(a: Dict[str, Tensor], b: Dict[str, Tensor]) -> None: assert torch.equal(v, b[k]) -def check_optim_state_dict(a: dict, b: dict, ignore_param_gruops: bool = False) -> None: +def check_optim_state_dict(a: dict, b: dict, ignore_param_groups: bool = False) -> None: assert set(a['state'].keys()) == set(b['state'].keys()) for k, state in a['state'].items(): b_state = b['state'][k] @@ -32,7 +32,7 @@ def check_optim_state_dict(a: dict, b: dict, ignore_param_gruops: bool = False) assert torch.equal(v1, v2) else: assert v1 == v2 - if not ignore_param_gruops: + if not ignore_param_groups: assert a['param_groups'] == b['param_groups'] @@ -129,23 +129,23 @@ def launch_dist(fn, world_size: int): def save_dist(dir_name: str, zero: bool): - model, optmizer = prepare_model_optim(shard=True, zero=zero) - reset_model_optim(model, optmizer) + model, optimizer = prepare_model_optim(shard=True, zero=zero) + reset_model_optim(model, optimizer) world_size = dist.get_world_size() rank = dist.get_rank() - save(dir_name, model, optmizer, dist_meta=get_dist_metas(world_size, zero)[rank]) + save(dir_name, model, optimizer, dist_meta=get_dist_metas(world_size, zero)[rank]) def load_and_check_dist(dir_name: str): world_size = dist.get_world_size() - model, optmizer = prepare_model_optim(shard=True) - reset_model_optim(model, optmizer) + model, optimizer = prepare_model_optim(shard=True) + reset_model_optim(model, optimizer) model_state_dict = deepcopy(model.state_dict()) - optimizer_state_dict = deepcopy(optmizer.state_dict()) - reset_model_optim(model, optmizer, 1) - load(dir_name, model, optmizer, get_redist_meta(world_size), get_dist_metas(world_size)) + optimizer_state_dict = deepcopy(optimizer.state_dict()) + reset_model_optim(model, optimizer, 1) + load(dir_name, model, optimizer, get_redist_meta(world_size), get_dist_metas(world_size)) check_model_state_dict(model_state_dict, model.state_dict()) - check_optim_state_dict(optimizer_state_dict, optmizer.state_dict()) + check_optim_state_dict(optimizer_state_dict, optimizer.state_dict()) @pytest.mark.dist diff --git a/tests/test_utils/test_checkpoint_io/test_merge.py b/tests/test_utils/test_checkpoint_io/test_merge.py index 255c74adf0a2..07d4597f8391 100644 --- a/tests/test_utils/test_checkpoint_io/test_merge.py +++ b/tests/test_utils/test_checkpoint_io/test_merge.py @@ -68,7 +68,7 @@ def run_dist(rank, world_size, port, test_fn): def run_save_dist(dir_name: str, zero: bool): - model, optmizer = prepare_model_optim(shard=True, zero=zero) + model, optimizer = prepare_model_optim(shard=True, zero=zero) rank = dist.get_rank() dp_world_size = dist.get_world_size() // 2 if not zero: @@ -90,7 +90,7 @@ def run_save_dist(dir_name: str, zero: bool): 'fc.bias': ParamDistMeta(rank // 2, dp_world_size, 0, 1, zero_numel=1, zero_orig_shape=[1]) } - save(dir_name, model, optmizer, dist_meta=dist_metas) + save(dir_name, model, optimizer, dist_meta=dist_metas) @pytest.mark.dist diff --git a/tests/test_utils/test_checkpoint_io/test_redist.py b/tests/test_utils/test_checkpoint_io/test_redist.py index 144715bdfcca..fdc849a5ecc0 100644 --- a/tests/test_utils/test_checkpoint_io/test_redist.py +++ b/tests/test_utils/test_checkpoint_io/test_redist.py @@ -125,9 +125,9 @@ def run_dist(rank, world_size, port, test_fn): def run_save_dist(dir_name: str, zero: bool): - model, optmizer = prepare_model_optim(shard=True, zero=zero) + model, optimizer = prepare_model_optim(shard=True, zero=zero) rank = dist.get_rank() - save(dir_name, model, optmizer, dist_meta=get_dist_metas(4, zero)[rank]) + save(dir_name, model, optimizer, dist_meta=get_dist_metas(4, zero)[rank]) @pytest.mark.dist diff --git a/tests/test_utils/test_checkpoint_io/test_save.py b/tests/test_utils/test_checkpoint_io/test_save.py index e35e566f6ff8..2abdd95a6481 100644 --- a/tests/test_utils/test_checkpoint_io/test_save.py +++ b/tests/test_utils/test_checkpoint_io/test_save.py @@ -28,7 +28,7 @@ def check_model_state_dict(a: Dict[str, Tensor], b: Dict[str, Tensor]) -> None: assert torch.equal(v, b[k]) -def check_optim_state_dict(a: dict, b: dict, ignore_param_gruops: bool = False) -> None: +def check_optim_state_dict(a: dict, b: dict, ignore_param_groups: bool = False) -> None: assert set(a['state'].keys()) == set(b['state'].keys()) for k, state in a['state'].items(): b_state = b['state'][k] @@ -37,7 +37,7 @@ def check_optim_state_dict(a: dict, b: dict, ignore_param_gruops: bool = False) assert torch.equal(v1, v2) else: assert v1 == v2 - if not ignore_param_gruops: + if not ignore_param_groups: assert a['param_groups'] == b['param_groups'] @@ -113,12 +113,12 @@ def run_dist(rank, world_size, port, test_fn): def run_save_dist(dir_name): - model, optmizer = prepare_model_optim() + model, optimizer = prepare_model_optim() dist_metas = { 'fc.weight': ParamDistMeta(dist.get_rank(), dist.get_world_size(), 0, 1), 'fc.bias': ParamDistMeta(dist.get_rank(), dist.get_world_size(), 0, 1) } - save(dir_name, model, optmizer, dist_meta=dist_metas) + save(dir_name, model, optimizer, dist_meta=dist_metas) @pytest.mark.dist diff --git a/tests/test_utils/test_lazy_init/utils.py b/tests/test_utils/test_lazy_init/utils.py index a8aeb4c8930c..0b5f15ca5445 100644 --- a/tests/test_utils/test_lazy_init/utils.py +++ b/tests/test_utils/test_lazy_init/utils.py @@ -18,7 +18,7 @@ def set_seed(seed: int) -> None: torch.manual_seed(seed) -def assert_model_eqaual(m1: torch.nn.Module, m2: torch.nn.Module) -> None: +def assert_model_equal(m1: torch.nn.Module, m2: torch.nn.Module) -> None: s1 = m1.state_dict() s2 = m2.state_dict() @@ -63,7 +63,7 @@ def check_lazy_init(entry: TestingEntry, seed: int = 42, verbose: bool = False, with ctx: deferred_model = model_fn() deferred_model = ctx.materialize(deferred_model, verbose=verbose) - assert_model_eqaual(model, deferred_model) + assert_model_equal(model, deferred_model) if check_forward: assert_forward_equal(model, deferred_model, data_gen_fn, output_transform_fn) if verbose: diff --git a/tests/test_zero/test_gemini/test_chunkv2.py b/tests/test_zero/test_gemini/test_chunkv2.py index 16764aa6b0b1..1cb31b260a99 100644 --- a/tests/test_zero/test_gemini/test_chunkv2.py +++ b/tests/test_zero/test_gemini/test_chunkv2.py @@ -23,7 +23,7 @@ def add_param(param_list, param_cp_list, *args, **kwargs): param_cp_list.append(param.clone()) -def check_euqal(param, param_cp): +def check_equal(param, param_cp): if param.device != param_cp.device: temp = param.data.to(param_cp.device) else: @@ -57,7 +57,7 @@ def exam_chunk_basic(init_device, keep_gathered, pin_memory): my_chunk.append_tensor(param) assert my_chunk.utilized_size == 597 for param, param_cp in zip(param_list, param_cp_list): - check_euqal(param, param_cp) + check_equal(param, param_cp) my_chunk.close_chunk() if keep_gathered is False: @@ -77,7 +77,7 @@ def exam_chunk_basic(init_device, keep_gathered, pin_memory): my_chunk.access_chunk() assert my_chunk.device_type == 'cuda' for param, param_cp in zip(param_list, param_cp_list): - check_euqal(param, param_cp) + check_equal(param, param_cp) assert my_chunk.tensor_state_cnter[TensorState.HOLD] == 4 my_chunk.tensor_trans_state(param_list[0], TensorState.COMPUTE)