Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions applications/Chat/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ from coati.trainer import SFTTrainer
model = LlamaLM(pretrained=args.pretrain)
tokenizer = AutoTokenizer.from_pretrained(args.pretrain)

(model, optim) = strategy.prepare((model, optim))
trainer = SFTTrainer(model=model,
strategy=strategy,
optim=optim,
Expand All @@ -254,7 +255,11 @@ trainer = SFTTrainer(model=model,
)

trainer.fit()
trainer.save_model(path=args.save_path, only_rank0=True, tokenizer=tokenizer)
# this saves in pytorch format
strategy.save_model(model, args.save_path, only_rank0=True)

# this saves in HF format. ColossalAI strategy with stage-3 doesn't support this method
strategy.save_pretrained(model, args.save_path, only_rank0=True, tokenizer=tokenizer)
```

</details>
Expand All @@ -263,7 +268,7 @@ trainer.save_model(path=args.save_path, only_rank0=True, tokenizer=tokenizer)

Here are some examples that can allow you to train a 7B model on a single or multiple consumer-grade GPUs.

If you only have a single 24G GPU, you can use the following script. `batch_size` and `lora_rank` are the most important parameters to successfully train the model.
If you only have a single 24G GPU, you can use the following script. `batch_size`, `lora_rank` and `grad_checkpoint` are the most important parameters to successfully train the model.
```
torchrun --standalone --nproc_per_node=1 train_sft.py \
--pretrain "/path/to/LLaMa-7B/" \
Expand All @@ -278,6 +283,7 @@ torchrun --standalone --nproc_per_node=1 train_sft.py \
--max_datasets_size 512 \
--max_epochs 1 \
--lora_rank 16 \
--grad_checkpoint
```

`colossalai_gemini` strategy can enable a single 24G GPU to train the whole model without using LoRA if you have sufficient CPU memory. You can use the following script.
Expand All @@ -294,6 +300,7 @@ torchrun --standalone --nproc_per_node=1 train_sft.py \
--lr 2e-5 \
--max_datasets_size 512 \
--max_epochs 1 \
--grad_checkpoint
```

If you have 4x32 GB GPUs, you can even train the whole 7B model using our `colossalai_zero2_cpu` strategy! The script is given as follows.
Expand All @@ -310,6 +317,7 @@ torchrun --standalone --nproc_per_node=4 train_sft.py \
--lr 2e-5 \
--max_datasets_size 512 \
--max_epochs 1 \
--grad_checkpoint
```
</details>

Expand Down
21 changes: 20 additions & 1 deletion applications/Chat/coati/models/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
import torch.nn as nn

from .actor import Actor
from .critic import Critic
from .reward_model import RewardModel

__all__ = ['Actor', 'Critic', 'RewardModel']

def get_base_model(model: nn.Module) -> nn.Module:
"""Get the base model of our wrapper classes.
For Actor, it's base model is ``actor.model`` and it's usually a ``transformers.PreTrainedModel``.
For Critic and RewardModel, it's base model is itself.

Args:
model (nn.Module): model to get base model from

Returns:
nn.Module: the base model
"""
if isinstance(model, Actor):
return model.get_base_model()
return model


__all__ = ['Actor', 'Critic', 'RewardModel', 'get_base_model']
8 changes: 1 addition & 7 deletions applications/Chat/coati/trainer/ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,9 @@ def training_step(self, experience: Experience) -> Dict[str, float]:

return {'reward': experience.reward.mean().item()}

def save_model(self,
path: str,
only_rank0: bool = False,
tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
self.strategy.save_model(model=self.actor, path=path, only_rank0=only_rank0, tokenizer=tokenizer)


def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, actor: Actor) -> None:
origin_model = strategy._unwrap_actor(actor)
origin_model = strategy.unwrap_model(actor)
new_kwargs = {**generate_kwargs}
# use huggingface models method directly
if 'prepare_inputs_fn' not in generate_kwargs and hasattr(origin_model, 'prepare_inputs_for_generation'):
Expand Down
16 changes: 4 additions & 12 deletions applications/Chat/coati/trainer/rm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Optional, List
from typing import List, Optional

import pandas as pd
import torch
Expand All @@ -9,8 +9,8 @@
from tqdm import tqdm
from transformers.tokenization_utils_base import PreTrainedTokenizerBase

from .callbacks import Callback
from .base import Trainer
from .callbacks import Callback
from .strategies import Strategy
from .utils import is_rank_0

Expand Down Expand Up @@ -41,20 +41,18 @@ def __init__(
train_dataloader: DataLoader,
valid_dataloader: DataLoader,
eval_dataloader: DataLoader,
batch_size: int = 1,
max_epochs: int = 1,
callbacks: List[Callback] = [],
) -> None:
super().__init__(strategy, max_epochs, callbacks=callbacks)
train_sampler = None

self.train_dataloader = train_dataloader
self.valid_dataloader = valid_dataloader
self.eval_dataloader = eval_dataloader

self.model = strategy.setup_model(model)
self.model = model
self.loss_fn = loss_fn
self.optimizer = strategy.setup_optimizer(optim, self.model)
self.optimizer = optim
self.scheduler = lr_scheduler.CosineAnnealingLR(self.optimizer, self.train_dataloader.__len__() // 100)

def eval_acc(self, dataloader):
Expand Down Expand Up @@ -123,9 +121,3 @@ def fit(self):
epoch_bar.update()
step_bar.set_postfix({'dist': dist, 'acc': acc})
step_bar.close()

def save_model(self,
path: str,
only_rank0: bool = False,
tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0, tokenizer=tokenizer)
10 changes: 2 additions & 8 deletions applications/Chat/coati/trainer/sft.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def __init__(
super().__init__(strategy, max_epochs, callbacks=callbacks)
self.train_dataloader = train_dataloader
self.eval_dataloader = eval_dataloader

(self.model, self.optimizer) = strategy.prepare((model, optim))
self.model = model
self.optimizer = optim

self.accimulation_steps = accimulation_steps
num_update_steps_per_epoch = len(train_dataloader) // self.accimulation_steps
Expand Down Expand Up @@ -133,9 +133,3 @@ def fit(self, logger, use_wandb: bool = False):
logger.info(f'Eval Epoch {epoch}/{self.max_epochs} loss {loss_mean}')

# epoch_bar.update()

def save_model(self,
path: str,
only_rank0: bool = False,
tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0, tokenizer=tokenizer)
44 changes: 20 additions & 24 deletions applications/Chat/coati/trainer/strategies/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
from contextlib import nullcontext
from typing import Any, List, Optional, Tuple, Union

import numpy as np
import torch
import torch.nn as nn
from coati.models.base import Actor, Critic, RewardModel
from coati.models.base import Actor, get_base_model
from coati.replay_buffer import ReplayBuffer
from torch.optim import Optimizer
from torch.utils.data import DataLoader
Expand Down Expand Up @@ -72,16 +71,16 @@ def prepare(

def prepare_model(model: nn.Module):
if isinstance(model, Actor):
return Actor(self.setup_model(self._unwrap_model(model)))
return self.setup_model(self._unwrap_model(model))
return Actor(self.setup_model(model.get_base_model()))
return self.setup_model(model)

rets = []
for arg in models_or_model_optim_pairs:
if isinstance(arg, tuple):
assert len(arg) == 2, f'Expect (model, optimizer) pair, got a tuple with size "{len(arg)}"'
model, optimizer = arg
model = prepare_model(model)
optimizer = self.setup_optimizer(optimizer, self._unwrap_model(model))
optimizer = self.setup_optimizer(optimizer, get_base_model(model))
rets.append((model, optimizer))
elif isinstance(arg, nn.Module):
rets.append(prepare_model(arg))
Expand All @@ -93,31 +92,20 @@ def prepare_model(model: nn.Module):
return rets

@staticmethod
def _unwrap_model(model: nn.Module) -> nn.Module:
"""Useful for saving state dict. As actor is wrapped by Actor class again in `prepare()`, we should unwrap it before saving.
def unwrap_model(model: nn.Module) -> nn.Module:
"""Get the unwrapped model from a wrapped model. Useful for getting original huggingface model.
For Actor, it will unwrap `actor.model`.

Args:
model (nn.Module): an actor or a critic
"""
if isinstance(model, Actor):
return model.model
return model

@staticmethod
def _unwrap_actor(actor: Actor) -> nn.Module:
"""Get `actor.model` from a wrapped (by `prepare()`) actor. Useful for getting original huggingface model.
model (nn.Module): the model to unwrap

Args:
actor (Actor): a wrapped actor
Returns:
nn.Module: the original model (usually a huggingface model)
"""
return Strategy._unwrap_model(actor)
return get_base_model(model)

@abstractmethod
def save_model(self,
model: nn.Module,
path: str,
only_rank0: bool = False,
tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
def save_model(self, model: nn.Module, path: str, only_rank0: bool = True) -> None:
pass

@abstractmethod
Expand All @@ -134,3 +122,11 @@ def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = No

def setup_sampler(self, dataset) -> DistributedSampler:
return DistributedSampler(dataset, 1, 0)

@abstractmethod
def save_pretrained(self,
model: nn.Module,
path: str,
only_rank0: bool = True,
tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
pass
74 changes: 31 additions & 43 deletions applications/Chat/coati/trainer/strategies/colossalai.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from coati.models.base import Actor, RewardModel
from coati.models.lora import LoraLinear
from coati.models.base import get_base_model
from torch.optim import Optimizer
from transformers.modeling_utils import PreTrainedModel
from transformers.tokenization_utils_base import PreTrainedTokenizerBase

import colossalai
Expand All @@ -17,9 +15,7 @@
from colossalai.tensor import ProcessGroup, ShardSpec
from colossalai.utils import get_current_device
from colossalai.zero import ColoInitContext, ZeroDDP, zero_model_wrapper, zero_optim_wrapper
from colossalai.zero.gemini.utils import get_static_torch_model

from .base import Strategy
from .ddp import DDPStrategy

logger = get_dist_logger(__name__)
Expand Down Expand Up @@ -141,7 +137,7 @@ def setup_model(self, model: nn.Module) -> nn.Module:
model = zero_model_wrapper(model, zero_stage=self.stage, gemini_config=self.gemini_config)

if self.stage != 3 and self.precision == 'fp16':
model = model.half()
model = model.half().cuda()
return model

def setup_optimizer(self, optimizer: optim.Optimizer, model: nn.Module) -> optim.Optimizer:
Expand All @@ -154,47 +150,39 @@ def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: optim.Optimi
def optimizer_step(self, optimizer: optim.Optimizer, **kwargs) -> None:
optimizer.step()

@staticmethod
def _unwrap_actor(actor: Actor) -> nn.Module:
model: Union[nn.Module, ZeroDDP] = Strategy._unwrap_actor(actor)
if isinstance(model, ZeroDDP):
return model.module
return model

def save_model(self,
model: nn.Module,
path: str,
only_rank0: bool = True,
tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:

if only_rank0 and dist.get_rank() != 0:
return None
unwrapped_model = self._unwrap_model(model)
# TODO : better way to get torch model from gemini model
# to get torch model from gemini model

if isinstance(unwrapped_model, RewardModel):
state_dict = unwrapped_model.state_dict()
if only_rank0 and dist.get_rank() != 0:
return
torch.save(state_dict, path)
def save_model(self, model: nn.Module, path: str, only_rank0: bool = True) -> None:
if only_rank0 and dist.get_rank() != 0 and self.stage != 3:
return
base_model = get_base_model(model)
if self.stage == 3:
assert isinstance(base_model, ZeroDDP)
# for stage 3, state_dict() method should be called on every rank
state_dict = base_model.state_dict(only_rank_0=only_rank0)
else:
try:
logger.info(f'Saving model to {path}', ranks=[0])
unwrapped_model.save_pretrained(path)
logger.info(f'Model saved to {path} Successfully', ranks=[0])
if tokenizer is not None:
logger.info(f'Saving tokenizer to {path}', ranks=[0])
tokenizer.save_pretrained(path)
logger.info(f'Tokenizer saved to {path} Successfully', ranks=[0])
except AttributeError:
state_dict = unwrapped_model.state_dict()
if only_rank0 and dist.get_rank() != 0:
return
torch.save(state_dict, path)
# only_rank0 is false or rank == 0
state_dict = base_model.state_dict()
if only_rank0 and dist.get_rank() != 0:
return
torch.save(state_dict, path)

def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = False) -> None:
if only_rank0:
raise RuntimeError(
f'Optimizer states are sharded when using ColossalAIStrategy. Only rank0 is not supported.')
torch.save(optimizer.state_dict(), path)

def unwrap_model(self, model: nn.Module) -> nn.Module:
base_model: Union[nn.Module, ZeroDDP] = get_base_model(model)
if self.stage == 3:
assert isinstance(base_model, ZeroDDP)
return base_model.module
return base_model

def save_pretrained(self,
model: nn.Module,
path: str,
only_rank0: bool = True,
tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
if self.stage == 3:
raise RuntimeError('ColossalAI strategy with stage-3 does not support save_pretrained() now')
super().save_pretrained(model, path, only_rank0, tokenizer)
Loading