From c9c159f6ebbbb239db59cd5fcb4135b441efcaf8 Mon Sep 17 00:00:00 2001 From: CWHer Date: Sun, 25 Jun 2023 17:52:14 +0800 Subject: [PATCH 01/17] to: add SLTrainer --- applications/Chat/coati/trainer/__init__.py | 9 +++-- applications/Chat/coati/trainer/base.py | 40 ++++++++++++++------- applications/Chat/coati/trainer/rm.py | 20 ++++------- applications/Chat/coati/trainer/sft.py | 24 ++++++------- 4 files changed, 52 insertions(+), 41 deletions(-) diff --git a/applications/Chat/coati/trainer/__init__.py b/applications/Chat/coati/trainer/__init__.py index 525b57bf21d3..cb8fd5dca605 100644 --- a/applications/Chat/coati/trainer/__init__.py +++ b/applications/Chat/coati/trainer/__init__.py @@ -1,6 +1,9 @@ -from .base import Trainer -from .ppo import PPOTrainer +from .base import SLTrainer +# from .ppo import PPOTrainer from .rm import RewardModelTrainer from .sft import SFTTrainer -__all__ = ['Trainer', 'PPOTrainer', 'RewardModelTrainer', 'SFTTrainer'] +__all__ = [ + 'SLTrainer', + 'RewardModelTrainer', 'SFTTrainer' +] diff --git a/applications/Chat/coati/trainer/base.py b/applications/Chat/coati/trainer/base.py index ac3a878be884..bdf91dbd5eed 100644 --- a/applications/Chat/coati/trainer/base.py +++ b/applications/Chat/coati/trainer/base.py @@ -2,36 +2,52 @@ from typing import Any, Callable, Dict, List, Optional, Union import torch +import torch.nn as nn from coati.experience_maker import Experience +from torch.optim import Optimizer +from torch.utils.data import DataLoader from .callbacks import Callback from .strategies import Strategy -class Trainer(ABC): +class SLTrainer(ABC): """ - Base class for rlhf trainers. + Base class for supervised learning trainers. Args: strategy (Strategy):the strategy to use for training max_epochs (int, defaults to 1): the number of epochs of training process - dataloader_pin_memory (bool, defaults to True): whether to pin memory for data loader - callbacks (List[Callback], defaults to []): the callbacks to call during training process - generate_kwargs (dict, optional): the kwargs to use while model generating + model (nn.Module): the model to train + optim (Optimizer): the optimizer to use for training + train_dataloader (DataLoader): the dataloader to use for training """ def __init__(self, strategy: Strategy, - max_epochs: int = 1, - dataloader_pin_memory: bool = True, - callbacks: List[Callback] = [], - **generate_kwargs) -> None: + max_epochs: int, + model: nn.Module, + optimizer: Optimizer, + train_dataloader: DataLoader, + ) -> None: super().__init__() self.strategy = strategy self.max_epochs = max_epochs - self.generate_kwargs = generate_kwargs - self.dataloader_pin_memory = dataloader_pin_memory - self.callbacks = callbacks + self.model = model + self.optimizer = optimizer + self.train_dataloader = train_dataloader + + # @abstractmethod + # def _train(self): + # raise NotImplementedError() + + # @abstractmethod + # def _eval(self): + # raise NotImplementedError() + + @abstractmethod + def fit(self): + raise NotImplementedError() # TODO(ver217): maybe simplify these code using context def _on_fit_start(self) -> None: diff --git a/applications/Chat/coati/trainer/rm.py b/applications/Chat/coati/trainer/rm.py index 316eded7ea5d..562dbf919989 100644 --- a/applications/Chat/coati/trainer/rm.py +++ b/applications/Chat/coati/trainer/rm.py @@ -8,13 +8,12 @@ from torch.utils.data import DataLoader from tqdm import tqdm -from .base import Trainer -from .callbacks import Callback +from .base import SLTrainer from .strategies import Strategy from .utils import is_rank_0 -class RewardModelTrainer(Trainer): +class RewardModelTrainer(SLTrainer): """ Trainer to use while training reward model. @@ -27,9 +26,7 @@ class RewardModelTrainer(Trainer): train_dataloader (DataLoader): the dataloader to use for training valid_dataloader (DataLoader): the dataloader to use for validation eval_dataloader (DataLoader): the dataloader to use for evaluation - batch_size (int, defaults to 1): the batch size while training max_epochs (int, defaults to 2): the number of epochs to train - callbacks (List[Callback], defaults to []): the callbacks to call during training process """ def __init__( @@ -43,17 +40,16 @@ def __init__( valid_dataloader: DataLoader, eval_dataloader: DataLoader, max_epochs: int = 1, - callbacks: List[Callback] = [], ) -> None: - super().__init__(strategy, max_epochs, callbacks=callbacks) + super().__init__( + strategy, max_epochs, + model, optim, train_dataloader + ) - self.train_dataloader = train_dataloader self.valid_dataloader = valid_dataloader self.eval_dataloader = eval_dataloader - self.model = model self.loss_fn = loss_fn - self.optimizer = optim self.scheduler = lr_scheduler def eval_acc(self, dataloader): @@ -88,9 +84,7 @@ def fit(self): disable=not is_rank_0()) # train self.model.train() - cnt = 0 - acc = 0 - dist = 0 + cnt, acc, dist = 0, 0, 0 for chosen_ids, c_mask, reject_ids, r_mask in self.train_dataloader: chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device()) c_mask = c_mask.squeeze(1).to(torch.cuda.current_device()) diff --git a/applications/Chat/coati/trainer/sft.py b/applications/Chat/coati/trainer/sft.py index da223f1f33ff..6a385b32eac1 100644 --- a/applications/Chat/coati/trainer/sft.py +++ b/applications/Chat/coati/trainer/sft.py @@ -1,5 +1,5 @@ import time -from typing import List +from typing import List, Optional import torch import torch.distributed as dist @@ -9,13 +9,13 @@ from torch.utils.data import DataLoader from tqdm import tqdm -from .base import Trainer +from .base import SLTrainer from .callbacks import Callback from .strategies import ColossalAIStrategy, Strategy from .utils import is_rank_0, to_device -class SFTTrainer(Trainer): +class SFTTrainer(SLTrainer): """ Trainer to use while training reward model. @@ -23,12 +23,11 @@ class SFTTrainer(Trainer): model (torch.nn.Module): the model to train strategy (Strategy): the strategy to use for training optim(Optimizer): the optimizer to use for training + lr_scheduler(_LRScheduler): the lr scheduler to use for training train_dataloader: the dataloader to use for training eval_dataloader: the dataloader to use for evaluation - batch_size (int, defaults to 1): the batch size while training max_epochs (int, defaults to 2): the number of epochs to train - callbacks (List[Callback], defaults to []): the callbacks to call during training process - optim_kwargs (dict, defaults to {'lr':1e-4}): the kwargs to use while initializing optimizer + accumulation_steps (int, defaults to 8): the number of steps to accumulate gradients """ def __init__( @@ -38,23 +37,22 @@ def __init__( optim: Optimizer, lr_scheduler: _LRScheduler, train_dataloader: DataLoader, - eval_dataloader: DataLoader = None, + eval_dataloader: Optional[DataLoader] = None, max_epochs: int = 2, accumulation_steps: int = 8, - callbacks: List[Callback] = [], ) -> None: if accumulation_steps > 1 and isinstance(strategy, ColossalAIStrategy): from colossalai.booster.plugin import GeminiPlugin assert not isinstance(strategy.plugin, GeminiPlugin), \ "Accumulation steps are not supported in stage 3 of ColossalAI" - super().__init__(strategy, max_epochs, callbacks=callbacks) - self.train_dataloader = train_dataloader + super().__init__( + strategy, max_epochs, + model, optim, train_dataloader + ) + self.eval_dataloader = eval_dataloader - self.model = model - self.optimizer = optim self.accumulation_steps = accumulation_steps - self.scheduler = lr_scheduler def fit(self, logger, use_wandb: bool = False): From 36c52eff2c92c57cef1781e5e86b0be44a784a3f Mon Sep 17 00:00:00 2001 From: CWHer Date: Sun, 25 Jun 2023 17:52:36 +0800 Subject: [PATCH 02/17] refactor: refactor RMTrainer and SFTTrainer --- applications/Chat/coati/trainer/base.py | 29 +++-- applications/Chat/coati/trainer/rm.py | 119 +++++++++----------- applications/Chat/coati/trainer/sft.py | 143 ++++++++++++------------ 3 files changed, 143 insertions(+), 148 deletions(-) diff --git a/applications/Chat/coati/trainer/base.py b/applications/Chat/coati/trainer/base.py index bdf91dbd5eed..a39f5e740137 100644 --- a/applications/Chat/coati/trainer/base.py +++ b/applications/Chat/coati/trainer/base.py @@ -3,12 +3,14 @@ import torch import torch.nn as nn +import tqdm from coati.experience_maker import Experience from torch.optim import Optimizer from torch.utils.data import DataLoader from .callbacks import Callback from .strategies import Strategy +from .utils import is_rank_0 class SLTrainer(ABC): @@ -37,22 +39,25 @@ def __init__(self, self.optimizer = optimizer self.train_dataloader = train_dataloader - # @abstractmethod - # def _train(self): - # raise NotImplementedError() - - # @abstractmethod - # def _eval(self): - # raise NotImplementedError() + @abstractmethod + def _train(self, epoch): + raise NotImplementedError() @abstractmethod - def fit(self): + def _eval(self, epoch): raise NotImplementedError() - # TODO(ver217): maybe simplify these code using context - def _on_fit_start(self) -> None: - for callback in self.callbacks: - callback.on_fit_start() + def _before_fit(self): + self.no_epoch_bar = False + + def fit(self, *args, **kwargs): + self._before_fit(*args, **kwargs) + for epoch in tqdm.trange(self.max_epochs, + desc="Epochs", + disable=not is_rank_0() or self.no_epoch_bar + ): + self._train(epoch) + self._eval(epoch) def _on_fit_end(self) -> None: for callback in self.callbacks: diff --git a/applications/Chat/coati/trainer/rm.py b/applications/Chat/coati/trainer/rm.py index 562dbf919989..4beb10468473 100644 --- a/applications/Chat/coati/trainer/rm.py +++ b/applications/Chat/coati/trainer/rm.py @@ -1,12 +1,12 @@ from datetime import datetime -from typing import Callable, List +from typing import Callable import pandas as pd import torch +import tqdm from torch.optim import Optimizer from torch.optim.lr_scheduler import _LRScheduler from torch.utils.data import DataLoader -from tqdm import tqdm from .base import SLTrainer from .strategies import Strategy @@ -52,68 +52,59 @@ def __init__( self.loss_fn = loss_fn self.scheduler = lr_scheduler - def eval_acc(self, dataloader): - dist = 0 - on = 0 - cnt = 0 - self.model.eval() - with torch.no_grad(): - for chosen_ids, c_mask, reject_ids, r_mask in dataloader: - chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device()) - c_mask = c_mask.squeeze(1).to(torch.cuda.current_device()) - reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device()) - r_mask = r_mask.squeeze(1).to(torch.cuda.current_device()) - chosen_reward = self.model(chosen_ids, attention_mask=c_mask) - reject_reward = self.model(reject_ids, attention_mask=r_mask) - for i in range(len(chosen_reward)): - cnt += 1 - if chosen_reward[i] > reject_reward[i]: - on += 1 - dist += (chosen_reward - reject_reward).mean().item() - dist_mean = dist / len(dataloader) - acc = on / cnt - self.model.train() - return dist_mean, acc + def _eval(self, epoch): + if self.eval_dataloader is not None: + self.model.eval() + dist, on, cnt = 0, 0 , 0 + with torch.no_grad(): + for chosen_ids, c_mask, reject_ids, r_mask in self.eval_dataloader: + chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device()) + c_mask = c_mask.squeeze(1).to(torch.cuda.current_device()) + reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device()) + r_mask = r_mask.squeeze(1).to(torch.cuda.current_device()) + chosen_reward = self.model(chosen_ids, attention_mask=c_mask) + reject_reward = self.model(reject_ids, attention_mask=r_mask) + for i in range(len(chosen_reward)): + cnt += 1 + if chosen_reward[i] > reject_reward[i]: + on += 1 + dist += (chosen_reward - reject_reward).mean().item() + self.dist = dist / len(self.eval_dataloader) + self.acc = on / cnt - def fit(self): - time = datetime.now() - epoch_bar = tqdm(range(self.max_epochs), desc='Train epoch', disable=not is_rank_0()) - for epoch in range(self.max_epochs): - step_bar = tqdm(range(self.train_dataloader.__len__()), - desc='Train step of epoch %d' % epoch, - disable=not is_rank_0()) - # train - self.model.train() - cnt, acc, dist = 0, 0, 0 - for chosen_ids, c_mask, reject_ids, r_mask in self.train_dataloader: - chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device()) - c_mask = c_mask.squeeze(1).to(torch.cuda.current_device()) - reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device()) - r_mask = r_mask.squeeze(1).to(torch.cuda.current_device()) - chosen_reward = self.model(chosen_ids, attention_mask=c_mask) - reject_reward = self.model(reject_ids, attention_mask=r_mask) - loss = self.loss_fn(chosen_reward, reject_reward) - self.strategy.backward(loss, self.model, self.optimizer) - self.strategy.optimizer_step(self.optimizer) - self.optimizer.zero_grad() - cnt += 1 - if cnt == 100: - self.scheduler.step() - dist, acc = self.eval_acc(self.valid_dataloader) - cnt = 0 - if is_rank_0(): - log = pd.DataFrame([[step_bar.n, loss.item(), dist, acc]], - columns=['step', 'loss', 'dist', 'acc']) - log.to_csv('log_%s.csv' % time, mode='a', header=False, index=False) - step_bar.update() - step_bar.set_postfix({'dist': dist, 'acc': acc}) - - # eval - dist, acc = self.eval_acc(self.eval_dataloader) if is_rank_0(): - log = pd.DataFrame([[step_bar.n, loss.item(), dist, acc]], - columns=['step', 'loss', 'dist', 'acc']) + log = pd.DataFrame( + [[(epoch + 1) * len(self.train_dataloader) , + self.loss.item(), self.dist, self.acc]], + columns=['step', 'loss', 'dist', 'acc'] + ) log.to_csv('log.csv', mode='a', header=False, index=False) - epoch_bar.update() - step_bar.set_postfix({'dist': dist, 'acc': acc}) - step_bar.close() + + def _train(self, epoch): + self.model.train() + step_bar = tqdm.trange( + len(self.train_dataloader), + desc='Train step of epoch %d' % epoch, + disable=not is_rank_0() + ) + cnt = 0 + for chosen_ids, c_mask, reject_ids, r_mask in self.train_dataloader: + chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device()) + c_mask = c_mask.squeeze(1).to(torch.cuda.current_device()) + reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device()) + r_mask = r_mask.squeeze(1).to(torch.cuda.current_device()) + chosen_reward = self.model(chosen_ids, attention_mask=c_mask) + reject_reward = self.model(reject_ids, attention_mask=r_mask) + self.loss = self.loss_fn(chosen_reward, reject_reward) + self.strategy.backward(self.loss, self.model, self.optimizer) + self.strategy.optimizer_step(self.optimizer) + self.optimizer.zero_grad() + cnt += 1 + if cnt % 100 == 0: + self.scheduler.step() + step_bar.update() + step_bar.close() + + def _before_fit(self): + super()._before_fit() + self.datetime = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') diff --git a/applications/Chat/coati/trainer/sft.py b/applications/Chat/coati/trainer/sft.py index 6a385b32eac1..034f11c0b616 100644 --- a/applications/Chat/coati/trainer/sft.py +++ b/applications/Chat/coati/trainer/sft.py @@ -1,16 +1,15 @@ import time -from typing import List, Optional +from typing import Optional import torch import torch.distributed as dist +import tqdm import wandb from torch.optim import Optimizer from torch.optim.lr_scheduler import _LRScheduler from torch.utils.data import DataLoader -from tqdm import tqdm from .base import SLTrainer -from .callbacks import Callback from .strategies import ColossalAIStrategy, Strategy from .utils import is_rank_0, to_device @@ -45,6 +44,7 @@ def __init__( from colossalai.booster.plugin import GeminiPlugin assert not isinstance(strategy.plugin, GeminiPlugin), \ "Accumulation steps are not supported in stage 3 of ColossalAI" + super().__init__( strategy, max_epochs, model, optim, train_dataloader @@ -55,75 +55,74 @@ def __init__( self.accumulation_steps = accumulation_steps self.scheduler = lr_scheduler - def fit(self, logger, use_wandb: bool = False): + def _train(self, epoch: int): + self.model.train() + for batch_id, batch in enumerate(self.train_dataloader): + + batch = to_device(batch, torch.cuda.current_device()) + outputs = self.model(batch["input_ids"], + attention_mask=batch["attention_mask"], + labels=batch["labels"]) + + loss = outputs.loss + + if loss >= 2.5 and is_rank_0(): + self.logger.warning(f"batch_id:{batch_id}, abnormal loss: {loss}") + + loss = loss / self.accumulation_steps + + self.strategy.backward(loss, self.model, self.optimizer) + + self.total_loss += loss.item() + + # gradient accumulation + if (batch_id + 1) % self.accumulation_steps == 0: + self.strategy.optimizer_step(self.optimizer) + self.optimizer.zero_grad() + self.scheduler.step() + if is_rank_0() and self.use_wandb: + wandb.log({ + "loss": self.total_loss / self.accumulation_steps, + "lr": self.scheduler.get_last_lr()[0], + "epoch": epoch, + "batch_id": batch_id + }) + self.total_loss = 0 + self.step_bar.update() + + def _eval(self, epoch: int): + if self.eval_dataloader is not None: + self.model.eval() + with torch.no_grad(): + loss_sum, num_seen = 0, 0 + for batch in self.eval_dataloader: + batch = to_device(batch, torch.cuda.current_device()) + outputs = self.model(batch["input_ids"], + attention_mask=batch["attention_mask"], + labels=batch["labels"]) + loss = outputs.loss + + loss_sum += loss.item() + num_seen += batch["input_ids"].size(0) + + loss_mean = loss_sum / num_seen + if dist.get_rank() == 0: + self.logger.info(f'Eval Epoch {epoch}/{self.max_epochs} loss {loss_mean}') + + def _before_fit(self, + logger, + use_wandb: bool = False + ): + self.logger = logger + self.use_wandb = use_wandb if use_wandb: wandb.init(project="Coati", name=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) wandb.watch(self.model) - total_loss = 0 - # epoch_bar = tqdm(range(self.epochs), desc='Epochs', disable=not is_rank_0()) - step_bar = tqdm(range(len(self.train_dataloader) // self.accumulation_steps * self.max_epochs), - desc=f'steps', - disable=not is_rank_0()) - for epoch in range(self.max_epochs): - - # process_bar = tqdm(range(len(self.train_dataloader)), desc=f'Train process for{epoch}', disable=not is_rank_0()) - # train - self.model.train() - for batch_id, batch in enumerate(self.train_dataloader): - - batch = to_device(batch, torch.cuda.current_device()) - outputs = self.model(batch["input_ids"], attention_mask=batch["attention_mask"], labels=batch["labels"]) - - loss = outputs.loss - - if loss >= 2.5 and is_rank_0(): - logger.warning(f"batch_id:{batch_id}, abnormal loss: {loss}") - - loss = loss / self.accumulation_steps - - self.strategy.backward(loss, self.model, self.optimizer) - - total_loss += loss.item() - - # gradient accumulation - if (batch_id + 1) % self.accumulation_steps == 0: - self.strategy.optimizer_step(self.optimizer) - self.optimizer.zero_grad() - self.scheduler.step() - if is_rank_0() and use_wandb: - wandb.log({ - "loss": total_loss / self.accumulation_steps, - "lr": self.scheduler.get_last_lr()[0], - "epoch": epoch, - "batch_id": batch_id - }) - total_loss = 0 - step_bar.update() - - # if batch_id % log_interval == 0: - # logger.info(f'Train Epoch {epoch}/{self.epochs} Batch {batch_id} Rank {dist.get_rank()} loss {loss.item()}') - # wandb.log({"loss": loss.item()}) - - # process_bar.update() - - # eval - if self.eval_dataloader is not None: - self.model.eval() - with torch.no_grad(): - loss_sum = 0 - num_seen = 0 - for batch in self.eval_dataloader: - batch = to_device(batch, torch.cuda.current_device()) - outputs = self.model(batch["input_ids"], - attention_mask=batch["attention_mask"], - labels=batch["labels"]) - loss = outputs.loss - - loss_sum += loss.item() - num_seen += batch["input_ids"].size(0) - - loss_mean = loss_sum / num_seen - if dist.get_rank() == 0: - logger.info(f'Eval Epoch {epoch}/{self.max_epochs} loss {loss_mean}') - - # epoch_bar.update() + + self.total_loss = 0 + self.no_epoch_bar = True + self.step_bar = tqdm.trange( + len(self.train_dataloader) // self.accumulation_steps * self.max_epochs, + desc=f'steps', + disable=not is_rank_0() + ) From e6a56f0acb102e7bcac9bf1ca3b1a38f4c0502e2 Mon Sep 17 00:00:00 2001 From: CWHer Date: Sun, 25 Jun 2023 18:09:12 +0800 Subject: [PATCH 03/17] fix: fix init file --- applications/Chat/coati/trainer/__init__.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/applications/Chat/coati/trainer/__init__.py b/applications/Chat/coati/trainer/__init__.py index cb8fd5dca605..86142361f3ff 100644 --- a/applications/Chat/coati/trainer/__init__.py +++ b/applications/Chat/coati/trainer/__init__.py @@ -1,9 +1,10 @@ -from .base import SLTrainer -# from .ppo import PPOTrainer +from .base import OnPolicyTrainer, SLTrainer +from .ppo import PPOTrainer from .rm import RewardModelTrainer from .sft import SFTTrainer __all__ = [ - 'SLTrainer', - 'RewardModelTrainer', 'SFTTrainer' + 'SLTrainer', 'OnPolicyTrainer', + 'RewardModelTrainer', 'SFTTrainer', + 'PPOTrainer' ] From 89f77e43a174be917f3f4c9ee9ae86b19199d23a Mon Sep 17 00:00:00 2001 From: CWHer Date: Sun, 25 Jun 2023 18:09:32 +0800 Subject: [PATCH 04/17] feat: remove on_learn_epoch fn as not used --- applications/Chat/coati/trainer/callbacks/base.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/applications/Chat/coati/trainer/callbacks/base.py b/applications/Chat/coati/trainer/callbacks/base.py index f5616048855b..4351c0079b8d 100644 --- a/applications/Chat/coati/trainer/callbacks/base.py +++ b/applications/Chat/coati/trainer/callbacks/base.py @@ -26,12 +26,6 @@ def on_make_experience_start(self) -> None: def on_make_experience_end(self, experience: Experience) -> None: pass - def on_learn_epoch_start(self, epoch: int) -> None: - pass - - def on_learn_epoch_end(self, epoch: int) -> None: - pass - def on_learn_batch_start(self) -> None: pass From 03d00799cd18860eb24e4d9750fffad3fa5d3fc1 Mon Sep 17 00:00:00 2001 From: CWHer Date: Sun, 25 Jun 2023 18:29:04 +0800 Subject: [PATCH 05/17] fix: align with modified gemini arguments --- .../Chat/coati/trainer/strategies/colossalai.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index f31551f22318..e5a69f3351cb 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -1,4 +1,3 @@ -import functools import warnings from typing import Optional @@ -103,7 +102,7 @@ def __init__( # NOTE: dist should be initialized before calling get_current_device() if stage == 3: plugin_initializer = lambda: GeminiPlugin( - # gemini_config + # gemini_config device=get_current_device(), placement_policy=placement_policy, precision=precision, @@ -113,20 +112,20 @@ def __init__( search_range_m=search_range_m, hidden_dim=hidden_dim, min_chunk_size_m=min_chunk_size_m, - # zero_optim_config + # zero_optim_config gpu_margin_mem_ratio=gpu_margin_mem_ratio, - # optim_config + # optim_config **optim_kwargs) else: plugin_initializer = lambda: LowLevelZeroPlugin( - # zero_config + # zero_config stage=stage, precision=precision, - # zero_optim_config + # zero_optim_config reduce_bucket_size_in_m=reduce_bucket_size, overlap_communication=overlap_communication, cpu_offload=(placement_policy == 'cpu'), - # optim_config + # optim_config **optim_kwargs) super().__init__(seed, plugin_initializer) From c1fb823498b71f5fb8caefcdf8638ac2b17419c0 Mon Sep 17 00:00:00 2001 From: CWHer Date: Sun, 25 Jun 2023 18:29:34 +0800 Subject: [PATCH 06/17] to: add OnPolicyTrainer --- applications/Chat/coati/trainer/base.py | 90 ++++++++++++++++++++----- applications/Chat/coati/trainer/ppo.py | 73 ++++++++++---------- 2 files changed, 106 insertions(+), 57 deletions(-) diff --git a/applications/Chat/coati/trainer/base.py b/applications/Chat/coati/trainer/base.py index a39f5e740137..ecc9d27cfa4b 100644 --- a/applications/Chat/coati/trainer/base.py +++ b/applications/Chat/coati/trainer/base.py @@ -1,10 +1,11 @@ from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, List, Optional, Union +from contextlib import contextmanager +from typing import List -import torch import torch.nn as nn import tqdm from coati.experience_maker import Experience +from coati.replay_buffer import NaiveReplayBuffer from torch.optim import Optimizer from torch.utils.data import DataLoader @@ -59,17 +60,46 @@ def fit(self, *args, **kwargs): self._train(epoch) self._eval(epoch) - def _on_fit_end(self) -> None: - for callback in self.callbacks: - callback.on_fit_end() - def _on_episode_start(self, episode: int) -> None: - for callback in self.callbacks: - callback.on_episode_start(episode) +class OnPolicyTrainer(ABC): + """ + Base class for on-policy rl trainers, e.g. PPO. - def _on_episode_end(self, episode: int) -> None: + Args: + strategy (Strategy):the strategy to use for training + buffer (NaiveReplayBuffer): the buffer to collect experiences + callbacks (List[Callback], defaults to []): the callbacks to call during training process + """ + + def __init__(self, + strategy: Strategy, + buffer: NaiveReplayBuffer, + callbacks: List[Callback] = [] + ) -> None: + super().__init__() + self.strategy = strategy + self.buffer = buffer + self.callbacks = callbacks + + @contextmanager + def _fit_ctx(self) -> None: for callback in self.callbacks: - callback.on_episode_end(episode) + callback.on_fit_start() + try: + yield + finally: + for callback in self.callbacks: + callback.on_fit_end() + + @contextmanager + def _episode_ctx(self, episode: int) -> None: + for callback in self.callbacks: + callback.on_episode_start(episode) + try: + yield + finally: + for callback in self.callbacks: + callback.on_episode_end(episode) def _on_make_experience_start(self) -> None: for callback in self.callbacks: @@ -79,14 +109,6 @@ def _on_make_experience_end(self, experience: Experience) -> None: for callback in self.callbacks: callback.on_make_experience_end(experience) - def _on_learn_epoch_start(self, epoch: int) -> None: - for callback in self.callbacks: - callback.on_learn_epoch_start(epoch) - - def _on_learn_epoch_end(self, epoch: int) -> None: - for callback in self.callbacks: - callback.on_learn_epoch_end(epoch) - def _on_learn_batch_start(self) -> None: for callback in self.callbacks: callback.on_learn_batch_start() @@ -94,3 +116,35 @@ def _on_learn_batch_start(self) -> None: def _on_learn_batch_end(self, metrics: dict, experience: Experience) -> None: for callback in self.callbacks: callback.on_learn_batch_end(metrics, experience) + + # TODO(cwher): + # @abstractmethod + # def _make_experience(self): + # raise NotImplementedError() + + # @abstractmethod + # def _learn(self): + # raise NotImplementedError() + + # def _collect_phase(self): + # self._on_make_experience_start() + # experience = self._make_experience() + # self._on_make_experience_end(experience) + + # def _update_phase(self): + # pass + + # def fit(self, + # num_episodes: int, + # num_collect_steps: int, + # num_update_steps: int, + # ): + # with self._fit_ctx(): + # for episode in range(num_episodes): + # with self._episode_ctx(episode): + # for collect_step in range(num_collect_steps): + # self._collect_phase() + # for update_step in range(num_update_steps): + # self._update_phase() + # # NOTE: this is for on-policy algorithms + # self.buffer.clear() diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index cfb18e2ae483..5e6dc5cf7ef9 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -1,6 +1,5 @@ -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Dict, List, Union -import torch import torch.nn as nn from coati.experience_maker import Experience, NaiveExperienceMaker from coati.models.base import Actor, Critic, get_base_model @@ -11,17 +10,16 @@ from torch.optim import Optimizer from torch.utils.data import DistributedSampler from tqdm import tqdm -from transformers.tokenization_utils_base import PreTrainedTokenizerBase from colossalai.utils import get_current_device -from .base import Trainer +from .base import OnPolicyTrainer from .callbacks import Callback from .strategies import ColossalAIStrategy, Strategy from .utils import is_rank_0, to_device -class PPOTrainer(Trainer): +class PPOTrainer(OnPolicyTrainer): """ Trainer for PPO algorithm. @@ -77,12 +75,14 @@ def __init__(self, "GeminiPlugin is not compatible with manual model.to('cpu')" experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, kl_coef) - replay_buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload) + buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload) generate_kwargs = _set_default_generate_kwargs(strategy, generate_kwargs, actor) - super().__init__(strategy, max_epochs, dataloader_pin_memory, callbacks, **generate_kwargs) + super().__init__(strategy, buffer, callbacks) + self.max_epochs = max_epochs + self.dataloader_pin_memory = dataloader_pin_memory + self.generate_kwargs = generate_kwargs self.experience_maker = experience_maker - self.replay_buffer = replay_buffer self.sample_replay_buffer = sample_replay_buffer self.offload_inference_models = offload_inference_models @@ -109,20 +109,18 @@ def _make_experience(self, inputs: Union[Tensor, Dict[str, Tensor]]) -> Experien def _learn(self): # replay buffer may be empty at first, we should rebuild at each training - if not self.sample_replay_buffer: - # HACK(cwher): according to the design of boost API, dataloader should also be boosted, - # but it is impractical to adapt this pattern in RL training. Thus, I left dataloader unboosted. - dataloader = self.strategy.setup_dataloader(self.replay_buffer, self.dataloader_pin_memory) if self.sample_replay_buffer: pbar = tqdm(range(self.max_epochs), desc='Train epoch', disable=not is_rank_0()) for _ in pbar: - experience = self.replay_buffer.sample() + experience = self.buffer.sample() experience.to_device(self.device) metrics = self.training_step(experience) pbar.set_postfix(metrics) else: + # HACK(cwher): according to the design of boost API, dataloader should also be boosted, + # but it is impractical to adapt this pattern in RL training. Thus, I left dataloader unboosted. + dataloader = self.strategy.setup_dataloader(self.buffer, self.dataloader_pin_memory) for epoch in range(self.max_epochs): - self._on_learn_epoch_start(epoch) if isinstance(dataloader.sampler, DistributedSampler): dataloader.sampler.set_epoch(epoch) pbar = tqdm(dataloader, desc=f'Train epoch [{epoch+1}/{self.max_epochs}]', disable=not is_rank_0()) @@ -132,7 +130,6 @@ def _learn(self): metrics = self.training_step(experience) self._on_learn_batch_end(metrics, experience) pbar.set_postfix(metrics) - self._on_learn_epoch_end(epoch) def fit(self, prompt_dataloader, @@ -143,30 +140,28 @@ def fit(self, time = 0 self.pretrain_dataloader = pretrain_dataloader self.prompt_dataloader = prompt_dataloader - self._on_fit_start() - for episode in range(num_episodes): - self._on_episode_start(episode) - for timestep in tqdm(range(max_timesteps), - desc=f'Episode [{episode+1}/{num_episodes}]', - disable=not is_rank_0()): - time += 1 - prompts = next(iter(self.prompt_dataloader)) - self._on_make_experience_start() - if self.offload_inference_models: - # TODO(ver217): this may be controlled by strategy if they are prepared by strategy - self.experience_maker.initial_model.to(self.device) - self.experience_maker.reward_model.to(self.device) - experience = self._make_experience(prompts) - self._on_make_experience_end(experience) - self.replay_buffer.append(experience) - if time % update_timesteps == 0: - if self.offload_inference_models: - self.experience_maker.initial_model.to('cpu') - self.experience_maker.reward_model.to('cpu') - self._learn() - self.replay_buffer.clear() - self._on_episode_end(episode) - self._on_fit_end() + with self._fit_ctx(): + for episode in range(num_episodes): + with self._episode_ctx(episode): + for timestep in tqdm(range(max_timesteps), + desc=f'Episode [{episode+1}/{num_episodes}]', + disable=not is_rank_0()): + time += 1 + prompts = next(iter(self.prompt_dataloader)) + self._on_make_experience_start() + if self.offload_inference_models: + # TODO(ver217): this may be controlled by strategy if they are prepared by strategy + self.experience_maker.initial_model.to(self.device) + self.experience_maker.reward_model.to(self.device) + experience = self._make_experience(prompts) + self._on_make_experience_end(experience) + self.buffer.append(experience) + if time % update_timesteps == 0: + if self.offload_inference_models: + self.experience_maker.initial_model.to('cpu') + self.experience_maker.reward_model.to('cpu') + self._learn() + self.buffer.clear() def training_step(self, experience: Experience) -> Dict[str, float]: self.actor.train() From b6f7687bb0492cc567b5ffc9a3213e67fa24b5c0 Mon Sep 17 00:00:00 2001 From: CWHer Date: Mon, 26 Jun 2023 11:50:09 +0800 Subject: [PATCH 07/17] revert: add _on_learn_epoch fn --- applications/Chat/coati/trainer/base.py | 8 ++++++++ applications/Chat/coati/trainer/callbacks/base.py | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/applications/Chat/coati/trainer/base.py b/applications/Chat/coati/trainer/base.py index ecc9d27cfa4b..c88c64f949e2 100644 --- a/applications/Chat/coati/trainer/base.py +++ b/applications/Chat/coati/trainer/base.py @@ -109,6 +109,14 @@ def _on_make_experience_end(self, experience: Experience) -> None: for callback in self.callbacks: callback.on_make_experience_end(experience) + def _on_learn_epoch_start(self, epoch: int) -> None: + for callback in self.callbacks: + callback.on_learn_epoch_start(epoch) + + def _on_learn_epoch_end(self, epoch: int) -> None: + for callback in self.callbacks: + callback.on_learn_epoch_end(epoch) + def _on_learn_batch_start(self) -> None: for callback in self.callbacks: callback.on_learn_batch_start() diff --git a/applications/Chat/coati/trainer/callbacks/base.py b/applications/Chat/coati/trainer/callbacks/base.py index 4351c0079b8d..f5616048855b 100644 --- a/applications/Chat/coati/trainer/callbacks/base.py +++ b/applications/Chat/coati/trainer/callbacks/base.py @@ -26,6 +26,12 @@ def on_make_experience_start(self) -> None: def on_make_experience_end(self, experience: Experience) -> None: pass + def on_learn_epoch_start(self, epoch: int) -> None: + pass + + def on_learn_epoch_end(self, epoch: int) -> None: + pass + def on_learn_batch_start(self) -> None: pass From 523b57c973ce29d1588d7c51e1626fcc142d08fc Mon Sep 17 00:00:00 2001 From: CWHer Date: Mon, 26 Jun 2023 12:27:13 +0800 Subject: [PATCH 08/17] refactor: refactor PPOTrainer --- applications/Chat/coati/trainer/base.py | 96 +++++++++----- applications/Chat/coati/trainer/ppo.py | 163 +++++++++++------------- 2 files changed, 137 insertions(+), 122 deletions(-) diff --git a/applications/Chat/coati/trainer/base.py b/applications/Chat/coati/trainer/base.py index c88c64f949e2..92397ff9a3c9 100644 --- a/applications/Chat/coati/trainer/base.py +++ b/applications/Chat/coati/trainer/base.py @@ -74,11 +74,15 @@ class OnPolicyTrainer(ABC): def __init__(self, strategy: Strategy, buffer: NaiveReplayBuffer, + sample_buffer: bool, + dataloader_pin_memory: bool, callbacks: List[Callback] = [] ) -> None: super().__init__() self.strategy = strategy self.buffer = buffer + self.sample_buffer = sample_buffer + self.dataloader_pin_memory = dataloader_pin_memory self.callbacks = callbacks @contextmanager @@ -125,34 +129,64 @@ def _on_learn_batch_end(self, metrics: dict, experience: Experience) -> None: for callback in self.callbacks: callback.on_learn_batch_end(metrics, experience) - # TODO(cwher): - # @abstractmethod - # def _make_experience(self): - # raise NotImplementedError() - - # @abstractmethod - # def _learn(self): - # raise NotImplementedError() - - # def _collect_phase(self): - # self._on_make_experience_start() - # experience = self._make_experience() - # self._on_make_experience_end(experience) - - # def _update_phase(self): - # pass - - # def fit(self, - # num_episodes: int, - # num_collect_steps: int, - # num_update_steps: int, - # ): - # with self._fit_ctx(): - # for episode in range(num_episodes): - # with self._episode_ctx(episode): - # for collect_step in range(num_collect_steps): - # self._collect_phase() - # for update_step in range(num_update_steps): - # self._update_phase() - # # NOTE: this is for on-policy algorithms - # self.buffer.clear() + @abstractmethod + def _make_experience(self, collect_step: int): + """ + Implement this method to make experience. + """ + raise NotImplementedError() + + @abstractmethod + def _learn(self, update_step: int): + """ + Implement this method to learn from experience, either + sample from buffer or transform buffer into dataloader. + """ + raise NotImplementedError() + + def _collect_phase(self, collect_step: int): + self._on_make_experience_start() + experience = self._make_experience(collect_step) + self._on_make_experience_end(experience) + self.buffer.append(experience) + + def _update_phase(self, update_step: int): + self._on_learn_epoch_start(update_step) + self._learn(update_step) + self._on_learn_epoch_end(update_step) + + def fit(self, + num_episodes: int, + num_collect_steps: int, + num_update_steps: int, + ): + """ + The main training loop of on-policy rl trainers. + + Args: + num_episodes (int): the number of episodes to train + num_collect_steps (int): the number of collect steps per episode + num_update_steps (int): the number of update steps per episode + """ + + with self._fit_ctx(): + for episode in tqdm.trange(num_episodes, + desc="Episodes", + disable=not is_rank_0()): + with self._episode_ctx(episode): + for collect_step in tqdm.trange(num_collect_steps, + desc="Collect steps", + disable=not is_rank_0()): + self._collect_phase(collect_step) + if not self.sample_buffer: + # HACK(cwher): according to the design of boost API, dataloader should also be boosted, + # but it is impractical to adapt this pattern in RL training. Thus, I left dataloader unboosted. + # I only call strategy.setup_dataloader() to setup dataloader. + self.dataloader = self.strategy.setup_dataloader(self.buffer, + self.dataloader_pin_memory) + for update_step in tqdm.trange(num_update_steps, + desc="Update steps", + disable=not is_rank_0()): + self._update_phase(update_step) + # NOTE: this is for on-policy algorithms + self.buffer.clear() diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index 5e6dc5cf7ef9..c39b40825a4e 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Union +from typing import Dict, List import torch.nn as nn from coati.experience_maker import Experience, NaiveExperienceMaker @@ -8,7 +8,7 @@ from coati.replay_buffer import NaiveReplayBuffer from torch import Tensor from torch.optim import Optimizer -from torch.utils.data import DistributedSampler +from torch.utils.data import DataLoader, DistributedSampler from tqdm import tqdm from colossalai.utils import get_current_device @@ -19,6 +19,20 @@ from .utils import is_rank_0, to_device +def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, actor: Actor) -> Dict: + unwrapper_model = strategy.unwrap_model(actor) + hf_model = get_base_model(unwrapper_model) + new_kwargs = {**generate_kwargs} + # use huggingface models method directly + if 'prepare_inputs_fn' not in generate_kwargs and hasattr(hf_model, 'prepare_inputs_for_generation'): + new_kwargs['prepare_inputs_fn'] = hf_model.prepare_inputs_for_generation + + if 'update_model_kwargs_fn' not in generate_kwargs and hasattr(hf_model, '_update_model_kwargs_for_generation'): + new_kwargs['update_model_kwargs_fn'] = hf_model._update_model_kwargs_for_generation + + return new_kwargs + + class PPOTrainer(OnPolicyTrainer): """ Trainer for PPO algorithm. @@ -31,6 +45,8 @@ class PPOTrainer(OnPolicyTrainer): initial_model (Actor): the initial model in rlhf algorithm to generate reference logics to limit the update of actor actor_optim (Optimizer): the optimizer to use for actor model critic_optim (Optimizer): the optimizer to use for critic model + prompt_dataloader (DataLoader): the dataloader to use for prompt data + pretrain_dataloader (DataLoader): the dataloader to use for pretrain data kl_coef (float, defaults to 0.1): the coefficient of kl divergence loss train_batch_size (int, defaults to 8): the batch size to use for training buffer_limit (int, defaults to 0): the max_size limitation of replay buffer @@ -39,7 +55,6 @@ class PPOTrainer(OnPolicyTrainer): vf_coef (float, defaults to 1.0): the coefficient of value loss ptx_coef (float, defaults to 0.9): the coefficient of ptx loss value_clip (float, defaults to 0.4): the clip coefficient of value loss - max_epochs (int, defaults to 1): the number of epochs of training process sample_replay_buffer (bool, defaults to False): whether to sample from replay buffer dataloader_pin_memory (bool, defaults to True): whether to pin memory for data loader offload_inference_models (bool, defaults to True): whether to offload inference models to cpu during training process @@ -55,6 +70,8 @@ def __init__(self, initial_model: Actor, actor_optim: Optimizer, critic_optim: Optimizer, + prompt_dataloader: DataLoader, + pretrain_dataloader: DataLoader, kl_coef: float = 0.1, ptx_coef: float = 0.9, train_batch_size: int = 8, @@ -63,29 +80,31 @@ def __init__(self, eps_clip: float = 0.2, vf_coef: float = 1.0, value_clip: float = 0.4, - max_epochs: int = 1, sample_replay_buffer: bool = False, dataloader_pin_memory: bool = True, offload_inference_models: bool = True, callbacks: List[Callback] = [], - **generate_kwargs) -> None: + **generate_kwargs + ) -> None: if isinstance(strategy, ColossalAIStrategy): from colossalai.booster.plugin import GeminiPlugin assert not (isinstance(strategy.plugin, GeminiPlugin) and offload_inference_models), \ "GeminiPlugin is not compatible with manual model.to('cpu')" - experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, kl_coef) buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload) - generate_kwargs = _set_default_generate_kwargs(strategy, generate_kwargs, actor) - super().__init__(strategy, buffer, callbacks) - - self.max_epochs = max_epochs - self.dataloader_pin_memory = dataloader_pin_memory - self.generate_kwargs = generate_kwargs - self.experience_maker = experience_maker - self.sample_replay_buffer = sample_replay_buffer + super().__init__( + strategy, buffer, + sample_replay_buffer, dataloader_pin_memory, + callbacks + ) + + self.generate_kwargs = _set_default_generate_kwargs(strategy, generate_kwargs, actor) + self.experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, kl_coef) self.offload_inference_models = offload_inference_models + self.prompt_dataloader = prompt_dataloader + self.pretrain_dataloader = pretrain_dataloader + self.actor = actor self.critic = critic @@ -99,71 +118,20 @@ def __init__(self, self.device = get_current_device() - def _make_experience(self, inputs: Union[Tensor, Dict[str, Tensor]]) -> Experience: - if isinstance(inputs, Tensor): - return self.experience_maker.make_experience(inputs, **self.generate_kwargs) - elif isinstance(inputs, dict): - return self.experience_maker.make_experience(**inputs, **self.generate_kwargs) - else: - raise ValueError(f'Unsupported input type "{type(inputs)}"') - - def _learn(self): - # replay buffer may be empty at first, we should rebuild at each training - if self.sample_replay_buffer: - pbar = tqdm(range(self.max_epochs), desc='Train epoch', disable=not is_rank_0()) - for _ in pbar: - experience = self.buffer.sample() - experience.to_device(self.device) - metrics = self.training_step(experience) - pbar.set_postfix(metrics) + def _make_experience(self, collect_step: int) -> Experience: + prompts = next(iter(self.prompt_dataloader)) # sample a prompt + if self.offload_inference_models: + # TODO(ver217): this may be controlled by strategy if they are prepared by strategy + self.experience_maker.initial_model.to(self.device) + self.experience_maker.reward_model.to(self.device) + if isinstance(prompts, Tensor): + return self.experience_maker.make_experience(prompts, **self.generate_kwargs) + elif isinstance(prompts, dict): + return self.experience_maker.make_experience(**prompts, **self.generate_kwargs) else: - # HACK(cwher): according to the design of boost API, dataloader should also be boosted, - # but it is impractical to adapt this pattern in RL training. Thus, I left dataloader unboosted. - dataloader = self.strategy.setup_dataloader(self.buffer, self.dataloader_pin_memory) - for epoch in range(self.max_epochs): - if isinstance(dataloader.sampler, DistributedSampler): - dataloader.sampler.set_epoch(epoch) - pbar = tqdm(dataloader, desc=f'Train epoch [{epoch+1}/{self.max_epochs}]', disable=not is_rank_0()) - for experience in pbar: - self._on_learn_batch_start() - experience.to_device(self.device) - metrics = self.training_step(experience) - self._on_learn_batch_end(metrics, experience) - pbar.set_postfix(metrics) - - def fit(self, - prompt_dataloader, - pretrain_dataloader, - num_episodes: int = 50000, - max_timesteps: int = 500, - update_timesteps: int = 5000) -> None: - time = 0 - self.pretrain_dataloader = pretrain_dataloader - self.prompt_dataloader = prompt_dataloader - with self._fit_ctx(): - for episode in range(num_episodes): - with self._episode_ctx(episode): - for timestep in tqdm(range(max_timesteps), - desc=f'Episode [{episode+1}/{num_episodes}]', - disable=not is_rank_0()): - time += 1 - prompts = next(iter(self.prompt_dataloader)) - self._on_make_experience_start() - if self.offload_inference_models: - # TODO(ver217): this may be controlled by strategy if they are prepared by strategy - self.experience_maker.initial_model.to(self.device) - self.experience_maker.reward_model.to(self.device) - experience = self._make_experience(prompts) - self._on_make_experience_end(experience) - self.buffer.append(experience) - if time % update_timesteps == 0: - if self.offload_inference_models: - self.experience_maker.initial_model.to('cpu') - self.experience_maker.reward_model.to('cpu') - self._learn() - self.buffer.clear() - - def training_step(self, experience: Experience) -> Dict[str, float]: + raise ValueError(f'Unsupported input type "{type(prompts)}"') + + def _training_step(self, experience: Experience) -> Dict[str, float]: self.actor.train() self.critic.train() # policy loss @@ -203,16 +171,29 @@ def training_step(self, experience: Experience) -> Dict[str, float]: return {'reward': experience.reward.mean().item()} - -def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, actor: Actor) -> Dict: - unwrapper_model = strategy.unwrap_model(actor) - hf_model = get_base_model(unwrapper_model) - new_kwargs = {**generate_kwargs} - # use huggingface models method directly - if 'prepare_inputs_fn' not in generate_kwargs and hasattr(hf_model, 'prepare_inputs_for_generation'): - new_kwargs['prepare_inputs_fn'] = hf_model.prepare_inputs_for_generation - - if 'update_model_kwargs_fn' not in generate_kwargs and hasattr(hf_model, '_update_model_kwargs_for_generation'): - new_kwargs['update_model_kwargs_fn'] = hf_model._update_model_kwargs_for_generation - - return new_kwargs + def _learn(self, update_step: int): + if self.offload_inference_models: + self.experience_maker.initial_model.to('cpu') + self.experience_maker.reward_model.to('cpu') + + # buffer may be empty at first, we should rebuild at each training + if self.sample_buffer: + experience = self.buffer.sample() + self._on_learn_batch_start() + experience.to_device(self.device) + metrics = self._training_step(experience) + self._on_learn_batch_end(metrics, experience) + else: + if isinstance(self.dataloader.sampler, DistributedSampler): + self.dataloader.sampler.set_epoch(update_step) + pbar = tqdm( + self.dataloader, + desc=f'Train epoch [{update_step + 1}]', + disable=not is_rank_0() + ) + for experience in pbar: + self._on_learn_batch_start() + experience.to_device(self.device) + metrics = self._training_step(experience) + self._on_learn_batch_end(metrics, experience) + pbar.set_postfix(metrics) From 4a69297c2ad43633827b3edfaccf6418666fbabd Mon Sep 17 00:00:00 2001 From: CWHer Date: Mon, 26 Jun 2023 12:29:57 +0800 Subject: [PATCH 09/17] style: rename PPOTrainer argument --- applications/Chat/coati/trainer/ppo.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index c39b40825a4e..ae23b5194717 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -49,13 +49,13 @@ class PPOTrainer(OnPolicyTrainer): pretrain_dataloader (DataLoader): the dataloader to use for pretrain data kl_coef (float, defaults to 0.1): the coefficient of kl divergence loss train_batch_size (int, defaults to 8): the batch size to use for training - buffer_limit (int, defaults to 0): the max_size limitation of replay buffer - buffer_cpu_offload (bool, defaults to True): whether to offload replay buffer to cpu + buffer_limit (int, defaults to 0): the max_size limitation of buffer + buffer_cpu_offload (bool, defaults to True): whether to offload buffer to cpu eps_clip (float, defaults to 0.2): the clip coefficient of policy loss vf_coef (float, defaults to 1.0): the coefficient of value loss ptx_coef (float, defaults to 0.9): the coefficient of ptx loss value_clip (float, defaults to 0.4): the clip coefficient of value loss - sample_replay_buffer (bool, defaults to False): whether to sample from replay buffer + sample_buffer (bool, defaults to False): whether to sample from buffer dataloader_pin_memory (bool, defaults to True): whether to pin memory for data loader offload_inference_models (bool, defaults to True): whether to offload inference models to cpu during training process callbacks (List[Callback], defaults to []): the callbacks to call during training process @@ -80,7 +80,7 @@ def __init__(self, eps_clip: float = 0.2, vf_coef: float = 1.0, value_clip: float = 0.4, - sample_replay_buffer: bool = False, + sample_buffer: bool = False, dataloader_pin_memory: bool = True, offload_inference_models: bool = True, callbacks: List[Callback] = [], @@ -94,7 +94,7 @@ def __init__(self, buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload) super().__init__( strategy, buffer, - sample_replay_buffer, dataloader_pin_memory, + sample_buffer, dataloader_pin_memory, callbacks ) From 8bd57a95dd5d28a2a973d57fc5beb0f0540e282c Mon Sep 17 00:00:00 2001 From: CWHer Date: Mon, 26 Jun 2023 13:42:00 +0800 Subject: [PATCH 10/17] fix: align with modified PPO arguments --- .../benchmarks/benchmark_opt_lora_dummy.py | 28 +++++++++---------- applications/Chat/examples/README.md | 5 ++-- .../community/peft/train_peft_prompts.py | 16 +++++------ applications/Chat/examples/train_prompts.py | 16 +++++------ 4 files changed, 29 insertions(+), 36 deletions(-) diff --git a/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py b/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py index dea7ebc60a8b..eeb9fcd08873 100644 --- a/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py +++ b/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py @@ -137,6 +137,12 @@ def main(args): (actor, actor_optim), (critic, critic_optim) = strategy.prepare((actor, actor_optim), (critic, critic_optim)) + random_prompts = torch.randint(tokenizer.vocab_size, (1000, 256), device=torch.cuda.current_device()) + dataloader = DataLoader(random_prompts, + batch_size=args.experience_batch_size, + shuffle=True, + collate_fn=preprocess_batch) + trainer = PPOTrainer(strategy, actor, critic, @@ -144,8 +150,9 @@ def main(args): initial_model, actor_optim, critic_optim, + prompt_dataloader=dataloader, + pretrain_dataloader=None, ptx_coef=0, - max_epochs=args.max_epochs, train_batch_size=args.train_batch_size, offload_inference_models=args.offload_inference_models, max_length=512, @@ -157,17 +164,9 @@ def main(args): eos_token_id=tokenizer.eos_token_id, callbacks=[performance_evaluator]) - random_prompts = torch.randint(tokenizer.vocab_size, (1000, 256), device=torch.cuda.current_device()) - dataloader = DataLoader(random_prompts, - batch_size=args.experience_batch_size, - shuffle=True, - collate_fn=preprocess_batch) - - trainer.fit(dataloader, - None, - num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) + trainer.fit(num_episodes=args.num_episodes, + num_update_steps=args.num_update_steps, + num_collect_steps=args.num_collect_steps) print_rank_0(f'Peak CUDA mem: {torch.cuda.max_memory_allocated()/1024**3:.2f} GB') @@ -183,9 +182,8 @@ def main(args): ], default='ddp') parser.add_argument('--num_episodes', type=int, default=3) - parser.add_argument('--max_timesteps', type=int, default=8) - parser.add_argument('--update_timesteps', type=int, default=8) - parser.add_argument('--max_epochs', type=int, default=1) + parser.add_argument('--num_collect_steps', type=int, default=8) + parser.add_argument('--num_update_steps', type=int, default=1) parser.add_argument('--train_batch_size', type=int, default=8) parser.add_argument('--experience_batch_size', type=int, default=8) parser.add_argument('--lora_rank', type=int, default=0) diff --git a/applications/Chat/examples/README.md b/applications/Chat/examples/README.md index 72810738d017..3e9d9c4325d8 100644 --- a/applications/Chat/examples/README.md +++ b/applications/Chat/examples/README.md @@ -171,9 +171,8 @@ Pretrain dataset: the pretrain dataset including the instruction and correspondi - --pretrain_dataset: path of the ptx dataset, type=str, default=None - --need_optim_ckpt: whether to save optim ckpt, type=bool, default=False - --num_episodes: num of episodes for training, type=int, default=10 -- --max_epochs: max epochs for training in one episode, type=int, default=5 -- --max_timesteps: max episodes in one batch, type=int, default=10 -- --update_timesteps: timesteps to update, type=int, default=10 +- --num_update_steps: number of steps to update policy per episode, type=int +- --num_collect_steps: number of steps to collect experience per episode, type=int - --train_batch_size: batch size while training, type=int, default=8 - --ptx_batch_size: batch size to compute ptx loss, type=int, default=1 - --experience_batch_size: batch size to make experience, type=int, default=8 diff --git a/applications/Chat/examples/community/peft/train_peft_prompts.py b/applications/Chat/examples/community/peft/train_peft_prompts.py index ba8470f38fad..2ab22ff289b2 100644 --- a/applications/Chat/examples/community/peft/train_peft_prompts.py +++ b/applications/Chat/examples/community/peft/train_peft_prompts.py @@ -169,9 +169,10 @@ def tokenize_fn(texts): initial_model, actor_optim, critic_optim, + prompt_dataloader=prompt_dataloader, + pretrain_dataloader=pretrain_dataloader, kl_coef=args.kl_coef, ptx_coef=args.ptx_coef, - max_epochs=args.max_epochs, train_batch_size=args.train_batch_size, experience_batch_size=args.experience_batch_size, tokenizer=tokenize_fn, @@ -183,11 +184,9 @@ def tokenize_fn(texts): eos_token_id=tokenizer.eos_token_id, ) - trainer.fit(prompt_dataloader=prompt_dataloader, - pretrain_dataloader=pretrain_dataloader, - num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) + trainer.fit(num_episodes=args.num_episodes, + num_update_steps=args.num_update_steps, + num_collect_steps=args.num_collect_steps) # save model checkpoint after fitting trainer.save_model(args.save_path, only_rank0=True, tokenizer=tokenizer) @@ -215,9 +214,8 @@ def tokenize_fn(texts): parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts') parser.add_argument('--need_optim_ckpt', type=bool, default=False) parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) + parser.add_argument('--num_collect_steps', type=int, default=10) + parser.add_argument('--num_update_steps', type=int, default=5) parser.add_argument('--train_batch_size', type=int, default=2) parser.add_argument('--ptx_batch_size', type=int, default=1) parser.add_argument('--experience_batch_size', type=int, default=8) diff --git a/applications/Chat/examples/train_prompts.py b/applications/Chat/examples/train_prompts.py index 2a47dda637bb..874b2fe630e1 100644 --- a/applications/Chat/examples/train_prompts.py +++ b/applications/Chat/examples/train_prompts.py @@ -175,9 +175,10 @@ def main(args): initial_model, actor_optim, critic_optim, + prompt_dataloader=prompt_dataloader, + pretrain_dataloader=pretrain_dataloader, kl_coef=args.kl_coef, ptx_coef=args.ptx_coef, - max_epochs=args.max_epochs, train_batch_size=args.train_batch_size, max_length=args.max_seq_len, use_cache=True, @@ -189,11 +190,9 @@ def main(args): offload_inference_models=args.strategy != 'colossalai_gemini' ) - trainer.fit(prompt_dataloader=prompt_dataloader, - pretrain_dataloader=pretrain_dataloader, - num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) + trainer.fit(num_episodes=args.num_episodes, + num_collect_steps=args.num_collect_steps, + num_update_steps=args.num_update_steps) # save model checkpoint after fitting strategy.save_model(actor, args.save_path, only_rank0=True) @@ -220,9 +219,8 @@ def main(args): parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts') parser.add_argument('--need_optim_ckpt', type=bool, default=False) parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) + parser.add_argument('--num_collect_steps', type=int, default=10) + parser.add_argument('--num_update_steps', type=int, default=5) parser.add_argument('--train_batch_size', type=int, default=8) parser.add_argument('--ptx_batch_size', type=int, default=1) parser.add_argument('--experience_batch_size', type=int, default=8) From 8a277dc694bc01c04c14fd8a758a84b4f21cbace Mon Sep 17 00:00:00 2001 From: CWHer Date: Mon, 26 Jun 2023 13:45:38 +0800 Subject: [PATCH 11/17] test: align with modified train_prompts arguments --- applications/Chat/examples/test_ci.sh | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/applications/Chat/examples/test_ci.sh b/applications/Chat/examples/test_ci.sh index 85728e95820c..4bf5524afb01 100755 --- a/applications/Chat/examples/test_ci.sh +++ b/applications/Chat/examples/test_ci.sh @@ -63,8 +63,8 @@ for model in 'gpt2' 'bloom' 'opt' 'llama' 'roberta'; do torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ --strategy $strategy --model $model \ - --num_episodes 1 --max_timesteps 2 \ - --update_timesteps 2 --max_epochs 1 --train_batch_size 2 + --num_episodes 1 --num_collect_steps 2 --num_update_steps 1 \ + --train_batch_size 2 done done @@ -149,8 +149,8 @@ rm -rf ${BASE}/rm_ckpt.pt torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ - --strategy colossalai_zero2 --num_episodes 1 --max_timesteps 2 \ - --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ + --strategy colossalai_zero2 --num_episodes 1 \ + --num_collect_steps 2 --num_update_steps 1 --train_batch_size 2 \ --pretrain 'facebook/opt-350m' --model opt \ --rm_pretrain 'facebook/opt-350m' \ --rm_path ${BASE}/rm_ckpt_opt.pt \ @@ -159,8 +159,8 @@ rm -rf ${BASE}/rm_ckpt_opt.pt torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ - --strategy colossalai_zero2 --num_episodes 1 --max_timesteps 2 \ - --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ + --strategy colossalai_zero2 --num_episodes 1 \ + --num_collect_steps 2 --num_update_steps 1 --train_batch_size 2 \ --pretrain 'gpt2' --model gpt2 \ --rm_pretrain 'gpt2' \ --rm_path ${BASE}/rm_ckpt_gpt.pt \ @@ -168,8 +168,8 @@ torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ - --strategy colossalai_gemini --num_episodes 1 --max_timesteps 2 \ - --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ + --strategy colossalai_gemini --num_episodes 1 \ + --num_collect_steps 2 --num_update_steps 1 --train_batch_size 2 \ --pretrain 'gpt2' --model gpt2 \ --rm_pretrain 'gpt2' \ --rm_path ${BASE}/rm_ckpt_gpt.pt \ From a2443e2d55a0c949701e20c96ef095ae49267f9b Mon Sep 17 00:00:00 2001 From: CWHer Date: Mon, 26 Jun 2023 13:54:11 +0800 Subject: [PATCH 12/17] chore: modify train_prompts --- applications/Chat/examples/train_prompts.sh | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/applications/Chat/examples/train_prompts.sh b/applications/Chat/examples/train_prompts.sh index 7f3b2636ca32..d04c416015b1 100755 --- a/applications/Chat/examples/train_prompts.sh +++ b/applications/Chat/examples/train_prompts.sh @@ -1,13 +1,13 @@ set_n_least_used_CUDA_VISIBLE_DEVICES() { local n=${1:-"9999"} echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) + local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv | + tail -n +2 | + nl -v 0 | + tee /dev/tty | + sort -g -k 2 | + awk '{print $1}' | + head -n $n) export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') echo "Now CUDA_VISIBLE_DEVICES is set to:" echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" @@ -17,4 +17,9 @@ set_n_least_used_CUDA_VISIBLE_DEVICES 2 # torchrun --standalone --nproc_per_node=2 train_prompts.py prompts.csv --strategy colossalai_zero2 -torchrun --standalone --nproc_per_node=2 train_prompts.py --prompt_dataset /path/to/data.json --strategy colossalai_zero2 +torchrun --standalone --nproc_per_node=2 train_prompts.py \ + --pretrain_dataset /path/to/data.json \ + --prompt_dataset /path/to/data.json \ + --strategy colossalai_zero2 \ + --num_episodes 1 --num_collect_steps 2 --num_update_steps 1 \ + --train_batch_size 2 From dc82ade6e4bdcc37d26e6d0458f2ba20af90a4ff Mon Sep 17 00:00:00 2001 From: CWHer Date: Mon, 26 Jun 2023 13:58:16 +0800 Subject: [PATCH 13/17] docs: align with modified arguments --- applications/Chat/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/applications/Chat/README.md b/applications/Chat/README.md index 29cd581d7cc9..082cbb22b587 100644 --- a/applications/Chat/README.md +++ b/applications/Chat/README.md @@ -83,7 +83,7 @@ More details can be found in the latest news.

-> DeepSpeedChat performance comes from its blog on 2023 April 12, ColossalChat performance can be reproduced on an AWS p4d.24xlarge node with 8 A100-40G GPUs with the following command: torchrun --standalone --nproc_per_node 8 benchmark_opt_lora_dummy.py --max_timesteps 1 --update_timesteps 1 --use_kernels --strategy colossalai_zero2 --experience_batch_size 64 --train_batch_size 32 +> DeepSpeedChat performance comes from its blog on 2023 April 12, ColossalChat performance can be reproduced on an AWS p4d.24xlarge node with 8 A100-40G GPUs with the following command: torchrun --standalone --nproc_per_node 8 benchmark_opt_lora_dummy.py --num_collect_steps 1 --use_kernels --strategy colossalai_zero2 --experience_batch_size 64 --train_batch_size 32 ## Install From 95681853c8ef2bfcaa17c584364af6fcdc27fdf2 Mon Sep 17 00:00:00 2001 From: CWHer Date: Tue, 27 Jun 2023 14:01:56 +0800 Subject: [PATCH 14/17] fix: remove unnecessary output --- applications/Chat/coati/trainer/sft.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/applications/Chat/coati/trainer/sft.py b/applications/Chat/coati/trainer/sft.py index 034f11c0b616..f437bf2688ad 100644 --- a/applications/Chat/coati/trainer/sft.py +++ b/applications/Chat/coati/trainer/sft.py @@ -65,10 +65,6 @@ def _train(self, epoch: int): labels=batch["labels"]) loss = outputs.loss - - if loss >= 2.5 and is_rank_0(): - self.logger.warning(f"batch_id:{batch_id}, abnormal loss: {loss}") - loss = loss / self.accumulation_steps self.strategy.backward(loss, self.model, self.optimizer) From 211dfb559fdff6366e263b1874aeebba1f964efb Mon Sep 17 00:00:00 2001 From: CWHer Date: Tue, 27 Jun 2023 14:21:45 +0800 Subject: [PATCH 15/17] fix: move dataloader to fit fn of SLTrainer --- applications/Chat/coati/trainer/base.py | 3 -- applications/Chat/coati/trainer/rm.py | 33 ++++++++++--------- applications/Chat/coati/trainer/sft.py | 28 ++++++++-------- .../Chat/examples/train_reward_model.py | 7 ++-- applications/Chat/examples/train_sft.py | 7 ++-- 5 files changed, 39 insertions(+), 39 deletions(-) diff --git a/applications/Chat/coati/trainer/base.py b/applications/Chat/coati/trainer/base.py index 92397ff9a3c9..570a839bd2e7 100644 --- a/applications/Chat/coati/trainer/base.py +++ b/applications/Chat/coati/trainer/base.py @@ -23,7 +23,6 @@ class SLTrainer(ABC): max_epochs (int, defaults to 1): the number of epochs of training process model (nn.Module): the model to train optim (Optimizer): the optimizer to use for training - train_dataloader (DataLoader): the dataloader to use for training """ def __init__(self, @@ -31,14 +30,12 @@ def __init__(self, max_epochs: int, model: nn.Module, optimizer: Optimizer, - train_dataloader: DataLoader, ) -> None: super().__init__() self.strategy = strategy self.max_epochs = max_epochs self.model = model self.optimizer = optimizer - self.train_dataloader = train_dataloader @abstractmethod def _train(self, epoch): diff --git a/applications/Chat/coati/trainer/rm.py b/applications/Chat/coati/trainer/rm.py index 4beb10468473..54a5d0f40dea 100644 --- a/applications/Chat/coati/trainer/rm.py +++ b/applications/Chat/coati/trainer/rm.py @@ -23,9 +23,6 @@ class RewardModelTrainer(SLTrainer): optim (Optimizer): the optimizer to use for training lr_scheduler (_LRScheduler): the lr scheduler to use for training loss_fn (callable): the loss function to use for training - train_dataloader (DataLoader): the dataloader to use for training - valid_dataloader (DataLoader): the dataloader to use for validation - eval_dataloader (DataLoader): the dataloader to use for evaluation max_epochs (int, defaults to 2): the number of epochs to train """ @@ -36,18 +33,9 @@ def __init__( optim: Optimizer, lr_scheduler: _LRScheduler, loss_fn: Callable, - train_dataloader: DataLoader, - valid_dataloader: DataLoader, - eval_dataloader: DataLoader, max_epochs: int = 1, ) -> None: - super().__init__( - strategy, max_epochs, - model, optim, train_dataloader - ) - - self.valid_dataloader = valid_dataloader - self.eval_dataloader = eval_dataloader + super().__init__(strategy, max_epochs, model, optim) self.loss_fn = loss_fn self.scheduler = lr_scheduler @@ -55,7 +43,7 @@ def __init__( def _eval(self, epoch): if self.eval_dataloader is not None: self.model.eval() - dist, on, cnt = 0, 0 , 0 + dist, on, cnt = 0, 0, 0 with torch.no_grad(): for chosen_ids, c_mask, reject_ids, r_mask in self.eval_dataloader: chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device()) @@ -74,7 +62,7 @@ def _eval(self, epoch): if is_rank_0(): log = pd.DataFrame( - [[(epoch + 1) * len(self.train_dataloader) , + [[(epoch + 1) * len(self.train_dataloader), self.loss.item(), self.dist, self.acc]], columns=['step', 'loss', 'dist', 'acc'] ) @@ -105,6 +93,19 @@ def _train(self, epoch): step_bar.update() step_bar.close() - def _before_fit(self): + def _before_fit(self, + train_dataloader: DataLoader, + valid_dataloader: DataLoader, + eval_dataloader: DataLoader): + """ + Args: + train_dataloader (DataLoader): the dataloader to use for training + valid_dataloader (DataLoader): the dataloader to use for validation + eval_dataloader (DataLoader): the dataloader to use for evaluation + """ super()._before_fit() self.datetime = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + + self.train_dataloader = train_dataloader + self.valid_dataloader = valid_dataloader + self.eval_dataloader = eval_dataloader diff --git a/applications/Chat/coati/trainer/sft.py b/applications/Chat/coati/trainer/sft.py index f437bf2688ad..12c51d7a80c3 100644 --- a/applications/Chat/coati/trainer/sft.py +++ b/applications/Chat/coati/trainer/sft.py @@ -9,6 +9,8 @@ from torch.optim.lr_scheduler import _LRScheduler from torch.utils.data import DataLoader +from colossalai.logging import DistributedLogger + from .base import SLTrainer from .strategies import ColossalAIStrategy, Strategy from .utils import is_rank_0, to_device @@ -23,8 +25,6 @@ class SFTTrainer(SLTrainer): strategy (Strategy): the strategy to use for training optim(Optimizer): the optimizer to use for training lr_scheduler(_LRScheduler): the lr scheduler to use for training - train_dataloader: the dataloader to use for training - eval_dataloader: the dataloader to use for evaluation max_epochs (int, defaults to 2): the number of epochs to train accumulation_steps (int, defaults to 8): the number of steps to accumulate gradients """ @@ -35,8 +35,6 @@ def __init__( strategy: Strategy, optim: Optimizer, lr_scheduler: _LRScheduler, - train_dataloader: DataLoader, - eval_dataloader: Optional[DataLoader] = None, max_epochs: int = 2, accumulation_steps: int = 8, ) -> None: @@ -45,12 +43,7 @@ def __init__( assert not isinstance(strategy.plugin, GeminiPlugin), \ "Accumulation steps are not supported in stage 3 of ColossalAI" - super().__init__( - strategy, max_epochs, - model, optim, train_dataloader - ) - - self.eval_dataloader = eval_dataloader + super().__init__(strategy, max_epochs, model, optim) self.accumulation_steps = accumulation_steps self.scheduler = lr_scheduler @@ -106,9 +99,18 @@ def _eval(self, epoch: int): self.logger.info(f'Eval Epoch {epoch}/{self.max_epochs} loss {loss_mean}') def _before_fit(self, - logger, - use_wandb: bool = False - ): + train_dataloader: DataLoader, + eval_dataloader: Optional[DataLoader] = None, + logger: Optional[DistributedLogger] = None, + use_wandb: bool = False): + """ + Args: + train_dataloader: the dataloader to use for training + eval_dataloader: the dataloader to use for evaluation + """ + self.train_dataloader = train_dataloader + self.eval_dataloader = eval_dataloader + self.logger = logger self.use_wandb = use_wandb if use_wandb: diff --git a/applications/Chat/examples/train_reward_model.py b/applications/Chat/examples/train_reward_model.py index 2df3bc391b9b..4a6851ab5b24 100644 --- a/applications/Chat/examples/train_reward_model.py +++ b/applications/Chat/examples/train_reward_model.py @@ -178,12 +178,11 @@ def train(args): optim=optim, lr_scheduler=lr_scheduler, loss_fn=loss_fn, - train_dataloader=train_dataloader, - valid_dataloader=valid_dataloader, - eval_dataloader=eval_dataloader, max_epochs=args.max_epochs) - trainer.fit() + trainer.fit(train_dataloader=train_dataloader, + valid_dataloader=valid_dataloader, + eval_dataloader=eval_dataloader) # save model checkpoint after fitting on only rank0 strategy.save_model(model, args.save_path, only_rank0=True) # save optimizer checkpoint on all ranks diff --git a/applications/Chat/examples/train_sft.py b/applications/Chat/examples/train_sft.py index 717eb95311fb..967b7c277c6a 100644 --- a/applications/Chat/examples/train_sft.py +++ b/applications/Chat/examples/train_sft.py @@ -170,12 +170,13 @@ def train(args): strategy=strategy, optim=optim, lr_scheduler=lr_scheduler, - train_dataloader=train_dataloader, - eval_dataloader=eval_dataloader, max_epochs=args.max_epochs, accumulation_steps=args.accumulation_steps) - trainer.fit(logger=logger, use_wandb=args.use_wandb) + trainer.fit(train_dataloader=train_dataloader, + eval_dataloader=eval_dataloader, + logger=logger, + use_wandb=args.use_wandb) # save model checkpoint after fitting on only rank0 strategy.save_pretrained(model, path=args.save_path, only_rank0=True, tokenizer=tokenizer) From 69ab71ba182f26b37473b002122f9de8dc30d3b0 Mon Sep 17 00:00:00 2001 From: CWHer Date: Tue, 27 Jun 2023 14:29:51 +0800 Subject: [PATCH 16/17] fix: move dataloader to fit fn of OnPolicyTrainer --- applications/Chat/benchmarks/benchmark_opt_lora_dummy.py | 6 +++--- applications/Chat/coati/trainer/base.py | 8 ++++++++ applications/Chat/coati/trainer/ppo.py | 7 ------- .../Chat/examples/community/peft/train_peft_prompts.py | 6 +++--- applications/Chat/examples/train_prompts.py | 6 +++--- 5 files changed, 17 insertions(+), 16 deletions(-) diff --git a/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py b/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py index eeb9fcd08873..39f2f28eca16 100644 --- a/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py +++ b/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py @@ -150,8 +150,6 @@ def main(args): initial_model, actor_optim, critic_optim, - prompt_dataloader=dataloader, - pretrain_dataloader=None, ptx_coef=0, train_batch_size=args.train_batch_size, offload_inference_models=args.offload_inference_models, @@ -164,7 +162,9 @@ def main(args): eos_token_id=tokenizer.eos_token_id, callbacks=[performance_evaluator]) - trainer.fit(num_episodes=args.num_episodes, + trainer.fit(prompt_dataloader=dataloader, + pretrain_dataloader=None, + num_episodes=args.num_episodes, num_update_steps=args.num_update_steps, num_collect_steps=args.num_collect_steps) diff --git a/applications/Chat/coati/trainer/base.py b/applications/Chat/coati/trainer/base.py index 570a839bd2e7..0e1f6136cafc 100644 --- a/applications/Chat/coati/trainer/base.py +++ b/applications/Chat/coati/trainer/base.py @@ -65,6 +65,8 @@ class OnPolicyTrainer(ABC): Args: strategy (Strategy):the strategy to use for training buffer (NaiveReplayBuffer): the buffer to collect experiences + sample_buffer (bool, defaults to False): whether to sample from buffer + dataloader_pin_memory (bool, defaults to True): whether to pin memory for data loader callbacks (List[Callback], defaults to []): the callbacks to call during training process """ @@ -153,6 +155,8 @@ def _update_phase(self, update_step: int): self._on_learn_epoch_end(update_step) def fit(self, + prompt_dataloader: DataLoader, + pretrain_dataloader: DataLoader, num_episodes: int, num_collect_steps: int, num_update_steps: int, @@ -161,10 +165,14 @@ def fit(self, The main training loop of on-policy rl trainers. Args: + prompt_dataloader (DataLoader): the dataloader to use for prompt data + pretrain_dataloader (DataLoader): the dataloader to use for pretrain data num_episodes (int): the number of episodes to train num_collect_steps (int): the number of collect steps per episode num_update_steps (int): the number of update steps per episode """ + self.prompt_dataloader = prompt_dataloader + self.pretrain_dataloader = pretrain_dataloader with self._fit_ctx(): for episode in tqdm.trange(num_episodes, diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index ae23b5194717..4b15520095d7 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -45,8 +45,6 @@ class PPOTrainer(OnPolicyTrainer): initial_model (Actor): the initial model in rlhf algorithm to generate reference logics to limit the update of actor actor_optim (Optimizer): the optimizer to use for actor model critic_optim (Optimizer): the optimizer to use for critic model - prompt_dataloader (DataLoader): the dataloader to use for prompt data - pretrain_dataloader (DataLoader): the dataloader to use for pretrain data kl_coef (float, defaults to 0.1): the coefficient of kl divergence loss train_batch_size (int, defaults to 8): the batch size to use for training buffer_limit (int, defaults to 0): the max_size limitation of buffer @@ -70,8 +68,6 @@ def __init__(self, initial_model: Actor, actor_optim: Optimizer, critic_optim: Optimizer, - prompt_dataloader: DataLoader, - pretrain_dataloader: DataLoader, kl_coef: float = 0.1, ptx_coef: float = 0.9, train_batch_size: int = 8, @@ -102,9 +98,6 @@ def __init__(self, self.experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, kl_coef) self.offload_inference_models = offload_inference_models - self.prompt_dataloader = prompt_dataloader - self.pretrain_dataloader = pretrain_dataloader - self.actor = actor self.critic = critic diff --git a/applications/Chat/examples/community/peft/train_peft_prompts.py b/applications/Chat/examples/community/peft/train_peft_prompts.py index 2ab22ff289b2..00ed7aa36257 100644 --- a/applications/Chat/examples/community/peft/train_peft_prompts.py +++ b/applications/Chat/examples/community/peft/train_peft_prompts.py @@ -169,8 +169,6 @@ def tokenize_fn(texts): initial_model, actor_optim, critic_optim, - prompt_dataloader=prompt_dataloader, - pretrain_dataloader=pretrain_dataloader, kl_coef=args.kl_coef, ptx_coef=args.ptx_coef, train_batch_size=args.train_batch_size, @@ -184,7 +182,9 @@ def tokenize_fn(texts): eos_token_id=tokenizer.eos_token_id, ) - trainer.fit(num_episodes=args.num_episodes, + trainer.fit(prompt_dataloader=prompt_dataloader, + pretrain_dataloader=pretrain_dataloader, + num_episodes=args.num_episodes, num_update_steps=args.num_update_steps, num_collect_steps=args.num_collect_steps) diff --git a/applications/Chat/examples/train_prompts.py b/applications/Chat/examples/train_prompts.py index 874b2fe630e1..a9bc0e532e5d 100644 --- a/applications/Chat/examples/train_prompts.py +++ b/applications/Chat/examples/train_prompts.py @@ -175,8 +175,6 @@ def main(args): initial_model, actor_optim, critic_optim, - prompt_dataloader=prompt_dataloader, - pretrain_dataloader=pretrain_dataloader, kl_coef=args.kl_coef, ptx_coef=args.ptx_coef, train_batch_size=args.train_batch_size, @@ -190,7 +188,9 @@ def main(args): offload_inference_models=args.strategy != 'colossalai_gemini' ) - trainer.fit(num_episodes=args.num_episodes, + trainer.fit(prompt_dataloader=prompt_dataloader, + pretrain_dataloader=pretrain_dataloader, + num_episodes=args.num_episodes, num_collect_steps=args.num_collect_steps, num_update_steps=args.num_update_steps) From 128ec4fa00858b2abbb75b95b6988f724db30c55 Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 28 Jun 2023 18:49:24 +0800 Subject: [PATCH 17/17] fix: modify usage of prompt and pretrain dataloader --- applications/Chat/coati/trainer/base.py | 6 +++--- applications/Chat/coati/trainer/ppo.py | 4 ++-- applications/Chat/coati/trainer/utils.py | 27 ++++++++++++++++++++++++ 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/applications/Chat/coati/trainer/base.py b/applications/Chat/coati/trainer/base.py index 0e1f6136cafc..13571cdcc23a 100644 --- a/applications/Chat/coati/trainer/base.py +++ b/applications/Chat/coati/trainer/base.py @@ -11,7 +11,7 @@ from .callbacks import Callback from .strategies import Strategy -from .utils import is_rank_0 +from .utils import CycledDataLoader, is_rank_0 class SLTrainer(ABC): @@ -171,8 +171,8 @@ def fit(self, num_collect_steps (int): the number of collect steps per episode num_update_steps (int): the number of update steps per episode """ - self.prompt_dataloader = prompt_dataloader - self.pretrain_dataloader = pretrain_dataloader + self.prompt_dataloader = CycledDataLoader(prompt_dataloader) + self.pretrain_dataloader = CycledDataLoader(pretrain_dataloader) with self._fit_ctx(): for episode in tqdm.trange(num_episodes, diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index 4b15520095d7..451abe2a7438 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -112,7 +112,7 @@ def __init__(self, self.device = get_current_device() def _make_experience(self, collect_step: int) -> Experience: - prompts = next(iter(self.prompt_dataloader)) # sample a prompt + prompts = self.prompt_dataloader.next() if self.offload_inference_models: # TODO(ver217): this may be controlled by strategy if they are prepared by strategy self.experience_maker.initial_model.to(self.device) @@ -138,7 +138,7 @@ def _training_step(self, experience: Experience) -> Dict[str, float]: # ptx loss if self.ptx_coef != 0: - batch = next(iter(self.pretrain_dataloader)) + batch = self.pretrain_dataloader.next() batch = to_device(batch, self.device) ptx_log_probs = self.actor(batch['input_ids'], attention_mask=batch['attention_mask'])['logits'] diff --git a/applications/Chat/coati/trainer/utils.py b/applications/Chat/coati/trainer/utils.py index 9cccb5c92603..c9fc8d0fe19f 100644 --- a/applications/Chat/coati/trainer/utils.py +++ b/applications/Chat/coati/trainer/utils.py @@ -3,6 +3,33 @@ import torch import torch.distributed as dist from torch.utils._pytree import tree_map +from torch.utils.data import DataLoader + + +class CycledDataLoader: + """ + Why do we need this class? + In version 4da324cd60, "prompts = next(iter(self.prompt_dataloader))" is used to sample a batch of prompts/pretrain. + However, this may be inefficient due to frequent re-initialization of the dataloader. (re-initialize workers...) + NOTE: next(iter(dataloader)) is not equivalent to for batch in dataloader: break, it causes slightly different behavior. + """ + + def __init__(self, + dataloader: DataLoader, + ) -> None: + self.dataloader = dataloader + + self.count = 0 + self.dataloader_iter = iter(dataloader) + + def next(self): + self.count += 1 + try: + return next(self.dataloader_iter) + except StopIteration: + self.count = 0 + self.dataloader_iter = iter(self.dataloader) + return next(self.dataloader_iter) def is_rank_0() -> bool: