diff --git a/applications/Chat/coati/ray/detached_replay_buffer.py b/applications/Chat/coati/ray/detached_replay_buffer.py index 257b0b072493..6a4f5a6d67c2 100644 --- a/applications/Chat/coati/ray/detached_replay_buffer.py +++ b/applications/Chat/coati/ray/detached_replay_buffer.py @@ -26,20 +26,12 @@ class DetachedReplayBuffer: cpu_offload: Whether to offload experience to cpu when sampling. Defaults to True. ''' - def __init__(self, sample_batch_size: int, tp_world_size: int = 1, limit: int = 0) -> None: + def __init__(self, sample_batch_size: int, limit: int = 0) -> None: self.sample_batch_size = sample_batch_size self.limit = limit self.items = Queue(self.limit, actor_options={"num_cpus": 1}) self.batch_collector: List[BufferItem] = [] - ''' - Workers in the same tp group share this buffer and need same sample for one step. - Therefore a held_sample should be returned tp_world_size times before it could be dropped. - worker_state records wheter a worker got the held_sample - ''' - self.tp_world_size = tp_world_size - self.worker_state = [False] * self.tp_world_size - self.held_sample = None - self._worker_state_lock = Lock() + @torch.no_grad() def append(self, experience: Experience) -> None: @@ -70,16 +62,7 @@ def clear(self) -> None: @torch.no_grad() def sample(self, worker_rank=0, to_device="cpu") -> Experience: - self._worker_state_lock.acquire() - if not any(self.worker_state): - self.held_sample = self._sample_and_erase() - self.worker_state[worker_rank] = True - if all(self.worker_state): - self.worker_state = [False] * self.tp_world_size - ret = self.held_sample - else: - ret = copy.deepcopy(self.held_sample) - self._worker_state_lock.release() + ret = self._sample_and_erase() ret.to_device(to_device) return ret diff --git a/applications/Chat/coati/ray/detached_trainer_ppo.py b/applications/Chat/coati/ray/detached_trainer_ppo.py index 347df3d84589..d30158019d65 100644 --- a/applications/Chat/coati/ray/detached_trainer_ppo.py +++ b/applications/Chat/coati/ray/detached_trainer_ppo.py @@ -110,6 +110,8 @@ def __init__( @torch.no_grad() def _update_remote_makers(self, fully_update: bool = False, **config): # TODO: balance duties + if not fully_update: + config['requires_grad_only'] = True self.update_target_holder_list() # mark start, ensure order tasks = [] @@ -197,9 +199,9 @@ def _get_unwrapped_critic(self): return self.critic def _get_model_state_dict_shard(self, model: torch.nn.Module, **config): - try: - self.strategy.merge_lora_weight(model) - except AttributeError: - pass + # try: + # self.strategy.merge_lora_weight(model) + # except AttributeError: + # pass for state_dict in self.strategy.get_model_state_dict_shard(model, **config): yield state_dict_to(state_dict) diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index 996996400064..573771ad6258 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -192,6 +192,7 @@ def update_experience_maker(self, if new_critic_state_dict is not None: self.experience_maker.critic.load_state_dict(new_critic_state_dict, strict=False) + # the lock must be released after both actor and critic being updated if chunk_end: self._model_visit_lock.release() diff --git a/applications/Chat/coati/ray/utils.py b/applications/Chat/coati/ray/utils.py index bc38bd012d61..6cd7c564cc92 100644 --- a/applications/Chat/coati/ray/utils.py +++ b/applications/Chat/coati/ray/utils.py @@ -1,5 +1,6 @@ import os from typing import Any, Callable, Dict, List, Optional +from collections import OrderedDict import torch import torch.distributed as dist diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index d39092d5e7ad..238408b9f676 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -214,9 +214,10 @@ def get_model_state_dict_shard(self, model: nn.Module, **config): if self.stage != 3: yield from super().get_model_state_dict_shard(model, **config) else: - unwrapped_model = self._unwrap_model(model) - for module in unwrapped_model.modules(): - if isinstance(module, LoraLinear): - module.merge_weights = True - module.eval() + # unwrapped_model = self._unwrap_model(model) + # for module in unwrapped_model.modules(): + # if isinstance(module, LoraLinear): + # module.merge_weights = True + # module.eval() + model: ZeroDDP = model yield from model.state_dict_shard(max_shard_size=1024, only_rank_0=False) diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index a761786e0f21..3b537fdde2d4 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -1,6 +1,7 @@ import os -from typing import Any, Optional - +import sys +from typing import Any, Optional, Dict +from collections import OrderedDict import torch import torch.distributed as dist import torch.nn as nn @@ -14,6 +15,14 @@ from .base import Strategy +# TODO Move this to a util.py (Moving to ray.util introduces ringed import) +def get_grad_required_state_dict(model: nn.Module): + state_dict = OrderedDict() + for name, parameter in model.named_parameters(): + if parameter.requires_grad: + state_dict[name] = parameter.detach() + return state_dict + class NaiveStrategy(Strategy): """ @@ -81,8 +90,26 @@ def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = No def get_model_state_dict_shard(self, model: nn.Module, **config): # TODO: implement sharding on naive strategy - state_dict = model.state_dict() - yield state_dict + if 'requires_grad_only' in config and config['requires_grad_only'] == True: + state_dict = get_grad_required_state_dict(model) + else: + state_dict = model.state_dict() + + if 'shard_size' in config: + shard_size = config['shard_size'] + accumulate_size = 0 + state_dict_shard = OrderedDict() + for name, param in state_dict.items(): + state_dict_shard[name] = param + accumulate_size += param.numel() * param.element_size() + if accumulate_size >= shard_size: + accumulate_size = 0 + yield state_dict_shard + state_dict_shard = OrderedDict() + if accumulate_size > 0: + yield state_dict_shard + else: + yield state_dict def merge_lora_weight(self, model: nn.Module): unwrapped_model = self._unwrap_model(model) diff --git a/applications/Chat/examples/ray/1m1t.py b/applications/Chat/examples/ray/1m1t.py deleted file mode 100644 index 8c291abb1f8b..000000000000 --- a/applications/Chat/examples/ray/1m1t.py +++ /dev/null @@ -1,166 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.callbacks.performance_evaluator import ( - ExperienceMakerPerformanceEvaluator, - TrainerPerformaceEvaluator, -) -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': trainer_port, - 'master_addr': master_addr - } - - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': maker_port, - 'master_addr': master_addr - } - - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_ref = DetachedPPOTrainer.options(name="trainer1", num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - eval_performance=True, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - env_info=env_info_maker, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - eval_performance=True, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - ray.get(trainer_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs + 3 # +3 for fault tolerance - maker_done_ref = experience_holder_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_done_ref, maker_done_ref]) - - # save model checkpoint after fitting - trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - ray.init(namespace=os.environ["RAY_NAMESPACE"]) - main(args) diff --git a/applications/Chat/examples/ray/1m1t.sh b/applications/Chat/examples/ray/1m1t.sh deleted file mode 100644 index f7c5054c800e..000000000000 --- a/applications/Chat/examples/ray/1m1t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 2 - -export RAY_NAMESPACE="admin" - -python 1m1t.py "/path/to/prompts.csv" \ - --trainer_strategy colossalai_zero2 --maker_strategy naive --lora_rank 2 --pretrain "facebook/opt-350m" --model 'opt' \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 --debug diff --git a/applications/Chat/examples/ray/1m1t_quantize.py b/applications/Chat/examples/ray/1m1t_quantize.py deleted file mode 100644 index cc54bd1905c6..000000000000 --- a/applications/Chat/examples/ray/1m1t_quantize.py +++ /dev/null @@ -1,156 +0,0 @@ -import argparse -import os -import socket - -import pandas as pd -import ray -import torch -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': trainer_port, - 'master_addr': master_addr - } - - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': maker_port, - 'master_addr': master_addr - } - - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_ref = DetachedPPOTrainer.options(name="trainer1", num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - eval_performance=True, - ) - - # configure Experience Maker - experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - env_info=env_info_maker, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - eval_performance=True, - ) - - # a 'jump wire' to set quantized initial_model and reward_model - - # trainer send its actor and critic to experience holders. - # ray.get(trainer_ref.initialize_remote_makers.remote()) - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs + 3 # +3 for fault tolerance - maker_done_ref = experience_holder_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_done_ref, maker_done_ref]) - - # save model checkpoint after fitting - trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama', 'roberta']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - ray.init(namespace=os.environ["RAY_NAMESPACE"]) - main(args) diff --git a/applications/Chat/examples/ray/1m2t.py b/applications/Chat/examples/ray/1m2t.py deleted file mode 100644 index 1a35beb6221a..000000000000 --- a/applications/Chat/examples/ray/1m2t.py +++ /dev/null @@ -1,203 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - env_info_trainer_2 = { - 'local_rank': '0', - 'rank': '1', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': maker_port, - 'master_addr': master_addr - } - print([env_info_trainer_1, env_info_trainer_2, env_info_maker_1]) - ray.init(dashboard_port=1145) - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_1_ref = DetachedPPOTrainer.options(name="trainer1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_1, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - trainer_2_ref = DetachedPPOTrainer.options(name="trainer2", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_2, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_1, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - # TODO: balance duty - ray.get(trainer_1_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_1_done_ref = trainer_1_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - trainer_2_done_ref = trainer_2_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs * 2 + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_1_done_ref, trainer_2_done_ref, maker_1_done_ref]) - # save model checkpoint after fitting - trainer_1_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - trainer_2_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_1_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - trainer_2_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - main(args) diff --git a/applications/Chat/examples/ray/1m2t.sh b/applications/Chat/examples/ray/1m2t.sh deleted file mode 100644 index 9608526ea7e7..000000000000 --- a/applications/Chat/examples/ray/1m2t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 2 - -export RAY_NAMESPACE="admin" - -python 1m2t.py "/path/to/prompts.csv" --model gpt2 \ - --maker_strategy naive --trainer_strategy ddp --lora_rank 2 \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 #--debug diff --git a/applications/Chat/examples/ray/1mmt_dummy.py b/applications/Chat/examples/ray/1mmt_dummy.py index d2e820680114..eba5213a83d3 100644 --- a/applications/Chat/examples/ray/1mmt_dummy.py +++ b/applications/Chat/examples/ray/1mmt_dummy.py @@ -61,17 +61,17 @@ def main(args): def model_fn(): actor_cfg = AutoConfig.from_pretrained(args.pretrain) critic_cfg = AutoConfig.from_pretrained(args.critic_pretrain) - actor = get_actor_from_args(args.model, config=actor_cfg).half().cuda() - critic = get_critic_from_args(args.critic_model, config=critic_cfg).half().cuda() - reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).half().cuda() + actor = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() if args.initial_model_quant_ckpt is not None and args.model == 'llama': # quantize initial model with low_resource_init(), no_init_weights(): initial_model = get_actor_from_args(args.model, config=actor_cfg) initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, - args.quant_group_size).cuda() + args.quant_group_size).cuda().requires_grad_(False) else: - initial_model = get_actor_from_args(args.model, config=actor_cfg).half().cuda() + initial_model = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py index 5baf96eaa508..bd7224aae749 100644 --- a/applications/Chat/examples/ray/1mmt_prompt.py +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -6,6 +6,7 @@ import pandas as pd import ray import torch +from coati.quant import llama_load_quant, low_resource_init from coati.ray.detached_trainer_ppo import DetachedPPOTrainer from coati.ray.experience_maker_holder import ExperienceMakerHolder from coati.ray.utils import ( @@ -17,6 +18,8 @@ ) from torch.utils.data import DataLoader +from transformers import AutoConfig +from transformers.modeling_utils import no_init_weights def get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -74,10 +77,18 @@ def trainer_model_fn(): ] def model_fn(): - actor = get_actor_from_args(args.model, args.pretrain).half().cuda() - critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() - reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).half().cuda() - initial_model = get_actor_from_args(args.model, args.pretrain).half().cuda() + actor = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + if args.initial_model_quant_ckpt is not None and args.model == 'llama': + # quantize initial model + actor_cfg = AutoConfig.from_pretrained(args.pretrain) + with low_resource_init(), no_init_weights(): + initial_model = get_actor_from_args(args.model, config=actor_cfg) + initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, + args.quant_group_size).cuda().requires_grad_(False) + else: + initial_model = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker @@ -118,7 +129,6 @@ def model_fn(): dataset_size = args.experience_batch_size * 4 - from torch.utils.data import DataLoader def build_dataloader(): def tokenize_fn(texts): @@ -145,10 +155,14 @@ def tokenize_fn(texts): parser.add_argument('--prompt_path', type=str, default=None) parser.add_argument('--num_trainers', type=int, default=1) parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + choices=[ + 'naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2', 'colossalai_gemini_cpu', + 'colossalai_zero2_cpu' + ], default='naive') parser.add_argument('--maker_strategy', choices=['naive'], default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--critic_model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) parser.add_argument('--pretrain', type=str, default=None) parser.add_argument('--critic_pretrain', type=str, default=None) parser.add_argument('--experience_steps', type=int, default=4) @@ -158,6 +172,9 @@ def tokenize_fn(texts): parser.add_argument('--train_batch_size', type=int, default=8) parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + parser.add_argument('--initial_model_quant_ckpt', type=str, default=None) + parser.add_argument('--quant_bits', type=int, default=4) + parser.add_argument('--quant_group_size', type=int, default=128) parser.add_argument('--debug', action='store_true') args = parser.parse_args() ray.init(namespace=os.environ["RAY_NAMESPACE"]) diff --git a/applications/Chat/examples/ray/2m1t.py b/applications/Chat/examples/ray/2m1t.py deleted file mode 100644 index bed6246ed0d7..000000000000 --- a/applications/Chat/examples/ray/2m1t.py +++ /dev/null @@ -1,141 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def main(args): - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_ref = DetachedPPOTrainer.options(name="trainer1", num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - experience_holder_2_ref = ExperienceMakerHolder.options(name="maker2", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - ray.get(trainer_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs // 2 + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - maker_2_done_ref = experience_holder_2_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_done_ref, maker_1_done_ref, maker_2_done_ref]) - - # save model checkpoint after fitting - trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - ray.init(namespace=os.environ["RAY_NAMESPACE"]) - main(args) diff --git a/applications/Chat/examples/ray/2m1t.sh b/applications/Chat/examples/ray/2m1t.sh deleted file mode 100644 index a207d4118d60..000000000000 --- a/applications/Chat/examples/ray/2m1t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 3 - -export RAY_NAMESPACE="admin" - -python 2m1t.py "/path/to/prompts.csv" \ - --trainer_strategy naive --maker_strategy naive --lora_rank 2 --pretrain "facebook/opt-350m" --model 'opt' \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 # --debug diff --git a/applications/Chat/examples/ray/2m2t.py b/applications/Chat/examples/ray/2m2t.py deleted file mode 100644 index 05440032ce9f..000000000000 --- a/applications/Chat/examples/ray/2m2t.py +++ /dev/null @@ -1,230 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - env_info_trainer_2 = { - 'local_rank': '0', - 'rank': '1', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': maker_port, - 'master_addr': master_addr - } - env_info_maker_2 = { - 'local_rank': '0', - 'rank': '1', - 'world_size': '2', - 'master_port': maker_port, - 'master_addr': master_addr - } - print([env_info_trainer_1, env_info_trainer_2, env_info_maker_1, env_info_maker_2]) - ray.init() - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_1_ref = DetachedPPOTrainer.options(name="trainer1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_1, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - trainer_2_ref = DetachedPPOTrainer.options(name="trainer2", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_2, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_1, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - experience_holder_2_ref = ExperienceMakerHolder.options(name="maker2", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_2, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - # TODO: balance duty - ray.get(trainer_1_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_1_done_ref = trainer_1_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - trainer_2_done_ref = trainer_2_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - maker_2_done_ref = experience_holder_2_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_1_done_ref, trainer_2_done_ref, maker_1_done_ref, maker_2_done_ref]) - # save model checkpoint after fitting - trainer_1_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - trainer_2_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_1_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - trainer_2_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - main(args) diff --git a/applications/Chat/examples/ray/2m2t.sh b/applications/Chat/examples/ray/2m2t.sh deleted file mode 100644 index bd8ca84a58fb..000000000000 --- a/applications/Chat/examples/ray/2m2t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 2 - -export RAY_NAMESPACE="admin" - -python 2m2t.py "path/to/prompts.csv" \ - --maker_strategy naive --trainer_strategy colossalai_zero2 --lora_rank 2 \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 --debug diff --git a/applications/Chat/examples/ray/mmmt_dummy.py b/applications/Chat/examples/ray/mmmt_dummy.py index 767fe37030f6..082f4851777e 100644 --- a/applications/Chat/examples/ray/mmmt_dummy.py +++ b/applications/Chat/examples/ray/mmmt_dummy.py @@ -61,17 +61,17 @@ def main(args): def model_fn(): actor_cfg = AutoConfig.from_pretrained(args.pretrain) critic_cfg = AutoConfig.from_pretrained(args.critic_pretrain) - actor = get_actor_from_args(args.model, config=actor_cfg).half().cuda() - critic = get_critic_from_args(args.critic_model, config=critic_cfg).half().cuda() - reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).half().cuda() + actor = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() if args.initial_model_quant_ckpt is not None and args.model == 'llama': # quantize initial model with low_resource_init(), no_init_weights(): initial_model = get_actor_from_args(args.model, config=actor_cfg) initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, - args.quant_group_size).cuda() + args.quant_group_size).cuda().requires_grad_(False) else: - initial_model = get_actor_from_args(args.model, config=actor_cfg).half().cuda() + initial_model = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker diff --git a/applications/Chat/examples/ray/mmmt_prompt.py b/applications/Chat/examples/ray/mmmt_prompt.py new file mode 100644 index 000000000000..d2398d451c7b --- /dev/null +++ b/applications/Chat/examples/ray/mmmt_prompt.py @@ -0,0 +1,191 @@ +import argparse +import os +import socket +from functools import partial + +import ray +import torch +import pandas as pd +from coati.quant import llama_load_quant, low_resource_init +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder +from coati.ray.utils import ( + get_actor_from_args, + get_critic_from_args, + get_receivers_per_sender, + get_reward_model_from_args, + get_strategy_from_args, +) +from torch.utils.data import DataLoader +from transformers import AutoConfig, AutoTokenizer +from transformers.modeling_utils import no_init_weights + + +def get_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + +def get_local_ip(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(('8.8.8.8', 80)) + return s.getsockname()[0] + + +def main(args): + master_addr = str(get_local_ip()) + # trainer_env_info + trainer_port = str(get_free_port()) + env_info_trainers = [{ + 'local_rank': '0', + 'rank': str(rank), + 'world_size': str(args.num_trainers), + 'master_port': trainer_port, + 'master_addr': master_addr + } for rank in range(args.num_trainers)] + + # maker_env_info + maker_port = str(get_free_port()) + env_info_makers = [{ + 'local_rank': '0', + 'rank': str(rank), + 'world_size': str(args.num_makers), + 'master_port': maker_port, + 'master_addr': master_addr + } for rank in range(args.num_makers)] + + # configure tokenizer + tokenizer = AutoTokenizer.from_pretrained(args.pretrain) + tokenizer.pad_token = tokenizer.eos_token + + def model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + if args.initial_model_quant_ckpt is not None and args.model == 'llama': + # quantize initial model + actor_cfg = AutoConfig.from_pretrained(args.pretrain) + with low_resource_init(), no_init_weights(): + initial_model = get_actor_from_args(args.model, config=actor_cfg) + initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, + args.quant_group_size).cuda().requires_grad_(False) + else: + initial_model = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + return actor, critic, reward_model, initial_model + + # configure Experience Maker + experience_holder_refs = [ + ExperienceMakerHolder.options(name=f"maker{i}", num_gpus=1, max_concurrency=2).remote( + detached_trainer_name_list=[ + f'trainer{x}' + for x in get_receivers_per_sender(i, args.num_makers, args.num_trainers, allow_idle_sender=False) + ], + strategy_fn=partial(get_strategy_from_args, args.maker_strategy), + model_fn=model_fn, + env_info=env_info_maker, + kl_coef=0.1, + debug=args.debug, + # sync_models_from_trainers=True, + # generation kwargs: + max_length=512, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + eval_performance=True, + use_cache=True, + ) + for i, env_info_maker in enumerate(env_info_makers) + ] + + def trainer_model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() + return actor, critic + + # configure Trainer + trainer_refs = [ + DetachedPPOTrainer.options(name=f"trainer{i}", num_gpus=1, max_concurrency=2).remote( + experience_maker_holder_name_list=[ + f"maker{x}" + for x in get_receivers_per_sender(i, args.num_trainers, args.num_makers, allow_idle_sender=True) + ], + strategy_fn=partial(get_strategy_from_args, args.trainer_strategy), + model_fn=trainer_model_fn, + env_info=env_info_trainer, + train_batch_size=args.train_batch_size, + buffer_limit=16, + eval_performance=True, + debug=args.debug, + ) + for i, env_info_trainer in enumerate(env_info_trainers) + ] + + dataset_size = args.experience_batch_size * 4 + + def build_dataloader(): + def tokenize_fn(texts): + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + + dataset = pd.read_csv(args.prompt_path)['prompt'] + dataloader = DataLoader(dataset=dataset, + batch_size=dataset_size, + shuffle=True, + collate_fn=tokenize_fn + ) + return dataloader + + # uncomment this function if sync_models_from_trainers is True + # ray.get([ + # trainer_ref.sync_models_to_remote_makers.remote() + # for trainer_ref in trainer_refs + # ]) + + wait_tasks = [] + + for experience_holder_ref in experience_holder_refs: + wait_tasks.append( + experience_holder_ref.workingloop.remote(build_dataloader, + num_steps=args.experience_steps)) + + total_steps = args.experience_batch_size * args.experience_steps * \ + args.num_makers // (args.num_trainers * args.train_batch_size) + for trainer_ref in trainer_refs: + wait_tasks.append(trainer_ref.fit.remote(total_steps, args.update_steps, args.train_epochs)) + + ray.get(wait_tasks) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--prompt_path', type=str, default=None) + parser.add_argument('--num_makers', type=int, default=1) + parser.add_argument('--num_trainers', type=int, default=1) + parser.add_argument('--trainer_strategy', + choices=[ + 'naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2', 'colossalai_gemini_cpu', + 'colossalai_zero2_cpu' + ], + default='naive') + parser.add_argument('--maker_strategy', choices=['naive'], default='naive') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--critic_model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--pretrain', type=str, default=None) + parser.add_argument('--critic_pretrain', type=str, default=None) + parser.add_argument('--experience_steps', type=int, default=4) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--train_epochs', type=int, default=1) + parser.add_argument('--update_steps', type=int, default=2) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + + parser.add_argument('--initial_model_quant_ckpt', type=str, default=None) + parser.add_argument('--quant_bits', type=int, default=4) + parser.add_argument('--quant_group_size', type=int, default=128) + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + ray.init(namespace=os.environ["RAY_NAMESPACE"]) + main(args)