diff --git a/applications/Chat/coati/ray/__init__.py b/applications/Chat/coati/ray/__init__.py index 5802c05bc03f..e69de29bb2d1 100644 --- a/applications/Chat/coati/ray/__init__.py +++ b/applications/Chat/coati/ray/__init__.py @@ -1,2 +0,0 @@ -from .src.detached_replay_buffer import DetachedReplayBuffer -from .src.detached_trainer_ppo import DetachedPPOTrainer diff --git a/applications/Chat/coati/ray/src/detached_replay_buffer.py b/applications/Chat/coati/ray/detached_replay_buffer.py similarity index 100% rename from applications/Chat/coati/ray/src/detached_replay_buffer.py rename to applications/Chat/coati/ray/detached_replay_buffer.py diff --git a/applications/Chat/coati/ray/src/detached_trainer_base.py b/applications/Chat/coati/ray/detached_trainer_base.py similarity index 100% rename from applications/Chat/coati/ray/src/detached_trainer_base.py rename to applications/Chat/coati/ray/detached_trainer_base.py diff --git a/applications/Chat/coati/ray/src/detached_trainer_ppo.py b/applications/Chat/coati/ray/detached_trainer_ppo.py similarity index 100% rename from applications/Chat/coati/ray/src/detached_trainer_ppo.py rename to applications/Chat/coati/ray/detached_trainer_ppo.py diff --git a/applications/Chat/coati/ray/example/1m2t.py b/applications/Chat/coati/ray/example/1m2t.py deleted file mode 100644 index 3883c364a8e0..000000000000 --- a/applications/Chat/coati/ray/example/1m2t.py +++ /dev/null @@ -1,186 +0,0 @@ -import argparse -from copy import deepcopy - -import pandas as pd -import torch -from coati.trainer import PPOTrainer - - -from coati.ray.src.experience_maker_holder import ExperienceMakerHolder -from coati.ray.src.detached_trainer_ppo import DetachedPPOTrainer - -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from coati.experience_maker import NaiveExperienceMaker -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - -import ray -import os -import socket - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer_1 = {'local_rank' : '0', - 'rank' : '0', - 'world_size' : '2', - 'master_port' : trainer_port, - 'master_addr' : master_addr} - env_info_trainer_2 = {'local_rank' : '0', - 'rank' : '1', - 'world_size' : '2', - 'master_port' : trainer_port, - 'master_addr' : master_addr} - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker_1 = {'local_rank' : '0', - 'rank' : '0', - 'world_size' : '2', - 'master_port' : maker_port, - 'master_addr' : master_addr} - print([env_info_trainer_1, - env_info_trainer_2, - env_info_maker_1]) - ray.init(dashboard_port = 1145) - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_1_ref = DetachedPPOTrainer.options(name="trainer1", namespace=os.environ["RAY_NAMESPACE"], num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_1, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - #kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - trainer_2_ref = DetachedPPOTrainer.options(name="trainer2", namespace=os.environ["RAY_NAMESPACE"], num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_2, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - #kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug= args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", namespace=os.environ["RAY_NAMESPACE"], num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_1, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - #kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - # TODO: balance duty - ray.get(trainer_1_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_1_done_ref = trainer_1_ref.fit.remote(num_episodes=args.num_episodes, max_timesteps=args.max_timesteps, update_timesteps=args.update_timesteps) - trainer_2_done_ref = trainer_2_ref.fit.remote(num_episodes=args.num_episodes, max_timesteps=args.max_timesteps, update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * args.max_epochs * 2 + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_1_done_ref, trainer_2_done_ref, maker_1_done_ref]) - # save model checkpoint after fitting - trainer_1_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - trainer_2_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_1_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % (torch.cuda.current_device()), - only_rank0=False) - trainer_2_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - main(args) diff --git a/applications/Chat/coati/ray/example/2m2t.py b/applications/Chat/coati/ray/example/2m2t.py deleted file mode 100644 index 435c71915fc2..000000000000 --- a/applications/Chat/coati/ray/example/2m2t.py +++ /dev/null @@ -1,209 +0,0 @@ -import argparse -from copy import deepcopy - -import pandas as pd -import torch -from coati.trainer import PPOTrainer - - -from coati.ray.src.experience_maker_holder import ExperienceMakerHolder -from coati.ray.src.detached_trainer_ppo import DetachedPPOTrainer - -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from coati.experience_maker import NaiveExperienceMaker -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - -import ray -import os -import socket - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer_1 = {'local_rank' : '0', - 'rank' : '0', - 'world_size' : '2', - 'master_port' : trainer_port, - 'master_addr' : master_addr} - env_info_trainer_2 = {'local_rank' : '0', - 'rank' : '1', - 'world_size' : '2', - 'master_port' : trainer_port, - 'master_addr' : master_addr} - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker_1 = {'local_rank' : '0', - 'rank' : '0', - 'world_size' : '2', - 'master_port' : maker_port, - 'master_addr' : master_addr} - env_info_maker_2 = {'local_rank' : '0', - 'rank' : '1', - 'world_size' : '2', - 'master_port': maker_port, - 'master_addr' : master_addr} - print([env_info_trainer_1, - env_info_trainer_2, - env_info_maker_1, - env_info_maker_2]) - ray.init() - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_1_ref = DetachedPPOTrainer.options(name="trainer1", namespace=os.environ["RAY_NAMESPACE"], num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_1, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - #kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - trainer_2_ref = DetachedPPOTrainer.options(name="trainer2", namespace=os.environ["RAY_NAMESPACE"], num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_2, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - #kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", namespace=os.environ["RAY_NAMESPACE"], num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_1, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - #kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - experience_holder_2_ref = ExperienceMakerHolder.options(name="maker2", namespace=os.environ["RAY_NAMESPACE"], num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_2, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - #kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - # TODO: balance duty - ray.get(trainer_1_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_1_done_ref = trainer_1_ref.fit.remote(num_episodes=args.num_episodes, max_timesteps=args.max_timesteps, update_timesteps=args.update_timesteps) - trainer_2_done_ref = trainer_2_ref.fit.remote(num_episodes=args.num_episodes, max_timesteps=args.max_timesteps, update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * args.max_epochs + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - maker_2_done_ref = experience_holder_2_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_1_done_ref, trainer_2_done_ref, maker_1_done_ref, maker_2_done_ref]) - # save model checkpoint after fitting - trainer_1_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - trainer_2_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_1_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % (torch.cuda.current_device()), - only_rank0=False) - trainer_2_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - main(args) diff --git a/applications/Chat/coati/ray/src/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py similarity index 100% rename from applications/Chat/coati/ray/src/experience_maker_holder.py rename to applications/Chat/coati/ray/experience_maker_holder.py diff --git a/applications/Chat/coati/ray/src/pipeline_strategy.py b/applications/Chat/coati/ray/pipeline_strategy.py similarity index 81% rename from applications/Chat/coati/ray/src/pipeline_strategy.py rename to applications/Chat/coati/ray/pipeline_strategy.py index 1780839c62ee..4b01a45b176e 100644 --- a/applications/Chat/coati/ray/src/pipeline_strategy.py +++ b/applications/Chat/coati/ray/pipeline_strategy.py @@ -1,42 +1,42 @@ # WIP - -from coati.trainer.strategies import Strategy -from coati.trainer.strategies import NaiveStrategy -from coati.models.base import Actor, RewardModel, Critic +import os +import random +from functools import partial import numpy as np import torch +from coati.models.base import Actor, Critic, RewardModel +from coati.trainer.strategies import NaiveStrategy, Strategy from torch._C._distributed_rpc import _is_current_rpc_agent_set import colossalai -from colossalai.pipeline.pipeline_process_group import ppg -from colossalai.pipeline.rpc._pipeline_schedule import OneFOneBPipelineEngine from colossalai.fx import ColoTracer from colossalai.fx.passes.adding_split_node_pass import balanced_split_pass, split_with_split_nodes_pass from colossalai.pipeline.middleware.adaptor import get_fx_topology - - -import os -from functools import partial -import random +from colossalai.pipeline.pipeline_process_group import ppg +from colossalai.pipeline.rpc._pipeline_schedule import OneFOneBPipelineEngine rpc_is_initialized = _is_current_rpc_agent_set + class PipelineModel(torch.nn.Module): ''' - Actor has 2 kinds of jobs: forward and generate. + Actor has 2 kinds of jobs: forward and generate. better to just pipelinize the inner model ''' - def __init__(self, - model: torch.nn.Module, - stage_num: int, - num_microbatches: int, - data_kwargs = None, - ): + + def __init__( + self, + model: torch.nn.Module, + stage_num: int, + num_microbatches: int, + data_kwargs=None, + ): super().__init__() + # create partition module - def create_partition_module(pp_rank:int, stage_num: int, model, data_kwargs): + def create_partition_module(pp_rank: int, stage_num: int, model, data_kwargs): model.eval() tracer = ColoTracer() meta_args = {k: v.to('meta') for k, v in data_kwargs.items()} @@ -49,10 +49,11 @@ def create_partition_module(pp_rank:int, stage_num: int, model, data_kwargs): if isinstance(submodule, torch.fx.GraphModule): setattr(submodule, '_topo', topo) return split_submodules[pp_rank + 1] - + def partition(model, data_kwargs: dict, pp_rank: int, chunk: int, stage_num: int): partition = create_partition_module(pp_rank, stage_num, model, data_kwargs) return partition + self.inference_engine = OneFOneBPipelineEngine( partition_fn=partial(partition, model, data_kwargs), stage_num=stage_num, @@ -60,38 +61,33 @@ def partition(model, data_kwargs: dict, pp_rank: int, chunk: int, stage_num: int device='cuda', ) - def forward(self, - **model_inputs): + def forward(self, **model_inputs): return self.inference_engine.forward_backward(**model_inputs, forward_only=True) - class PPStrategy(NaiveStrategy): """ Strategy for Pipeline inference (inference only!) - + master node only """ - def __init__( - self, - seed: int = 42 - ): + + def __init__(self, seed: int = 42): self.seed = seed super().__init__() - - + def setup_distributed(self) -> None: colossalai.launch_from_torch({}, seed=self.seed) - ppg.set_global_info(rank = int(os.environ['RANK']), + ppg.set_global_info(rank=int(os.environ['RANK']), world_size=int(os.environ['WORLD_SIZE']), dp_degree=1, tp_degree=1, num_worker_threads=128, device="cuda") - + def model_init_context(self): return super().model_init_context() - + def setup_model(self, model: torch.nn.Module) -> torch.nn.Module: if isinstance(model, Actor) or \ isinstance(model, RewardModel) or \ @@ -102,4 +98,3 @@ def set_seed(self, seed: int) -> None: random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) - diff --git a/applications/Chat/coati/ray/src/__init__.py b/applications/Chat/coati/ray/src/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/applications/Chat/coati/ray/src/utils.py b/applications/Chat/coati/ray/utils.py similarity index 100% rename from applications/Chat/coati/ray/src/utils.py rename to applications/Chat/coati/ray/utils.py diff --git a/applications/Chat/coati/ray/example/1m1t.py b/applications/Chat/examples/ray/1m1t.py similarity index 97% rename from applications/Chat/coati/ray/example/1m1t.py rename to applications/Chat/examples/ray/1m1t.py index 4ad724c1e354..8c291abb1f8b 100644 --- a/applications/Chat/coati/ray/example/1m1t.py +++ b/applications/Chat/examples/ray/1m1t.py @@ -7,8 +7,8 @@ import ray import torch from coati.experience_maker import NaiveExperienceMaker -from coati.ray.src.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.src.experience_maker_holder import ExperienceMakerHolder +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder from coati.trainer import PPOTrainer from coati.trainer.callbacks.performance_evaluator import ( ExperienceMakerPerformanceEvaluator, diff --git a/applications/Chat/coati/ray/example/1m1t.sh b/applications/Chat/examples/ray/1m1t.sh similarity index 100% rename from applications/Chat/coati/ray/example/1m1t.sh rename to applications/Chat/examples/ray/1m1t.sh diff --git a/applications/Chat/coati/ray/example/1m1t_quantize.py b/applications/Chat/examples/ray/1m1t_quantize.py similarity index 97% rename from applications/Chat/coati/ray/example/1m1t_quantize.py rename to applications/Chat/examples/ray/1m1t_quantize.py index dc9c9bf9a1f3..cc54bd1905c6 100644 --- a/applications/Chat/coati/ray/example/1m1t_quantize.py +++ b/applications/Chat/examples/ray/1m1t_quantize.py @@ -5,8 +5,8 @@ import pandas as pd import ray import torch -from coati.ray.src.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.src.experience_maker_holder import ExperienceMakerHolder +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder from transformers import AutoTokenizer, BloomTokenizerFast from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer diff --git a/applications/Chat/examples/ray/1m2t.py b/applications/Chat/examples/ray/1m2t.py new file mode 100644 index 000000000000..1a35beb6221a --- /dev/null +++ b/applications/Chat/examples/ray/1m2t.py @@ -0,0 +1,203 @@ +import argparse +import os +import socket +from copy import deepcopy + +import pandas as pd +import ray +import torch +from coati.experience_maker import NaiveExperienceMaker +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder +from coati.trainer import PPOTrainer +from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy +from torch.optim import Adam +from transformers import AutoTokenizer, BloomTokenizerFast +from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer + +from colossalai.nn.optimizer import HybridAdam + + +def get_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + +def get_local_ip(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(('8.8.8.8', 80)) + return s.getsockname()[0] + + +def main(args): + master_addr = str(get_local_ip()) + # trainer_env_info + trainer_port = str(get_free_port()) + env_info_trainer_1 = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '2', + 'master_port': trainer_port, + 'master_addr': master_addr + } + env_info_trainer_2 = { + 'local_rank': '0', + 'rank': '1', + 'world_size': '2', + 'master_port': trainer_port, + 'master_addr': master_addr + } + # maker_env_info + maker_port = str(get_free_port()) + env_info_maker_1 = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '2', + 'master_port': maker_port, + 'master_addr': master_addr + } + print([env_info_trainer_1, env_info_trainer_2, env_info_maker_1]) + ray.init(dashboard_port=1145) + # configure tokenizer + if args.model == 'gpt2': + tokenizer = GPT2Tokenizer.from_pretrained('gpt2') + tokenizer.pad_token = tokenizer.eos_token + elif args.model == 'bloom': + tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) + tokenizer.pad_token = tokenizer.eos_token + elif args.model == 'opt': + tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") + else: + raise ValueError(f'Unsupported model "{args.model}"') + + # configure Trainer + trainer_1_ref = DetachedPPOTrainer.options(name="trainer1", + namespace=os.environ["RAY_NAMESPACE"], + num_gpus=1, + max_concurrency=2).remote( + experience_maker_holder_name_list=["maker1"], + strategy=args.trainer_strategy, + model=args.model, + env_info=env_info_trainer_1, + pretrained=args.pretrain, + lora_rank=args.lora_rank, + train_batch_size=args.train_batch_size, + buffer_limit=16, + experience_batch_size=args.experience_batch_size, + max_epochs=args.max_epochs, + # kwargs: + max_length=128, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + debug=args.debug, + ) + + trainer_2_ref = DetachedPPOTrainer.options(name="trainer2", + namespace=os.environ["RAY_NAMESPACE"], + num_gpus=1, + max_concurrency=2).remote( + experience_maker_holder_name_list=["maker1"], + strategy=args.trainer_strategy, + model=args.model, + env_info=env_info_trainer_2, + pretrained=args.pretrain, + lora_rank=args.lora_rank, + train_batch_size=args.train_batch_size, + buffer_limit=16, + experience_batch_size=args.experience_batch_size, + max_epochs=args.max_epochs, + # kwargs: + max_length=128, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + debug=args.debug, + ) + + # configure Experience Maker + experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", + namespace=os.environ["RAY_NAMESPACE"], + num_gpus=1, + max_concurrency=2).remote( + detached_trainer_name_list=["trainer1", "trainer2"], + strategy=args.maker_strategy, + env_info=env_info_maker_1, + experience_batch_size=args.experience_batch_size, + kl_coef=0.1, + # kwargs: + max_length=128, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + debug=args.debug, + ) + + # trainer send its actor and critic to experience holders. + # TODO: balance duty + ray.get(trainer_1_ref.initialize_remote_makers.remote()) + + # configure sampler + dataset = pd.read_csv(args.prompt_path)['prompt'] + + def tokenize_fn(texts): + # MUST padding to max length to ensure inputs of all ranks have the same length + # Different length may lead to hang when using gemini, as different generation steps + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + + trainer_1_done_ref = trainer_1_ref.fit.remote(num_episodes=args.num_episodes, + max_timesteps=args.max_timesteps, + update_timesteps=args.update_timesteps) + trainer_2_done_ref = trainer_2_ref.fit.remote(num_episodes=args.num_episodes, + max_timesteps=args.max_timesteps, + update_timesteps=args.update_timesteps) + num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ + args.max_epochs * 2 + 3 # +3 for fault tolerance + maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) + + ray.get([trainer_1_done_ref, trainer_2_done_ref, maker_1_done_ref]) + # save model checkpoint after fitting + trainer_1_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) + trainer_2_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) + # save optimizer checkpoint on all ranks + if args.need_optim_ckpt: + trainer_1_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % + (torch.cuda.current_device()), + only_rank0=False) + trainer_2_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % + (torch.cuda.current_device()), + only_rank0=False) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('prompt_path') + parser.add_argument('--trainer_strategy', + choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + default='naive') + parser.add_argument('--maker_strategy', + choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + default='naive') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) + parser.add_argument('--pretrain', type=str, default=None) + parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') + parser.add_argument('--need_optim_ckpt', type=bool, default=False) + parser.add_argument('--num_episodes', type=int, default=10) + parser.add_argument('--max_timesteps', type=int, default=10) + parser.add_argument('--update_timesteps', type=int, default=10) + parser.add_argument('--max_epochs', type=int, default=5) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + main(args) diff --git a/applications/Chat/coati/ray/example/1m2t.sh b/applications/Chat/examples/ray/1m2t.sh similarity index 96% rename from applications/Chat/coati/ray/example/1m2t.sh rename to applications/Chat/examples/ray/1m2t.sh index 669f4141026c..9608526ea7e7 100644 --- a/applications/Chat/coati/ray/example/1m2t.sh +++ b/applications/Chat/examples/ray/1m2t.sh @@ -20,4 +20,4 @@ export RAY_NAMESPACE="admin" python 1m2t.py "/path/to/prompts.csv" --model gpt2 \ --maker_strategy naive --trainer_strategy ddp --lora_rank 2 \ --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 #--debug \ No newline at end of file + --max_epochs 10 #--debug diff --git a/applications/Chat/coati/ray/example/1mmt_dummy.py b/applications/Chat/examples/ray/1mmt_dummy.py similarity index 97% rename from applications/Chat/coati/ray/example/1mmt_dummy.py rename to applications/Chat/examples/ray/1mmt_dummy.py index c7619ea6940b..540f4243577d 100644 --- a/applications/Chat/coati/ray/example/1mmt_dummy.py +++ b/applications/Chat/examples/ray/1mmt_dummy.py @@ -5,9 +5,9 @@ import ray import torch -from coati.ray.src.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.src.experience_maker_holder import ExperienceMakerHolder -from coati.ray.src.utils import ( +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder +from coati.ray.utils import ( get_actor_from_args, get_critic_from_args, get_reward_model_from_args, diff --git a/applications/Chat/coati/ray/example/2m1t.py b/applications/Chat/examples/ray/2m1t.py similarity index 90% rename from applications/Chat/coati/ray/example/2m1t.py rename to applications/Chat/examples/ray/2m1t.py index b655de1ab1fa..bed6246ed0d7 100644 --- a/applications/Chat/coati/ray/example/2m1t.py +++ b/applications/Chat/examples/ray/2m1t.py @@ -1,26 +1,22 @@ import argparse +import os +import socket from copy import deepcopy import pandas as pd +import ray import torch +from coati.experience_maker import NaiveExperienceMaker +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder from coati.trainer import PPOTrainer - - -from coati.ray.src.experience_maker_holder import ExperienceMakerHolder -from coati.ray.src.detached_trainer_ppo import DetachedPPOTrainer - from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from coati.experience_maker import NaiveExperienceMaker from torch.optim import Adam from transformers import AutoTokenizer, BloomTokenizerFast from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer from colossalai.nn.optimizer import HybridAdam -import ray -import os -import socket - def main(args): # configure tokenizer @@ -46,7 +42,7 @@ def main(args): buffer_limit=16, experience_batch_size=args.experience_batch_size, max_epochs=args.max_epochs, - #kwargs: + # kwargs: max_length=128, do_sample=True, temperature=1.0, @@ -62,7 +58,7 @@ def main(args): strategy=args.maker_strategy, experience_batch_size=args.experience_batch_size, kl_coef=0.1, - #kwargs: + # kwargs: max_length=128, do_sample=True, temperature=1.0, @@ -71,13 +67,13 @@ def main(args): eos_token_id=tokenizer.eos_token_id, debug=args.debug, ) - + experience_holder_2_ref = ExperienceMakerHolder.options(name="maker2", num_gpus=1, max_concurrency=2).remote( detached_trainer_name_list=["trainer1"], strategy=args.maker_strategy, experience_batch_size=args.experience_batch_size, kl_coef=0.1, - #kwargs: + # kwargs: max_length=128, do_sample=True, temperature=1.0, @@ -99,20 +95,25 @@ def tokenize_fn(texts): batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) return {k: v.cuda() for k, v in batch.items()} - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, max_timesteps=args.max_timesteps, update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * args.max_epochs // 2 + 3 # +3 for fault tolerance + trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, + max_timesteps=args.max_timesteps, + update_timesteps=args.update_timesteps) + num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ + args.max_epochs // 2 + 3 # +3 for fault tolerance maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) maker_2_done_ref = experience_holder_2_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - + ray.get([trainer_done_ref, maker_1_done_ref, maker_2_done_ref]) # save model checkpoint after fitting trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) # save optimizer checkpoint on all ranks if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % (torch.cuda.current_device()), + trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % + (torch.cuda.current_device()), only_rank0=False) + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('prompt_path') diff --git a/applications/Chat/coati/ray/example/2m1t.sh b/applications/Chat/examples/ray/2m1t.sh similarity index 100% rename from applications/Chat/coati/ray/example/2m1t.sh rename to applications/Chat/examples/ray/2m1t.sh diff --git a/applications/Chat/examples/ray/2m2t.py b/applications/Chat/examples/ray/2m2t.py new file mode 100644 index 000000000000..05440032ce9f --- /dev/null +++ b/applications/Chat/examples/ray/2m2t.py @@ -0,0 +1,230 @@ +import argparse +import os +import socket +from copy import deepcopy + +import pandas as pd +import ray +import torch +from coati.experience_maker import NaiveExperienceMaker +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder +from coati.trainer import PPOTrainer +from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy +from torch.optim import Adam +from transformers import AutoTokenizer, BloomTokenizerFast +from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer + +from colossalai.nn.optimizer import HybridAdam + + +def get_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + +def get_local_ip(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(('8.8.8.8', 80)) + return s.getsockname()[0] + + +def main(args): + master_addr = str(get_local_ip()) + # trainer_env_info + trainer_port = str(get_free_port()) + env_info_trainer_1 = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '2', + 'master_port': trainer_port, + 'master_addr': master_addr + } + env_info_trainer_2 = { + 'local_rank': '0', + 'rank': '1', + 'world_size': '2', + 'master_port': trainer_port, + 'master_addr': master_addr + } + # maker_env_info + maker_port = str(get_free_port()) + env_info_maker_1 = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '2', + 'master_port': maker_port, + 'master_addr': master_addr + } + env_info_maker_2 = { + 'local_rank': '0', + 'rank': '1', + 'world_size': '2', + 'master_port': maker_port, + 'master_addr': master_addr + } + print([env_info_trainer_1, env_info_trainer_2, env_info_maker_1, env_info_maker_2]) + ray.init() + # configure tokenizer + if args.model == 'gpt2': + tokenizer = GPT2Tokenizer.from_pretrained('gpt2') + tokenizer.pad_token = tokenizer.eos_token + elif args.model == 'bloom': + tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) + tokenizer.pad_token = tokenizer.eos_token + elif args.model == 'opt': + tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") + else: + raise ValueError(f'Unsupported model "{args.model}"') + + # configure Trainer + trainer_1_ref = DetachedPPOTrainer.options(name="trainer1", + namespace=os.environ["RAY_NAMESPACE"], + num_gpus=1, + max_concurrency=2).remote( + experience_maker_holder_name_list=["maker1", "maker2"], + strategy=args.trainer_strategy, + model=args.model, + env_info=env_info_trainer_1, + pretrained=args.pretrain, + lora_rank=args.lora_rank, + train_batch_size=args.train_batch_size, + buffer_limit=16, + experience_batch_size=args.experience_batch_size, + max_epochs=args.max_epochs, + # kwargs: + max_length=128, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + debug=args.debug, + ) + + trainer_2_ref = DetachedPPOTrainer.options(name="trainer2", + namespace=os.environ["RAY_NAMESPACE"], + num_gpus=1, + max_concurrency=2).remote( + experience_maker_holder_name_list=["maker1", "maker2"], + strategy=args.trainer_strategy, + model=args.model, + env_info=env_info_trainer_2, + pretrained=args.pretrain, + lora_rank=args.lora_rank, + train_batch_size=args.train_batch_size, + buffer_limit=16, + experience_batch_size=args.experience_batch_size, + max_epochs=args.max_epochs, + # kwargs: + max_length=128, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + debug=args.debug, + ) + + # configure Experience Maker + experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", + namespace=os.environ["RAY_NAMESPACE"], + num_gpus=1, + max_concurrency=2).remote( + detached_trainer_name_list=["trainer1", "trainer2"], + strategy=args.maker_strategy, + env_info=env_info_maker_1, + experience_batch_size=args.experience_batch_size, + kl_coef=0.1, + # kwargs: + max_length=128, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + debug=args.debug, + ) + + experience_holder_2_ref = ExperienceMakerHolder.options(name="maker2", + namespace=os.environ["RAY_NAMESPACE"], + num_gpus=1, + max_concurrency=2).remote( + detached_trainer_name_list=["trainer1", "trainer2"], + strategy=args.maker_strategy, + env_info=env_info_maker_2, + experience_batch_size=args.experience_batch_size, + kl_coef=0.1, + # kwargs: + max_length=128, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + debug=args.debug, + ) + + # trainer send its actor and critic to experience holders. + # TODO: balance duty + ray.get(trainer_1_ref.initialize_remote_makers.remote()) + + # configure sampler + dataset = pd.read_csv(args.prompt_path)['prompt'] + + def tokenize_fn(texts): + # MUST padding to max length to ensure inputs of all ranks have the same length + # Different length may lead to hang when using gemini, as different generation steps + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + + trainer_1_done_ref = trainer_1_ref.fit.remote(num_episodes=args.num_episodes, + max_timesteps=args.max_timesteps, + update_timesteps=args.update_timesteps) + trainer_2_done_ref = trainer_2_ref.fit.remote(num_episodes=args.num_episodes, + max_timesteps=args.max_timesteps, + update_timesteps=args.update_timesteps) + num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ + args.max_epochs + 3 # +3 for fault tolerance + maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) + maker_2_done_ref = experience_holder_2_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) + + ray.get([trainer_1_done_ref, trainer_2_done_ref, maker_1_done_ref, maker_2_done_ref]) + # save model checkpoint after fitting + trainer_1_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) + trainer_2_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) + # save optimizer checkpoint on all ranks + if args.need_optim_ckpt: + trainer_1_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % + (torch.cuda.current_device()), + only_rank0=False) + trainer_2_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % + (torch.cuda.current_device()), + only_rank0=False) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('prompt_path') + parser.add_argument('--trainer_strategy', + choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + default='naive') + parser.add_argument('--maker_strategy', + choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + default='naive') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) + parser.add_argument('--pretrain', type=str, default=None) + parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') + parser.add_argument('--need_optim_ckpt', type=bool, default=False) + parser.add_argument('--num_episodes', type=int, default=10) + parser.add_argument('--max_timesteps', type=int, default=10) + parser.add_argument('--update_timesteps', type=int, default=10) + parser.add_argument('--max_epochs', type=int, default=5) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + main(args) diff --git a/applications/Chat/coati/ray/example/2m2t.sh b/applications/Chat/examples/ray/2m2t.sh similarity index 96% rename from applications/Chat/coati/ray/example/2m2t.sh rename to applications/Chat/examples/ray/2m2t.sh index fb4024766c54..bd8ca84a58fb 100644 --- a/applications/Chat/coati/ray/example/2m2t.sh +++ b/applications/Chat/examples/ray/2m2t.sh @@ -20,4 +20,4 @@ export RAY_NAMESPACE="admin" python 2m2t.py "path/to/prompts.csv" \ --maker_strategy naive --trainer_strategy colossalai_zero2 --lora_rank 2 \ --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 --debug \ No newline at end of file + --max_epochs 10 --debug