From bb43927a060ae9ac5b4758fff43bfce59b5497b3 Mon Sep 17 00:00:00 2001 From: csric Date: Sun, 23 Apr 2023 19:00:43 +0800 Subject: [PATCH 1/9] prompt example --- .../Chat/coati/ray/experience_maker_holder.py | 10 +- .../Chat/coati/trainer/strategies/sampler.py | 10 ++ applications/Chat/examples/ray/1mmt_prompt.py | 161 ++++++++++++++++++ 3 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 applications/Chat/examples/ray/1mmt_prompt.py diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index ebeb58137370..284e4b7a7f2f 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -161,7 +161,11 @@ def _inference_step(self, batch) -> None: experience.to_device('cpu') self._send_items(experience) - def workingloop(self, dataloader_fn: Callable[[], Iterable], num_epochs: int = 1, num_steps: int = 0): + def workingloop(self, + dataloader_fn: Callable[[], Iterable], + tokenize_fn: Callable = lambda x: x, + num_epochs: int = 1, + num_steps: int = 0): """Working loop of the experience maker. Args: @@ -180,12 +184,12 @@ def workingloop(self, dataloader_fn: Callable[[], Iterable], num_epochs: int = 1 except StopIteration: it = iter(dataloader) batch = next(it) - self._inference_step(batch) + self._inference_step(tokenize_fn(batch)) else: with tqdm(total=num_epochs * len(dataloader), desc='ExperienceMaker', disable=not is_rank_0()) as pbar: for _ in range(num_epochs): for batch in dataloader: - self._inference_step(batch) + self._inference_step(tokenize_fn(batch)) pbar.update() self._on_finish() diff --git a/applications/Chat/coati/trainer/strategies/sampler.py b/applications/Chat/coati/trainer/strategies/sampler.py index d726fa640fa2..cf9c3303c868 100644 --- a/applications/Chat/coati/trainer/strategies/sampler.py +++ b/applications/Chat/coati/trainer/strategies/sampler.py @@ -27,6 +27,16 @@ def __init__(self, dataset, num_replicas: int, rank: int) -> None: assert len(indices) == self.num_samples self.indices = indices + def sample(self, batch_size: int) -> list: sampled_indices = np.random.choice(self.indices, batch_size, replace=False) return [self.dataset[idx] for idx in sampled_indices] + + def __iter__(self): + return self + + def __next__(self): + return self.sample(self.iter_batch_size) + + def set_iter_batch_size(self, batch_size : int = 1): + self.iter_batch_size = batch_size \ No newline at end of file diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py new file mode 100644 index 000000000000..fb0bc6d43644 --- /dev/null +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -0,0 +1,161 @@ +import argparse +import os +import socket +from functools import partial + +import pandas as pd +import ray +import torch +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder +from coati.ray.utils import ( + get_actor_from_args, + get_critic_from_args, + get_reward_model_from_args, + get_strategy_from_args, + get_tokenizer_from_args +) + +from torch.utils.data import DataLoader + +def get_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + +def get_local_ip(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(('8.8.8.8', 80)) + return s.getsockname()[0] + +def main(args): + master_addr = str(get_local_ip()) + # trainer_env_info + trainer_port = str(get_free_port()) + env_info_trainers = [{ + 'local_rank': '0', + 'rank': str(rank), + 'world_size': str(args.num_trainers), + 'master_port': trainer_port, + 'master_addr': master_addr + } for rank in range(args.num_trainers)] + + # maker_env_info + maker_port = str(get_free_port()) + env_info_maker = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '1', + 'master_port': maker_port, + 'master_addr': master_addr + } + + # configure tokenizer + tokenizer = get_tokenizer_from_args(args.model) + + def trainer_model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() + return actor, critic + + # configure Trainer + trainer_refs = [ + DetachedPPOTrainer.options(name=f"trainer{i}", num_gpus=1, max_concurrency=2).remote( + experience_maker_holder_name_list=["maker1"], + strategy_fn=partial(get_strategy_from_args, args.trainer_strategy), + model_fn=trainer_model_fn, + env_info=env_info_trainer, + train_batch_size=args.train_batch_size, + buffer_limit=16, + eval_performance=True, + debug=args.debug, + ) for i, env_info_trainer in enumerate(env_info_trainers) + ] + + def model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() + reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).half().cuda() + initial_model = get_actor_from_args(args.model, args.pretrain).half().cuda() + return actor, critic, reward_model, initial_model + + # configure Experience Maker + experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( + detached_trainer_name_list=[f'trainer{i}' for i in range(args.num_trainers)], + strategy_fn=partial(get_strategy_from_args, args.maker_strategy), + model_fn=model_fn, + env_info=env_info_maker, + experience_batch_size=args.experience_batch_size, + kl_coef=0.1, + debug=args.debug, + # sync_models_from_trainers=True, + # generation kwargs: + max_length=512, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + eval_performance=True, + use_cache=True, + ) + + + + # uncomment this function if sync_models_from_trainers is True + # ray.get([ + # trainer_ref.sync_models_to_remote_makers.remote() + # for trainer_ref in trainer_refs + # ]) + + wait_tasks = [] + + total_steps = args.experience_batch_size * args.experience_steps // (args.num_trainers * args.train_batch_size) + for trainer_ref in trainer_refs: + wait_tasks.append( + trainer_ref.fit.remote(total_steps, args.update_steps, args.train_epochs)) + + + dataset_size = args.experience_batch_size * 4 + + def build_dataloader(): + dataset = pd.read_csv(args.prompt_path)['prompt'] + # csric: Strategy.setup_sampler, Strategy.setup_dataloader aren't utilized + sampler = get_strategy_from_args(args.maker_strategy).setup_sampler(dataset) + sampler.set_iter_batch_size(dataset_size) + return sampler + + def tokenize_fn(texts): + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + + wait_tasks.append(experience_holder_ref.workingloop.remote(build_dataloader, + tokenize_fn, + num_steps=args.experience_steps)) + + ray.get(wait_tasks) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--prompt_path', type=str, default=None) + parser.add_argument('--num_trainers', type=int, default=1) + parser.add_argument('--trainer_strategy', + choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + default='naive') + parser.add_argument('--maker_strategy', choices=['naive'], default='naive') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) + parser.add_argument('--pretrain', type=str, default=None) + parser.add_argument('--critic_pretrain', type=str, default=None) + parser.add_argument('--experience_steps', type=int, default=4) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--train_epochs', type=int, default=1) + parser.add_argument('--update_steps', type=int, default=2) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + ray.init(namespace=os.environ["RAY_NAMESPACE"]) + main(args) \ No newline at end of file From 884a645b7232d0ec3c9a3f103f9cc4a9d9750576 Mon Sep 17 00:00:00 2001 From: csric Date: Mon, 24 Apr 2023 15:50:22 +0800 Subject: [PATCH 2/9] prompt load csv data --- .../Chat/coati/ray/experience_maker_holder.py | 10 +++------ applications/Chat/examples/ray/1mmt_prompt.py | 21 +++++++++++-------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index 284e4b7a7f2f..ebeb58137370 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -161,11 +161,7 @@ def _inference_step(self, batch) -> None: experience.to_device('cpu') self._send_items(experience) - def workingloop(self, - dataloader_fn: Callable[[], Iterable], - tokenize_fn: Callable = lambda x: x, - num_epochs: int = 1, - num_steps: int = 0): + def workingloop(self, dataloader_fn: Callable[[], Iterable], num_epochs: int = 1, num_steps: int = 0): """Working loop of the experience maker. Args: @@ -184,12 +180,12 @@ def workingloop(self, except StopIteration: it = iter(dataloader) batch = next(it) - self._inference_step(tokenize_fn(batch)) + self._inference_step(batch) else: with tqdm(total=num_epochs * len(dataloader), desc='ExperienceMaker', disable=not is_rank_0()) as pbar: for _ in range(num_epochs): for batch in dataloader: - self._inference_step(tokenize_fn(batch)) + self._inference_step(batch) pbar.update() self._on_finish() diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py index fb0bc6d43644..5baf96eaa508 100644 --- a/applications/Chat/examples/ray/1mmt_prompt.py +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -118,20 +118,23 @@ def model_fn(): dataset_size = args.experience_batch_size * 4 + from torch.utils.data import DataLoader def build_dataloader(): + def tokenize_fn(texts): + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + dataset = pd.read_csv(args.prompt_path)['prompt'] - # csric: Strategy.setup_sampler, Strategy.setup_dataloader aren't utilized - sampler = get_strategy_from_args(args.maker_strategy).setup_sampler(dataset) - sampler.set_iter_batch_size(dataset_size) - return sampler - - def tokenize_fn(texts): - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} + dataloader = DataLoader(dataset=dataset, + batch_size=dataset_size, + shuffle=True, + collate_fn=tokenize_fn + ) + return dataloader + wait_tasks.append(experience_holder_ref.workingloop.remote(build_dataloader, - tokenize_fn, num_steps=args.experience_steps)) ray.get(wait_tasks) From 603bd7eb8306ccbf75d7630ebb6798dd91477ca0 Mon Sep 17 00:00:00 2001 From: csric Date: Mon, 24 Apr 2023 15:59:17 +0800 Subject: [PATCH 3/9] remove legacy try --- applications/Chat/coati/trainer/strategies/sampler.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/sampler.py b/applications/Chat/coati/trainer/strategies/sampler.py index cf9c3303c868..65e199dbf029 100644 --- a/applications/Chat/coati/trainer/strategies/sampler.py +++ b/applications/Chat/coati/trainer/strategies/sampler.py @@ -31,12 +31,3 @@ def __init__(self, dataset, num_replicas: int, rank: int) -> None: def sample(self, batch_size: int) -> list: sampled_indices = np.random.choice(self.indices, batch_size, replace=False) return [self.dataset[idx] for idx in sampled_indices] - - def __iter__(self): - return self - - def __next__(self): - return self.sample(self.iter_batch_size) - - def set_iter_batch_size(self, batch_size : int = 1): - self.iter_batch_size = batch_size \ No newline at end of file From 99fbd4038937eca72b5cb0a80712ca54802244de Mon Sep 17 00:00:00 2001 From: csric Date: Mon, 24 Apr 2023 17:44:18 +0800 Subject: [PATCH 4/9] maker models require_grad set to False --- applications/Chat/coati/ray/experience_maker_holder.py | 6 ++++++ applications/Chat/examples/ray/1mmt_dummy.py | 10 +++++----- applications/Chat/examples/ray/1mmt_prompt.py | 8 ++++---- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index ebeb58137370..d74aceb40a9f 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -215,8 +215,12 @@ def update_experience_maker(self, with torch.no_grad(): if new_actor_state_dict is not None: self.experience_maker.actor.model.load_state_dict(new_actor_state_dict, strict=False) + print("actor", new_actor_state_dict.keys()) + print("local actor", self.experience_maker.actor.model.state_dict().keys()) if new_critic_state_dict is not None: self.experience_maker.critic.load_state_dict(new_critic_state_dict, strict=False) + print("critic", new_critic_state_dict.keys()) + print("local critic", self.experience_maker.critic.state_dict().keys()) # the lock must be released after both actor and critic being updated if chunk_end: @@ -227,6 +231,8 @@ def update_experience_maker(self, tracemalloc.stop() if fully_update: self._is_fully_initialized = True + print(self.experience_maker.actor.model.lm_head.weight) + print(self.experience_maker.critic.value_head.weight) def _on_make_experience_start(self) -> None: for callback in self.callbacks: diff --git a/applications/Chat/examples/ray/1mmt_dummy.py b/applications/Chat/examples/ray/1mmt_dummy.py index d293e6940fbe..095ca161ba74 100644 --- a/applications/Chat/examples/ray/1mmt_dummy.py +++ b/applications/Chat/examples/ray/1mmt_dummy.py @@ -60,17 +60,17 @@ def main(args): def model_fn(): actor_cfg = AutoConfig.from_pretrained(args.pretrain) critic_cfg = AutoConfig.from_pretrained(args.critic_pretrain) - actor = get_actor_from_args(args.model, config=actor_cfg).half().cuda() - critic = get_critic_from_args(args.critic_model, config=critic_cfg).half().cuda() - reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).half().cuda() + actor = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() if args.initial_model_quant_ckpt is not None and args.model == 'llama': # quantize initial model with low_resource_init(), no_init_weights(): initial_model = get_actor_from_args(args.model, config=actor_cfg) initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, - args.quant_group_size).cuda() + args.quant_group_size).cuda().requires_grad_(False) else: - initial_model = get_actor_from_args(args.model, config=actor_cfg).half().cuda() + initial_model = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py index 5baf96eaa508..7808f4ce72c8 100644 --- a/applications/Chat/examples/ray/1mmt_prompt.py +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -74,10 +74,10 @@ def trainer_model_fn(): ] def model_fn(): - actor = get_actor_from_args(args.model, args.pretrain).half().cuda() - critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() - reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).half().cuda() - initial_model = get_actor_from_args(args.model, args.pretrain).half().cuda() + actor = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + initial_model = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker From 08108d23f94782268658bbd719b7f07791f31bcc Mon Sep 17 00:00:00 2001 From: csric Date: Tue, 25 Apr 2023 11:28:34 +0800 Subject: [PATCH 5/9] working on zero redundancy update --- applications/Chat/coati/ray/detached_trainer_ppo.py | 12 ++++++++---- .../Chat/coati/ray/experience_maker_holder.py | 5 +++-- applications/Chat/coati/ray/utils.py | 11 +++++++++++ applications/Chat/coati/trainer/strategies/naive.py | 10 ++++++++-- 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/applications/Chat/coati/ray/detached_trainer_ppo.py b/applications/Chat/coati/ray/detached_trainer_ppo.py index b0630cd0b5ae..4c59e9f576e9 100644 --- a/applications/Chat/coati/ray/detached_trainer_ppo.py +++ b/applications/Chat/coati/ray/detached_trainer_ppo.py @@ -107,6 +107,10 @@ def __init__( @torch.no_grad() def _update_remote_makers(self, fully_update: bool = False, **config): # TODO: balance duties + self.actor.eval() + self.critic.eval() + if not fully_update: + config['requires_grad_only'] = True if is_rank_0(): self.update_target_holder_list(self.target_holder_name_list) # mark start, ensure order @@ -198,9 +202,9 @@ def _get_unwrapped_critic(self): return self.critic def _get_model_state_dict_shard(self, model: torch.nn.Module, **config): - try: - self.strategy.merge_lora_weight(model) - except AttributeError: - pass + # try: + # self.strategy.merge_lora_weight(model) + # except AttributeError: + # pass for state_dict in self.strategy.get_model_state_dict_shard(model, **config): yield state_dict_to(state_dict) diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index d74aceb40a9f..f9627c1d5249 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -216,11 +216,12 @@ def update_experience_maker(self, if new_actor_state_dict is not None: self.experience_maker.actor.model.load_state_dict(new_actor_state_dict, strict=False) print("actor", new_actor_state_dict.keys()) - print("local actor", self.experience_maker.actor.model.state_dict().keys()) + # print("local actor", self.experience_maker.actor.model.state_dict().keys()) if new_critic_state_dict is not None: self.experience_maker.critic.load_state_dict(new_critic_state_dict, strict=False) print("critic", new_critic_state_dict.keys()) - print("local critic", self.experience_maker.critic.state_dict().keys()) + # print("local critic", self.experience_maker.critic.state_dict().keys()) + # the lock must be released after both actor and critic being updated if chunk_end: diff --git a/applications/Chat/coati/ray/utils.py b/applications/Chat/coati/ray/utils.py index 6e62ba0b4841..7bd1aa5b6f47 100644 --- a/applications/Chat/coati/ray/utils.py +++ b/applications/Chat/coati/ray/utils.py @@ -126,3 +126,14 @@ def state_dict_to(state_dict: Dict[str, Any], def get_model_numel(model: nn.Module) -> int: numel = sum(p.numel() for p in model.parameters()) return numel + +def state_dict_filter_grad(state_dict: Dict[str, Any], model: nn.Module): + ''' + state_dict loses grad info. Original model needed. + ''' + for name, parameter in model.named_parameters(): + if not parameter.requires_grad: + pass + +def get_grad_required_state_dict(model: nn.Module): + pass \ No newline at end of file diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index a22be1181fb8..5d9b70d9fc21 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -1,5 +1,5 @@ from typing import Any, Optional - +from collections import OrderedDict import torch import torch.nn as nn import torch.optim as optim @@ -75,7 +75,13 @@ def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = No def get_model_state_dict_shard(self, model: nn.Module, **config): # TODO: implement sharding on naive strategy - state_dict = model.state_dict() + if 'requires_grad_only' in config and config['requires_grad_only'] == True: + state_dict = OrderedDict() + for name, parameter in model.named_parameters(): + if parameter.requires_grad: + state_dict[name] = parameter.detach() + else: + state_dict = model.state_dict() yield state_dict def merge_lora_weight(self, model: nn.Module): From 3266516d7d4092fd046e091bdcc05b6049239369 Mon Sep 17 00:00:00 2001 From: csric Date: Tue, 25 Apr 2023 15:20:07 +0800 Subject: [PATCH 6/9] mmmt_prompt example; naive strategy requires_grad state_dict & sharding; maker model requires_no_grad. --- .../Chat/coati/ray/detached_trainer_ppo.py | 15 +- .../Chat/coati/ray/experience_maker_holder.py | 6 - applications/Chat/coati/ray/utils.py | 12 +- .../coati/trainer/strategies/colossalai.py | 11 +- .../Chat/coati/trainer/strategies/naive.py | 33 ++- applications/Chat/examples/ray/1mmt_prompt.py | 25 ++- applications/Chat/examples/ray/mmmt_dummy.py | 10 +- applications/Chat/examples/ray/mmmt_prompt.py | 191 ++++++++++++++++++ 8 files changed, 257 insertions(+), 46 deletions(-) create mode 100644 applications/Chat/examples/ray/mmmt_prompt.py diff --git a/applications/Chat/coati/ray/detached_trainer_ppo.py b/applications/Chat/coati/ray/detached_trainer_ppo.py index 536a50ab4cd3..50243d88f483 100644 --- a/applications/Chat/coati/ray/detached_trainer_ppo.py +++ b/applications/Chat/coati/ray/detached_trainer_ppo.py @@ -110,17 +110,14 @@ def __init__( @torch.no_grad() def _update_remote_makers(self, fully_update: bool = False, **config): # TODO: balance duties - self.actor.eval() - self.critic.eval() if not fully_update: config['requires_grad_only'] = True - if is_rank_0(): - self.update_target_holder_list(self.target_holder_name_list) - # mark start, ensure order - tasks = [] - for target_holder in self.target_holder_list: - tasks.append(target_holder.update_experience_maker.remote(chunk_start=True, fully_update=fully_update)) - ray.get(tasks) + self.update_target_holder_list() + # mark start, ensure order + tasks = [] + for target_holder in self.target_holder_list: + tasks.append(target_holder.update_experience_maker.remote(chunk_start=True, fully_update=fully_update)) + ray.get(tasks) # sending loop tasks = [] for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_model(self.actor), **config): diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index d811a0fb8b06..20ada291a9ae 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -184,12 +184,8 @@ def update_experience_maker(self, with torch.no_grad(): if new_actor_state_dict is not None: self.experience_maker.actor.model.load_state_dict(new_actor_state_dict, strict=False) - print("actor", new_actor_state_dict.keys()) - # print("local actor", self.experience_maker.actor.model.state_dict().keys()) if new_critic_state_dict is not None: self.experience_maker.critic.load_state_dict(new_critic_state_dict, strict=False) - print("critic", new_critic_state_dict.keys()) - # print("local critic", self.experience_maker.critic.state_dict().keys()) # the lock must be released after both actor and critic being updated @@ -201,8 +197,6 @@ def update_experience_maker(self, tracemalloc.stop() if fully_update: self._is_fully_initialized = True - print(self.experience_maker.actor.model.lm_head.weight) - print(self.experience_maker.critic.value_head.weight) def _on_make_experience_start(self) -> None: for callback in self.callbacks: diff --git a/applications/Chat/coati/ray/utils.py b/applications/Chat/coati/ray/utils.py index 60ab44006c05..6cd7c564cc92 100644 --- a/applications/Chat/coati/ray/utils.py +++ b/applications/Chat/coati/ray/utils.py @@ -1,5 +1,6 @@ import os from typing import Any, Callable, Dict, List, Optional +from collections import OrderedDict import torch import torch.distributed as dist @@ -149,14 +150,3 @@ def get_receivers_per_sender(sender_idx: int, num_senders: int, num_receivers: i # a receiver may have more than one sender target_receivers.append(sender_idx % num_receivers) return target_receivers - -def state_dict_filter_grad(state_dict: Dict[str, Any], model: nn.Module): - ''' - state_dict loses grad info. Original model needed. - ''' - for name, parameter in model.named_parameters(): - if not parameter.requires_grad: - pass - -def get_grad_required_state_dict(model: nn.Module): - pass \ No newline at end of file diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index d39092d5e7ad..238408b9f676 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -214,9 +214,10 @@ def get_model_state_dict_shard(self, model: nn.Module, **config): if self.stage != 3: yield from super().get_model_state_dict_shard(model, **config) else: - unwrapped_model = self._unwrap_model(model) - for module in unwrapped_model.modules(): - if isinstance(module, LoraLinear): - module.merge_weights = True - module.eval() + # unwrapped_model = self._unwrap_model(model) + # for module in unwrapped_model.modules(): + # if isinstance(module, LoraLinear): + # module.merge_weights = True + # module.eval() + model: ZeroDDP = model yield from model.state_dict_shard(max_shard_size=1024, only_rank_0=False) diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index 4982bd6aa475..3b537fdde2d4 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -1,5 +1,6 @@ import os -from typing import Any, Optional +import sys +from typing import Any, Optional, Dict from collections import OrderedDict import torch import torch.distributed as dist @@ -14,6 +15,14 @@ from .base import Strategy +# TODO Move this to a util.py (Moving to ray.util introduces ringed import) +def get_grad_required_state_dict(model: nn.Module): + state_dict = OrderedDict() + for name, parameter in model.named_parameters(): + if parameter.requires_grad: + state_dict[name] = parameter.detach() + return state_dict + class NaiveStrategy(Strategy): """ @@ -82,13 +91,25 @@ def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = No def get_model_state_dict_shard(self, model: nn.Module, **config): # TODO: implement sharding on naive strategy if 'requires_grad_only' in config and config['requires_grad_only'] == True: - state_dict = OrderedDict() - for name, parameter in model.named_parameters(): - if parameter.requires_grad: - state_dict[name] = parameter.detach() + state_dict = get_grad_required_state_dict(model) else: state_dict = model.state_dict() - yield state_dict + + if 'shard_size' in config: + shard_size = config['shard_size'] + accumulate_size = 0 + state_dict_shard = OrderedDict() + for name, param in state_dict.items(): + state_dict_shard[name] = param + accumulate_size += param.numel() * param.element_size() + if accumulate_size >= shard_size: + accumulate_size = 0 + yield state_dict_shard + state_dict_shard = OrderedDict() + if accumulate_size > 0: + yield state_dict_shard + else: + yield state_dict def merge_lora_weight(self, model: nn.Module): unwrapped_model = self._unwrap_model(model) diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py index 7808f4ce72c8..bd7224aae749 100644 --- a/applications/Chat/examples/ray/1mmt_prompt.py +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -6,6 +6,7 @@ import pandas as pd import ray import torch +from coati.quant import llama_load_quant, low_resource_init from coati.ray.detached_trainer_ppo import DetachedPPOTrainer from coati.ray.experience_maker_holder import ExperienceMakerHolder from coati.ray.utils import ( @@ -17,6 +18,8 @@ ) from torch.utils.data import DataLoader +from transformers import AutoConfig +from transformers.modeling_utils import no_init_weights def get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -77,7 +80,15 @@ def model_fn(): actor = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() critic = get_critic_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() - initial_model = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + if args.initial_model_quant_ckpt is not None and args.model == 'llama': + # quantize initial model + actor_cfg = AutoConfig.from_pretrained(args.pretrain) + with low_resource_init(), no_init_weights(): + initial_model = get_actor_from_args(args.model, config=actor_cfg) + initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, + args.quant_group_size).cuda().requires_grad_(False) + else: + initial_model = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker @@ -118,7 +129,6 @@ def model_fn(): dataset_size = args.experience_batch_size * 4 - from torch.utils.data import DataLoader def build_dataloader(): def tokenize_fn(texts): @@ -145,10 +155,14 @@ def tokenize_fn(texts): parser.add_argument('--prompt_path', type=str, default=None) parser.add_argument('--num_trainers', type=int, default=1) parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + choices=[ + 'naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2', 'colossalai_gemini_cpu', + 'colossalai_zero2_cpu' + ], default='naive') parser.add_argument('--maker_strategy', choices=['naive'], default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--critic_model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) parser.add_argument('--pretrain', type=str, default=None) parser.add_argument('--critic_pretrain', type=str, default=None) parser.add_argument('--experience_steps', type=int, default=4) @@ -158,6 +172,9 @@ def tokenize_fn(texts): parser.add_argument('--train_batch_size', type=int, default=8) parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + parser.add_argument('--initial_model_quant_ckpt', type=str, default=None) + parser.add_argument('--quant_bits', type=int, default=4) + parser.add_argument('--quant_group_size', type=int, default=128) parser.add_argument('--debug', action='store_true') args = parser.parse_args() ray.init(namespace=os.environ["RAY_NAMESPACE"]) diff --git a/applications/Chat/examples/ray/mmmt_dummy.py b/applications/Chat/examples/ray/mmmt_dummy.py index 767fe37030f6..082f4851777e 100644 --- a/applications/Chat/examples/ray/mmmt_dummy.py +++ b/applications/Chat/examples/ray/mmmt_dummy.py @@ -61,17 +61,17 @@ def main(args): def model_fn(): actor_cfg = AutoConfig.from_pretrained(args.pretrain) critic_cfg = AutoConfig.from_pretrained(args.critic_pretrain) - actor = get_actor_from_args(args.model, config=actor_cfg).half().cuda() - critic = get_critic_from_args(args.critic_model, config=critic_cfg).half().cuda() - reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).half().cuda() + actor = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() if args.initial_model_quant_ckpt is not None and args.model == 'llama': # quantize initial model with low_resource_init(), no_init_weights(): initial_model = get_actor_from_args(args.model, config=actor_cfg) initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, - args.quant_group_size).cuda() + args.quant_group_size).cuda().requires_grad_(False) else: - initial_model = get_actor_from_args(args.model, config=actor_cfg).half().cuda() + initial_model = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker diff --git a/applications/Chat/examples/ray/mmmt_prompt.py b/applications/Chat/examples/ray/mmmt_prompt.py new file mode 100644 index 000000000000..d2398d451c7b --- /dev/null +++ b/applications/Chat/examples/ray/mmmt_prompt.py @@ -0,0 +1,191 @@ +import argparse +import os +import socket +from functools import partial + +import ray +import torch +import pandas as pd +from coati.quant import llama_load_quant, low_resource_init +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder +from coati.ray.utils import ( + get_actor_from_args, + get_critic_from_args, + get_receivers_per_sender, + get_reward_model_from_args, + get_strategy_from_args, +) +from torch.utils.data import DataLoader +from transformers import AutoConfig, AutoTokenizer +from transformers.modeling_utils import no_init_weights + + +def get_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + +def get_local_ip(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(('8.8.8.8', 80)) + return s.getsockname()[0] + + +def main(args): + master_addr = str(get_local_ip()) + # trainer_env_info + trainer_port = str(get_free_port()) + env_info_trainers = [{ + 'local_rank': '0', + 'rank': str(rank), + 'world_size': str(args.num_trainers), + 'master_port': trainer_port, + 'master_addr': master_addr + } for rank in range(args.num_trainers)] + + # maker_env_info + maker_port = str(get_free_port()) + env_info_makers = [{ + 'local_rank': '0', + 'rank': str(rank), + 'world_size': str(args.num_makers), + 'master_port': maker_port, + 'master_addr': master_addr + } for rank in range(args.num_makers)] + + # configure tokenizer + tokenizer = AutoTokenizer.from_pretrained(args.pretrain) + tokenizer.pad_token = tokenizer.eos_token + + def model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + if args.initial_model_quant_ckpt is not None and args.model == 'llama': + # quantize initial model + actor_cfg = AutoConfig.from_pretrained(args.pretrain) + with low_resource_init(), no_init_weights(): + initial_model = get_actor_from_args(args.model, config=actor_cfg) + initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, + args.quant_group_size).cuda().requires_grad_(False) + else: + initial_model = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + return actor, critic, reward_model, initial_model + + # configure Experience Maker + experience_holder_refs = [ + ExperienceMakerHolder.options(name=f"maker{i}", num_gpus=1, max_concurrency=2).remote( + detached_trainer_name_list=[ + f'trainer{x}' + for x in get_receivers_per_sender(i, args.num_makers, args.num_trainers, allow_idle_sender=False) + ], + strategy_fn=partial(get_strategy_from_args, args.maker_strategy), + model_fn=model_fn, + env_info=env_info_maker, + kl_coef=0.1, + debug=args.debug, + # sync_models_from_trainers=True, + # generation kwargs: + max_length=512, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + eval_performance=True, + use_cache=True, + ) + for i, env_info_maker in enumerate(env_info_makers) + ] + + def trainer_model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() + return actor, critic + + # configure Trainer + trainer_refs = [ + DetachedPPOTrainer.options(name=f"trainer{i}", num_gpus=1, max_concurrency=2).remote( + experience_maker_holder_name_list=[ + f"maker{x}" + for x in get_receivers_per_sender(i, args.num_trainers, args.num_makers, allow_idle_sender=True) + ], + strategy_fn=partial(get_strategy_from_args, args.trainer_strategy), + model_fn=trainer_model_fn, + env_info=env_info_trainer, + train_batch_size=args.train_batch_size, + buffer_limit=16, + eval_performance=True, + debug=args.debug, + ) + for i, env_info_trainer in enumerate(env_info_trainers) + ] + + dataset_size = args.experience_batch_size * 4 + + def build_dataloader(): + def tokenize_fn(texts): + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + + dataset = pd.read_csv(args.prompt_path)['prompt'] + dataloader = DataLoader(dataset=dataset, + batch_size=dataset_size, + shuffle=True, + collate_fn=tokenize_fn + ) + return dataloader + + # uncomment this function if sync_models_from_trainers is True + # ray.get([ + # trainer_ref.sync_models_to_remote_makers.remote() + # for trainer_ref in trainer_refs + # ]) + + wait_tasks = [] + + for experience_holder_ref in experience_holder_refs: + wait_tasks.append( + experience_holder_ref.workingloop.remote(build_dataloader, + num_steps=args.experience_steps)) + + total_steps = args.experience_batch_size * args.experience_steps * \ + args.num_makers // (args.num_trainers * args.train_batch_size) + for trainer_ref in trainer_refs: + wait_tasks.append(trainer_ref.fit.remote(total_steps, args.update_steps, args.train_epochs)) + + ray.get(wait_tasks) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--prompt_path', type=str, default=None) + parser.add_argument('--num_makers', type=int, default=1) + parser.add_argument('--num_trainers', type=int, default=1) + parser.add_argument('--trainer_strategy', + choices=[ + 'naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2', 'colossalai_gemini_cpu', + 'colossalai_zero2_cpu' + ], + default='naive') + parser.add_argument('--maker_strategy', choices=['naive'], default='naive') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--critic_model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--pretrain', type=str, default=None) + parser.add_argument('--critic_pretrain', type=str, default=None) + parser.add_argument('--experience_steps', type=int, default=4) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--train_epochs', type=int, default=1) + parser.add_argument('--update_steps', type=int, default=2) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + + parser.add_argument('--initial_model_quant_ckpt', type=str, default=None) + parser.add_argument('--quant_bits', type=int, default=4) + parser.add_argument('--quant_group_size', type=int, default=128) + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + ray.init(namespace=os.environ["RAY_NAMESPACE"]) + main(args) From 332fd3c040f9ae3df9ac9b9415639a25ba6d91bc Mon Sep 17 00:00:00 2001 From: csric Date: Tue, 25 Apr 2023 15:28:32 +0800 Subject: [PATCH 7/9] remove legacy examples --- applications/Chat/examples/ray/2m1t.py | 141 --------------- applications/Chat/examples/ray/2m1t.sh | 23 --- applications/Chat/examples/ray/2m2t.py | 230 ------------------------- applications/Chat/examples/ray/2m2t.sh | 23 --- 4 files changed, 417 deletions(-) delete mode 100644 applications/Chat/examples/ray/2m1t.py delete mode 100644 applications/Chat/examples/ray/2m1t.sh delete mode 100644 applications/Chat/examples/ray/2m2t.py delete mode 100644 applications/Chat/examples/ray/2m2t.sh diff --git a/applications/Chat/examples/ray/2m1t.py b/applications/Chat/examples/ray/2m1t.py deleted file mode 100644 index bed6246ed0d7..000000000000 --- a/applications/Chat/examples/ray/2m1t.py +++ /dev/null @@ -1,141 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def main(args): - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_ref = DetachedPPOTrainer.options(name="trainer1", num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - experience_holder_2_ref = ExperienceMakerHolder.options(name="maker2", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - ray.get(trainer_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs // 2 + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - maker_2_done_ref = experience_holder_2_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_done_ref, maker_1_done_ref, maker_2_done_ref]) - - # save model checkpoint after fitting - trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - ray.init(namespace=os.environ["RAY_NAMESPACE"]) - main(args) diff --git a/applications/Chat/examples/ray/2m1t.sh b/applications/Chat/examples/ray/2m1t.sh deleted file mode 100644 index a207d4118d60..000000000000 --- a/applications/Chat/examples/ray/2m1t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 3 - -export RAY_NAMESPACE="admin" - -python 2m1t.py "/path/to/prompts.csv" \ - --trainer_strategy naive --maker_strategy naive --lora_rank 2 --pretrain "facebook/opt-350m" --model 'opt' \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 # --debug diff --git a/applications/Chat/examples/ray/2m2t.py b/applications/Chat/examples/ray/2m2t.py deleted file mode 100644 index 05440032ce9f..000000000000 --- a/applications/Chat/examples/ray/2m2t.py +++ /dev/null @@ -1,230 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - env_info_trainer_2 = { - 'local_rank': '0', - 'rank': '1', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': maker_port, - 'master_addr': master_addr - } - env_info_maker_2 = { - 'local_rank': '0', - 'rank': '1', - 'world_size': '2', - 'master_port': maker_port, - 'master_addr': master_addr - } - print([env_info_trainer_1, env_info_trainer_2, env_info_maker_1, env_info_maker_2]) - ray.init() - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_1_ref = DetachedPPOTrainer.options(name="trainer1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_1, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - trainer_2_ref = DetachedPPOTrainer.options(name="trainer2", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_2, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_1, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - experience_holder_2_ref = ExperienceMakerHolder.options(name="maker2", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_2, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - # TODO: balance duty - ray.get(trainer_1_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_1_done_ref = trainer_1_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - trainer_2_done_ref = trainer_2_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - maker_2_done_ref = experience_holder_2_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_1_done_ref, trainer_2_done_ref, maker_1_done_ref, maker_2_done_ref]) - # save model checkpoint after fitting - trainer_1_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - trainer_2_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_1_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - trainer_2_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - main(args) diff --git a/applications/Chat/examples/ray/2m2t.sh b/applications/Chat/examples/ray/2m2t.sh deleted file mode 100644 index bd8ca84a58fb..000000000000 --- a/applications/Chat/examples/ray/2m2t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 2 - -export RAY_NAMESPACE="admin" - -python 2m2t.py "path/to/prompts.csv" \ - --maker_strategy naive --trainer_strategy colossalai_zero2 --lora_rank 2 \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 --debug From 9b2b77eeacdb915904968f5d2bca006813e5eee2 Mon Sep 17 00:00:00 2001 From: csric Date: Tue, 25 Apr 2023 15:29:24 +0800 Subject: [PATCH 8/9] remove legacy examples --- applications/Chat/examples/ray/1m1t.py | 166 -------------- applications/Chat/examples/ray/1m1t.sh | 23 -- .../Chat/examples/ray/1m1t_quantize.py | 156 -------------- applications/Chat/examples/ray/1m2t.py | 203 ------------------ applications/Chat/examples/ray/1m2t.sh | 23 -- 5 files changed, 571 deletions(-) delete mode 100644 applications/Chat/examples/ray/1m1t.py delete mode 100644 applications/Chat/examples/ray/1m1t.sh delete mode 100644 applications/Chat/examples/ray/1m1t_quantize.py delete mode 100644 applications/Chat/examples/ray/1m2t.py delete mode 100644 applications/Chat/examples/ray/1m2t.sh diff --git a/applications/Chat/examples/ray/1m1t.py b/applications/Chat/examples/ray/1m1t.py deleted file mode 100644 index 8c291abb1f8b..000000000000 --- a/applications/Chat/examples/ray/1m1t.py +++ /dev/null @@ -1,166 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.callbacks.performance_evaluator import ( - ExperienceMakerPerformanceEvaluator, - TrainerPerformaceEvaluator, -) -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': trainer_port, - 'master_addr': master_addr - } - - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': maker_port, - 'master_addr': master_addr - } - - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_ref = DetachedPPOTrainer.options(name="trainer1", num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - eval_performance=True, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - env_info=env_info_maker, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - eval_performance=True, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - ray.get(trainer_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs + 3 # +3 for fault tolerance - maker_done_ref = experience_holder_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_done_ref, maker_done_ref]) - - # save model checkpoint after fitting - trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - ray.init(namespace=os.environ["RAY_NAMESPACE"]) - main(args) diff --git a/applications/Chat/examples/ray/1m1t.sh b/applications/Chat/examples/ray/1m1t.sh deleted file mode 100644 index f7c5054c800e..000000000000 --- a/applications/Chat/examples/ray/1m1t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 2 - -export RAY_NAMESPACE="admin" - -python 1m1t.py "/path/to/prompts.csv" \ - --trainer_strategy colossalai_zero2 --maker_strategy naive --lora_rank 2 --pretrain "facebook/opt-350m" --model 'opt' \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 --debug diff --git a/applications/Chat/examples/ray/1m1t_quantize.py b/applications/Chat/examples/ray/1m1t_quantize.py deleted file mode 100644 index cc54bd1905c6..000000000000 --- a/applications/Chat/examples/ray/1m1t_quantize.py +++ /dev/null @@ -1,156 +0,0 @@ -import argparse -import os -import socket - -import pandas as pd -import ray -import torch -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': trainer_port, - 'master_addr': master_addr - } - - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': maker_port, - 'master_addr': master_addr - } - - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_ref = DetachedPPOTrainer.options(name="trainer1", num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - eval_performance=True, - ) - - # configure Experience Maker - experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - env_info=env_info_maker, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - eval_performance=True, - ) - - # a 'jump wire' to set quantized initial_model and reward_model - - # trainer send its actor and critic to experience holders. - # ray.get(trainer_ref.initialize_remote_makers.remote()) - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs + 3 # +3 for fault tolerance - maker_done_ref = experience_holder_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_done_ref, maker_done_ref]) - - # save model checkpoint after fitting - trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama', 'roberta']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - ray.init(namespace=os.environ["RAY_NAMESPACE"]) - main(args) diff --git a/applications/Chat/examples/ray/1m2t.py b/applications/Chat/examples/ray/1m2t.py deleted file mode 100644 index 1a35beb6221a..000000000000 --- a/applications/Chat/examples/ray/1m2t.py +++ /dev/null @@ -1,203 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - env_info_trainer_2 = { - 'local_rank': '0', - 'rank': '1', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': maker_port, - 'master_addr': master_addr - } - print([env_info_trainer_1, env_info_trainer_2, env_info_maker_1]) - ray.init(dashboard_port=1145) - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_1_ref = DetachedPPOTrainer.options(name="trainer1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_1, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - trainer_2_ref = DetachedPPOTrainer.options(name="trainer2", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_2, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_1, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - # TODO: balance duty - ray.get(trainer_1_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_1_done_ref = trainer_1_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - trainer_2_done_ref = trainer_2_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs * 2 + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_1_done_ref, trainer_2_done_ref, maker_1_done_ref]) - # save model checkpoint after fitting - trainer_1_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - trainer_2_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_1_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - trainer_2_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - main(args) diff --git a/applications/Chat/examples/ray/1m2t.sh b/applications/Chat/examples/ray/1m2t.sh deleted file mode 100644 index 9608526ea7e7..000000000000 --- a/applications/Chat/examples/ray/1m2t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 2 - -export RAY_NAMESPACE="admin" - -python 1m2t.py "/path/to/prompts.csv" --model gpt2 \ - --maker_strategy naive --trainer_strategy ddp --lora_rank 2 \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 #--debug From 13fa0383430f0c1d935e0133f80d6726aa11b87d Mon Sep 17 00:00:00 2001 From: csric Date: Wed, 26 Apr 2023 11:01:04 +0800 Subject: [PATCH 9/9] remove replay buffer tp state. bad design --- .../Chat/coati/ray/detached_replay_buffer.py | 23 +++---------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/applications/Chat/coati/ray/detached_replay_buffer.py b/applications/Chat/coati/ray/detached_replay_buffer.py index 257b0b072493..6a4f5a6d67c2 100644 --- a/applications/Chat/coati/ray/detached_replay_buffer.py +++ b/applications/Chat/coati/ray/detached_replay_buffer.py @@ -26,20 +26,12 @@ class DetachedReplayBuffer: cpu_offload: Whether to offload experience to cpu when sampling. Defaults to True. ''' - def __init__(self, sample_batch_size: int, tp_world_size: int = 1, limit: int = 0) -> None: + def __init__(self, sample_batch_size: int, limit: int = 0) -> None: self.sample_batch_size = sample_batch_size self.limit = limit self.items = Queue(self.limit, actor_options={"num_cpus": 1}) self.batch_collector: List[BufferItem] = [] - ''' - Workers in the same tp group share this buffer and need same sample for one step. - Therefore a held_sample should be returned tp_world_size times before it could be dropped. - worker_state records wheter a worker got the held_sample - ''' - self.tp_world_size = tp_world_size - self.worker_state = [False] * self.tp_world_size - self.held_sample = None - self._worker_state_lock = Lock() + @torch.no_grad() def append(self, experience: Experience) -> None: @@ -70,16 +62,7 @@ def clear(self) -> None: @torch.no_grad() def sample(self, worker_rank=0, to_device="cpu") -> Experience: - self._worker_state_lock.acquire() - if not any(self.worker_state): - self.held_sample = self._sample_and_erase() - self.worker_state[worker_rank] = True - if all(self.worker_state): - self.worker_state = [False] * self.tp_world_size - ret = self.held_sample - else: - ret = copy.deepcopy(self.held_sample) - self._worker_state_lock.release() + ret = self._sample_and_erase() ret.to_device(to_device) return ret