From bb43927a060ae9ac5b4758fff43bfce59b5497b3 Mon Sep 17 00:00:00 2001 From: csric Date: Sun, 23 Apr 2023 19:00:43 +0800 Subject: [PATCH 1/3] prompt example --- .../Chat/coati/ray/experience_maker_holder.py | 10 +- .../Chat/coati/trainer/strategies/sampler.py | 10 ++ applications/Chat/examples/ray/1mmt_prompt.py | 161 ++++++++++++++++++ 3 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 applications/Chat/examples/ray/1mmt_prompt.py diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index ebeb58137370..284e4b7a7f2f 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -161,7 +161,11 @@ def _inference_step(self, batch) -> None: experience.to_device('cpu') self._send_items(experience) - def workingloop(self, dataloader_fn: Callable[[], Iterable], num_epochs: int = 1, num_steps: int = 0): + def workingloop(self, + dataloader_fn: Callable[[], Iterable], + tokenize_fn: Callable = lambda x: x, + num_epochs: int = 1, + num_steps: int = 0): """Working loop of the experience maker. Args: @@ -180,12 +184,12 @@ def workingloop(self, dataloader_fn: Callable[[], Iterable], num_epochs: int = 1 except StopIteration: it = iter(dataloader) batch = next(it) - self._inference_step(batch) + self._inference_step(tokenize_fn(batch)) else: with tqdm(total=num_epochs * len(dataloader), desc='ExperienceMaker', disable=not is_rank_0()) as pbar: for _ in range(num_epochs): for batch in dataloader: - self._inference_step(batch) + self._inference_step(tokenize_fn(batch)) pbar.update() self._on_finish() diff --git a/applications/Chat/coati/trainer/strategies/sampler.py b/applications/Chat/coati/trainer/strategies/sampler.py index d726fa640fa2..cf9c3303c868 100644 --- a/applications/Chat/coati/trainer/strategies/sampler.py +++ b/applications/Chat/coati/trainer/strategies/sampler.py @@ -27,6 +27,16 @@ def __init__(self, dataset, num_replicas: int, rank: int) -> None: assert len(indices) == self.num_samples self.indices = indices + def sample(self, batch_size: int) -> list: sampled_indices = np.random.choice(self.indices, batch_size, replace=False) return [self.dataset[idx] for idx in sampled_indices] + + def __iter__(self): + return self + + def __next__(self): + return self.sample(self.iter_batch_size) + + def set_iter_batch_size(self, batch_size : int = 1): + self.iter_batch_size = batch_size \ No newline at end of file diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py new file mode 100644 index 000000000000..fb0bc6d43644 --- /dev/null +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -0,0 +1,161 @@ +import argparse +import os +import socket +from functools import partial + +import pandas as pd +import ray +import torch +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder +from coati.ray.utils import ( + get_actor_from_args, + get_critic_from_args, + get_reward_model_from_args, + get_strategy_from_args, + get_tokenizer_from_args +) + +from torch.utils.data import DataLoader + +def get_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + +def get_local_ip(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(('8.8.8.8', 80)) + return s.getsockname()[0] + +def main(args): + master_addr = str(get_local_ip()) + # trainer_env_info + trainer_port = str(get_free_port()) + env_info_trainers = [{ + 'local_rank': '0', + 'rank': str(rank), + 'world_size': str(args.num_trainers), + 'master_port': trainer_port, + 'master_addr': master_addr + } for rank in range(args.num_trainers)] + + # maker_env_info + maker_port = str(get_free_port()) + env_info_maker = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '1', + 'master_port': maker_port, + 'master_addr': master_addr + } + + # configure tokenizer + tokenizer = get_tokenizer_from_args(args.model) + + def trainer_model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() + return actor, critic + + # configure Trainer + trainer_refs = [ + DetachedPPOTrainer.options(name=f"trainer{i}", num_gpus=1, max_concurrency=2).remote( + experience_maker_holder_name_list=["maker1"], + strategy_fn=partial(get_strategy_from_args, args.trainer_strategy), + model_fn=trainer_model_fn, + env_info=env_info_trainer, + train_batch_size=args.train_batch_size, + buffer_limit=16, + eval_performance=True, + debug=args.debug, + ) for i, env_info_trainer in enumerate(env_info_trainers) + ] + + def model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() + reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).half().cuda() + initial_model = get_actor_from_args(args.model, args.pretrain).half().cuda() + return actor, critic, reward_model, initial_model + + # configure Experience Maker + experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( + detached_trainer_name_list=[f'trainer{i}' for i in range(args.num_trainers)], + strategy_fn=partial(get_strategy_from_args, args.maker_strategy), + model_fn=model_fn, + env_info=env_info_maker, + experience_batch_size=args.experience_batch_size, + kl_coef=0.1, + debug=args.debug, + # sync_models_from_trainers=True, + # generation kwargs: + max_length=512, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + eval_performance=True, + use_cache=True, + ) + + + + # uncomment this function if sync_models_from_trainers is True + # ray.get([ + # trainer_ref.sync_models_to_remote_makers.remote() + # for trainer_ref in trainer_refs + # ]) + + wait_tasks = [] + + total_steps = args.experience_batch_size * args.experience_steps // (args.num_trainers * args.train_batch_size) + for trainer_ref in trainer_refs: + wait_tasks.append( + trainer_ref.fit.remote(total_steps, args.update_steps, args.train_epochs)) + + + dataset_size = args.experience_batch_size * 4 + + def build_dataloader(): + dataset = pd.read_csv(args.prompt_path)['prompt'] + # csric: Strategy.setup_sampler, Strategy.setup_dataloader aren't utilized + sampler = get_strategy_from_args(args.maker_strategy).setup_sampler(dataset) + sampler.set_iter_batch_size(dataset_size) + return sampler + + def tokenize_fn(texts): + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + + wait_tasks.append(experience_holder_ref.workingloop.remote(build_dataloader, + tokenize_fn, + num_steps=args.experience_steps)) + + ray.get(wait_tasks) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--prompt_path', type=str, default=None) + parser.add_argument('--num_trainers', type=int, default=1) + parser.add_argument('--trainer_strategy', + choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + default='naive') + parser.add_argument('--maker_strategy', choices=['naive'], default='naive') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) + parser.add_argument('--pretrain', type=str, default=None) + parser.add_argument('--critic_pretrain', type=str, default=None) + parser.add_argument('--experience_steps', type=int, default=4) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--train_epochs', type=int, default=1) + parser.add_argument('--update_steps', type=int, default=2) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + ray.init(namespace=os.environ["RAY_NAMESPACE"]) + main(args) \ No newline at end of file From 884a645b7232d0ec3c9a3f103f9cc4a9d9750576 Mon Sep 17 00:00:00 2001 From: csric Date: Mon, 24 Apr 2023 15:50:22 +0800 Subject: [PATCH 2/3] prompt load csv data --- .../Chat/coati/ray/experience_maker_holder.py | 10 +++------ applications/Chat/examples/ray/1mmt_prompt.py | 21 +++++++++++-------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index 284e4b7a7f2f..ebeb58137370 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -161,11 +161,7 @@ def _inference_step(self, batch) -> None: experience.to_device('cpu') self._send_items(experience) - def workingloop(self, - dataloader_fn: Callable[[], Iterable], - tokenize_fn: Callable = lambda x: x, - num_epochs: int = 1, - num_steps: int = 0): + def workingloop(self, dataloader_fn: Callable[[], Iterable], num_epochs: int = 1, num_steps: int = 0): """Working loop of the experience maker. Args: @@ -184,12 +180,12 @@ def workingloop(self, except StopIteration: it = iter(dataloader) batch = next(it) - self._inference_step(tokenize_fn(batch)) + self._inference_step(batch) else: with tqdm(total=num_epochs * len(dataloader), desc='ExperienceMaker', disable=not is_rank_0()) as pbar: for _ in range(num_epochs): for batch in dataloader: - self._inference_step(tokenize_fn(batch)) + self._inference_step(batch) pbar.update() self._on_finish() diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py index fb0bc6d43644..5baf96eaa508 100644 --- a/applications/Chat/examples/ray/1mmt_prompt.py +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -118,20 +118,23 @@ def model_fn(): dataset_size = args.experience_batch_size * 4 + from torch.utils.data import DataLoader def build_dataloader(): + def tokenize_fn(texts): + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + dataset = pd.read_csv(args.prompt_path)['prompt'] - # csric: Strategy.setup_sampler, Strategy.setup_dataloader aren't utilized - sampler = get_strategy_from_args(args.maker_strategy).setup_sampler(dataset) - sampler.set_iter_batch_size(dataset_size) - return sampler - - def tokenize_fn(texts): - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} + dataloader = DataLoader(dataset=dataset, + batch_size=dataset_size, + shuffle=True, + collate_fn=tokenize_fn + ) + return dataloader + wait_tasks.append(experience_holder_ref.workingloop.remote(build_dataloader, - tokenize_fn, num_steps=args.experience_steps)) ray.get(wait_tasks) From 603bd7eb8306ccbf75d7630ebb6798dd91477ca0 Mon Sep 17 00:00:00 2001 From: csric Date: Mon, 24 Apr 2023 15:59:17 +0800 Subject: [PATCH 3/3] remove legacy try --- applications/Chat/coati/trainer/strategies/sampler.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/sampler.py b/applications/Chat/coati/trainer/strategies/sampler.py index cf9c3303c868..65e199dbf029 100644 --- a/applications/Chat/coati/trainer/strategies/sampler.py +++ b/applications/Chat/coati/trainer/strategies/sampler.py @@ -31,12 +31,3 @@ def __init__(self, dataset, num_replicas: int, rank: int) -> None: def sample(self, batch_size: int) -> list: sampled_indices = np.random.choice(self.indices, batch_size, replace=False) return [self.dataset[idx] for idx in sampled_indices] - - def __iter__(self): - return self - - def __next__(self): - return self.sample(self.iter_batch_size) - - def set_iter_batch_size(self, batch_size : int = 1): - self.iter_batch_size = batch_size \ No newline at end of file