Skip to content
Merged

as #41

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion applications/Chat/coati/kernels/opt_attn.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def forward(
scale=self.scaling)

# Use the `embed_dim` from the config (stored in the class) rather than `hidden_state` because `attn_output` can be
# partitioned aross GPUs when using tensor-parallelism.
# partitioned across GPUs when using tensor-parallelism.
attn_output = attn_output.reshape(bsz, tgt_len, self.embed_dim)

attn_output = self.out_proj(attn_output)
Expand Down
2 changes: 1 addition & 1 deletion applications/Chat/examples/community/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Community examples consist of both inference and training examples that have bee
| Example | Description | Code Example | Colab | Author |
|:---------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------:|
| Peft | Adding Peft support for SFT and Prompts model training | [Huggingface Peft](https://github.com/hpcaitech/ColossalAI/tree/main/applications/Chat/examples/community/peft) | - | [YY Lin](https://github.com/yynil) |
| Train prompts on Ray | A Ray based implementation of Train prompts example | [Huggingface Peft](https://github.com/hpcaitech/ColossalAI/tree/main/applications/Chat/examples/community/ray) | - | [MisterLin1995](https://github.com/MisterLin1995) |
| Train prompts on Ray | A Ray based implementation of Train prompts example | [Training On Ray](https://github.com/hpcaitech/ColossalAI/tree/main/applications/Chat/examples/community/ray) | - | [MisterLin1995](https://github.com/MisterLin1995) |
|...|...|...|...|...|

### How to get involved
Expand Down
5 changes: 4 additions & 1 deletion colossalai/booster/plugin/gemini_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import warnings
from pathlib import Path
from typing import Callable, List, Optional, Tuple, Union
from typing import Callable, Iterator, List, Optional, Tuple, Union

import torch
import torch.nn as nn
Expand Down Expand Up @@ -286,3 +286,6 @@ def control_checkpoint_io(self) -> bool:

def get_checkpoint_io(self) -> CheckpointIO:
return GeminiCheckpointIO()

def no_sync(self, model: nn.Module) -> Iterator[None]:
raise NotImplementedError
5 changes: 4 additions & 1 deletion colossalai/booster/plugin/low_level_zero_plugin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import warnings
from typing import Callable, List, Optional, Tuple, Union
from typing import Callable, Iterator, List, Optional, Tuple, Union

import torch
import torch.nn as nn
Expand Down Expand Up @@ -197,3 +197,6 @@ def control_checkpoint_io(self) -> bool:

def get_checkpoint_io(self) -> CheckpointIO:
return LowLevelZeroCheckpointIO()

def no_sync(self, model: nn.Module) -> Iterator[None]:
raise NotImplementedError
9 changes: 8 additions & 1 deletion colossalai/booster/plugin/plugin_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Callable, List, Tuple, Union
from typing import Callable, Iterator, List, Tuple, Union

import torch.nn as nn
from torch.optim import Optimizer
Expand Down Expand Up @@ -60,6 +60,13 @@ def get_checkpoint_io(self) -> CheckpointIO:
"""
pass

@abstractmethod
def no_sync(self, model: nn.Module) -> Iterator[None]:
"""
Context manager to disable gradient synchronization.
"""
pass

@abstractmethod
def prepare_dataloader(self,
dataset: Dataset,
Expand Down
6 changes: 5 additions & 1 deletion colossalai/booster/plugin/torch_ddp_plugin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, List, Tuple, Union
from typing import Callable, Iterator, List, Tuple, Union

import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
Expand Down Expand Up @@ -142,3 +142,7 @@ def control_checkpoint_io(self) -> bool:

def get_checkpoint_io(self) -> CheckpointIO:
return TorchDDPCheckpointIO()

def no_sync(self, model: nn.Module) -> Iterator[None]:
assert isinstance(model, TorchDDPModel), 'Model is not boosted by TorchDDPPlugin.'
return model.module.no_sync()
2 changes: 1 addition & 1 deletion colossalai/communication/p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def recv_backward(output_grad_shape,
next_rank (int, optional): The rank of the source of the tensor.

Returns:
Union[:class:`torch.Tensor`, List[:class:`torch.Tensor`]]: The input gradient tensor or gradident tensor list.
Union[:class:`torch.Tensor`, List[:class:`torch.Tensor`]]: The input gradient tensor or gradient tensor list.
"""
if gpc.is_pipeline_last_stage():
output_tensor_grad = None
Expand Down
2 changes: 1 addition & 1 deletion colossalai/communication/p2p_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


def init_process_group():
"""intialise process group by dist.new_group in the adjacent stages
"""initialise process group by dist.new_group in the adjacent stages

Args:
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ def init_dist_group(self):

parallel_setting = []

local_rank, group_world_size, process_group, cpu_grop, ranks_in_group, mode = \
local_rank, group_world_size, process_group, cpu_group, ranks_in_group, mode = \
self._sequence_initializer.init_dist_group()
# change mode to sequence
mode = ParallelMode.SEQUENCE

parallel_setting.append((local_rank, group_world_size, process_group, cpu_grop, ranks_in_group, mode))
parallel_setting.append((local_rank, group_world_size, process_group, cpu_group, ranks_in_group, mode))
parallel_setting.append(self._sequence_dp_initializer.init_dist_group())
return parallel_setting
11 changes: 9 additions & 2 deletions colossalai/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from .comparison import assert_close, assert_close_loose, assert_equal, assert_equal_in_group, assert_not_equal
from .comparison import (
assert_close,
assert_close_loose,
assert_equal,
assert_equal_in_group,
assert_not_equal,
check_state_dict_equal,
)
from .pytest_wrapper import run_on_environment_flag
from .utils import (
clear_cache_before_run,
Expand All @@ -13,5 +20,5 @@
__all__ = [
'assert_equal', 'assert_not_equal', 'assert_close', 'assert_close_loose', 'assert_equal_in_group', 'parameterize',
'rerun_on_exception', 'rerun_if_address_is_in_use', 'skip_if_not_enough_gpus', 'free_port', 'spawn',
'clear_cache_before_run', 'run_on_environment_flag'
'clear_cache_before_run', 'run_on_environment_flag', 'check_state_dict_equal'
]
24 changes: 24 additions & 0 deletions colossalai/testing/comparison.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import OrderedDict

import torch
import torch.distributed as dist
from torch import Tensor
Expand Down Expand Up @@ -28,3 +30,25 @@ def assert_equal_in_group(tensor: Tensor, process_group: ProcessGroup = None):
a = tensor_list[i]
b = tensor_list[i + 1]
assert torch.all(a == b), f'expected tensors on rank {i} and {i + 1} to be equal but they are not, {a} vs {b}'


def check_state_dict_equal(d1: OrderedDict, d2: OrderedDict, ignore_device: bool = True):
for k, v in d1.items():
if isinstance(v, dict):
check_state_dict_equal(v, d2[k])
elif isinstance(v, list):
for i in range(len(v)):
if isinstance(v[i], torch.Tensor):
if not ignore_device:
v[i] = v[i].to("cpu")
d2[k][i] = d2[k][i].to("cpu")
assert torch.equal(v[i], d2[k][i])
else:
assert v[i] == d2[k][i]
elif isinstance(v, torch.Tensor):
if not ignore_device:
v = v.to("cpu")
d2[k] = d2[k].to("cpu")
assert torch.equal(v, d2[k])
else:
assert v == d2[k]
2 changes: 1 addition & 1 deletion examples/tutorial/new_api/cifar_resnet/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


def build_dataloader(batch_size: int, coordinator: DistCoordinator, plugin: DPPluginBase):
# trainsform
# transform
transform_train = transforms.Compose(
[transforms.Pad(4),
transforms.RandomHorizontalFlip(),
Expand Down
6 changes: 3 additions & 3 deletions examples/tutorial/new_api/cifar_vit/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# Prepare Hyperparameters
# ==============================
NUM_EPOCHS = 60
WARMUP_EPOCSH = 5
WARMUP_EPOCHS = 5
LEARNING_RATE = 1e-3


Expand All @@ -37,7 +37,7 @@ def vit_cifar(**kwargs):


def build_dataloader(batch_size: int, coordinator: DistCoordinator, plugin: DPPluginBase):
# trainsform
# transform
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
Expand Down Expand Up @@ -177,7 +177,7 @@ def main():
optimizer = HybridAdam(model.parameters(), lr=LEARNING_RATE)

# lr scheduler
lr_scheduler = LinearWarmupLR(optimizer, NUM_EPOCHS, WARMUP_EPOCSH)
lr_scheduler = LinearWarmupLR(optimizer, NUM_EPOCHS, WARMUP_EPOCHS)

# ==============================
# Boost with ColossalAI
Expand Down
2 changes: 1 addition & 1 deletion op_builder/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def get_cuda_version_in_pytorch() -> List[int]:
torch_cuda_minor = torch.version.cuda.split(".")[1]
except:
raise ValueError(
"[extension] Cannot retrive the CUDA version in the PyTorch binary given by torch.version.cuda")
"[extension] Cannot retrieve the CUDA version in the PyTorch binary given by torch.version.cuda")
return torch_cuda_major, torch_cuda_minor


Expand Down
12 changes: 6 additions & 6 deletions tests/components_to_test/albert.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def bert_model_builder(checkpoint: bool = False):
attention_probs_dropout_prob=0.)
print('building AlbertForSequenceClassification model')

# adapting huggingface BertForSequenceClassification for single unitest calling interface
class ModelAaptor(AlbertForSequenceClassification):
# adapting huggingface BertForSequenceClassification for single unittest calling interface
class ModelAdaptor(AlbertForSequenceClassification):

def forward(self, input_ids, labels):
"""
Expand All @@ -37,23 +37,23 @@ def forward(self, input_ids, labels):
"""
return super().forward(input_ids=input_ids, labels=labels)[0]

model = ModelAaptor(config)
model = ModelAdaptor(config)
# if checkpoint and version.parse(transformers.__version__) >= version.parse("4.11.0"):
# model.gradient_checkpointing_enable()

return model

is_distrbuted = torch.distributed.is_initialized()
is_distributed = torch.distributed.is_initialized()
trainloader = get_bert_data_loader(n_class=vocab_size,
batch_size=2,
total_samples=10000,
sequence_length=sequence_length,
is_distrbuted=is_distrbuted)
is_distributed=is_distributed)
testloader = get_bert_data_loader(n_class=vocab_size,
batch_size=2,
total_samples=10000,
sequence_length=sequence_length,
is_distrbuted=is_distrbuted)
is_distributed=is_distributed)

criterion = None
return bert_model_builder, trainloader, testloader, torch.optim.Adam, criterion
4 changes: 2 additions & 2 deletions tests/components_to_test/beit.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def generate(self):
@non_distributed_component_funcs.register(name='beit')
def get_training_components():

def model_buider(checkpoint=False):
def model_builder(checkpoint=False):
model = Beit(img_size=DummyDataLoader.img_size,
num_classes=DummyDataLoader.num_class,
embed_dim=32,
Expand All @@ -39,4 +39,4 @@ def model_buider(checkpoint=False):
testloader = DummyDataLoader()

criterion = torch.nn.CrossEntropyLoss()
return model_buider, trainloader, testloader, torch.optim.Adam, criterion
return model_builder, trainloader, testloader, torch.optim.Adam, criterion
16 changes: 8 additions & 8 deletions tests/components_to_test/bert.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def get_bert_data_loader(
total_samples,
sequence_length,
device=torch.device('cpu:0'),
is_distrbuted=False,
is_distributed=False,
):
train_data = torch.randint(
low=0,
Expand All @@ -24,7 +24,7 @@ def get_bert_data_loader(
)
train_label = torch.randint(low=0, high=2, size=(total_samples,), device=device, dtype=torch.long)
train_dataset = torch.utils.data.TensorDataset(train_data, train_label)
if is_distrbuted:
if is_distributed:
sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
else:
sampler = SequentialSampler(train_dataset)
Expand Down Expand Up @@ -52,8 +52,8 @@ def bert_model_builder(checkpoint: bool = False):
attention_probs_dropout_prob=0.)
print('building BertForSequenceClassification model')

# adapting huggingface BertForSequenceClassification for single unitest calling interface
class ModelAaptor(BertForSequenceClassification):
# adapting huggingface BertForSequenceClassification for single unittest calling interface
class ModelAdaptor(BertForSequenceClassification):

def forward(self, input_ids, labels):
"""
Expand All @@ -62,23 +62,23 @@ def forward(self, input_ids, labels):
"""
return super().forward(input_ids=input_ids, labels=labels)[0]

model = ModelAaptor(config)
model = ModelAdaptor(config)
if checkpoint and version.parse(transformers.__version__) >= version.parse("4.11.0"):
model.gradient_checkpointing_enable()

return model

is_distrbuted = torch.distributed.is_initialized()
is_distributed = torch.distributed.is_initialized()
trainloader = get_bert_data_loader(n_class=vocab_size,
batch_size=2,
total_samples=10000,
sequence_length=sequence_length,
is_distrbuted=is_distrbuted)
is_distributed=is_distributed)
testloader = get_bert_data_loader(n_class=vocab_size,
batch_size=2,
total_samples=10000,
sequence_length=sequence_length,
is_distrbuted=is_distrbuted)
is_distributed=is_distributed)

criterion = None
return bert_model_builder, trainloader, testloader, torch.optim.Adam, criterion
8 changes: 4 additions & 4 deletions tests/components_to_test/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ def __init__(self):
def register(self, name):
assert name not in self._registry

def _regsiter(callable_):
def _register(callable_):
self._registry[name] = callable_

return _regsiter
return _register

def get_callable(self, name: str):
return self._registry[name]
Expand All @@ -34,6 +34,6 @@ def __next__(self):


non_distributed_component_funcs = Registry()
model_paralle_component_funcs = Registry()
model_parallel_component_funcs = Registry()

__all__ = ['non_distributed_component_funcs', 'model_paralle_component_funcs']
__all__ = ['non_distributed_component_funcs', 'model_parallel_component_funcs']
6 changes: 3 additions & 3 deletions tests/test_booster/test_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
@clear_cache_before_run()
@parameterize('device', ['cpu', 'cuda'])
def test_accelerator(device):
acceleartor = Accelerator(device)
accelerator = Accelerator(device)
model = nn.Linear(8, 8)
model = acceleartor.configure_model(model)
model = accelerator.configure_model(model)
assert next(model.parameters()).device.type == device
del model, acceleartor
del model, accelerator
7 changes: 5 additions & 2 deletions tests/test_booster/test_plugin/test_dp_plugin_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, List, Tuple, Union
from typing import Callable, Iterator, List, Tuple, Union

import torch
import torch.distributed as dist
Expand Down Expand Up @@ -49,11 +49,14 @@ def supported_devices(self) -> List[str]:
def supported_precisions(self) -> List[str]:
pass

def no_sync(self, model: nn.Module) -> Iterator[None]:
pass


def check_dataloader_sharding():
plugin = DPPluginWrapper()

# create a custom dasetset with 0 to 10
# create a custom dataset with 0 to 10
dataset = TensorDataset(torch.arange(0, 10))
train_dataloader = plugin.prepare_dataloader(dataset, batch_size=2)

Expand Down
Loading