From 975f4b37194387ff3f52daf421a426c5be7fea9d Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 20 Sep 2023 17:38:15 +0800 Subject: [PATCH 01/20] feat: modify lora merge weights fn --- applications/Chat/coati/models/lora.py | 70 ++++++++++++++------------ 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/applications/Chat/coati/models/lora.py b/applications/Chat/coati/models/lora.py index 2114913e107b..e9bd7b2ed8f0 100644 --- a/applications/Chat/coati/models/lora.py +++ b/applications/Chat/coati/models/lora.py @@ -1,4 +1,6 @@ +import dataclasses import math +import warnings from typing import Optional import loralib as lora @@ -7,6 +9,14 @@ import torch.nn.functional as F +@dataclasses.dataclass +class LoRAManager: + merge_weights: bool = False + + +LORA_MANAGER = LoRAManager() + + class LoraLinear(lora.LoRALayer, nn.Module): """Replace in-place ops to out-of-place ops to fit gemini. Convert a torch.nn.Linear to LoraLinear.""" @@ -17,13 +27,11 @@ def __init__( r: int = 0, lora_alpha: int = 1, lora_dropout: float = 0.0, - fan_in_fan_out: bool = False, # Set this to True if the layer to replace stores weight like (fan_in, fan_out) - merge_weights: bool = True, + # Set this to True if the layer to replace stores weight like (fan_in, fan_out) + fan_in_fan_out: bool = False, ): nn.Module.__init__(self) - lora.LoRALayer.__init__( - self, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, merge_weights=merge_weights - ) + lora.LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, merge_weights=False) self.weight = weight self.bias = bias @@ -53,31 +61,31 @@ def train(self, mode: bool = True): def T(w): return w.T if self.fan_in_fan_out else w - nn.Module.train(self, mode) - if self.merge_weights and self.merged: - # Make sure that the weights are not merged - if self.r > 0: - if not hasattr(self, "lora_A") or not hasattr(self, "lora_B"): - # FIXME(csric): temporary fix - self.lora_A = nn.Parameter(self.weight.new_empty((self.r, self.in_features))) - self.lora_B = nn.Parameter(self.weight.new_empty((self.out_features, self.r))) - self.reset_parameters() - else: - self.weight.data -= T(self.lora_B @ self.lora_A) * self.scaling - self.merged = False - - def eval(self): - def T(w): - return w.T if self.fan_in_fan_out else w - - nn.Module.eval(self) - if self.merge_weights and not self.merged: - # Merge the weights and mark it - if self.r > 0: - self.weight.data += T(self.lora_B @ self.lora_A) * self.scaling - delattr(self, "lora_A") - delattr(self, "lora_B") - self.merged = True + self.training = mode + if LORA_MANAGER.merge_weights: + if mode and self.merged: + warnings.warn("Invoke module.train() would unmerge LoRA weights.") + raise NotImplementedError("LoRA unmerge is not tested.") + # Make sure that the weights are not merged + if self.r > 0: + if not hasattr(self, "lora_A") or not hasattr(self, "lora_B"): + # FIXME(csric): temporary fix + self.lora_A = nn.Parameter(self.weight.new_empty((self.r, self.in_features))) + self.lora_B = nn.Parameter(self.weight.new_empty((self.out_features, self.r))) + self.reset_parameters() + else: + self.weight.data -= T(self.lora_B @ self.lora_A) * self.scaling + self.merged = False + elif not mode and not self.merged: + warnings.warn("Invoke module.eval() would merge LoRA weights.") + # Merge the weights and mark it + if self.r > 0: + self.weight.data += T(self.lora_B @ self.lora_A) * self.scaling + delattr(self, "lora_A") + delattr(self, "lora_B") + self.merged = True + + return self def forward(self, x: torch.Tensor): def T(w): @@ -96,7 +104,7 @@ def _lora_linear_wrapper(linear: nn.Linear, lora_rank: int) -> LoraLinear: assert ( lora_rank <= linear.in_features ), f"LoRA rank ({lora_rank}) must be less than or equal to in features ({linear.in_features})" - lora_linear = LoraLinear(linear.weight, linear.bias, r=lora_rank, merge_weights=False) + lora_linear = LoraLinear(linear.weight, linear.bias, r=lora_rank) return lora_linear From 352c5bd575d9080a54eb6645c38e77ff6b3afd4e Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 20 Sep 2023 17:38:53 +0800 Subject: [PATCH 02/20] feat: add lora merge weights config --- applications/Chat/examples/train_prompts.py | 7 +++++++ applications/Chat/examples/train_reward_model.py | 8 ++++++++ applications/Chat/examples/train_sft.py | 7 +++++++ 3 files changed, 22 insertions(+) diff --git a/applications/Chat/examples/train_prompts.py b/applications/Chat/examples/train_prompts.py index de2a33263040..a8ab15eebfa5 100644 --- a/applications/Chat/examples/train_prompts.py +++ b/applications/Chat/examples/train_prompts.py @@ -192,6 +192,12 @@ def main(args): use_wandb=args.use_wandb, ) + if args.lora_rank > 0 and args.merge_lora_weights: + from coati.models.lora import LORA_MANAGER + + # NOTE: set model to eval to merge LoRA weights + LORA_MANAGER.merge_weights = True + actor.eval() # save model checkpoint after fitting strategy.save_model(actor, args.save_path, only_rank0=True) # save optimizer checkpoint on all ranks @@ -227,6 +233,7 @@ def main(args): parser.add_argument("--ptx_batch_size", type=int, default=1) parser.add_argument("--experience_batch_size", type=int, default=8) parser.add_argument("--lora_rank", type=int, default=0, help="low-rank adaptation matrices rank") + parser.add_argument("--merge_lora_weights", type=bool, default=True) parser.add_argument("--lr", type=float, default=1e-7) parser.add_argument("--kl_coef", type=float, default=0.1) parser.add_argument("--ptx_coef", type=float, default=0.9) diff --git a/applications/Chat/examples/train_reward_model.py b/applications/Chat/examples/train_reward_model.py index c9095b365884..c1be51f2f587 100644 --- a/applications/Chat/examples/train_reward_model.py +++ b/applications/Chat/examples/train_reward_model.py @@ -157,6 +157,13 @@ def train(args): log_dir=args.log_dir, use_wandb=args.use_wandb, ) + + if args.lora_rank > 0 and args.merge_lora_weights: + from coati.models.lora import LORA_MANAGER + + # NOTE: set model to eval to merge LoRA weights + LORA_MANAGER.merge_weights = True + model.eval() # save model checkpoint after fitting on only rank0 strategy.save_model(model, args.save_path, only_rank0=True) # save optimizer checkpoint on all ranks @@ -186,6 +193,7 @@ def train(args): parser.add_argument("--batch_size", type=int, default=1) parser.add_argument("--max_len", type=int, default=512) parser.add_argument("--lora_rank", type=int, default=0, help="low-rank adaptation matrices rank") + parser.add_argument("--merge_lora_weights", type=bool, default=True) parser.add_argument("--lr", type=float, default=9e-6) parser.add_argument("--loss_fn", type=str, default="log_sig", choices=["log_sig", "log_exp"]) parser.add_argument("--log_dir", default="logs", type=str) diff --git a/applications/Chat/examples/train_sft.py b/applications/Chat/examples/train_sft.py index a34661762258..4f36791be3cf 100644 --- a/applications/Chat/examples/train_sft.py +++ b/applications/Chat/examples/train_sft.py @@ -177,6 +177,12 @@ def train(args): use_wandb=args.use_wandb, ) + if args.lora_rank > 0 and args.merge_lora_weights: + from coati.models.lora import LORA_MANAGER + + # NOTE: set model to eval to merge LoRA weights + LORA_MANAGER.merge_weights = True + model.eval() # save model checkpoint after fitting on only rank0 strategy.save_pretrained(model, path=args.save_path, only_rank0=True, tokenizer=tokenizer) # save optimizer checkpoint on all ranks @@ -204,6 +210,7 @@ def train(args): parser.add_argument("--batch_size", type=int, default=4) parser.add_argument("--max_len", type=int, default=512) parser.add_argument("--lora_rank", type=int, default=0, help="low-rank adaptation matrices rank") + parser.add_argument("--merge_lora_weights", type=bool, default=True) parser.add_argument("--lr", type=float, default=5e-6) parser.add_argument("--accumulation_steps", type=int, default=8) parser.add_argument("--log_dir", default="logs", type=str) From a57287a6066aa46bb630171b0c6708b572c4693d Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 11:29:19 +0800 Subject: [PATCH 03/20] feat: update Experience class --- .../Chat/coati/experience_buffer/utils.py | 58 +++++++----------- .../Chat/coati/experience_maker/base.py | 60 ++++++++++++------- 2 files changed, 58 insertions(+), 60 deletions(-) diff --git a/applications/Chat/coati/experience_buffer/utils.py b/applications/Chat/coati/experience_buffer/utils.py index baedbebd184f..23ff205c1fe0 100644 --- a/applications/Chat/coati/experience_buffer/utils.py +++ b/applications/Chat/coati/experience_buffer/utils.py @@ -1,47 +1,45 @@ from dataclasses import dataclass -from typing import List, Optional +from typing import List import torch -import torch.nn.functional as F from coati.experience_maker.base import Experience @dataclass class BufferItem: - """BufferItem is an item of experience data. + """BufferItem is an item of `Experience` data. Shapes of each tensor: - sequences: (S) - action_log_probs: (A) - values: (1) - reward: (1) - advantages: (1) - attention_mask: (S) - action_mask: (A) + sequences: (S) + attention_mask: (S) + action_mask: (A) + step_mask: (N) + action_log_probs: (A) + values: (N) + returns: (N) + advantages: (N) - "A" is the number of actions. """ sequences: torch.Tensor + attention_mask: torch.LongTensor + action_mask: torch.BoolTensor + step_mask: torch.BoolTensor action_log_probs: torch.Tensor values: torch.Tensor - reward: torch.Tensor + returns: torch.Tensor advantages: torch.Tensor - attention_mask: Optional[torch.LongTensor] - action_mask: Optional[torch.BoolTensor] def split_experience_batch(experience: Experience) -> List[BufferItem]: batch_size = experience.sequences.size(0) batch_kwargs = [{} for _ in range(batch_size)] - keys = ("sequences", "action_log_probs", "values", "reward", "advantages", "attention_mask", "action_mask") + keys = ('sequences', 'attention_mask', 'action_mask', 'step_mask', + 'action_log_probs', 'values', 'returns', 'advantages') for key in keys: value = getattr(experience, key) - if isinstance(value, torch.Tensor): - vals = torch.unbind(value) - else: - # None - vals = [value for _ in range(batch_size)] + assert isinstance(value, torch.Tensor) + vals = torch.unbind(value) assert batch_size == len(vals) for i, v in enumerate(vals): batch_kwargs[i][key] = v @@ -49,26 +47,12 @@ def split_experience_batch(experience: Experience) -> List[BufferItem]: return items -def _zero_pad_sequences(sequences: List[torch.Tensor], side: str = "left") -> torch.Tensor: - assert side in ("left", "right") - max_len = max(seq.size(0) for seq in sequences) - padded_sequences = [] - for seq in sequences: - pad_len = max_len - seq.size(0) - padding = (pad_len, 0) if side == "left" else (0, pad_len) - padded_sequences.append(F.pad(seq, padding)) - return torch.stack(padded_sequences, dim=0) - - def make_experience_batch(items: List[BufferItem]) -> Experience: kwargs = {} - to_pad_keys = set(("action_log_probs", "action_mask")) - keys = ("sequences", "action_log_probs", "values", "reward", "advantages", "attention_mask", "action_mask") + keys = ('sequences', 'attention_mask', 'action_mask', 'step_mask', + 'action_log_probs', 'values', 'returns', 'advantages') for key in keys: vals = [getattr(item, key) for item in items] - if key in to_pad_keys: - batch_data = _zero_pad_sequences(vals) - else: - batch_data = torch.stack(vals, dim=0) + batch_data = torch.stack(vals, dim=0) kwargs[key] = batch_data return Experience(**kwargs) diff --git a/applications/Chat/coati/experience_maker/base.py b/applications/Chat/coati/experience_maker/base.py index 0731f6e0f97f..96b4b00932f8 100644 --- a/applications/Chat/coati/experience_maker/base.py +++ b/applications/Chat/coati/experience_maker/base.py @@ -1,6 +1,5 @@ from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Optional import torch from coati.models.base import Actor, Critic, RewardModel @@ -9,51 +8,66 @@ @dataclass class Experience: """Experience is a batch of data. - These data should have the sequence length and number of actions. Left padding for sequences is applied. + "B" is the batch size. + "S" is the sequence length. + "A" is the number of actions. + "C" is the chunk size. + "N" is the number of MDP steps. + NOTE: N = A / C, each Experience contains N MDP steps ([s0, a0], [s1, a1], ...), + sequences = |pad|prompt|a0|a1|a2|...|pad|, + s0 = prompt, s1 = prompt + a0, s2 = prompt + a0 + a1, ... + FIXME(cwher): store N steps in a Experience can be computationally efficient, + but may be different from uniform sampling (shuffle all steps and sample). + Shapes of each tensor: - sequences: (B, S) - action_log_probs: (B, A) - values: (B) - reward: (B) - advantages: (B) - attention_mask: (B, S) - action_mask: (B, A) + sequences: (B, S) + attention_mask: (B, S) + action_mask: (B, A) + step_mask: (B, N) + action_log_probs: (B, A) + values: (B, N), output of old critic model + returns: (B, N), result of GAE + advantages: (B, N), result of GAE - "A" is the number of actions. + e.g., + sequences = |pad|prompt|response|pad| + attention_mask = |0|1|1|0| + action_mask = |1|0| (for response) + + NOTE: `Experience` are split into `BufferItem`s when added to buffer. """ sequences: torch.Tensor + attention_mask: torch.LongTensor + action_mask: torch.BoolTensor + step_mask: torch.BoolTensor action_log_probs: torch.Tensor values: torch.Tensor - reward: torch.Tensor + returns: torch.Tensor advantages: torch.Tensor - attention_mask: Optional[torch.LongTensor] - action_mask: Optional[torch.BoolTensor] @torch.no_grad() def to_device(self, device: torch.device) -> None: self.sequences = self.sequences.to(device) + self.attention_mask = self.attention_mask.to(device) + self.action_mask = self.action_mask.to(device) + self.step_mask = self.step_mask.to(device) self.action_log_probs = self.action_log_probs.to(device) self.values = self.values.to(device) - self.reward = self.reward.to(device) + self.returns = self.returns.to(device) self.advantages = self.advantages.to(device) - if self.attention_mask is not None: - self.attention_mask = self.attention_mask.to(device) - if self.action_mask is not None: - self.action_mask = self.action_mask.to(device) def pin_memory(self): self.sequences = self.sequences.pin_memory() + self.attention_mask = self.attention_mask.pin_memory() + self.action_mask = self.action_mask.pin_memory() + self.step_mask = self.step_mask.pin_memory() self.action_log_probs = self.action_log_probs.pin_memory() self.values = self.values.pin_memory() - self.reward = self.reward.pin_memory() + self.returns = self.returns.pin_memory() self.advantages = self.advantages.pin_memory() - if self.attention_mask is not None: - self.attention_mask = self.attention_mask.pin_memory() - if self.action_mask is not None: - self.action_mask = self.action_mask.pin_memory() return self From 326a3e33523e2e1a22196df876b6462662aa6a7b Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 11:31:13 +0800 Subject: [PATCH 04/20] to: remove NaiveExperienceMaker --- .../Chat/coati/experience_maker/__init__.py | 7 +- .../Chat/coati/experience_maker/naive.py | 71 ------------------- 2 files changed, 5 insertions(+), 73 deletions(-) delete mode 100644 applications/Chat/coati/experience_maker/naive.py diff --git a/applications/Chat/coati/experience_maker/__init__.py b/applications/Chat/coati/experience_maker/__init__.py index 06452292e77c..abe8b92c3ceb 100644 --- a/applications/Chat/coati/experience_maker/__init__.py +++ b/applications/Chat/coati/experience_maker/__init__.py @@ -1,4 +1,7 @@ from .base import Experience, ExperienceMaker -from .naive import NaiveExperienceMaker +from .chunked import ChunkedExperienceMaker -__all__ = ["Experience", "ExperienceMaker", "NaiveExperienceMaker"] +__all__ = [ + 'Experience', 'ExperienceMaker', + 'ChunkedExperienceMaker', +] diff --git a/applications/Chat/coati/experience_maker/naive.py b/applications/Chat/coati/experience_maker/naive.py deleted file mode 100644 index 941e1994b148..000000000000 --- a/applications/Chat/coati/experience_maker/naive.py +++ /dev/null @@ -1,71 +0,0 @@ -import torch -import torch.nn.functional as F -from coati.models.base import Actor, Critic, RewardModel -from coati.models.generation import generate -from coati.models.utils import calc_action_log_probs, compute_reward -from transformers import PreTrainedTokenizer - -from .base import Experience, ExperienceMaker - - -class NaiveExperienceMaker(ExperienceMaker): - """ - Naive experience maker. - """ - - def __init__( - self, - actor: Actor, - critic: Critic, - reward_model: RewardModel, - initial_model: Actor, - tokenizer: PreTrainedTokenizer, - kl_coef: float = 0.1, - ) -> None: - super().__init__(actor, critic, reward_model, initial_model) - self.tokenizer = tokenizer - self.kl_coef = kl_coef - - @torch.no_grad() - def make_experience(self, input_ids: torch.Tensor, **generate_kwargs) -> Experience: - self.actor.eval() - self.critic.eval() - self.initial_model.eval() - self.reward_model.eval() - - # generate sequences - sequences = generate(self.actor, input_ids, self.tokenizer, **generate_kwargs) - - # calculate auxiliary tensors - attention_mask = None - pad_token_id = self.tokenizer.pad_token_id - if pad_token_id is not None: - attention_mask = sequences.not_equal(pad_token_id).to(dtype=torch.long, device=sequences.device) - - input_len = input_ids.size(1) - eos_token_id = self.tokenizer.eos_token_id - if eos_token_id is None: - action_mask = torch.ones_like(sequences, dtype=torch.bool) - else: - # left padding may be applied, only mask action - action_mask = (sequences[:, input_len:] == eos_token_id).cumsum(dim=-1) == 0 - action_mask = F.pad(action_mask, (1 + input_len, -1), value=True) # include eos token and input - action_mask[:, :input_len] = False - action_mask = action_mask[:, 1:] - action_mask = action_mask[:, -(sequences.size(1) - input_len) :] - num_actions = action_mask.size(1) - - actor_output = self.actor(sequences, attention_mask)["logits"] - action_log_probs = calc_action_log_probs(actor_output, sequences, num_actions) - base_model_output = self.initial_model(sequences, attention_mask)["logits"] - base_action_log_probs = calc_action_log_probs(base_model_output, sequences, num_actions) - value = self.critic(sequences, attention_mask) - r = self.reward_model(sequences, attention_mask) - reward = compute_reward(r, self.kl_coef, action_log_probs, base_action_log_probs, action_mask=action_mask) - - advantage = reward - value - # TODO(ver217): maybe normalize adv - if advantage.ndim == 1: - advantage = advantage.unsqueeze(-1) - - return Experience(sequences, action_log_probs, value, reward, advantage, attention_mask, action_mask) From 8c4d9d7748f410603285514ecc2541908671efc6 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 12:15:39 +0800 Subject: [PATCH 05/20] test: update experience tests --- applications/Chat/tests/test_experience.py | 30 +++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/applications/Chat/tests/test_experience.py b/applications/Chat/tests/test_experience.py index a9591259800d..ef97e251ac7f 100644 --- a/applications/Chat/tests/test_experience.py +++ b/applications/Chat/tests/test_experience.py @@ -5,7 +5,7 @@ import torch import torch.distributed as dist from coati.experience_buffer import NaiveExperienceBuffer -from coati.experience_maker import NaiveExperienceMaker +from coati.experience_maker import ChunkedExperienceMaker from coati.models.base import RewardModel from coati.models.gpt import GPTActor, GPTCritic from coati.trainer.ppo import _set_default_generate_kwargs @@ -63,7 +63,7 @@ def __init__(self): self.pad_token_id = 0 tokenizer = MockTokenizer() - experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, tokenizer) + experience_maker = ChunkedExperienceMaker(actor, critic, reward_model, initial_model, tokenizer) data_buffer = NaiveExperienceBuffer(SAMPLE_BATCH_SIZE, cpu_offload=False) generate_kwargs = dict(do_sample=True, max_length=16) @@ -72,16 +72,21 @@ def __init__(self): # experience of all ranks should be the same for _ in range(2): data = get_data(EXPERIENCE_BATCH_SIZE) - assert gather_and_equal(data["input_ids"]) - assert gather_and_equal(data["attention_mask"]) - experience = experience_maker.make_experience(**data, do_sample=True, max_length=16) + assert gather_and_equal(data['input_ids']) + assert gather_and_equal(data['attention_mask']) + experience = experience_maker.make_experience(**data, **generate_kwargs) + num_actions = experience.action_log_probs.size(1) + chunk_size = experience_maker.chunk_size + num_steps = experience.advantages.size(1) + assert num_steps == (num_actions + chunk_size - 1) // chunk_size assert gather_and_equal(experience.sequences) + assert gather_and_equal(experience.attention_mask) + assert gather_and_equal(experience.action_mask) + assert gather_and_equal(experience.step_mask) assert gather_and_equal(experience.action_log_probs) assert gather_and_equal(experience.values) - assert gather_and_equal(experience.reward) + assert gather_and_equal(experience.returns) assert gather_and_equal(experience.advantages) - assert gather_and_equal(experience.action_mask) - assert gather_and_equal(experience.attention_mask) data_buffer.append(experience) # data buffer's data should be the same @@ -89,12 +94,13 @@ def __init__(self): assert gather_and_equal(buffer_size) for item in data_buffer.items: assert gather_and_equal(item.sequences) + assert gather_and_equal(item.attention_mask) + assert gather_and_equal(item.action_mask) + assert gather_and_equal(item.step_mask) assert gather_and_equal(item.action_log_probs) assert gather_and_equal(item.values) - assert gather_and_equal(item.reward) + assert gather_and_equal(item.returns) assert gather_and_equal(item.advantages) - assert gather_and_equal(item.action_mask) - assert gather_and_equal(item.attention_mask) # dataloader of each rank should have the same size and different batch dataloader = strategy.setup_dataloader(data_buffer) @@ -104,7 +110,7 @@ def __init__(self): assert not gather_and_equal(experience.sequences) assert not gather_and_equal(experience.action_log_probs) assert not gather_and_equal(experience.values) - assert not gather_and_equal(experience.reward) + assert not gather_and_equal(experience.returns) assert not gather_and_equal(experience.advantages) # action mask and attention mask may be same From 0a1ec076814a8bdf079a7189af6ca1ad924dc5bf Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:02:29 +0800 Subject: [PATCH 06/20] feat: add ChunkedExperienceMaker --- .../Chat/coati/experience_maker/chunked.py | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 applications/Chat/coati/experience_maker/chunked.py diff --git a/applications/Chat/coati/experience_maker/chunked.py b/applications/Chat/coati/experience_maker/chunked.py new file mode 100644 index 000000000000..6f859c4478e1 --- /dev/null +++ b/applications/Chat/coati/experience_maker/chunked.py @@ -0,0 +1,186 @@ +from typing import Dict, Tuple + +import torch +import torch.nn.functional as F +from coati.models.base import Actor, Critic, RewardModel +from coati.models.generation import generate +from coati.models.utils import calc_action_log_probs +from transformers import PreTrainedTokenizer + +from .base import Experience, ExperienceMaker + + +class ChunkedExperienceMaker(ExperienceMaker): + """ + Chunked experience maker. + NOTE: Treat every `chunk_size` tokens chunk as a step in MDP. + chunk_size = 1: every token is a step + chunk_size = seq_len: all the tokens constitute a giant step + """ + + def __init__(self, + actor: Actor, + critic: Critic, + reward_model: RewardModel, + initial_model: Actor, + tokenizer: PreTrainedTokenizer, + chunk_size: int = 8, + kl_coef: float = 0.1, + gamma: float = 0.99, + gae_lambda: float = 0.95, + ) -> None: + super().__init__(actor, critic, reward_model, initial_model) + self.tokenizer = tokenizer + self.chunk_size = chunk_size + self.kl_coef = kl_coef + self.gamma = gamma + self.gae_lambda = gae_lambda + + @staticmethod + @torch.no_grad() + def compute_advantages_and_returns( + values: torch.Tensor, + rewards: torch.Tensor, + end_flags: torch.Tensor, + num_steps: int, + gamma: float, + gae_lambda: float, + bootstrap_value: bool = True, + ) -> Tuple[torch.Tensor, torch.Tensor]: + """ + This is modified from https://github.com/CarperAI/trlx/blob/main/trlx/models/modeling_ppo.py. + + Function that computes advantages and returns from rewards and values. + Calculated as in the original PPO paper: https://arxiv.org/abs/1707.06347 + Note that rewards may include a KL divergence loss term. + + Advantages looks like this: + Adv1 = R1 + γ * λ * R2 + γ^2 * λ^2 * R3 + ... + - V1 + γ * (1 - λ) V2 + γ^2 * λ * (1 - λ) V3 + ... + + Returns looks like this: + Ret1 = R1 + γ * λ * R2 + γ^2 * λ^2 * R3 + ... + + γ * (1 - λ) V2 + γ^2 * λ * (1 - λ) V3 + ... + + Args: + values: Tensor of shape (batch_size, num_steps + 1), + V(s_0), V(s_1), ..., V(s_N), V(s_N+1) + rewards: Tensor of shape (batch_size, num_steps), + r(s_0, a_0), r(s_1, a_1), ..., r(s_N, a_N) + end_flags: Tensor of shape (batch_size, num_steps), + NOTE: end_flags[i] indicates whether the s_{i+1} is the terminal state. + num_steps: Length of MDP + gamma: γ, discount factor + gae_lambda: λ, GAE parameter + bootstrap_value: Whether to bootstrap the terminal value or not. + i.e., V(s_T+1) = 0 if bootstrap_value is False, otherwise V(s_T+1) = Critic(s_T+1) + This may be useful when the MDP is truncated by the max length. + """ + + last_gae_lambda = 0 + advantages = torch.zeros_like(rewards) + for t in reversed(range(num_steps)): + next_non_terminal = torch.logical_not(end_flags[:, t]) + next_values = values[:, t + 1] * (next_non_terminal + bootstrap_value).bool() + delta = rewards[:, t] + gamma * next_values - values[:, t] + last_gae_lambda = delta + gamma * gae_lambda * next_non_terminal * last_gae_lambda + advantages[:, t] = last_gae_lambda + returns = advantages + values[:, :-1] + return advantages, returns + + @torch.no_grad() + def make_experience(self, + input_ids: torch.Tensor, + attention_mask: torch.Tensor, + **generate_kwargs + ) -> Tuple[Experience, Dict]: + self.actor.eval() + self.critic.eval() + self.initial_model.eval() + self.reward_model.eval() + + # generate sequences + sequences = generate(self.actor, + input_ids, + tokenizer=self.tokenizer, + attention_mask=attention_mask, + **generate_kwargs) + + # calculate auxiliary tensors + eos_token_id = self.tokenizer.eos_token_id + pad_token_id = self.tokenizer.pad_token_id + assert eos_token_id is not None and pad_token_id is not None, \ + "eos_token_id and pad_token_id must be specified in generate_kwargs" + input_len = input_ids.size(1) + num_actions = sequences.size(1) - input_len + num_steps = (num_actions + self.chunk_size - 1) // self.chunk_size + + # if action is |action|eos|pad|, then action_mask is |1|1|0| + action_mask = (sequences[:, input_len:] == eos_token_id).cumsum(dim=-1) == 0 + action_mask = F.pad(action_mask, (1, -1), value=True) # shift right by 1 to include eos token + step_mask = F.pad(action_mask, (0, (self.chunk_size - num_actions) % + self.chunk_size), value=False).view(-1, self.chunk_size) + step_mask = (step_mask.sum(dim=-1) > 0).view(-1, num_steps) + attention_mask = torch.cat([attention_mask, action_mask], dim=-1) + + # compute action log probs + actor_logits = self.actor(sequences, attention_mask)["logits"] + action_log_probs = calc_action_log_probs(actor_logits, sequences, num_actions) + base_model_logits = self.initial_model(sequences, attention_mask)["logits"] + base_log_probs = calc_action_log_probs(base_model_logits, sequences, num_actions) + + log_ratio = action_log_probs - base_log_probs + log_ratio = F.pad(log_ratio, (0, (self.chunk_size - num_actions) % + self.chunk_size), value=0).view(-1, self.chunk_size) + log_ratio_mask = F.pad(action_mask, (0, (self.chunk_size - num_actions) % + self.chunk_size), value=False).view(-1, self.chunk_size) + chunk_log_ratio = torch.sum(log_ratio * log_ratio_mask, dim=-1).view(-1, num_steps) + + # compute V(s_i) + values = torch.zeros((sequences.size(0), num_steps + 1), device=sequences.device) + # TODO(cwher): employ kv cache? + # TODO(cwher): is it necessary to add ? + for i in range(num_steps + 1): + seq_len = input_len + min(i * self.chunk_size, num_actions) + sequence_with_eos = F.pad(sequences[:, :seq_len], (0, 1), value=eos_token_id) + sequence_with_eos_mask = F.pad(attention_mask[:, :seq_len], (0, 1), value=False) + # NOTE: sequences[:, :seq_len] must contain if sequences[:, seq_len - 1] is {, padding token} + sequence_with_eos_mask[:, -1] = torch.logical_and( + sequences[:, seq_len - 1] != pad_token_id, + sequences[:, seq_len - 1] != eos_token_id + ) + values[:, i] = self.critic(sequence_with_eos, sequence_with_eos_mask) + final_rewards = self.reward_model(sequence_with_eos, sequence_with_eos_mask) + + # NOTE: reward is calculated according to the following rules: + # 1. reward[i] = -kl_coef * chunk_log_ratio[i] + # 2. reward[-1] += final_reward + rewards = -self.kl_coef * chunk_log_ratio * step_mask + # NOTE: actions may contain padding tokens + num_valid_actions = action_mask.sum(dim=-1) + num_valid_steps = (num_valid_actions + self.chunk_size - 1) // self.chunk_size + rewards[torch.arange(rewards.size(0)), num_valid_steps - 1] += final_rewards + + end_flags = torch.zeros_like(rewards, dtype=torch.bool) + end_flags[torch.arange(rewards.size(0)), num_valid_steps - 1] = True + + advantages, returns = self.compute_advantages_and_returns( + values, rewards, end_flags, num_steps, self.gamma, self.gae_lambda) + + experience = Experience(sequences, + attention_mask, + action_mask, + step_mask, + action_log_probs, + values[:, :-1], + returns, + advantages) + + metrics = { + "episode_rewards": rewards.sum(dim=-1).mean().item(), + "final_rewards": final_rewards.mean().item(), + "episode_steps": num_valid_steps.float().mean().item(), + "episode_tokens": num_valid_actions.float().mean().item(), + } + + return experience, metrics From c639f5f489a8d54879ef92f366447c8edeeccabc Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:03:02 +0800 Subject: [PATCH 07/20] feat: remove compute_reward fn --- applications/Chat/coati/models/utils.py | 33 ++++--------------------- 1 file changed, 5 insertions(+), 28 deletions(-) diff --git a/applications/Chat/coati/models/utils.py b/applications/Chat/coati/models/utils.py index 1aaef16620d2..c345fe2be083 100644 --- a/applications/Chat/coati/models/utils.py +++ b/applications/Chat/coati/models/utils.py @@ -1,12 +1,10 @@ -from typing import Optional, Union - import torch import torch.nn.functional as F -def _compute_approx_kl( - log_probs: torch.Tensor, log_probs_base: torch.Tensor, action_mask: Optional[torch.Tensor] = None -) -> torch.Tensor: +def compute_approx_kl(log_probs: torch.Tensor, + log_probs_base: torch.Tensor, + ) -> torch.Tensor: """ Compute the approximate KL divergence between two distributions. Schulman blog: http://joschu.net/blog/kl-approx.html @@ -14,30 +12,9 @@ def _compute_approx_kl( Args: log_probs: Log probabilities of the new distribution. log_probs_base: Log probabilities of the base distribution. - action_mask: Mask for actions. """ - - log_ratio = log_probs_base - log_probs - approx_kl = (log_ratio.exp() - 1) - log_ratio - if action_mask is not None: - approx_kl = masked_mean(approx_kl, action_mask, dim=1) - return approx_kl - approx_kl = approx_kl.mean(dim=1) - return approx_kl - - -def compute_reward( - r: Union[torch.Tensor, float], - kl_coef: float, - log_probs: torch.Tensor, - log_probs_base: torch.Tensor, - action_mask: Optional[torch.Tensor] = None, -) -> torch.Tensor: - if kl_coef <= 0.0: - return r - kl = _compute_approx_kl(log_probs, log_probs_base, action_mask=action_mask) - reward = r - kl_coef * kl - return reward + # FIXME: this fn is not used for now + raise NotImplementedError def _log_probs_from_logits(logits: torch.Tensor, labels: torch.Tensor) -> torch.Tensor: From 14ad5daab0222ad74504ff692d1f9c57ff679cec Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:05:26 +0800 Subject: [PATCH 08/20] feat: update ppo loss --- applications/Chat/coati/models/loss.py | 57 +++++++++++++------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/applications/Chat/coati/models/loss.py b/applications/Chat/coati/models/loss.py index 687bd0f7bfe7..c1b91e899b6e 100644 --- a/applications/Chat/coati/models/loss.py +++ b/applications/Chat/coati/models/loss.py @@ -1,9 +1,6 @@ -from typing import Optional - import torch import torch.nn as nn - -from .utils import masked_mean +import torch.nn.functional as F class GPTLMLoss(nn.Module): @@ -32,20 +29,23 @@ def __init__(self, clip_eps: float = 0.2) -> None: super().__init__() self.clip_eps = clip_eps - def forward( - self, - log_probs: torch.Tensor, - old_log_probs: torch.Tensor, - advantages: torch.Tensor, - action_mask: Optional[torch.Tensor] = None, - ) -> torch.Tensor: - ratio = (log_probs - old_log_probs).exp() - surr1 = ratio * advantages - surr2 = ratio.clamp(1 - self.clip_eps, 1 + self.clip_eps) * advantages + def forward(self, + log_probs: torch.Tensor, + old_log_probs: torch.Tensor, + advantages: torch.Tensor, + action_mask: torch.Tensor, + chunk_size: int + ) -> torch.Tensor: + log_ratio = log_probs - old_log_probs + num_steps = (log_ratio.size(1) + chunk_size - 1) // chunk_size + log_ratio = F.pad(log_ratio, (0, (chunk_size - log_ratio.size(1)) % chunk_size)).view(-1, chunk_size) + action_mask = F.pad(action_mask, (0, (chunk_size - action_mask.size(1)) % chunk_size)).view(-1, chunk_size) + chunk_ratio = torch.sum(log_ratio * action_mask, dim=1).exp().view(-1, num_steps) + surr1 = chunk_ratio * advantages + surr2 = chunk_ratio.clamp(1 - self.clip_eps, 1 + self.clip_eps) * advantages loss = -torch.min(surr1, surr2) - if action_mask is not None: - loss = masked_mean(loss, action_mask) - loss = loss.mean() + # NOTE: loss is likely to be a tensor, not a scalar + # requires further reduction to get the final loss return loss @@ -58,19 +58,18 @@ def __init__(self, clip_eps: float = 0.4) -> None: super().__init__() self.clip_eps = clip_eps - def forward( - self, - values: torch.Tensor, - old_values: torch.Tensor, - reward: torch.Tensor, - action_mask: Optional[torch.Tensor] = None, - ) -> torch.Tensor: + def forward(self, + values: torch.Tensor, + old_values: torch.Tensor, + returns: torch.Tensor, + ) -> torch.Tensor: values_clipped = old_values + (values - old_values).clamp(-self.clip_eps, self.clip_eps) - surr1 = (values_clipped - reward) ** 2 - surr2 = (values - reward) ** 2 - loss = torch.max(surr1, surr2) - loss = loss.mean() - return 0.5 * loss + surr1 = (values_clipped - returns) ** 2 + surr2 = (values - returns) ** 2 + loss = 0.5 * torch.max(surr1, surr2) + # NOTE: loss is likely to be a tensor, not a scalar + # requires further reduction to get the final loss + return loss class LogSigLoss(nn.Module): From f7c749527eee3e4e4c60c672e262859daf1bbd9b Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:11:02 +0800 Subject: [PATCH 09/20] to: update ppo trainer --- applications/Chat/coati/trainer/ppo.py | 91 ++++++++++++++++---------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index d6966689885e..751819565a11 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -1,7 +1,7 @@ from typing import Dict, List, Optional from coati.experience_buffer import NaiveExperienceBuffer -from coati.experience_maker import Experience, NaiveExperienceMaker +from coati.experience_maker import ChunkedExperienceMaker, Experience from coati.models.base import Actor, Critic, RewardModel, get_base_model from coati.models.loss import GPTLMLoss, PolicyLoss, ValueLoss from coati.models.utils import calc_action_log_probs @@ -45,6 +45,7 @@ class PPOTrainer(OnPolicyTrainer): actor_optim (Optimizer): the optimizer to use for actor model critic_optim (Optimizer): the optimizer to use for critic model kl_coef (float, defaults to 0.1): the coefficient of kl divergence loss + chunk_size (int, defaults to 8): the chunk size to use for chunked experience maker train_batch_size (int, defaults to 8): the batch size to use for training buffer_limit (int, defaults to 0): the max_size limitation of buffer buffer_cpu_offload (bool, defaults to True): whether to offload buffer to cpu @@ -59,30 +60,30 @@ class PPOTrainer(OnPolicyTrainer): generate_kwargs (dict, optional): the kwargs to use while model generating """ - def __init__( - self, - strategy: Strategy, - actor: Actor, - critic: Critic, - reward_model: RewardModel, - initial_model: Actor, - actor_optim: Optimizer, - critic_optim: Optimizer, - tokenizer: PreTrainedTokenizerBase, - kl_coef: float = 0.1, - ptx_coef: float = 0.9, - train_batch_size: int = 8, - buffer_limit: int = 0, - buffer_cpu_offload: bool = True, - eps_clip: float = 0.2, - vf_coef: float = 1.0, - value_clip: float = 0.4, - sample_buffer: bool = False, - dataloader_pin_memory: bool = True, - offload_inference_models: bool = True, - callbacks: List[Callback] = [], - **generate_kwargs, - ) -> None: + def __init__(self, + strategy: Strategy, + actor: Actor, + critic: Critic, + reward_model: RewardModel, + initial_model: Actor, + actor_optim: Optimizer, + critic_optim: Optimizer, + tokenizer: PreTrainedTokenizerBase, + kl_coef: float = 0.1, + chunk_size: int = 8, + ptx_coef: float = 0.9, + train_batch_size: int = 8, + buffer_limit: int = 0, + buffer_cpu_offload: bool = True, + eps_clip: float = 0.2, + vf_coef: float = 1.0, + value_clip: float = 0.4, + sample_buffer: bool = False, + dataloader_pin_memory: bool = True, + offload_inference_models: bool = True, + callbacks: List[Callback] = [], + **generate_kwargs + ) -> None: if isinstance(strategy, GeminiStrategy): assert not offload_inference_models, "GeminiPlugin is not compatible with manual model.to('cpu')" @@ -90,7 +91,10 @@ def __init__( super().__init__(strategy, data_buffer, sample_buffer, dataloader_pin_memory, callbacks) self.generate_kwargs = _set_default_generate_kwargs(strategy, generate_kwargs, actor) - self.experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, tokenizer, kl_coef) + self.experience_maker = ChunkedExperienceMaker( + actor, critic, reward_model, initial_model, + tokenizer, chunk_size, kl_coef + ) self.actor = actor self.critic = critic @@ -98,6 +102,7 @@ def __init__( self.actor_loss_fn = PolicyLoss(eps_clip) self.critic_loss_fn = ValueLoss(value_clip) + self.chunk_size = chunk_size self.vf_coef = vf_coef self.ptx_loss_fn = GPTLMLoss() self.ptx_coef = ptx_coef @@ -150,14 +155,20 @@ def _make_experience(self, collect_step: int) -> Experience: def _training_step(self, experience: Experience): self.actor.train() self.critic.train() + + step_mask = experience.step_mask + num_samples = torch.sum(step_mask) + # policy loss num_actions = experience.action_log_probs.size(1) actor_logits = self.actor(experience.sequences, experience.attention_mask)["logits"] action_log_probs = calc_action_log_probs(actor_logits, experience.sequences, num_actions) - actor_loss = self.actor_loss_fn( - action_log_probs, experience.action_log_probs, experience.advantages, action_mask=experience.action_mask - ) - actor_loss = (1 - self.ptx_coef) * actor_loss + actor_loss = self.actor_loss_fn(action_log_probs, + experience.action_log_probs, + experience.advantages, + action_mask=experience.action_mask, + chunk_size=self.experience_maker.chunk_size) + actor_loss = (1 - self.ptx_coef) * torch.sum(actor_loss * step_mask) / num_samples self.strategy.backward(actor_loss, self.actor, self.actor_optim) # ptx loss @@ -172,10 +183,22 @@ def _training_step(self, experience: Experience): self.actor_optim.zero_grad() # value loss - values = self.critic(experience.sequences, attention_mask=experience.attention_mask) - critic_loss = self.critic_loss_fn(values, experience.values, experience.reward) - critic_loss = critic_loss * self.vf_coef - self.strategy.backward(critic_loss, self.critic, self.critic_optim) + input_len = experience.sequences.size(1) - num_actions + num_steps = (num_actions + self.chunk_size - 1) // self.chunk_size + for i in range(num_steps): + seq_len = input_len + min(i * self.chunk_size, num_actions) + sequence_with_eos = F.pad(experience.sequences[:, :seq_len], (0, 1), value=self.tokenizer.eos_token_id) + sequence_with_eos_mask = F.pad(experience.attention_mask[:, :seq_len], (0, 1), value=False) + # NOTE: sequences[:, :seq_len] must contain if sequences[:, seq_len - 1] is {, padding token} + sequence_with_eos_mask[:, -1] = torch.logical_and( + experience.sequences[:, seq_len - 1] != self.tokenizer.pad_token_id, + experience.sequences[:, seq_len - 1] != self.tokenizer.eos_token_id + ) + values = self.critic(sequence_with_eos, sequence_with_eos_mask) + critic_loss = self.critic_loss_fn(values, experience.values[:, i], experience.returns[:, i]) + critic_loss = torch.sum(critic_loss * step_mask[:, i]) / num_samples + self.strategy.backward(critic_loss, self.critic, self.critic_optim) + self.strategy.optimizer_step(self.critic_optim) self.critic_optim.zero_grad() From fd7f8e7a999dd8f27a6cb1c706ffc68328ed90e4 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:11:38 +0800 Subject: [PATCH 10/20] feat: add chunk_size arg --- applications/Chat/examples/train_prompts.py | 63 ++++++++++----------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/applications/Chat/examples/train_prompts.py b/applications/Chat/examples/train_prompts.py index a8ab15eebfa5..815f7d399209 100644 --- a/applications/Chat/examples/train_prompts.py +++ b/applications/Chat/examples/train_prompts.py @@ -175,6 +175,7 @@ def main(args): ptx_coef=args.ptx_coef, train_batch_size=args.train_batch_size, max_length=args.max_seq_len, + chunk_size=args.seq_chunk_size, use_cache=True, do_sample=True, temperature=1.0, @@ -209,37 +210,35 @@ def main(args): if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--prompt_dataset", type=str, default=None, help="path to the prompt dataset") - parser.add_argument("--pretrain_dataset", type=str, default=None, help="path to the pretrained dataset") - parser.add_argument("--max_datasets_size", type=int, default=50000) - parser.add_argument( - "--strategy", - choices=["ddp", "colossalai_gemini", "colossalai_zero2"], - default="colossalai_zero2", - help="strategy to use", - ) - parser.add_argument("--model", default="gpt2", choices=["gpt2", "bloom", "opt", "llama"]) - parser.add_argument("--tokenizer", type=str, default=None) - parser.add_argument("--pretrain", type=str, default=None) - parser.add_argument("--rm_model", default=None, choices=["gpt2", "bloom", "opt", "llama"]) - parser.add_argument("--rm_path", type=str, default=None) - parser.add_argument("--rm_pretrain", type=str, default=None) - parser.add_argument("--save_path", type=str, default="actor_checkpoint_prompts") - parser.add_argument("--need_optim_ckpt", type=bool, default=False) - parser.add_argument("--num_episodes", type=int, default=10) - parser.add_argument("--num_collect_steps", type=int, default=10) - parser.add_argument("--num_update_steps", type=int, default=5) - parser.add_argument("--train_batch_size", type=int, default=8) - parser.add_argument("--ptx_batch_size", type=int, default=1) - parser.add_argument("--experience_batch_size", type=int, default=8) - parser.add_argument("--lora_rank", type=int, default=0, help="low-rank adaptation matrices rank") - parser.add_argument("--merge_lora_weights", type=bool, default=True) - parser.add_argument("--lr", type=float, default=1e-7) - parser.add_argument("--kl_coef", type=float, default=0.1) - parser.add_argument("--ptx_coef", type=float, default=0.9) - parser.add_argument("--max_input_len", type=int, default=96) - parser.add_argument("--max_seq_len", type=int, default=128) - parser.add_argument("--log_dir", default="logs", type=str) - parser.add_argument("--use_wandb", default=False, action="store_true") + parser.add_argument('--prompt_dataset', type=str, default=None, help='path to the prompt dataset') + parser.add_argument('--pretrain_dataset', type=str, default=None, help='path to the pretrained dataset') + parser.add_argument('--max_datasets_size', type=int, default=50000) + parser.add_argument('--strategy', + choices=['ddp', 'colossalai_gemini', 'colossalai_zero2'], + default='colossalai_zero2', + help='strategy to use') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--tokenizer', type=str, default=None) + parser.add_argument('--pretrain', type=str, default=None) + parser.add_argument('--rm_model', default=None, choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--rm_path', type=str, default=None) + parser.add_argument('--rm_pretrain', type=str, default=None) + parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts') + parser.add_argument('--need_optim_ckpt', type=bool, default=False) + parser.add_argument('--num_episodes', type=int, default=10) + parser.add_argument('--num_collect_steps', type=int, default=10) + parser.add_argument('--num_update_steps', type=int, default=5) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--ptx_batch_size', type=int, default=1) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + parser.add_argument('--lr', type=float, default=1e-7) + parser.add_argument('--kl_coef', type=float, default=0.1) + parser.add_argument('--ptx_coef', type=float, default=0.9) + parser.add_argument('--max_input_len', type=int, default=96) + parser.add_argument('--max_seq_len', type=int, default=128) + parser.add_argument('--seq_chunk_size', type=int, default=8) + parser.add_argument('--log_dir', default='logs', type=str) + parser.add_argument('--use_wandb', default=False, action='store_true') args = parser.parse_args() main(args) From 564b1a9a0386aab28b103fe094f5a1a419c54f10 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:12:06 +0800 Subject: [PATCH 11/20] test: modify loss tests --- applications/Chat/tests/test_models.py | 42 +++++++++++++------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/applications/Chat/tests/test_models.py b/applications/Chat/tests/test_models.py index b2c22ac6a3b9..94d045361ad4 100644 --- a/applications/Chat/tests/test_models.py +++ b/applications/Chat/tests/test_models.py @@ -173,7 +173,15 @@ def test_models(models_maker: Callable[[], Tuple[Actor, Critic, RewardModel]], b @pytest.mark.parametrize("batch_size", [16]) @pytest.mark.parametrize("seq_len", [128]) @pytest.mark.parametrize("num_labels", [100]) -def test_loss(batch_size: int, seq_len: int, num_labels: int): +@pytest.mark.parametrize("num_actions", [10]) +@pytest.mark.parametrize("chunk_size", [4]) +def test_loss(batch_size: int, + seq_len: int, + num_labels: int, + num_actions: int, + chunk_size: int): + num_steps = (num_actions + chunk_size - 1) // chunk_size + loss = GPTLMLoss() loss_input = { "logits": torch.randn(batch_size, seq_len, num_labels), @@ -183,31 +191,23 @@ def test_loss(batch_size: int, seq_len: int, num_labels: int): loss = PolicyLoss() loss_input = { - "log_probs": torch.randn( - batch_size, - ), - "old_log_probs": torch.randn( - batch_size, - ), - "advantages": torch.randn( - batch_size, - ), + "log_probs": torch.randn(batch_size, num_actions), + "old_log_probs": torch.randn(batch_size, num_actions), + "advantages": torch.randn(batch_size, num_steps), + "action_mask": torch.randint(0, 2, (batch_size, num_actions)), + "chunk_size": chunk_size } - loss(**loss_input) + loss_output = loss(**loss_input) + assert loss_output.shape == (batch_size, num_steps) loss = ValueLoss() loss_input = { - "values": torch.randn( - batch_size, - ), - "old_values": torch.randn( - batch_size, - ), - "reward": torch.randn( - batch_size, - ), + "values": torch.randn(batch_size, num_steps), + "old_values": torch.randn(batch_size, num_steps), + "returns": torch.randn(batch_size, num_steps), } - loss(**loss_input) + loss_output = loss(**loss_input) + assert loss_output.shape == (batch_size, num_steps) loss = LogSigLoss() loss_input = { From ac1ce1dc9581bccf51d768cf0e2727da9814b3b0 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:20:06 +0800 Subject: [PATCH 12/20] fix: avoid overflow and add metrics --- applications/Chat/coati/trainer/ppo.py | 37 +++++++++++++++++++------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index 751819565a11..3a4330f9a22f 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -112,13 +112,14 @@ def __init__(self, self.offload_inference_models = offload_inference_models self.device = get_current_device() - def _before_fit( - self, - prompt_dataloader: DataLoader, - pretrain_dataloader: DataLoader, - log_dir: Optional[str] = None, - use_wandb: bool = False, - ): + self.num_collect_step = 0 + self.num_update_step = 0 + + def _before_fit(self, + prompt_dataloader: DataLoader, + pretrain_dataloader: DataLoader, + log_dir: Optional[str] = None, + use_wandb: bool = False): """ Args: prompt_dataloader (DataLoader): the dataloader to use for prompt data @@ -150,7 +151,12 @@ def _make_experience(self, collect_step: int) -> Experience: self.experience_maker.initial_model.to(self.device) self.experience_maker.reward_model.to(self.device) assert isinstance(prompts, dict), f'Unsupported input type "{type(prompts)}"' - return self.experience_maker.make_experience(**prompts, **self.generate_kwargs) + experience, metrics = self.experience_maker.make_experience(**prompts, **self.generate_kwargs) + if self.writer: + for k, v in metrics.items(): + self.writer.add_scalar(f'collect/{k}', v, self.num_collect_step) + self.num_collect_step += 1 + return experience def _training_step(self, experience: Experience): self.actor.train() @@ -158,6 +164,8 @@ def _training_step(self, experience: Experience): step_mask = experience.step_mask num_samples = torch.sum(step_mask) + assert self.chunk_size == self.experience_maker.chunk_size, \ + "chunk_size of trainer and experience_maker must be the same" # policy loss num_actions = experience.action_log_probs.size(1) @@ -168,7 +176,7 @@ def _training_step(self, experience: Experience): experience.advantages, action_mask=experience.action_mask, chunk_size=self.experience_maker.chunk_size) - actor_loss = (1 - self.ptx_coef) * torch.sum(actor_loss * step_mask) / num_samples + actor_loss = (1 - self.ptx_coef) * torch.sum(actor_loss * step_mask / num_samples) self.strategy.backward(actor_loss, self.actor, self.actor_optim) # ptx loss @@ -196,12 +204,21 @@ def _training_step(self, experience: Experience): ) values = self.critic(sequence_with_eos, sequence_with_eos_mask) critic_loss = self.critic_loss_fn(values, experience.values[:, i], experience.returns[:, i]) - critic_loss = torch.sum(critic_loss * step_mask[:, i]) / num_samples + critic_loss = torch.sum(critic_loss * step_mask[:, i] / num_samples) self.strategy.backward(critic_loss, self.critic, self.critic_optim) self.strategy.optimizer_step(self.critic_optim) self.critic_optim.zero_grad() + if self.writer: + self.writer.add_scalar('update/actor_loss', actor_loss.item(), self.num_update_step) + self.writer.add_scalar('update/critic_loss', critic_loss.item(), self.num_update_step) + if self.ptx_coef != 0: + self.writer.add_scalar('update/ptx_loss', ptx_loss.item(), self.num_update_step) + self.num_update_step += 1 + + return {'reward': experience.reward.mean().item()} + def _learn(self, update_step: int): if self.offload_inference_models: self.experience_maker.initial_model.to("cpu") From 429784b19777c84a5bba58aca332b07da8eff7d2 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:20:57 +0800 Subject: [PATCH 13/20] test: add chunk_size --- applications/Chat/tests/test_train.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/applications/Chat/tests/test_train.sh b/applications/Chat/tests/test_train.sh index 55de269005ed..9170e3007ae3 100755 --- a/applications/Chat/tests/test_train.sh +++ b/applications/Chat/tests/test_train.sh @@ -219,8 +219,8 @@ for model in ${MODELS[@]}; do torchrun --standalone --nproc_per_node=4 $EXAMPLES_DIR/train_prompts.py \ --prompt_dataset $PROMPT_DATASET --pretrain_dataset $PRETRAIN_DATASET --max_datasets_size 32 \ --strategy $strategy --model $model --tokenizer $MODELS_DIR/$model \ - --num_episodes 1 --num_collect_steps 1 --num_update_steps 1 --lr 1e-8 \ - --experience_batch_size 2 --train_batch_size 1 --lora_rank $lora_rank \ + --num_episodes 1 --num_collect_steps 1 --num_update_steps 1 --seq_chunk_size $((RANDOM % 16 + 1)) \ + --experience_batch_size 2 --train_batch_size 1 --lora_rank $lora_rank --lr 1e-8 \ --pretrain $EXAMPLES_DIR/rlhf_models/sft_ckpt_${model}_${lora_rank} \ $rm_pretrain_model --rm_path $EXAMPLES_DIR/rlhf_models/rm_ckpt_${model}_${lora_rank}.pt \ --save_path $EXAMPLES_DIR/rlhf_models/actor_checkpoint_prompts.pt From edb1b3f8afd7a69ccbb871fd1ae84b4e51ec0049 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:39:17 +0800 Subject: [PATCH 14/20] test: use mock tokenizer --- applications/Chat/tests/test_experience.py | 2 +- applications/Chat/tests/test_models.py | 45 ++++++++++------------ 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/applications/Chat/tests/test_experience.py b/applications/Chat/tests/test_experience.py index ef97e251ac7f..48cb1c33de2b 100644 --- a/applications/Chat/tests/test_experience.py +++ b/applications/Chat/tests/test_experience.py @@ -56,7 +56,7 @@ def make_and_consume_experience(strategy): actor, critic, initial_model, reward_model = strategy.prepare(actor, critic, initial_model, reward_model) - class MockTokenizer: + class MockTokenizer(): def __init__(self): self.padding_side = "left" self.eos_token_id = 0 diff --git a/applications/Chat/tests/test_models.py b/applications/Chat/tests/test_models.py index 94d045361ad4..47c7ac6dd77e 100644 --- a/applications/Chat/tests/test_models.py +++ b/applications/Chat/tests/test_models.py @@ -19,30 +19,27 @@ @pytest.mark.parametrize("batch_size", [4]) @pytest.mark.parametrize("seq_len", [32]) -@pytest.mark.parametrize( - "actor_maker", - [ - lambda: BLOOMActor(), - lambda: GPTActor(), - # HACK: skip llama due to long execution time - # lambda: LlamaActor(), - lambda: OPTActor(), - ], -) -@pytest.mark.parametrize( - "generate_kwargs", - [ - { - "max_length": 64, - "use_cache": True, - "do_sample": True, - "temperature": 1.0, - "top_k": 50, - } - ], -) -def test_generation(actor_maker: Callable[[], Actor], batch_size: int, seq_len: int, generate_kwargs: Dict[str, Any]): - class MockTokenizer: +@pytest.mark.parametrize("actor_maker", [ + lambda: BLOOMActor(), + lambda: GPTActor(), + # HACK: skip llama due to long execution time + # lambda: LlamaActor(), + lambda: OPTActor() +]) +@pytest.mark.parametrize("generate_kwargs", [{ + "max_length": 64, + "use_cache": True, + "do_sample": True, + "temperature": 1.0, + "top_k": 50, +}]) +def test_generation(actor_maker: Callable[[], Actor], + batch_size: int, + seq_len: int, + generate_kwargs: Dict[str, Any] + ): + + class MockTokenizer(): def __init__(self): self.padding_side = "left" self.eos_token_id = 0 From ff73b6e048b674800be8701e6030614c93e35b8c Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:45:28 +0800 Subject: [PATCH 15/20] fix: fix experience_maker returns --- applications/Chat/tests/test_experience.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/applications/Chat/tests/test_experience.py b/applications/Chat/tests/test_experience.py index 48cb1c33de2b..1a80decd2260 100644 --- a/applications/Chat/tests/test_experience.py +++ b/applications/Chat/tests/test_experience.py @@ -74,7 +74,7 @@ def __init__(self): data = get_data(EXPERIENCE_BATCH_SIZE) assert gather_and_equal(data['input_ids']) assert gather_and_equal(data['attention_mask']) - experience = experience_maker.make_experience(**data, **generate_kwargs) + experience, _ = experience_maker.make_experience(**data, **generate_kwargs) num_actions = experience.action_log_probs.size(1) chunk_size = experience_maker.chunk_size num_steps = experience.advantages.size(1) From 7698226d0313b04a83f19ded367f74d4a72d39a2 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 15:51:42 +0800 Subject: [PATCH 16/20] fix: fix inference --- applications/Chat/examples/inference.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/applications/Chat/examples/inference.py b/applications/Chat/examples/inference.py index 62e06bf7b3bb..ccc8ee954bd4 100644 --- a/applications/Chat/examples/inference.py +++ b/applications/Chat/examples/inference.py @@ -46,17 +46,17 @@ def eval(args): actor.eval() tokenizer.padding_side = "left" - input_ids = tokenizer.encode(args.input, return_tensors="pt").to(torch.cuda.current_device()) - outputs = generate( - actor, - input_ids, - tokenizer=tokenizer, - max_length=args.max_length, - do_sample=True, - top_k=50, - top_p=0.95, - num_return_sequences=1, - ) + input_ids = tokenizer.encode(args.input, + return_tensors='pt')\ + .to(torch.cuda.current_device()) + outputs = generate(actor, + input_ids, + tokenizer=tokenizer, + max_length=args.max_length, + do_sample=True, + top_k=50, + top_p=0.95, + num_return_sequences=1) output = tokenizer.batch_decode(outputs[0], skip_special_tokens=True) print(f"[Output]: {''.join(output)}") From e2ca75d82fda7c4d6883353f9a5a2cb456523117 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 16:17:19 +0800 Subject: [PATCH 17/20] style: modify _on_learn_batch_end --- applications/Chat/coati/trainer/ppo.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index 3a4330f9a22f..76886e3680b0 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -217,8 +217,6 @@ def _training_step(self, experience: Experience): self.writer.add_scalar('update/ptx_loss', ptx_loss.item(), self.num_update_step) self.num_update_step += 1 - return {'reward': experience.reward.mean().item()} - def _learn(self, update_step: int): if self.offload_inference_models: self.experience_maker.initial_model.to("cpu") From 49e3fe5ead41a4e25dd0f58e713ad35e0c6a46a8 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 18 Aug 2023 16:22:14 +0800 Subject: [PATCH 18/20] chore: modify train prompts script --- applications/Chat/examples/train_prompts.sh | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/applications/Chat/examples/train_prompts.sh b/applications/Chat/examples/train_prompts.sh index d04c416015b1..18940debb4e1 100755 --- a/applications/Chat/examples/train_prompts.sh +++ b/applications/Chat/examples/train_prompts.sh @@ -15,11 +15,20 @@ set_n_least_used_CUDA_VISIBLE_DEVICES() { set_n_least_used_CUDA_VISIBLE_DEVICES 2 -# torchrun --standalone --nproc_per_node=2 train_prompts.py prompts.csv --strategy colossalai_zero2 - torchrun --standalone --nproc_per_node=2 train_prompts.py \ - --pretrain_dataset /path/to/data.json \ - --prompt_dataset /path/to/data.json \ + --model "gpt2" \ + --pretrain "/path/to/sft_gpt2" \ + --rm_model "gpt2" \ + --rm_path "/path/to/gpt2_rm" \ + --pretrain_dataset /path/to/pretrain_data.json \ + --prompt_dataset /path/to/prompt_data.json \ --strategy colossalai_zero2 \ - --num_episodes 1 --num_collect_steps 2 --num_update_steps 1 \ - --train_batch_size 2 + --num_episodes 10 \ + --num_collect_steps 2 \ + --num_update_steps 2 \ + --train_batch_size 4 \ + --experience_batch_size 8 \ + --ptx_batch_size 4 \ + --max_input_len 96 \ + --max_seq_len 256 \ + --seq_chunk_size 8 From ba9f613783d6de9ecda096bf5c76f832fe52df52 Mon Sep 17 00:00:00 2001 From: CWHer Date: Thu, 21 Sep 2023 11:10:25 +0800 Subject: [PATCH 19/20] style: apply pre-commit --- .../Chat/coati/experience_buffer/utils.py | 24 ++++- .../Chat/coati/experience_maker/__init__.py | 5 +- .../Chat/coati/experience_maker/base.py | 4 +- .../Chat/coati/experience_maker/chunked.py | 78 ++++++++-------- applications/Chat/coati/models/loss.py | 26 +++--- applications/Chat/coati/models/utils.py | 7 +- applications/Chat/coati/trainer/ppo.py | 91 ++++++++++--------- applications/Chat/examples/inference.py | 22 ++--- applications/Chat/examples/train_prompts.py | 62 +++++++------ applications/Chat/tests/test_experience.py | 6 +- applications/Chat/tests/test_models.py | 53 ++++++----- 11 files changed, 200 insertions(+), 178 deletions(-) diff --git a/applications/Chat/coati/experience_buffer/utils.py b/applications/Chat/coati/experience_buffer/utils.py index 23ff205c1fe0..2c53f7b3d812 100644 --- a/applications/Chat/coati/experience_buffer/utils.py +++ b/applications/Chat/coati/experience_buffer/utils.py @@ -34,8 +34,16 @@ class BufferItem: def split_experience_batch(experience: Experience) -> List[BufferItem]: batch_size = experience.sequences.size(0) batch_kwargs = [{} for _ in range(batch_size)] - keys = ('sequences', 'attention_mask', 'action_mask', 'step_mask', - 'action_log_probs', 'values', 'returns', 'advantages') + keys = ( + "sequences", + "attention_mask", + "action_mask", + "step_mask", + "action_log_probs", + "values", + "returns", + "advantages", + ) for key in keys: value = getattr(experience, key) assert isinstance(value, torch.Tensor) @@ -49,8 +57,16 @@ def split_experience_batch(experience: Experience) -> List[BufferItem]: def make_experience_batch(items: List[BufferItem]) -> Experience: kwargs = {} - keys = ('sequences', 'attention_mask', 'action_mask', 'step_mask', - 'action_log_probs', 'values', 'returns', 'advantages') + keys = ( + "sequences", + "attention_mask", + "action_mask", + "step_mask", + "action_log_probs", + "values", + "returns", + "advantages", + ) for key in keys: vals = [getattr(item, key) for item in items] batch_data = torch.stack(vals, dim=0) diff --git a/applications/Chat/coati/experience_maker/__init__.py b/applications/Chat/coati/experience_maker/__init__.py index abe8b92c3ceb..ae2d0e8c570c 100644 --- a/applications/Chat/coati/experience_maker/__init__.py +++ b/applications/Chat/coati/experience_maker/__init__.py @@ -2,6 +2,7 @@ from .chunked import ChunkedExperienceMaker __all__ = [ - 'Experience', 'ExperienceMaker', - 'ChunkedExperienceMaker', + "Experience", + "ExperienceMaker", + "ChunkedExperienceMaker", ] diff --git a/applications/Chat/coati/experience_maker/base.py b/applications/Chat/coati/experience_maker/base.py index 96b4b00932f8..82f40b561227 100644 --- a/applications/Chat/coati/experience_maker/base.py +++ b/applications/Chat/coati/experience_maker/base.py @@ -16,7 +16,7 @@ class Experience: "C" is the chunk size. "N" is the number of MDP steps. NOTE: N = A / C, each Experience contains N MDP steps ([s0, a0], [s1, a1], ...), - sequences = |pad|prompt|a0|a1|a2|...|pad|, + sequences = |pad|prompt|a0|a1|a2|...|pad|, s0 = prompt, s1 = prompt + a0, s2 = prompt + a0 + a1, ... FIXME(cwher): store N steps in a Experience can be computationally efficient, but may be different from uniform sampling (shuffle all steps and sample). @@ -31,7 +31,7 @@ class Experience: returns: (B, N), result of GAE advantages: (B, N), result of GAE - e.g., + e.g., sequences = |pad|prompt|response|pad| attention_mask = |0|1|1|0| action_mask = |1|0| (for response) diff --git a/applications/Chat/coati/experience_maker/chunked.py b/applications/Chat/coati/experience_maker/chunked.py index 6f859c4478e1..5f054ee6dc12 100644 --- a/applications/Chat/coati/experience_maker/chunked.py +++ b/applications/Chat/coati/experience_maker/chunked.py @@ -18,17 +18,18 @@ class ChunkedExperienceMaker(ExperienceMaker): chunk_size = seq_len: all the tokens constitute a giant step """ - def __init__(self, - actor: Actor, - critic: Critic, - reward_model: RewardModel, - initial_model: Actor, - tokenizer: PreTrainedTokenizer, - chunk_size: int = 8, - kl_coef: float = 0.1, - gamma: float = 0.99, - gae_lambda: float = 0.95, - ) -> None: + def __init__( + self, + actor: Actor, + critic: Critic, + reward_model: RewardModel, + initial_model: Actor, + tokenizer: PreTrainedTokenizer, + chunk_size: int = 8, + kl_coef: float = 0.1, + gamma: float = 0.99, + gae_lambda: float = 0.95, + ) -> None: super().__init__(actor, critic, reward_model, initial_model) self.tokenizer = tokenizer self.chunk_size = chunk_size @@ -89,28 +90,25 @@ def compute_advantages_and_returns( return advantages, returns @torch.no_grad() - def make_experience(self, - input_ids: torch.Tensor, - attention_mask: torch.Tensor, - **generate_kwargs - ) -> Tuple[Experience, Dict]: + def make_experience( + self, input_ids: torch.Tensor, attention_mask: torch.Tensor, **generate_kwargs + ) -> Tuple[Experience, Dict]: self.actor.eval() self.critic.eval() self.initial_model.eval() self.reward_model.eval() # generate sequences - sequences = generate(self.actor, - input_ids, - tokenizer=self.tokenizer, - attention_mask=attention_mask, - **generate_kwargs) + sequences = generate( + self.actor, input_ids, tokenizer=self.tokenizer, attention_mask=attention_mask, **generate_kwargs + ) # calculate auxiliary tensors eos_token_id = self.tokenizer.eos_token_id pad_token_id = self.tokenizer.pad_token_id - assert eos_token_id is not None and pad_token_id is not None, \ - "eos_token_id and pad_token_id must be specified in generate_kwargs" + assert ( + eos_token_id is not None and pad_token_id is not None + ), "eos_token_id and pad_token_id must be specified in generate_kwargs" input_len = input_ids.size(1) num_actions = sequences.size(1) - input_len num_steps = (num_actions + self.chunk_size - 1) // self.chunk_size @@ -118,8 +116,9 @@ def make_experience(self, # if action is |action|eos|pad|, then action_mask is |1|1|0| action_mask = (sequences[:, input_len:] == eos_token_id).cumsum(dim=-1) == 0 action_mask = F.pad(action_mask, (1, -1), value=True) # shift right by 1 to include eos token - step_mask = F.pad(action_mask, (0, (self.chunk_size - num_actions) % - self.chunk_size), value=False).view(-1, self.chunk_size) + step_mask = F.pad(action_mask, (0, (self.chunk_size - num_actions) % self.chunk_size), value=False).view( + -1, self.chunk_size + ) step_mask = (step_mask.sum(dim=-1) > 0).view(-1, num_steps) attention_mask = torch.cat([attention_mask, action_mask], dim=-1) @@ -130,10 +129,12 @@ def make_experience(self, base_log_probs = calc_action_log_probs(base_model_logits, sequences, num_actions) log_ratio = action_log_probs - base_log_probs - log_ratio = F.pad(log_ratio, (0, (self.chunk_size - num_actions) % - self.chunk_size), value=0).view(-1, self.chunk_size) - log_ratio_mask = F.pad(action_mask, (0, (self.chunk_size - num_actions) % - self.chunk_size), value=False).view(-1, self.chunk_size) + log_ratio = F.pad(log_ratio, (0, (self.chunk_size - num_actions) % self.chunk_size), value=0).view( + -1, self.chunk_size + ) + log_ratio_mask = F.pad(action_mask, (0, (self.chunk_size - num_actions) % self.chunk_size), value=False).view( + -1, self.chunk_size + ) chunk_log_ratio = torch.sum(log_ratio * log_ratio_mask, dim=-1).view(-1, num_steps) # compute V(s_i) @@ -146,8 +147,7 @@ def make_experience(self, sequence_with_eos_mask = F.pad(attention_mask[:, :seq_len], (0, 1), value=False) # NOTE: sequences[:, :seq_len] must contain if sequences[:, seq_len - 1] is {, padding token} sequence_with_eos_mask[:, -1] = torch.logical_and( - sequences[:, seq_len - 1] != pad_token_id, - sequences[:, seq_len - 1] != eos_token_id + sequences[:, seq_len - 1] != pad_token_id, sequences[:, seq_len - 1] != eos_token_id ) values[:, i] = self.critic(sequence_with_eos, sequence_with_eos_mask) final_rewards = self.reward_model(sequence_with_eos, sequence_with_eos_mask) @@ -165,16 +165,12 @@ def make_experience(self, end_flags[torch.arange(rewards.size(0)), num_valid_steps - 1] = True advantages, returns = self.compute_advantages_and_returns( - values, rewards, end_flags, num_steps, self.gamma, self.gae_lambda) - - experience = Experience(sequences, - attention_mask, - action_mask, - step_mask, - action_log_probs, - values[:, :-1], - returns, - advantages) + values, rewards, end_flags, num_steps, self.gamma, self.gae_lambda + ) + + experience = Experience( + sequences, attention_mask, action_mask, step_mask, action_log_probs, values[:, :-1], returns, advantages + ) metrics = { "episode_rewards": rewards.sum(dim=-1).mean().item(), diff --git a/applications/Chat/coati/models/loss.py b/applications/Chat/coati/models/loss.py index c1b91e899b6e..79b4aeeb0a98 100644 --- a/applications/Chat/coati/models/loss.py +++ b/applications/Chat/coati/models/loss.py @@ -29,13 +29,14 @@ def __init__(self, clip_eps: float = 0.2) -> None: super().__init__() self.clip_eps = clip_eps - def forward(self, - log_probs: torch.Tensor, - old_log_probs: torch.Tensor, - advantages: torch.Tensor, - action_mask: torch.Tensor, - chunk_size: int - ) -> torch.Tensor: + def forward( + self, + log_probs: torch.Tensor, + old_log_probs: torch.Tensor, + advantages: torch.Tensor, + action_mask: torch.Tensor, + chunk_size: int, + ) -> torch.Tensor: log_ratio = log_probs - old_log_probs num_steps = (log_ratio.size(1) + chunk_size - 1) // chunk_size log_ratio = F.pad(log_ratio, (0, (chunk_size - log_ratio.size(1)) % chunk_size)).view(-1, chunk_size) @@ -58,11 +59,12 @@ def __init__(self, clip_eps: float = 0.4) -> None: super().__init__() self.clip_eps = clip_eps - def forward(self, - values: torch.Tensor, - old_values: torch.Tensor, - returns: torch.Tensor, - ) -> torch.Tensor: + def forward( + self, + values: torch.Tensor, + old_values: torch.Tensor, + returns: torch.Tensor, + ) -> torch.Tensor: values_clipped = old_values + (values - old_values).clamp(-self.clip_eps, self.clip_eps) surr1 = (values_clipped - returns) ** 2 surr2 = (values - returns) ** 2 diff --git a/applications/Chat/coati/models/utils.py b/applications/Chat/coati/models/utils.py index c345fe2be083..9e55a4fffcd9 100644 --- a/applications/Chat/coati/models/utils.py +++ b/applications/Chat/coati/models/utils.py @@ -2,9 +2,10 @@ import torch.nn.functional as F -def compute_approx_kl(log_probs: torch.Tensor, - log_probs_base: torch.Tensor, - ) -> torch.Tensor: +def compute_approx_kl( + log_probs: torch.Tensor, + log_probs_base: torch.Tensor, +) -> torch.Tensor: """ Compute the approximate KL divergence between two distributions. Schulman blog: http://joschu.net/blog/kl-approx.html diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index 76886e3680b0..444a85fdc23a 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -60,30 +60,31 @@ class PPOTrainer(OnPolicyTrainer): generate_kwargs (dict, optional): the kwargs to use while model generating """ - def __init__(self, - strategy: Strategy, - actor: Actor, - critic: Critic, - reward_model: RewardModel, - initial_model: Actor, - actor_optim: Optimizer, - critic_optim: Optimizer, - tokenizer: PreTrainedTokenizerBase, - kl_coef: float = 0.1, - chunk_size: int = 8, - ptx_coef: float = 0.9, - train_batch_size: int = 8, - buffer_limit: int = 0, - buffer_cpu_offload: bool = True, - eps_clip: float = 0.2, - vf_coef: float = 1.0, - value_clip: float = 0.4, - sample_buffer: bool = False, - dataloader_pin_memory: bool = True, - offload_inference_models: bool = True, - callbacks: List[Callback] = [], - **generate_kwargs - ) -> None: + def __init__( + self, + strategy: Strategy, + actor: Actor, + critic: Critic, + reward_model: RewardModel, + initial_model: Actor, + actor_optim: Optimizer, + critic_optim: Optimizer, + tokenizer: PreTrainedTokenizerBase, + kl_coef: float = 0.1, + chunk_size: int = 8, + ptx_coef: float = 0.9, + train_batch_size: int = 8, + buffer_limit: int = 0, + buffer_cpu_offload: bool = True, + eps_clip: float = 0.2, + vf_coef: float = 1.0, + value_clip: float = 0.4, + sample_buffer: bool = False, + dataloader_pin_memory: bool = True, + offload_inference_models: bool = True, + callbacks: List[Callback] = [], + **generate_kwargs, + ) -> None: if isinstance(strategy, GeminiStrategy): assert not offload_inference_models, "GeminiPlugin is not compatible with manual model.to('cpu')" @@ -92,8 +93,7 @@ def __init__(self, self.generate_kwargs = _set_default_generate_kwargs(strategy, generate_kwargs, actor) self.experience_maker = ChunkedExperienceMaker( - actor, critic, reward_model, initial_model, - tokenizer, chunk_size, kl_coef + actor, critic, reward_model, initial_model, tokenizer, chunk_size, kl_coef ) self.actor = actor @@ -115,11 +115,13 @@ def __init__(self, self.num_collect_step = 0 self.num_update_step = 0 - def _before_fit(self, - prompt_dataloader: DataLoader, - pretrain_dataloader: DataLoader, - log_dir: Optional[str] = None, - use_wandb: bool = False): + def _before_fit( + self, + prompt_dataloader: DataLoader, + pretrain_dataloader: DataLoader, + log_dir: Optional[str] = None, + use_wandb: bool = False, + ): """ Args: prompt_dataloader (DataLoader): the dataloader to use for prompt data @@ -154,7 +156,7 @@ def _make_experience(self, collect_step: int) -> Experience: experience, metrics = self.experience_maker.make_experience(**prompts, **self.generate_kwargs) if self.writer: for k, v in metrics.items(): - self.writer.add_scalar(f'collect/{k}', v, self.num_collect_step) + self.writer.add_scalar(f"collect/{k}", v, self.num_collect_step) self.num_collect_step += 1 return experience @@ -164,18 +166,21 @@ def _training_step(self, experience: Experience): step_mask = experience.step_mask num_samples = torch.sum(step_mask) - assert self.chunk_size == self.experience_maker.chunk_size, \ - "chunk_size of trainer and experience_maker must be the same" + assert ( + self.chunk_size == self.experience_maker.chunk_size + ), "chunk_size of trainer and experience_maker must be the same" # policy loss num_actions = experience.action_log_probs.size(1) actor_logits = self.actor(experience.sequences, experience.attention_mask)["logits"] action_log_probs = calc_action_log_probs(actor_logits, experience.sequences, num_actions) - actor_loss = self.actor_loss_fn(action_log_probs, - experience.action_log_probs, - experience.advantages, - action_mask=experience.action_mask, - chunk_size=self.experience_maker.chunk_size) + actor_loss = self.actor_loss_fn( + action_log_probs, + experience.action_log_probs, + experience.advantages, + action_mask=experience.action_mask, + chunk_size=self.experience_maker.chunk_size, + ) actor_loss = (1 - self.ptx_coef) * torch.sum(actor_loss * step_mask / num_samples) self.strategy.backward(actor_loss, self.actor, self.actor_optim) @@ -200,7 +205,7 @@ def _training_step(self, experience: Experience): # NOTE: sequences[:, :seq_len] must contain if sequences[:, seq_len - 1] is {, padding token} sequence_with_eos_mask[:, -1] = torch.logical_and( experience.sequences[:, seq_len - 1] != self.tokenizer.pad_token_id, - experience.sequences[:, seq_len - 1] != self.tokenizer.eos_token_id + experience.sequences[:, seq_len - 1] != self.tokenizer.eos_token_id, ) values = self.critic(sequence_with_eos, sequence_with_eos_mask) critic_loss = self.critic_loss_fn(values, experience.values[:, i], experience.returns[:, i]) @@ -211,10 +216,10 @@ def _training_step(self, experience: Experience): self.critic_optim.zero_grad() if self.writer: - self.writer.add_scalar('update/actor_loss', actor_loss.item(), self.num_update_step) - self.writer.add_scalar('update/critic_loss', critic_loss.item(), self.num_update_step) + self.writer.add_scalar("update/actor_loss", actor_loss.item(), self.num_update_step) + self.writer.add_scalar("update/critic_loss", critic_loss.item(), self.num_update_step) if self.ptx_coef != 0: - self.writer.add_scalar('update/ptx_loss', ptx_loss.item(), self.num_update_step) + self.writer.add_scalar("update/ptx_loss", ptx_loss.item(), self.num_update_step) self.num_update_step += 1 def _learn(self, update_step: int): diff --git a/applications/Chat/examples/inference.py b/applications/Chat/examples/inference.py index ccc8ee954bd4..62e06bf7b3bb 100644 --- a/applications/Chat/examples/inference.py +++ b/applications/Chat/examples/inference.py @@ -46,17 +46,17 @@ def eval(args): actor.eval() tokenizer.padding_side = "left" - input_ids = tokenizer.encode(args.input, - return_tensors='pt')\ - .to(torch.cuda.current_device()) - outputs = generate(actor, - input_ids, - tokenizer=tokenizer, - max_length=args.max_length, - do_sample=True, - top_k=50, - top_p=0.95, - num_return_sequences=1) + input_ids = tokenizer.encode(args.input, return_tensors="pt").to(torch.cuda.current_device()) + outputs = generate( + actor, + input_ids, + tokenizer=tokenizer, + max_length=args.max_length, + do_sample=True, + top_k=50, + top_p=0.95, + num_return_sequences=1, + ) output = tokenizer.batch_decode(outputs[0], skip_special_tokens=True) print(f"[Output]: {''.join(output)}") diff --git a/applications/Chat/examples/train_prompts.py b/applications/Chat/examples/train_prompts.py index 815f7d399209..5881aa18c061 100644 --- a/applications/Chat/examples/train_prompts.py +++ b/applications/Chat/examples/train_prompts.py @@ -210,35 +210,37 @@ def main(args): if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('--prompt_dataset', type=str, default=None, help='path to the prompt dataset') - parser.add_argument('--pretrain_dataset', type=str, default=None, help='path to the pretrained dataset') - parser.add_argument('--max_datasets_size', type=int, default=50000) - parser.add_argument('--strategy', - choices=['ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='colossalai_zero2', - help='strategy to use') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) - parser.add_argument('--tokenizer', type=str, default=None) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--rm_model', default=None, choices=['gpt2', 'bloom', 'opt', 'llama']) - parser.add_argument('--rm_path', type=str, default=None) - parser.add_argument('--rm_pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--num_collect_steps', type=int, default=10) - parser.add_argument('--num_update_steps', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--ptx_batch_size', type=int, default=1) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - parser.add_argument('--lr', type=float, default=1e-7) - parser.add_argument('--kl_coef', type=float, default=0.1) - parser.add_argument('--ptx_coef', type=float, default=0.9) - parser.add_argument('--max_input_len', type=int, default=96) - parser.add_argument('--max_seq_len', type=int, default=128) - parser.add_argument('--seq_chunk_size', type=int, default=8) - parser.add_argument('--log_dir', default='logs', type=str) - parser.add_argument('--use_wandb', default=False, action='store_true') + parser.add_argument("--prompt_dataset", type=str, default=None, help="path to the prompt dataset") + parser.add_argument("--pretrain_dataset", type=str, default=None, help="path to the pretrained dataset") + parser.add_argument("--max_datasets_size", type=int, default=50000) + parser.add_argument( + "--strategy", + choices=["ddp", "colossalai_gemini", "colossalai_zero2"], + default="colossalai_zero2", + help="strategy to use", + ) + parser.add_argument("--model", default="gpt2", choices=["gpt2", "bloom", "opt", "llama"]) + parser.add_argument("--tokenizer", type=str, default=None) + parser.add_argument("--pretrain", type=str, default=None) + parser.add_argument("--rm_model", default=None, choices=["gpt2", "bloom", "opt", "llama"]) + parser.add_argument("--rm_path", type=str, default=None) + parser.add_argument("--rm_pretrain", type=str, default=None) + parser.add_argument("--save_path", type=str, default="actor_checkpoint_prompts") + parser.add_argument("--need_optim_ckpt", type=bool, default=False) + parser.add_argument("--num_episodes", type=int, default=10) + parser.add_argument("--num_collect_steps", type=int, default=10) + parser.add_argument("--num_update_steps", type=int, default=5) + parser.add_argument("--train_batch_size", type=int, default=8) + parser.add_argument("--ptx_batch_size", type=int, default=1) + parser.add_argument("--experience_batch_size", type=int, default=8) + parser.add_argument("--lora_rank", type=int, default=0, help="low-rank adaptation matrices rank") + parser.add_argument("--lr", type=float, default=1e-7) + parser.add_argument("--kl_coef", type=float, default=0.1) + parser.add_argument("--ptx_coef", type=float, default=0.9) + parser.add_argument("--max_input_len", type=int, default=96) + parser.add_argument("--max_seq_len", type=int, default=128) + parser.add_argument("--seq_chunk_size", type=int, default=8) + parser.add_argument("--log_dir", default="logs", type=str) + parser.add_argument("--use_wandb", default=False, action="store_true") args = parser.parse_args() main(args) diff --git a/applications/Chat/tests/test_experience.py b/applications/Chat/tests/test_experience.py index 1a80decd2260..8238a3f27cc1 100644 --- a/applications/Chat/tests/test_experience.py +++ b/applications/Chat/tests/test_experience.py @@ -56,7 +56,7 @@ def make_and_consume_experience(strategy): actor, critic, initial_model, reward_model = strategy.prepare(actor, critic, initial_model, reward_model) - class MockTokenizer(): + class MockTokenizer: def __init__(self): self.padding_side = "left" self.eos_token_id = 0 @@ -72,8 +72,8 @@ def __init__(self): # experience of all ranks should be the same for _ in range(2): data = get_data(EXPERIENCE_BATCH_SIZE) - assert gather_and_equal(data['input_ids']) - assert gather_and_equal(data['attention_mask']) + assert gather_and_equal(data["input_ids"]) + assert gather_and_equal(data["attention_mask"]) experience, _ = experience_maker.make_experience(**data, **generate_kwargs) num_actions = experience.action_log_probs.size(1) chunk_size = experience_maker.chunk_size diff --git a/applications/Chat/tests/test_models.py b/applications/Chat/tests/test_models.py index 47c7ac6dd77e..b6e839ca14f6 100644 --- a/applications/Chat/tests/test_models.py +++ b/applications/Chat/tests/test_models.py @@ -19,27 +19,30 @@ @pytest.mark.parametrize("batch_size", [4]) @pytest.mark.parametrize("seq_len", [32]) -@pytest.mark.parametrize("actor_maker", [ - lambda: BLOOMActor(), - lambda: GPTActor(), - # HACK: skip llama due to long execution time - # lambda: LlamaActor(), - lambda: OPTActor() -]) -@pytest.mark.parametrize("generate_kwargs", [{ - "max_length": 64, - "use_cache": True, - "do_sample": True, - "temperature": 1.0, - "top_k": 50, -}]) -def test_generation(actor_maker: Callable[[], Actor], - batch_size: int, - seq_len: int, - generate_kwargs: Dict[str, Any] - ): - - class MockTokenizer(): +@pytest.mark.parametrize( + "actor_maker", + [ + lambda: BLOOMActor(), + lambda: GPTActor(), + # HACK: skip llama due to long execution time + # lambda: LlamaActor(), + lambda: OPTActor(), + ], +) +@pytest.mark.parametrize( + "generate_kwargs", + [ + { + "max_length": 64, + "use_cache": True, + "do_sample": True, + "temperature": 1.0, + "top_k": 50, + } + ], +) +def test_generation(actor_maker: Callable[[], Actor], batch_size: int, seq_len: int, generate_kwargs: Dict[str, Any]): + class MockTokenizer: def __init__(self): self.padding_side = "left" self.eos_token_id = 0 @@ -172,11 +175,7 @@ def test_models(models_maker: Callable[[], Tuple[Actor, Critic, RewardModel]], b @pytest.mark.parametrize("num_labels", [100]) @pytest.mark.parametrize("num_actions", [10]) @pytest.mark.parametrize("chunk_size", [4]) -def test_loss(batch_size: int, - seq_len: int, - num_labels: int, - num_actions: int, - chunk_size: int): +def test_loss(batch_size: int, seq_len: int, num_labels: int, num_actions: int, chunk_size: int): num_steps = (num_actions + chunk_size - 1) // chunk_size loss = GPTLMLoss() @@ -192,7 +191,7 @@ def test_loss(batch_size: int, "old_log_probs": torch.randn(batch_size, num_actions), "advantages": torch.randn(batch_size, num_steps), "action_mask": torch.randint(0, 2, (batch_size, num_actions)), - "chunk_size": chunk_size + "chunk_size": chunk_size, } loss_output = loss(**loss_input) assert loss_output.shape == (batch_size, num_steps) From a1426aa7b9ac538bdcc38a1140376bdff25b7811 Mon Sep 17 00:00:00 2001 From: CWHer Date: Thu, 21 Sep 2023 11:51:16 +0800 Subject: [PATCH 20/20] fix: fix missing arg and import --- applications/Chat/coati/trainer/ppo.py | 2 ++ applications/Chat/examples/train_prompts.py | 1 + 2 files changed, 3 insertions(+) diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index 444a85fdc23a..96c619241c98 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -1,5 +1,7 @@ from typing import Dict, List, Optional +import torch +import torch.nn.functional as F from coati.experience_buffer import NaiveExperienceBuffer from coati.experience_maker import ChunkedExperienceMaker, Experience from coati.models.base import Actor, Critic, RewardModel, get_base_model diff --git a/applications/Chat/examples/train_prompts.py b/applications/Chat/examples/train_prompts.py index 5881aa18c061..2a19bfbb6b66 100644 --- a/applications/Chat/examples/train_prompts.py +++ b/applications/Chat/examples/train_prompts.py @@ -234,6 +234,7 @@ def main(args): parser.add_argument("--ptx_batch_size", type=int, default=1) parser.add_argument("--experience_batch_size", type=int, default=8) parser.add_argument("--lora_rank", type=int, default=0, help="low-rank adaptation matrices rank") + parser.add_argument("--merge_lora_weights", type=bool, default=True) parser.add_argument("--lr", type=float, default=1e-7) parser.add_argument("--kl_coef", type=float, default=0.1) parser.add_argument("--ptx_coef", type=float, default=0.9)