From 2ba1efdf1ec5f027c49559b5a88978995bb0e04d Mon Sep 17 00:00:00 2001 From: ver217 Date: Fri, 21 Apr 2023 11:57:13 +0800 Subject: [PATCH 1/6] [chat] add performance evaluator for ray --- applications/Chat/coati/ray/example/1m1t.py | 71 ++++++----- .../Chat/coati/ray/example/1m1t_quantize.py | 63 +++++---- .../coati/ray/src/detached_replay_buffer.py | 41 +++--- .../coati/ray/src/detached_trainer_base.py | 52 ++++++-- .../coati/ray/src/detached_trainer_ppo.py | 120 +++++++++++------- .../coati/ray/src/experience_maker_holder.py | 96 ++++++++++---- applications/Chat/coati/ray/src/utils.py | 43 ++++--- .../callbacks/performance_evaluator.py | 89 +++++++++++++ 8 files changed, 400 insertions(+), 175 deletions(-) diff --git a/applications/Chat/coati/ray/example/1m1t.py b/applications/Chat/coati/ray/example/1m1t.py index a6527370505b..4ad724c1e354 100644 --- a/applications/Chat/coati/ray/example/1m1t.py +++ b/applications/Chat/coati/ray/example/1m1t.py @@ -1,25 +1,26 @@ import argparse +import os +import socket from copy import deepcopy import pandas as pd +import ray import torch -from coati.trainer import PPOTrainer - - -from coati.ray.src.experience_maker_holder import ExperienceMakerHolder +from coati.experience_maker import NaiveExperienceMaker from coati.ray.src.detached_trainer_ppo import DetachedPPOTrainer - +from coati.ray.src.experience_maker_holder import ExperienceMakerHolder +from coati.trainer import PPOTrainer +from coati.trainer.callbacks.performance_evaluator import ( + ExperienceMakerPerformanceEvaluator, + TrainerPerformaceEvaluator, +) from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from coati.experience_maker import NaiveExperienceMaker from torch.optim import Adam from transformers import AutoTokenizer, BloomTokenizerFast from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer from colossalai.nn.optimizer import HybridAdam -import ray -import os -import socket def get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -31,24 +32,29 @@ def get_local_ip(): with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(('8.8.8.8', 80)) return s.getsockname()[0] - + + def main(args): master_addr = str(get_local_ip()) # trainer_env_info trainer_port = str(get_free_port()) - env_info_trainer = {'local_rank' : '0', - 'rank' : '0', - 'world_size' : '1', - 'master_port' : trainer_port, - 'master_addr' : master_addr} - + env_info_trainer = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '1', + 'master_port': trainer_port, + 'master_addr': master_addr + } + # maker_env_info maker_port = str(get_free_port()) - env_info_maker = {'local_rank' : '0', - 'rank' : '0', - 'world_size' : '1', - 'master_port' : maker_port, - 'master_addr' : master_addr} + env_info_maker = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '1', + 'master_port': maker_port, + 'master_addr': master_addr + } # configure tokenizer if args.model == 'gpt2': @@ -67,20 +73,21 @@ def main(args): experience_maker_holder_name_list=["maker1"], strategy=args.trainer_strategy, model=args.model, - env_info = env_info_trainer, + env_info=env_info_trainer, pretrained=args.pretrain, lora_rank=args.lora_rank, train_batch_size=args.train_batch_size, buffer_limit=16, experience_batch_size=args.experience_batch_size, max_epochs=args.max_epochs, - #kwargs: + # kwargs: max_length=128, do_sample=True, temperature=1.0, top_k=50, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, + eval_performance=True, debug=args.debug, ) @@ -88,16 +95,17 @@ def main(args): experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( detached_trainer_name_list=["trainer1"], strategy=args.maker_strategy, - env_info = env_info_maker, + env_info=env_info_maker, experience_batch_size=args.experience_batch_size, kl_coef=0.1, - #kwargs: + # kwargs: max_length=128, do_sample=True, temperature=1.0, top_k=50, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, + eval_performance=True, debug=args.debug, ) @@ -113,19 +121,24 @@ def tokenize_fn(texts): batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) return {k: v.cuda() for k, v in batch.items()} - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, max_timesteps=args.max_timesteps, update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * args.max_epochs + 3 # +3 for fault tolerance + trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, + max_timesteps=args.max_timesteps, + update_timesteps=args.update_timesteps) + num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ + args.max_epochs + 3 # +3 for fault tolerance maker_done_ref = experience_holder_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - + ray.get([trainer_done_ref, maker_done_ref]) # save model checkpoint after fitting trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) # save optimizer checkpoint on all ranks if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % (torch.cuda.current_device()), + trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % + (torch.cuda.current_device()), only_rank0=False) + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('prompt_path') diff --git a/applications/Chat/coati/ray/example/1m1t_quantize.py b/applications/Chat/coati/ray/example/1m1t_quantize.py index 12a60fd65d8b..dc9c9bf9a1f3 100644 --- a/applications/Chat/coati/ray/example/1m1t_quantize.py +++ b/applications/Chat/coati/ray/example/1m1t_quantize.py @@ -1,16 +1,16 @@ import argparse -import pandas as pd -import torch -import ray import os import socket -from coati.ray.src.experience_maker_holder import ExperienceMakerHolder +import pandas as pd +import ray +import torch from coati.ray.src.detached_trainer_ppo import DetachedPPOTrainer - +from coati.ray.src.experience_maker_holder import ExperienceMakerHolder from transformers import AutoTokenizer, BloomTokenizerFast from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer + def get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) @@ -21,25 +21,29 @@ def get_local_ip(): with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(('8.8.8.8', 80)) return s.getsockname()[0] - + def main(args): master_addr = str(get_local_ip()) # trainer_env_info trainer_port = str(get_free_port()) - env_info_trainer = {'local_rank' : '0', - 'rank' : '0', - 'world_size' : '1', - 'master_port' : trainer_port, - 'master_addr' : master_addr} - + env_info_trainer = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '1', + 'master_port': trainer_port, + 'master_addr': master_addr + } + # maker_env_info maker_port = str(get_free_port()) - env_info_maker = {'local_rank' : '0', - 'rank' : '0', - 'world_size' : '1', - 'master_port' : maker_port, - 'master_addr' : master_addr} + env_info_maker = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '1', + 'master_port': maker_port, + 'master_addr': master_addr + } # configure tokenizer if args.model == 'gpt2': @@ -58,14 +62,14 @@ def main(args): experience_maker_holder_name_list=["maker1"], strategy=args.trainer_strategy, model=args.model, - env_info = env_info_trainer, + env_info=env_info_trainer, pretrained=args.pretrain, lora_rank=args.lora_rank, train_batch_size=args.train_batch_size, buffer_limit=16, experience_batch_size=args.experience_batch_size, max_epochs=args.max_epochs, - #kwargs: + # kwargs: max_length=128, do_sample=True, temperature=1.0, @@ -73,16 +77,17 @@ def main(args): pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, debug=args.debug, + eval_performance=True, ) # configure Experience Maker experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( detached_trainer_name_list=["trainer1"], strategy=args.maker_strategy, - env_info = env_info_maker, + env_info=env_info_maker, experience_batch_size=args.experience_batch_size, kl_coef=0.1, - #kwargs: + # kwargs: max_length=128, do_sample=True, temperature=1.0, @@ -90,14 +95,13 @@ def main(args): pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, debug=args.debug, + eval_performance=True, ) # a 'jump wire' to set quantized initial_model and reward_model - # trainer send its actor and critic to experience holders. # ray.get(trainer_ref.initialize_remote_makers.remote()) - # configure sampler dataset = pd.read_csv(args.prompt_path)['prompt'] @@ -107,19 +111,24 @@ def tokenize_fn(texts): batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) return {k: v.cuda() for k, v in batch.items()} - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, max_timesteps=args.max_timesteps, update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * args.max_epochs + 3 # +3 for fault tolerance + trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, + max_timesteps=args.max_timesteps, + update_timesteps=args.update_timesteps) + num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ + args.max_epochs + 3 # +3 for fault tolerance maker_done_ref = experience_holder_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - + ray.get([trainer_done_ref, maker_done_ref]) # save model checkpoint after fitting trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) # save optimizer checkpoint on all ranks if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % (torch.cuda.current_device()), + trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % + (torch.cuda.current_device()), only_rank0=False) + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('prompt_path') diff --git a/applications/Chat/coati/ray/src/detached_replay_buffer.py b/applications/Chat/coati/ray/src/detached_replay_buffer.py index 855eee48c5a5..4bc74bb878fd 100644 --- a/applications/Chat/coati/ray/src/detached_replay_buffer.py +++ b/applications/Chat/coati/ray/src/detached_replay_buffer.py @@ -1,22 +1,24 @@ -import torch +import asyncio +import copy import random -from typing import List, Any -# from torch.multiprocessing import Queue -from ray.util.queue import Queue +from threading import Lock +from typing import Any, List + import ray -import asyncio +import torch from coati.experience_maker.base import Experience -from coati.replay_buffer.utils import BufferItem, make_experience_batch, split_experience_batch from coati.replay_buffer import ReplayBuffer -from threading import Lock -import copy +from coati.replay_buffer.utils import BufferItem, make_experience_batch, split_experience_batch +# from torch.multiprocessing import Queue +from ray.util.queue import Queue + class DetachedReplayBuffer: ''' - Detached replay buffer. Share Experience across workers on the same node. - Therefore a trainer node is expected to have only one instance. + Detached replay buffer. Share Experience across workers on the same node. + Therefore a trainer node is expected to have only one instance. It is ExperienceMakerHolder's duty to call append(exp) method, remotely. - + Args: sample_batch_size: Batch size when sampling. Exp won't enqueue until they formed a batch. tp_world_size: Number of workers in the same tp group @@ -24,13 +26,16 @@ class DetachedReplayBuffer: cpu_offload: Whether to offload experience to cpu when sampling. Defaults to True. ''' - def __init__(self, sample_batch_size: int, tp_world_size: int = 1, limit : int = 0, cpu_offload: bool = True) -> None: + def __init__(self, + sample_batch_size: int, + tp_world_size: int = 1, + limit: int = 0, + cpu_offload: bool = True) -> None: self.cpu_offload = cpu_offload self.sample_batch_size = sample_batch_size self.limit = limit - self.items = Queue(self.limit, actor_options={"num_cpus":1}) - self.batch_collector : List[BufferItem] = [] - + self.items = Queue(self.limit, actor_options={"num_cpus": 1}) + self.batch_collector: List[BufferItem] = [] ''' Workers in the same tp group share this buffer and need same sample for one step. Therefore a held_sample should be returned tp_world_size times before it could be dropped. @@ -62,9 +67,9 @@ def clear(self) -> None: self.items = Queue(self.limit) self.worker_state = [False] * self.tp_world_size self.batch_collector = [] - + @torch.no_grad() - def sample(self, worker_rank = 0, to_device = "cpu") -> Experience: + def sample(self, worker_rank=0, to_device="cpu") -> Experience: self._worker_state_lock.acquire() if not any(self.worker_state): self.held_sample = self._sample_and_erase() @@ -85,4 +90,4 @@ def _sample_and_erase(self) -> Experience: def get_length(self) -> int: ret = self.items.qsize() - return ret \ No newline at end of file + return ret diff --git a/applications/Chat/coati/ray/src/detached_trainer_base.py b/applications/Chat/coati/ray/src/detached_trainer_base.py index f5e52e8a3b3a..1d6b2e99df4b 100644 --- a/applications/Chat/coati/ray/src/detached_trainer_base.py +++ b/applications/Chat/coati/ray/src/detached_trainer_base.py @@ -1,17 +1,19 @@ +import os from abc import ABC, abstractmethod from typing import Any, Callable, Dict, List, Optional, Union -from tqdm import tqdm -from coati.trainer.callbacks import Callback -from coati.experience_maker import Experience + import ray -import os +from coati.experience_maker import Experience +from coati.trainer.callbacks import Callback +from tqdm import tqdm from .detached_replay_buffer import DetachedReplayBuffer from .utils import is_rank_0 + class DetachedTrainer(ABC): ''' - Base class for detached rlhf trainers. + Base class for detached rlhf trainers. 'detach' means that the experience maker is detached compared to a normal Trainer. Please set name attribute during init: >>> trainer = DetachedTrainer.options(..., name = "xxx", ...).remote() @@ -38,7 +40,9 @@ def __init__(self, callbacks: List[Callback] = [], **generate_kwargs) -> None: super().__init__() - self.detached_replay_buffer = DetachedReplayBuffer(train_batch_size, limit=buffer_limit, cpu_offload=buffer_cpu_offload) + self.detached_replay_buffer = DetachedReplayBuffer(train_batch_size, + limit=buffer_limit, + cpu_offload=buffer_cpu_offload) self.experience_batch_size = experience_batch_size self.max_epochs = max_epochs self.dataloader_pin_memory = dataloader_pin_memory @@ -46,7 +50,7 @@ def __init__(self, self.generate_kwargs = generate_kwargs self.target_holder_name_list = experience_maker_holder_name_list self.target_holder_list = [] - + if 'debug' in self.generate_kwargs and self.generate_kwargs['debug'] == True: self._debug = True else: @@ -69,13 +73,15 @@ def training_step(self, experience: Experience) -> Dict[str, Any]: def _learn(self): pbar = tqdm(range(self.max_epochs), desc='Train epoch', disable=not is_rank_0()) for _ in pbar: - if self._debug: + if self._debug: print("[trainer] sampling exp") experience = self._buffer_sample() - if self._debug: + if self._debug: print("[trainer] training step") + self._on_learn_batch_start() metrics = self.training_step(experience) - if self._debug: + self._on_learn_batch_end(metrics, experience) + if self._debug: print("[trainer] step over") pbar.set_postfix(metrics) @@ -90,18 +96,19 @@ def fit(self, num_episodes: int = 50000, max_timesteps: int = 500, update_timest self._update_remote_makers() self._on_episode_end(episode) self._on_fit_end() + self._on_finish() @ray.method(concurrency_group="buffer_length") def buffer_get_length(self): # called by ExperienceMakerHolder - if self._debug: + if self._debug: print("[trainer] telling length") return self.detached_replay_buffer.get_length() @ray.method(concurrency_group="buffer_append") def buffer_append(self, experience: Experience): # called by ExperienceMakerHolder - if self._debug: + if self._debug: print(f"[trainer] receiving exp.") self.detached_replay_buffer.append(experience) @@ -124,3 +131,24 @@ def _on_episode_start(self, episode: int) -> None: def _on_episode_end(self, episode: int) -> None: for callback in self.callbacks: callback.on_episode_end(episode) + + def _on_learn_epoch_start(self, epoch: int) -> None: + for callback in self.callbacks: + callback.on_learn_epoch_start(epoch) + + def _on_learn_epoch_end(self, epoch: int) -> None: + for callback in self.callbacks: + callback.on_learn_epoch_end(epoch) + + def _on_learn_batch_start(self) -> None: + for callback in self.callbacks: + callback.on_learn_batch_start() + + def _on_learn_batch_end(self, metrics: dict, experience: Experience) -> None: + for callback in self.callbacks: + callback.on_learn_batch_end(metrics, experience) + + def _on_finish(self) -> None: + for callback in self.callbacks: + if hasattr(callback, 'on_finish'): + callback.on_finish() diff --git a/applications/Chat/coati/ray/src/detached_trainer_ppo.py b/applications/Chat/coati/ray/src/detached_trainer_ppo.py index 071f0ddab2b9..3956672bd9da 100644 --- a/applications/Chat/coati/ray/src/detached_trainer_ppo.py +++ b/applications/Chat/coati/ray/src/detached_trainer_ppo.py @@ -1,26 +1,37 @@ from typing import Any, Callable, Dict, List, Optional -import torch -from torch.optim import Adam +import ray +import torch from coati.experience_maker import Experience, NaiveExperienceMaker from coati.models.base import Actor, Critic from coati.models.generation_utils import update_model_kwargs_fn from coati.models.loss import PolicyLoss, ValueLoss -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy, Strategy from coati.trainer.callbacks import Callback +from coati.trainer.callbacks.performance_evaluator import TrainerPerformaceEvaluator +from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy, Strategy +from torch.optim import Adam from colossalai.nn.optimizer import HybridAdam -import ray - - -from .utils import is_rank_0, get_actor_from_args, get_critic_from_args, get_strategy_from_args, set_dist_env, \ - state_dict_to - from .detached_trainer_base import DetachedTrainer - - -@ray.remote(concurrency_groups={"buffer_length": 1, "buffer_append": 1, "buffer_sample": 1, "model_io": 1, "compute": 1}) +from .utils import ( + get_actor_from_args, + get_critic_from_args, + get_model_numel, + get_strategy_from_args, + is_rank_0, + set_dist_env, + state_dict_to, +) + + +@ray.remote(concurrency_groups={ + "buffer_length": 1, + "buffer_append": 1, + "buffer_sample": 1, + "model_io": 1, + "compute": 1 +}) class DetachedPPOTrainer(DetachedTrainer): ''' Detached Trainer for PPO algorithm @@ -42,26 +53,28 @@ class DetachedPPOTrainer(DetachedTrainer): generate_kwargs (dict, optional): the kwargs to use while model generating ''' - def __init__(self, - experience_maker_holder_name_list: List[str], - strategy: str, - model: str, - pretrained: str = None, - lora_rank: int = 0, - cr_model: str = None, # if not None, use below cr settings for critic - cr_pretrained: str = None, - cr_lora_rank: int = 0, - env_info: Dict[str, str] = None, - train_batch_size: int = 8, - buffer_limit: int = 0, - buffer_cpu_offload: bool = True, - eps_clip: float = 0.2, - value_clip: float = 0.4, - experience_batch_size: int = 8, - max_epochs: int = 10, - dataloader_pin_memory: bool = True, - callbacks: List[Callback] = [], - **generate_kwargs) -> None: + def __init__( + self, + experience_maker_holder_name_list: List[str], + strategy: str, + model: str, + pretrained: str = None, + lora_rank: int = 0, + cr_model: str = None, # if not None, use below cr settings for critic + cr_pretrained: str = None, + cr_lora_rank: int = 0, + env_info: Dict[str, str] = None, + train_batch_size: int = 8, + buffer_limit: int = 0, + buffer_cpu_offload: bool = True, + eps_clip: float = 0.2, + value_clip: float = 0.4, + experience_batch_size: int = 8, + max_epochs: int = 10, + dataloader_pin_memory: bool = True, + callbacks: List[Callback] = [], + eval_performance: bool = False, + **generate_kwargs) -> None: # set environment variables if env_info: set_dist_env(env_info=env_info) @@ -77,10 +90,15 @@ def __init__(self, self.actor = get_actor_from_args(model, pretrained, lora_rank) self.critic = get_critic_from_args(cr_model, cr_pretrained, cr_lora_rank) - if strategy != 'colossalai_gemini': - self.actor.to(torch.cuda.current_device()) #.to(torch.float16) - self.critic.to(torch.cuda.current_device()) #.to(torch.float16) + if eval_performance: + actor_numel = get_model_numel(self.actor) + critic_numel = get_model_numel(self.critic) + evaluator = TrainerPerformaceEvaluator(actor_numel, critic_numel) + callbacks = callbacks + [evaluator] + if strategy != 'colossalai_gemini': + self.actor.to(torch.cuda.current_device()) # .to(torch.float16) + self.critic.to(torch.cuda.current_device()) # .to(torch.float16) if strategy.startswith('colossalai'): self.actor_optim = HybridAdam(self.actor.parameters(), lr=1e-7) @@ -119,25 +137,27 @@ def _update_remote_makers(self, **config): if is_rank_0(): self.update_target_holder_list(self.target_holder_name_list) with torch.no_grad(): - # actor: + # actor: # mark start for target_holder in self.target_holder_list: target_holder.update_experience_maker.remote(chunk_start=True) # sending loop - for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_actor(self.actor), **config): + for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_actor(self.actor), + **config): for target_holder in self.target_holder_list: - target_holder.update_experience_maker.remote(new_actor_state_dict = state_dict_shard) + target_holder.update_experience_maker.remote(new_actor_state_dict=state_dict_shard) # mark end for target_holder in self.target_holder_list: target_holder.update_experience_maker.remote(chunk_end=True) # critic - # mark start + # mark start for target_holder in self.target_holder_list: target_holder.update_experience_maker.remote(chunk_start=True) # sending loop - for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_critic(self.critic), **config): + for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_critic(self.critic), + **config): for target_holder in self.target_holder_list: - target_holder.update_experience_maker.remote(new_critic_state_dict = state_dict_shard) + target_holder.update_experience_maker.remote(new_critic_state_dict=state_dict_shard) # mark end for target_holder in self.target_holder_list: target_holder.update_experience_maker.remote(chunk_end=True) @@ -148,23 +168,29 @@ def initialize_remote_makers(self, **config): if is_rank_0(): self.update_target_holder_list(self.target_holder_name_list) with torch.no_grad(): - # actor / initial_model: + # actor / initial_model: # mark start for target_holder in self.target_holder_list: - target_holder.initialize_experience_maker.remote(actor_model=self._model_str,actor_pretrained=self._pretrained,chunk_start=True) + target_holder.initialize_experience_maker.remote(actor_model=self._model_str, + actor_pretrained=self._pretrained, + chunk_start=True) # sending loop - for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_actor(self.actor), **config): + for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_actor(self.actor), + **config): for target_holder in self.target_holder_list: target_holder.initialize_experience_maker.remote(actor_state_dict=state_dict_shard) # mark end for target_holder in self.target_holder_list: target_holder.initialize_experience_maker.remote(actor_model=self._model_str, chunk_end=True) # critic / reward_model: - # mark start + # mark start for target_holder in self.target_holder_list: - target_holder.initialize_experience_maker.remote(critic_model=self._cr_model_str,critic_pretrained=self._cr_pretrained,chunk_start=True) + target_holder.initialize_experience_maker.remote(critic_model=self._cr_model_str, + critic_pretrained=self._cr_pretrained, + chunk_start=True) # sending loop - for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_critic(self.critic), **config): + for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_critic(self.critic), + **config): for target_holder in self.target_holder_list: target_holder.initialize_experience_maker.remote(critic_state_dict=state_dict_shard) # mark end diff --git a/applications/Chat/coati/ray/src/experience_maker_holder.py b/applications/Chat/coati/ray/src/experience_maker_holder.py index 67b89a68119a..5a0fe28e2732 100644 --- a/applications/Chat/coati/ray/src/experience_maker_holder.py +++ b/applications/Chat/coati/ray/src/experience_maker_holder.py @@ -1,22 +1,31 @@ -import torch +import os +import time +import tracemalloc +from copy import deepcopy +from threading import Lock from typing import Any, Callable, Dict, List, Optional, Union + import ray -from ray.exceptions import GetTimeoutError -from torch import Tensor +import torch import torch.nn as nn +from coati.experience_maker import Experience, ExperienceMaker, NaiveExperienceMaker from coati.models.base import Actor, Critic, RewardModel -from coati.trainer.strategies.sampler import DistributedSampler +from coati.trainer.callbacks import Callback +from coati.trainer.callbacks.performance_evaluator import ExperienceMakerPerformanceEvaluator from coati.trainer.strategies import Strategy -from coati.experience_maker import NaiveExperienceMaker, Experience, ExperienceMaker - -from copy import deepcopy -from threading import Lock -import time -import os -import tracemalloc +from coati.trainer.strategies.sampler import DistributedSampler +from ray.exceptions import GetTimeoutError +from torch import Tensor -from .utils import is_rank_0, get_strategy_from_args, set_dist_env, get_actor_from_args, get_critic_from_args, \ - get_reward_model_from_args +from .utils import ( + get_actor_from_args, + get_critic_from_args, + get_model_numel, + get_reward_model_from_args, + get_strategy_from_args, + is_rank_0, + set_dist_env, +) @ray.remote(concurrency_groups={"experience_io": 1, "model_io": 1, "compute": 1}) @@ -24,7 +33,7 @@ class ExperienceMakerHolder: ''' Args: detached_trainer_name_list: str list to get ray actor handles - strategy: + strategy: experience_batch_size: batch size of generated experience kl_coef: the coefficient of kl divergence loss ''' @@ -35,6 +44,8 @@ def __init__(self, env_info: Dict[str, str] = None, experience_batch_size: int = 8, kl_coef: float = 0.1, + callbacks: List[Callback] = [], + eval_performance: bool = False, **generate_kwargs): # set environment variables if env_info: @@ -49,6 +60,8 @@ def __init__(self, self.generate_kwargs = generate_kwargs actor, critic, reward_model, initial_model = None, None, None, None self.experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, self.kl_coef) + self.callbacks = callbacks + self.eval_performance = eval_performance self._model_visit_lock = Lock() self._initial_model_initialized = False @@ -68,6 +81,15 @@ def __init__(self, def _get_ready(self): while not self._fully_initialized(): time.sleep(1.0) + # setup performance evaluator + if self.eval_performance: + actor_numel = get_model_numel(self.experience_maker.actor) + critic_numel = get_model_numel(self.experience_maker.critic) + initial_model_numel = get_model_numel(self.experience_maker.initial_model) + reward_model_numel = get_model_numel(self.experience_maker.reward_model) + evaluator = ExperienceMakerPerformanceEvaluator(actor_numel, critic_numel, initial_model_numel, + reward_model_numel) + self.callbacks.append(evaluator) def _fully_initialized(self): if not self._initial_model_initialized: @@ -139,9 +161,12 @@ def workingloop(self, dataset, tokenizer: Optional[Callable[[Any], dict]] = None else: inputs = rand_prompts self._model_visit_lock.acquire() + self._on_make_experience_start() experience = self._make_experience(inputs=inputs) + self._on_make_experience_end(experience) self._model_visit_lock.release() self._send_experience(experience=experience) + self._on_finish() @ray.method(concurrency_group="model_io") def initialize_experience_maker(self, @@ -158,7 +183,7 @@ def initialize_experience_maker(self, chunk_start: Set True at the first call. Before sending state_dict calls chunk_end: Set True at the last call. After sending state_dict calls. - TODO: load_state_dict integrate with model-sharding strategy + TODO: load_state_dict integrate with model-sharding strategy ''' if self._fully_initialized(): return @@ -170,13 +195,17 @@ def initialize_experience_maker(self, # (csric) any better way to get model structure? with self.strategy.model_init_context(): if not self._actor_initialized and actor_model is not None: - self.experience_maker.actor = get_actor_from_args(actor_model, actor_pretrained).half().requires_grad_(False) + self.experience_maker.actor = get_actor_from_args(actor_model, + actor_pretrained).half().requires_grad_(False) if not self._critic_initialized and critic_model is not None: - self.experience_maker.critic = get_critic_from_args(critic_model, critic_pretrained).half().requires_grad_(False) + self.experience_maker.critic = get_critic_from_args( + critic_model, critic_pretrained).half().requires_grad_(False) if not self._initial_model_initialized and actor_model is not None: - self.experience_maker.initial_model = get_actor_from_args(actor_model, actor_pretrained).half().requires_grad_(False) + self.experience_maker.initial_model = get_actor_from_args( + actor_model, actor_pretrained).half().requires_grad_(False) if not self._reward_model_initialized and critic_model is not None: - self.experience_maker.reward_model = get_reward_model_from_args(critic_model, critic_pretrained).half().requires_grad_(False) + self.experience_maker.reward_model = get_reward_model_from_args( + critic_model, critic_pretrained).half().requires_grad_(False) with torch.no_grad(): if not self._actor_initialized and actor_state_dict is not None: @@ -188,25 +217,27 @@ def initialize_experience_maker(self, if not self._reward_model_initialized and critic_state_dict is not None: self.experience_maker.reward_model.load_state_dict(critic_state_dict, strict=False) - if chunk_end: with torch.no_grad(): if actor_model is not None: if not self._actor_initialized: - self.experience_maker.actor = self.strategy.prepare(self.experience_maker.actor.to(torch.cuda.current_device())) + self.experience_maker.actor = self.strategy.prepare( + self.experience_maker.actor.to(torch.cuda.current_device())) if not self._initial_model_initialized: - self.experience_maker.initial_model = self.strategy.prepare(self.experience_maker.initial_model.to(torch.cuda.current_device())) + self.experience_maker.initial_model = self.strategy.prepare( + self.experience_maker.initial_model.to(torch.cuda.current_device())) self._actor_initialized = True self._initial_model_initialized = True if critic_model is not None: if not self._critic_initialized: - self.experience_maker.critic = self.strategy.prepare(self.experience_maker.critic.to(torch.cuda.current_device())) + self.experience_maker.critic = self.strategy.prepare( + self.experience_maker.critic.to(torch.cuda.current_device())) if not self._reward_model_initialized: - self.experience_maker.reward_model = self.strategy.prepare(self.experience_maker.reward_model.to(torch.cuda.current_device())) + self.experience_maker.reward_model = self.strategy.prepare( + self.experience_maker.reward_model.to(torch.cuda.current_device())) self._critic_initialized = True self._reward_model_initialized = True - def initialize_experience_maker_local(self, initial_model_func=None, reward_model_func=None, @@ -241,7 +272,7 @@ def update_experience_maker(self, called by trainer chunk_start: Set True at the first call. Before sending state_dict calls chunk_end: Set True at the last call. After sending state_dict calls. - + TODO: load_state_dict integrate with model-sharding strategy ''' _watch_memory = True @@ -264,3 +295,16 @@ def update_experience_maker(self, current, peak = tracemalloc.get_traced_memory() print(f"Current memory usage is {current / 10**6}MB; Peak was {peak / 10**6}MB") tracemalloc.stop() + + def _on_make_experience_start(self) -> None: + for callback in self.callbacks: + callback.on_make_experience_start() + + def _on_make_experience_end(self, experience: Experience) -> None: + for callback in self.callbacks: + callback.on_make_experience_end(experience) + + def _on_finish(self) -> None: + for callback in self.callbacks: + if hasattr(callback, 'on_finish'): + callback.on_finish() diff --git a/applications/Chat/coati/ray/src/utils.py b/applications/Chat/coati/ray/src/utils.py index 827c2b8c6dc9..80848a5b9cee 100644 --- a/applications/Chat/coati/ray/src/utils.py +++ b/applications/Chat/coati/ray/src/utils.py @@ -1,21 +1,24 @@ -import torch.distributed as dist +import os from typing import Any, Callable, Dict, List, Optional -from coati.models.bloom import BLOOMActor, BLOOMCritic, BLOOMRM -from coati.models.gpt import GPTActor, GPTCritic, GPTRM -from coati.models.opt import OPTActor, OPTCritic, OPTRM -from coati.models.roberta import RoBERTaRM, RoBERTaActor, RoBERTaCritic -from coati.models.llama import LlamaActor, LlamaCritic, LlamaRM -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy import torch -import os +import torch.distributed as dist +import torch.nn as nn +from coati.models.bloom import BLOOMRM, BLOOMActor, BLOOMCritic +from coati.models.gpt import GPTRM, GPTActor, GPTCritic +from coati.models.llama import LlamaActor, LlamaCritic, LlamaRM +from coati.models.opt import OPTRM, OPTActor, OPTCritic +from coati.models.roberta import RoBERTaActor, RoBERTaCritic, RoBERTaRM +from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy +from coati.utils import prepare_llama_tokenizer_and_embedding +from transformers import AutoTokenizer, BloomTokenizerFast, GPT2Tokenizer, LlamaTokenizer, RobertaTokenizer def is_rank_0() -> bool: return not dist.is_initialized() or dist.get_rank() == 0 -def get_actor_from_args(model: str, pretrained: str = None, lora_rank = 0): +def get_actor_from_args(model: str, pretrained: str = None, lora_rank=0): if model == 'gpt2': actor = GPTActor(pretrained=pretrained, lora_rank=lora_rank) elif model == 'bloom': @@ -30,7 +33,8 @@ def get_actor_from_args(model: str, pretrained: str = None, lora_rank = 0): raise ValueError(f'Unsupported actor model "{model}"') return actor -def get_critic_from_args(model: str, pretrained: str = None, lora_rank = 0): + +def get_critic_from_args(model: str, pretrained: str = None, lora_rank=0): if model == 'gpt2': critic = GPTCritic(pretrained=pretrained, lora_rank=lora_rank, use_action_mask=True) elif model == 'bloom': @@ -45,6 +49,7 @@ def get_critic_from_args(model: str, pretrained: str = None, lora_rank = 0): raise ValueError(f'Unsupported reward model "{model}"') return critic + def get_reward_model_from_args(model: str, pretrained: str = None): if model == 'gpt2': reward_model = GPTRM(pretrained=pretrained) @@ -60,6 +65,7 @@ def get_reward_model_from_args(model: str, pretrained: str = None): raise ValueError(f'Unsupported reward model "{model}"') return reward_model + def get_strategy_from_args(strategy: str): if strategy == 'naive': strategy_ = NaiveStrategy() @@ -74,9 +80,6 @@ def get_strategy_from_args(strategy: str): return strategy_ -from transformers import AutoTokenizer, BloomTokenizerFast, GPT2Tokenizer, LlamaTokenizer, RobertaTokenizer -from coati.utils import prepare_llama_tokenizer_and_embedding - def get_tokenizer_from_args(model: str, **kwargs): if model == 'gpt2': tokenizer = GPT2Tokenizer.from_pretrained('gpt2') @@ -95,6 +98,7 @@ def get_tokenizer_from_args(model: str, **kwargs): tokenizer.pad_token = tokenizer.eos_token return tokenizer + def set_dist_env(env_info: Dict[str, str]): os.environ["RANK"] = env_info['rank'] os.environ["LOCAL_RANK"] = env_info['local_rank'] @@ -103,11 +107,18 @@ def set_dist_env(env_info: Dict[str, str]): os.environ['MASTER_ADDR'] = env_info['master_addr'] -def state_dict_to(state_dict: Dict[str, Any], dtype: torch.dtype = torch.float16, device: torch.device = torch.device('cpu')): +def state_dict_to(state_dict: Dict[str, Any], + dtype: torch.dtype = torch.float16, + device: torch.device = torch.device('cpu')): ''' keep state_dict intact ''' new_state_dict = {} for k, v in state_dict.items(): - new_state_dict[k] = v.to(dtype = dtype, device = device) - return new_state_dict \ No newline at end of file + new_state_dict[k] = v.to(dtype=dtype, device=device) + return new_state_dict + + +def get_model_numel(model: nn.Module) -> int: + numel = sum(p.numel() for p in model.parameters()) + return numel diff --git a/applications/Chat/coati/trainer/callbacks/performance_evaluator.py b/applications/Chat/coati/trainer/callbacks/performance_evaluator.py index 5ca44a52d6e7..0aebd2bf6280 100644 --- a/applications/Chat/coati/trainer/callbacks/performance_evaluator.py +++ b/applications/Chat/coati/trainer/callbacks/performance_evaluator.py @@ -29,6 +29,95 @@ def all_reduce_mean(x: float, world_size: int) -> float: return tensor.item() +class ExperienceMakerPerformanceEvaluator(Callback): + + def __init__(self, actor_num_params: int, critic_num_params: int, initial_model_num_params: int, + reward_model_num_params: int) -> None: + super().__init__() + self.world_size = get_world_size() + self.actor_num_params = actor_num_params + self.critic_num_params = critic_num_params + self.initial_model_num_params = initial_model_num_params + self.reward_model_num_params = reward_model_num_params + + self.make_experience_duration: float = 0. + self.make_experience_start_time: Optional[float] = None + self.make_experience_num_samples: int = 0 + self.make_experience_flop: int = 0 + + def on_make_experience_start(self) -> None: + self.make_experience_start_time = time() + + def on_make_experience_end(self, experience: Experience) -> None: + self.make_experience_duration += time() - self.make_experience_start_time + + batch_size, seq_len = experience.sequences.shape + + self.make_experience_num_samples += batch_size + + # actor generate + num_actions = experience.action_mask.size(1) + input_len = seq_len - num_actions + total_seq_len = (input_len + seq_len - 1) * num_actions / 2 + self.make_experience_flop += self.actor_num_params * batch_size * total_seq_len * 2 + # actor forward + self.make_experience_flop += self.actor_num_params * batch_size * seq_len * 2 + # critic forward + self.make_experience_flop += self.critic_num_params * batch_size * seq_len * 2 + # initial model forward + self.make_experience_flop += self.initial_model_num_params * batch_size * seq_len * 2 + # reward model forward + self.make_experience_flop += self.reward_model_num_params * batch_size * seq_len * 2 + + def on_finish(self) -> None: + avg_make_experience_duration = all_reduce_mean(self.make_experience_duration, self.world_size) + + avg_make_experience_throughput = self.make_experience_num_samples / (avg_make_experience_duration + 1e-12) + avg_make_experience_tflops = self.make_experience_flop / 1e12 / (avg_make_experience_duration + 1e-12) + + print_rank_0( + f'Making experience throughput: {avg_make_experience_throughput:.3f} samples/sec, TFLOPS: {avg_make_experience_tflops:.3f}' + ) + + +class TrainerPerformaceEvaluator(Callback): + + def __init__(self, actor_num_params: int, critic_num_params: int, enable_grad_checkpoint: bool = False) -> None: + super().__init__() + self.world_size = get_world_size() + self.actor_num_params = actor_num_params + self.critic_num_params = critic_num_params + self.enable_grad_checkpoint = enable_grad_checkpoint + + self.learn_duration: float = 0. + self.learn_start_time: Optional[float] = None + self.learn_num_samples: int = 0 + self.learn_flop: int = 0 + + def on_learn_batch_start(self) -> None: + self.learn_start_time = time() + + def on_learn_batch_end(self, metrics: dict, experience: Experience) -> None: + self.learn_duration += time() - self.learn_start_time + + batch_size, seq_len = experience.sequences.shape + + self.learn_num_samples += batch_size + + # actor forward-backward, 3 means forward(1) + backward(2) + self.learn_flop += self.actor_num_params * batch_size * seq_len * 2 * (3 + int(self.enable_grad_checkpoint)) + # critic forward-backward + self.learn_flop += self.critic_num_params * batch_size * seq_len * 2 * (3 + int(self.enable_grad_checkpoint)) + + def on_finish(self) -> None: + avg_learn_duration = all_reduce_mean(self.learn_duration, self.world_size) + + avg_learn_throughput = self.learn_num_samples / (avg_learn_duration + 1e-12) + avg_learn_tflops = self.learn_flop / 1e12 / (avg_learn_duration + 1e-12) + + print_rank_0(f'Learning throughput: {avg_learn_throughput:.3f} samples/sec, TFLOPS: {avg_learn_tflops:.3f}') + + class PerformanceEvaluator(Callback): """ Callback for valuate the performance of the model. From a314e6763e05808ef411e89c7628d2fb3a8d9c1b Mon Sep 17 00:00:00 2001 From: ver217 Date: Fri, 21 Apr 2023 12:02:27 +0800 Subject: [PATCH 2/6] [chat] refactor debug arg --- applications/Chat/coati/ray/src/detached_trainer_base.py | 6 ++---- applications/Chat/coati/ray/src/detached_trainer_ppo.py | 2 ++ .../Chat/coati/ray/src/experience_maker_holder.py | 8 +++----- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/applications/Chat/coati/ray/src/detached_trainer_base.py b/applications/Chat/coati/ray/src/detached_trainer_base.py index 1d6b2e99df4b..3558f58017a6 100644 --- a/applications/Chat/coati/ray/src/detached_trainer_base.py +++ b/applications/Chat/coati/ray/src/detached_trainer_base.py @@ -38,6 +38,7 @@ def __init__(self, max_epochs: int = 1, dataloader_pin_memory: bool = True, callbacks: List[Callback] = [], + debug: bool = False, **generate_kwargs) -> None: super().__init__() self.detached_replay_buffer = DetachedReplayBuffer(train_batch_size, @@ -51,10 +52,7 @@ def __init__(self, self.target_holder_name_list = experience_maker_holder_name_list self.target_holder_list = [] - if 'debug' in self.generate_kwargs and self.generate_kwargs['debug'] == True: - self._debug = True - else: - self._debug = False + self._debug = debug def update_target_holder_list(self, experience_maker_holder_name_list): self.target_holder_name_list = experience_maker_holder_name_list diff --git a/applications/Chat/coati/ray/src/detached_trainer_ppo.py b/applications/Chat/coati/ray/src/detached_trainer_ppo.py index 3956672bd9da..8e115557b0bc 100644 --- a/applications/Chat/coati/ray/src/detached_trainer_ppo.py +++ b/applications/Chat/coati/ray/src/detached_trainer_ppo.py @@ -74,6 +74,7 @@ def __init__( dataloader_pin_memory: bool = True, callbacks: List[Callback] = [], eval_performance: bool = False, + debug: bool = False, **generate_kwargs) -> None: # set environment variables if env_info: @@ -123,6 +124,7 @@ def __init__( max_epochs=max_epochs, dataloader_pin_memory=dataloader_pin_memory, callbacks=callbacks, + debug=debug, **generate_kwargs) # for remote maker initialization diff --git a/applications/Chat/coati/ray/src/experience_maker_holder.py b/applications/Chat/coati/ray/src/experience_maker_holder.py index 5a0fe28e2732..d2c2a7fab3a6 100644 --- a/applications/Chat/coati/ray/src/experience_maker_holder.py +++ b/applications/Chat/coati/ray/src/experience_maker_holder.py @@ -46,6 +46,7 @@ def __init__(self, kl_coef: float = 0.1, callbacks: List[Callback] = [], eval_performance: bool = False, + debug: bool = False, **generate_kwargs): # set environment variables if env_info: @@ -69,10 +70,7 @@ def __init__(self, self._actor_initialized = False self._critic_initialized = False - if 'debug' in self.generate_kwargs and self.generate_kwargs['debug'] == True: - self._debug = True - else: - self._debug = False + self._debug = debug self.target_auto_balance = False if self._debug: @@ -275,7 +273,7 @@ def update_experience_maker(self, TODO: load_state_dict integrate with model-sharding strategy ''' - _watch_memory = True + _watch_memory = self._debug if chunk_start: if self._debug: print("[maker] UPDATE ") From 4691dea86f538aaaa7d4dbc400bb76fefce20240 Mon Sep 17 00:00:00 2001 From: ver217 Date: Fri, 21 Apr 2023 14:41:19 +0800 Subject: [PATCH 3/6] [chat] support hf config --- applications/Chat/coati/ray/src/utils.py | 36 ++++++++++++------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/applications/Chat/coati/ray/src/utils.py b/applications/Chat/coati/ray/src/utils.py index 80848a5b9cee..1b14e1c3f1cb 100644 --- a/applications/Chat/coati/ray/src/utils.py +++ b/applications/Chat/coati/ray/src/utils.py @@ -18,49 +18,49 @@ def is_rank_0() -> bool: return not dist.is_initialized() or dist.get_rank() == 0 -def get_actor_from_args(model: str, pretrained: str = None, lora_rank=0): +def get_actor_from_args(model: str, pretrained: str = None, config=None, lora_rank=0): if model == 'gpt2': - actor = GPTActor(pretrained=pretrained, lora_rank=lora_rank) + actor = GPTActor(pretrained=pretrained, config=config, lora_rank=lora_rank) elif model == 'bloom': - actor = BLOOMActor(pretrained=pretrained, lora_rank=lora_rank) + actor = BLOOMActor(pretrained=pretrained, config=config, lora_rank=lora_rank) elif model == 'opt': - actor = OPTActor(pretrained=pretrained, lora_rank=lora_rank) + actor = OPTActor(pretrained=pretrained, config=config, lora_rank=lora_rank) elif model == 'llama': - actor = LlamaActor(pretrained=pretrained, lora_rank=lora_rank) + actor = LlamaActor(pretrained=pretrained, config=config, lora_rank=lora_rank) elif model == 'roberta': - actor = RoBERTaActor(pretrained=pretrained, lora_rank=lora_rank) + actor = RoBERTaActor(pretrained=pretrained, config=config, lora_rank=lora_rank) else: raise ValueError(f'Unsupported actor model "{model}"') return actor -def get_critic_from_args(model: str, pretrained: str = None, lora_rank=0): +def get_critic_from_args(model: str, pretrained: str = None, config=None, lora_rank=0): if model == 'gpt2': - critic = GPTCritic(pretrained=pretrained, lora_rank=lora_rank, use_action_mask=True) + critic = GPTCritic(pretrained=pretrained, lora_rank=lora_rank, config=config, use_action_mask=True) elif model == 'bloom': - critic = BLOOMCritic(pretrained=pretrained, lora_rank=lora_rank, use_action_mask=True) + critic = BLOOMCritic(pretrained=pretrained, lora_rank=lora_rank, config=config, use_action_mask=True) elif model == 'opt': - critic = OPTCritic(pretrained=pretrained, lora_rank=lora_rank, use_action_mask=True) + critic = OPTCritic(pretrained=pretrained, lora_rank=lora_rank, config=config, use_action_mask=True) elif model == 'llama': - critic = LlamaCritic(pretrained=pretrained, lora_rank=lora_rank, use_action_mask=True) + critic = LlamaCritic(pretrained=pretrained, lora_rank=lora_rank, config=config, use_action_mask=True) elif model == 'roberta': - critic = RoBERTaCritic(pretrained=pretrained, lora_rank=lora_rank, use_action_mask=True) + critic = RoBERTaCritic(pretrained=pretrained, lora_rank=lora_rank, config=config, use_action_mask=True) else: raise ValueError(f'Unsupported reward model "{model}"') return critic -def get_reward_model_from_args(model: str, pretrained: str = None): +def get_reward_model_from_args(model: str, pretrained: str = None, config=None): if model == 'gpt2': - reward_model = GPTRM(pretrained=pretrained) + reward_model = GPTRM(pretrained=pretrained, config=config) elif model == 'bloom': - reward_model = BLOOMRM(pretrained=pretrained) + reward_model = BLOOMRM(pretrained=pretrained, config=config) elif model == 'opt': - reward_model = OPTRM(pretrained=pretrained) + reward_model = OPTRM(pretrained=pretrained, config=config) elif model == 'llama': - reward_model = LlamaRM(pretrained=pretrained) + reward_model = LlamaRM(pretrained=pretrained, config=config) elif model == 'roberta': - reward_model = RoBERTaRM(pretrained=pretrained) + reward_model = RoBERTaRM(pretrained=pretrained, config=config) else: raise ValueError(f'Unsupported reward model "{model}"') return reward_model From 5f913c98fe9c8851e4f043222682e6daadea33b1 Mon Sep 17 00:00:00 2001 From: ver217 Date: Fri, 21 Apr 2023 14:41:50 +0800 Subject: [PATCH 4/6] [chat] fix generation --- applications/Chat/coati/models/generation.py | 3 +- .../coati/ray/src/experience_maker_holder.py | 15 ++++++++ applications/Chat/coati/trainer/ppo.py | 37 +++++++++---------- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/applications/Chat/coati/models/generation.py b/applications/Chat/coati/models/generation.py index eb30c36d0f84..961f2aec677d 100644 --- a/applications/Chat/coati/models/generation.py +++ b/applications/Chat/coati/models/generation.py @@ -76,8 +76,7 @@ def sample(model: nn.Module, # update generated ids, model inputs for next step input_ids = torch.cat([input_ids, next_tokens[:, None]], dim=-1) if update_model_kwargs_fn is not None: - model_kwargs = update_model_kwargs_fn(outputs, **model_kwargs) - + model_kwargs = update_model_kwargs_fn(outputs, model_kwargs) # if eos_token was found in one sentence, set sentence to finished if eos_token_id is not None: unfinished_sequences = unfinished_sequences.mul((next_tokens != eos_token_id).long()) diff --git a/applications/Chat/coati/ray/src/experience_maker_holder.py b/applications/Chat/coati/ray/src/experience_maker_holder.py index d2c2a7fab3a6..b1acdbb5494d 100644 --- a/applications/Chat/coati/ray/src/experience_maker_holder.py +++ b/applications/Chat/coati/ray/src/experience_maker_holder.py @@ -89,6 +89,8 @@ def _get_ready(self): reward_model_numel) self.callbacks.append(evaluator) + self.generate_kwargs = _set_default_generate_kwargs(self.generate_kwargs, self.experience_maker.actor) + def _fully_initialized(self): if not self._initial_model_initialized: return False @@ -306,3 +308,16 @@ def _on_finish(self) -> None: for callback in self.callbacks: if hasattr(callback, 'on_finish'): callback.on_finish() + + +def _set_default_generate_kwargs(generate_kwargs: dict, actor: Actor) -> None: + origin_model = actor.model + new_kwargs = {**generate_kwargs} + # use huggingface models method directly + if 'prepare_inputs_fn' not in generate_kwargs and hasattr(origin_model, 'prepare_inputs_for_generation'): + new_kwargs['prepare_inputs_fn'] = origin_model.prepare_inputs_for_generation + + if 'update_model_kwargs_fn' not in generate_kwargs and hasattr(origin_model, '_update_model_kwargs_for_generation'): + new_kwargs['update_model_kwargs_fn'] = origin_model._update_model_kwargs_for_generation + + return new_kwargs diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index 2db604fc9b74..89d708456c61 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -4,14 +4,13 @@ import torch.nn as nn from coati.experience_maker import Experience, NaiveExperienceMaker from coati.models.base import Actor, Critic -from coati.models.generation_utils import update_model_kwargs_fn from coati.models.loss import PolicyLoss, ValueLoss from coati.replay_buffer import NaiveReplayBuffer from torch import Tensor from torch.optim import Optimizer from torch.utils.data import DistributedSampler -from transformers.tokenization_utils_base import PreTrainedTokenizerBase from tqdm import tqdm +from transformers.tokenization_utils_base import PreTrainedTokenizerBase from .base import Trainer from .callbacks import Callback @@ -102,19 +101,16 @@ def _make_experience(self, inputs: Union[Tensor, Dict[str, Tensor]]) -> Experien def _sample_prompts(self, prompts) -> list: indices = list(range(len(prompts))) - sampled_indices = self.strategy.experience_sampler.choice( - indices, self.experience_batch_size, replace=False) + sampled_indices = self.strategy.experience_sampler.choice(indices, self.experience_batch_size, replace=False) return [prompts[i] for i in sampled_indices] def _learn(self): # replay buffer may be empty at first, we should rebuild at each training if not self.sample_replay_buffer: - dataloader = self.strategy.setup_dataloader( - self.replay_buffer, self.dataloader_pin_memory) + dataloader = self.strategy.setup_dataloader(self.replay_buffer, self.dataloader_pin_memory) device = torch.cuda.current_device() if self.sample_replay_buffer: - pbar = tqdm(range(self.max_epochs), desc='Train epoch', - disable=not is_rank_0()) + pbar = tqdm(range(self.max_epochs), desc='Train epoch', disable=not is_rank_0()) for _ in pbar: experience = self.replay_buffer.sample() metrics = self.training_step(experience) @@ -124,8 +120,7 @@ def _learn(self): self._on_learn_epoch_start(epoch) if isinstance(dataloader.sampler, DistributedSampler): dataloader.sampler.set_epoch(epoch) - pbar = tqdm( - dataloader, desc=f'Train epoch [{epoch+1}/{self.max_epochs}]', disable=not is_rank_0()) + pbar = tqdm(dataloader, desc=f'Train epoch [{epoch+1}/{self.max_epochs}]', disable=not is_rank_0()) for experience in pbar: self._on_learn_batch_start() experience.to_device(device) @@ -152,10 +147,8 @@ def fit(self, time += 1 prompts = next(iter(self.prompt_dataloader)) self._on_make_experience_start() - self.experience_maker.initial_model.to( - torch.cuda.current_device()) - self.experience_maker.reward_model.to( - torch.cuda.current_device()) + self.experience_maker.initial_model.to(torch.cuda.current_device()) + self.experience_maker.reward_model.to(torch.cuda.current_device()) experience = self._make_experience(prompts) self._on_make_experience_end(experience) self.replay_buffer.append(experience) @@ -206,11 +199,17 @@ def training_step(self, experience: Experience) -> Dict[str, float]: self.critic_optim.zero_grad() return {'reward': experience.reward.mean().item()} - - def save_model(self, path: str, only_rank0: bool = False, tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: + + def save_model(self, + path: str, + only_rank0: bool = False, + tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: self.strategy.save_model(model=self.actor, path=path, only_rank0=only_rank0, tokenizer=tokenizer) - def save_model(self, path: str, only_rank0: bool = False, tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: + def save_model(self, + path: str, + only_rank0: bool = False, + tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: self.strategy.save_model(model=self.actor, path=path, only_rank0=only_rank0, tokenizer=tokenizer) @@ -221,7 +220,7 @@ def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, acto if 'prepare_inputs_fn' not in generate_kwargs and hasattr(origin_model, 'prepare_inputs_for_generation'): new_kwargs['prepare_inputs_fn'] = origin_model.prepare_inputs_for_generation - if 'update_model_kwargs_fn' not in generate_kwargs: - new_kwargs['update_model_kwargs_fn'] = update_model_kwargs_fn + if 'update_model_kwargs_fn' not in generate_kwargs and hasattr(origin_model, '_update_model_kwargs_for_generation'): + new_kwargs['update_model_kwargs_fn'] = origin_model._update_model_kwargs_for_generation return new_kwargs From f712ed614e6c1d6828ff776778cf645bc9ee7f49 Mon Sep 17 00:00:00 2001 From: ver217 Date: Fri, 21 Apr 2023 15:00:08 +0800 Subject: [PATCH 5/6] [chat] add 1mmt dummy example --- .../Chat/coati/ray/example/1mmt_dummy.py | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 applications/Chat/coati/ray/example/1mmt_dummy.py diff --git a/applications/Chat/coati/ray/example/1mmt_dummy.py b/applications/Chat/coati/ray/example/1mmt_dummy.py new file mode 100644 index 000000000000..1f1f2e08e2f3 --- /dev/null +++ b/applications/Chat/coati/ray/example/1mmt_dummy.py @@ -0,0 +1,186 @@ +import argparse +import os +import socket +from copy import deepcopy +from functools import partial + +import ray +import torch +from coati.models.base import RewardModel +from coati.ray.src.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.src.experience_maker_holder import ExperienceMakerHolder +from coati.ray.src.utils import get_actor_from_args, get_critic_from_args, get_reward_model_from_args +from transformers import AutoTokenizer, BloomTokenizerFast +from transformers.models.gpt2.configuration_gpt2 import GPT2Config +from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer + + +def get_gpt_config(model_name: str) -> GPT2Config: + model_map = { + 's': GPT2Config(), + 'm': GPT2Config(n_embd=1024, n_layer=24, n_head=16), + 'l': GPT2Config(n_embd=1280, n_layer=36, n_head=20), + 'xl': GPT2Config(n_embd=1600, n_layer=48, n_head=25), + '2b': GPT2Config(n_embd=2048, n_layer=40, n_head=16), + '4b': GPT2Config(n_embd=2304, n_layer=64, n_head=16), + '6b': GPT2Config(n_embd=4096, n_layer=30, n_head=16), + '8b': GPT2Config(n_embd=4096, n_layer=40, n_head=16), + '10b': GPT2Config(n_embd=4096, n_layer=50, n_head=16), + '12b': GPT2Config(n_embd=4096, n_layer=60, n_head=16), + '15b': GPT2Config(n_embd=4096, n_layer=78, n_head=16), + '18b': GPT2Config(n_embd=4096, n_layer=90, n_head=16), + '20b': GPT2Config(n_embd=8192, n_layer=25, n_head=16), + '24b': GPT2Config(n_embd=8192, n_layer=30, n_head=16), + '28b': GPT2Config(n_embd=8192, n_layer=35, n_head=16), + '32b': GPT2Config(n_embd=8192, n_layer=40, n_head=16), + '36b': GPT2Config(n_embd=8192, n_layer=45, n_head=16), + '40b': GPT2Config(n_embd=8192, n_layer=50, n_head=16), + '175b': GPT2Config(n_positions=2048, n_embd=12288, n_layer=96, n_head=96), + } + try: + return model_map[model_name] + except KeyError: + raise ValueError(f'Unknown model "{model_name}"') + + +def get_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + +def get_local_ip(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(('8.8.8.8', 80)) + return s.getsockname()[0] + + +def main(args): + master_addr = str(get_local_ip()) + # trainer_env_info + trainer_port = str(get_free_port()) + env_info_trainers = [{ + 'local_rank': '0', + 'rank': str(rank), + 'world_size': str(args.num_trainers), + 'master_port': trainer_port, + 'master_addr': master_addr + } for rank in range(args.num_trainers)] + + # maker_env_info + maker_port = str(get_free_port()) + env_info_maker = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '1', + 'master_port': maker_port, + 'master_addr': master_addr + } + + # configure tokenizer + tokenizer = GPT2Tokenizer.from_pretrained('gpt2') + tokenizer.pad_token = tokenizer.eos_token + + # configure Trainer + trainer_refs = [ + DetachedPPOTrainer.options(name=f"trainer{i}", num_gpus=1, max_concurrency=2).remote( + experience_maker_holder_name_list=["maker1"], + strategy=args.trainer_strategy, + model=args.model, + env_info=env_info_trainer, + pretrained=args.pretrain, + lora_rank=args.lora_rank, + train_batch_size=args.train_batch_size, + buffer_limit=16, + experience_batch_size=args.experience_batch_size, + max_epochs=args.max_epochs, + # kwargs: + max_length=512, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + eval_performance=True, + debug=args.debug, + ) for i, env_info_trainer in enumerate(env_info_trainers) + ] + + # configure Experience Maker + experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( + detached_trainer_name_list=[f'trainer{i}' for i in range(args.num_trainers)], + strategy=args.maker_strategy, + env_info=env_info_maker, + experience_batch_size=args.experience_batch_size, + kl_coef=0.1, + # kwargs: + max_length=512, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + eval_performance=True, + use_cache=True, + debug=args.debug, + ) + + def init_inference_model(fn, model_name, pretrained): + model = fn(model_name, pretrained) + return model.half().cuda() + + # init maker locally + ray.get( + experience_holder_ref.initialize_experience_maker_local.remote( + initial_model_func=partial(init_inference_model, get_actor_from_args, args.model, args.pretrain), + reward_model_func=partial(init_inference_model, get_reward_model_from_args, args.model, args.pretrain), + actor_func=partial(init_inference_model, get_actor_from_args, args.model, args.pretrain), + critic_func=partial(init_inference_model, get_critic_from_args, args.model, args.pretrain), + )) + + # configure sampler + random_prompts = torch.randint(tokenizer.vocab_size, (1000, 400)) + + def tokenize_fn(texts): + # print(texts) + input_ids = torch.stack(texts).cuda() + # print(input_ids.shape) + attn_mask = torch.ones_like(input_ids) + return {'input_ids': input_ids, 'attention_mask': attn_mask} + + wait_tasks = [] + + for trainer_ref in trainer_refs: + wait_tasks.append( + trainer_ref.fit.remote(num_episodes=args.num_episodes, + max_timesteps=args.max_timesteps, + update_timesteps=args.update_timesteps)) + + num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ + args.max_epochs + 3 # +3 for fault tolerance + wait_tasks.append(experience_holder_ref.workingloop.remote(random_prompts, tokenize_fn, times=num_exp_per_maker)) + + ray.get(wait_tasks) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--num_trainers', type=int, default=1) + parser.add_argument('--trainer_strategy', + choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + default='naive') + parser.add_argument('--maker_strategy', choices=['naive'], default='naive') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) + parser.add_argument('--pretrain', type=str, default=None) + parser.add_argument('--num_episodes', type=int, default=10) + parser.add_argument('--max_timesteps', type=int, default=10) + parser.add_argument('--update_timesteps', type=int, default=10) + parser.add_argument('--max_epochs', type=int, default=5) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + ray.init(namespace=os.environ["RAY_NAMESPACE"]) + main(args) From e4546f4c987ae69592cfa52985e2ea1af65ceb66 Mon Sep 17 00:00:00 2001 From: ver217 Date: Fri, 21 Apr 2023 16:34:46 +0800 Subject: [PATCH 6/6] [chat] fix gemini ckpt --- .../Chat/coati/ray/example/1mmt_dummy.py | 2 +- .../coati/ray/src/detached_trainer_ppo.py | 48 ++++++++++--------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/applications/Chat/coati/ray/example/1mmt_dummy.py b/applications/Chat/coati/ray/example/1mmt_dummy.py index 1f1f2e08e2f3..68b666663b12 100644 --- a/applications/Chat/coati/ray/example/1mmt_dummy.py +++ b/applications/Chat/coati/ray/example/1mmt_dummy.py @@ -157,7 +157,7 @@ def tokenize_fn(texts): update_timesteps=args.update_timesteps)) num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs + 3 # +3 for fault tolerance + args.max_epochs * args.num_trainers + 3 # +3 for fault tolerance wait_tasks.append(experience_holder_ref.workingloop.remote(random_prompts, tokenize_fn, times=num_exp_per_maker)) ray.get(wait_tasks) diff --git a/applications/Chat/coati/ray/src/detached_trainer_ppo.py b/applications/Chat/coati/ray/src/detached_trainer_ppo.py index 8e115557b0bc..2850f1cf1d37 100644 --- a/applications/Chat/coati/ray/src/detached_trainer_ppo.py +++ b/applications/Chat/coati/ray/src/detached_trainer_ppo.py @@ -134,35 +134,39 @@ def __init__( self._cr_pretrained = cr_pretrained @ray.method(concurrency_group="model_io") + @torch.no_grad() def _update_remote_makers(self, **config): # TODO: balance duties if is_rank_0(): self.update_target_holder_list(self.target_holder_name_list) - with torch.no_grad(): - # actor: - # mark start - for target_holder in self.target_holder_list: - target_holder.update_experience_maker.remote(chunk_start=True) - # sending loop - for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_actor(self.actor), - **config): - for target_holder in self.target_holder_list: - target_holder.update_experience_maker.remote(new_actor_state_dict=state_dict_shard) - # mark end - for target_holder in self.target_holder_list: - target_holder.update_experience_maker.remote(chunk_end=True) - # critic + # actor: + if is_rank_0(): # mark start + for target_holder in self.target_holder_list: + target_holder.update_experience_maker.remote(chunk_start=True) + # sending loop + for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_model(self.actor), **config): + if is_rank_0(): for target_holder in self.target_holder_list: - target_holder.update_experience_maker.remote(chunk_start=True) - # sending loop - for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_critic(self.critic), - **config): - for target_holder in self.target_holder_list: - target_holder.update_experience_maker.remote(new_critic_state_dict=state_dict_shard) - # mark end + target_holder.update_experience_maker.remote(new_actor_state_dict=state_dict_shard) + if is_rank_0(): + # mark end + for target_holder in self.target_holder_list: + target_holder.update_experience_maker.remote(chunk_end=True) + # critic + if is_rank_0(): + # mark start + for target_holder in self.target_holder_list: + target_holder.update_experience_maker.remote(chunk_start=True) + # sending loop + for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_critic(self.critic), **config): + if is_rank_0(): for target_holder in self.target_holder_list: - target_holder.update_experience_maker.remote(chunk_end=True) + target_holder.update_experience_maker.remote(new_critic_state_dict=state_dict_shard) + if is_rank_0(): + # mark end + for target_holder in self.target_holder_list: + target_holder.update_experience_maker.remote(chunk_end=True) @ray.method(concurrency_group="model_io") def initialize_remote_makers(self, **config):