From bb43927a060ae9ac5b4758fff43bfce59b5497b3 Mon Sep 17 00:00:00 2001 From: csric Date: Sun, 23 Apr 2023 19:00:43 +0800 Subject: [PATCH 01/23] prompt example --- .../Chat/coati/ray/experience_maker_holder.py | 10 +- .../Chat/coati/trainer/strategies/sampler.py | 10 ++ applications/Chat/examples/ray/1mmt_prompt.py | 161 ++++++++++++++++++ 3 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 applications/Chat/examples/ray/1mmt_prompt.py diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index ebeb58137370..284e4b7a7f2f 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -161,7 +161,11 @@ def _inference_step(self, batch) -> None: experience.to_device('cpu') self._send_items(experience) - def workingloop(self, dataloader_fn: Callable[[], Iterable], num_epochs: int = 1, num_steps: int = 0): + def workingloop(self, + dataloader_fn: Callable[[], Iterable], + tokenize_fn: Callable = lambda x: x, + num_epochs: int = 1, + num_steps: int = 0): """Working loop of the experience maker. Args: @@ -180,12 +184,12 @@ def workingloop(self, dataloader_fn: Callable[[], Iterable], num_epochs: int = 1 except StopIteration: it = iter(dataloader) batch = next(it) - self._inference_step(batch) + self._inference_step(tokenize_fn(batch)) else: with tqdm(total=num_epochs * len(dataloader), desc='ExperienceMaker', disable=not is_rank_0()) as pbar: for _ in range(num_epochs): for batch in dataloader: - self._inference_step(batch) + self._inference_step(tokenize_fn(batch)) pbar.update() self._on_finish() diff --git a/applications/Chat/coati/trainer/strategies/sampler.py b/applications/Chat/coati/trainer/strategies/sampler.py index d726fa640fa2..cf9c3303c868 100644 --- a/applications/Chat/coati/trainer/strategies/sampler.py +++ b/applications/Chat/coati/trainer/strategies/sampler.py @@ -27,6 +27,16 @@ def __init__(self, dataset, num_replicas: int, rank: int) -> None: assert len(indices) == self.num_samples self.indices = indices + def sample(self, batch_size: int) -> list: sampled_indices = np.random.choice(self.indices, batch_size, replace=False) return [self.dataset[idx] for idx in sampled_indices] + + def __iter__(self): + return self + + def __next__(self): + return self.sample(self.iter_batch_size) + + def set_iter_batch_size(self, batch_size : int = 1): + self.iter_batch_size = batch_size \ No newline at end of file diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py new file mode 100644 index 000000000000..fb0bc6d43644 --- /dev/null +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -0,0 +1,161 @@ +import argparse +import os +import socket +from functools import partial + +import pandas as pd +import ray +import torch +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder +from coati.ray.utils import ( + get_actor_from_args, + get_critic_from_args, + get_reward_model_from_args, + get_strategy_from_args, + get_tokenizer_from_args +) + +from torch.utils.data import DataLoader + +def get_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + +def get_local_ip(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(('8.8.8.8', 80)) + return s.getsockname()[0] + +def main(args): + master_addr = str(get_local_ip()) + # trainer_env_info + trainer_port = str(get_free_port()) + env_info_trainers = [{ + 'local_rank': '0', + 'rank': str(rank), + 'world_size': str(args.num_trainers), + 'master_port': trainer_port, + 'master_addr': master_addr + } for rank in range(args.num_trainers)] + + # maker_env_info + maker_port = str(get_free_port()) + env_info_maker = { + 'local_rank': '0', + 'rank': '0', + 'world_size': '1', + 'master_port': maker_port, + 'master_addr': master_addr + } + + # configure tokenizer + tokenizer = get_tokenizer_from_args(args.model) + + def trainer_model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() + return actor, critic + + # configure Trainer + trainer_refs = [ + DetachedPPOTrainer.options(name=f"trainer{i}", num_gpus=1, max_concurrency=2).remote( + experience_maker_holder_name_list=["maker1"], + strategy_fn=partial(get_strategy_from_args, args.trainer_strategy), + model_fn=trainer_model_fn, + env_info=env_info_trainer, + train_batch_size=args.train_batch_size, + buffer_limit=16, + eval_performance=True, + debug=args.debug, + ) for i, env_info_trainer in enumerate(env_info_trainers) + ] + + def model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() + reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).half().cuda() + initial_model = get_actor_from_args(args.model, args.pretrain).half().cuda() + return actor, critic, reward_model, initial_model + + # configure Experience Maker + experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( + detached_trainer_name_list=[f'trainer{i}' for i in range(args.num_trainers)], + strategy_fn=partial(get_strategy_from_args, args.maker_strategy), + model_fn=model_fn, + env_info=env_info_maker, + experience_batch_size=args.experience_batch_size, + kl_coef=0.1, + debug=args.debug, + # sync_models_from_trainers=True, + # generation kwargs: + max_length=512, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + eval_performance=True, + use_cache=True, + ) + + + + # uncomment this function if sync_models_from_trainers is True + # ray.get([ + # trainer_ref.sync_models_to_remote_makers.remote() + # for trainer_ref in trainer_refs + # ]) + + wait_tasks = [] + + total_steps = args.experience_batch_size * args.experience_steps // (args.num_trainers * args.train_batch_size) + for trainer_ref in trainer_refs: + wait_tasks.append( + trainer_ref.fit.remote(total_steps, args.update_steps, args.train_epochs)) + + + dataset_size = args.experience_batch_size * 4 + + def build_dataloader(): + dataset = pd.read_csv(args.prompt_path)['prompt'] + # csric: Strategy.setup_sampler, Strategy.setup_dataloader aren't utilized + sampler = get_strategy_from_args(args.maker_strategy).setup_sampler(dataset) + sampler.set_iter_batch_size(dataset_size) + return sampler + + def tokenize_fn(texts): + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + + wait_tasks.append(experience_holder_ref.workingloop.remote(build_dataloader, + tokenize_fn, + num_steps=args.experience_steps)) + + ray.get(wait_tasks) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--prompt_path', type=str, default=None) + parser.add_argument('--num_trainers', type=int, default=1) + parser.add_argument('--trainer_strategy', + choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + default='naive') + parser.add_argument('--maker_strategy', choices=['naive'], default='naive') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) + parser.add_argument('--pretrain', type=str, default=None) + parser.add_argument('--critic_pretrain', type=str, default=None) + parser.add_argument('--experience_steps', type=int, default=4) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--train_epochs', type=int, default=1) + parser.add_argument('--update_steps', type=int, default=2) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + ray.init(namespace=os.environ["RAY_NAMESPACE"]) + main(args) \ No newline at end of file From 884a645b7232d0ec3c9a3f103f9cc4a9d9750576 Mon Sep 17 00:00:00 2001 From: csric Date: Mon, 24 Apr 2023 15:50:22 +0800 Subject: [PATCH 02/23] prompt load csv data --- .../Chat/coati/ray/experience_maker_holder.py | 10 +++------ applications/Chat/examples/ray/1mmt_prompt.py | 21 +++++++++++-------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index 284e4b7a7f2f..ebeb58137370 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -161,11 +161,7 @@ def _inference_step(self, batch) -> None: experience.to_device('cpu') self._send_items(experience) - def workingloop(self, - dataloader_fn: Callable[[], Iterable], - tokenize_fn: Callable = lambda x: x, - num_epochs: int = 1, - num_steps: int = 0): + def workingloop(self, dataloader_fn: Callable[[], Iterable], num_epochs: int = 1, num_steps: int = 0): """Working loop of the experience maker. Args: @@ -184,12 +180,12 @@ def workingloop(self, except StopIteration: it = iter(dataloader) batch = next(it) - self._inference_step(tokenize_fn(batch)) + self._inference_step(batch) else: with tqdm(total=num_epochs * len(dataloader), desc='ExperienceMaker', disable=not is_rank_0()) as pbar: for _ in range(num_epochs): for batch in dataloader: - self._inference_step(tokenize_fn(batch)) + self._inference_step(batch) pbar.update() self._on_finish() diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py index fb0bc6d43644..5baf96eaa508 100644 --- a/applications/Chat/examples/ray/1mmt_prompt.py +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -118,20 +118,23 @@ def model_fn(): dataset_size = args.experience_batch_size * 4 + from torch.utils.data import DataLoader def build_dataloader(): + def tokenize_fn(texts): + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + dataset = pd.read_csv(args.prompt_path)['prompt'] - # csric: Strategy.setup_sampler, Strategy.setup_dataloader aren't utilized - sampler = get_strategy_from_args(args.maker_strategy).setup_sampler(dataset) - sampler.set_iter_batch_size(dataset_size) - return sampler - - def tokenize_fn(texts): - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} + dataloader = DataLoader(dataset=dataset, + batch_size=dataset_size, + shuffle=True, + collate_fn=tokenize_fn + ) + return dataloader + wait_tasks.append(experience_holder_ref.workingloop.remote(build_dataloader, - tokenize_fn, num_steps=args.experience_steps)) ray.get(wait_tasks) From 603bd7eb8306ccbf75d7630ebb6798dd91477ca0 Mon Sep 17 00:00:00 2001 From: csric Date: Mon, 24 Apr 2023 15:59:17 +0800 Subject: [PATCH 03/23] remove legacy try --- applications/Chat/coati/trainer/strategies/sampler.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/sampler.py b/applications/Chat/coati/trainer/strategies/sampler.py index cf9c3303c868..65e199dbf029 100644 --- a/applications/Chat/coati/trainer/strategies/sampler.py +++ b/applications/Chat/coati/trainer/strategies/sampler.py @@ -31,12 +31,3 @@ def __init__(self, dataset, num_replicas: int, rank: int) -> None: def sample(self, batch_size: int) -> list: sampled_indices = np.random.choice(self.indices, batch_size, replace=False) return [self.dataset[idx] for idx in sampled_indices] - - def __iter__(self): - return self - - def __next__(self): - return self.sample(self.iter_batch_size) - - def set_iter_batch_size(self, batch_size : int = 1): - self.iter_batch_size = batch_size \ No newline at end of file From 99fbd4038937eca72b5cb0a80712ca54802244de Mon Sep 17 00:00:00 2001 From: csric Date: Mon, 24 Apr 2023 17:44:18 +0800 Subject: [PATCH 04/23] maker models require_grad set to False --- applications/Chat/coati/ray/experience_maker_holder.py | 6 ++++++ applications/Chat/examples/ray/1mmt_dummy.py | 10 +++++----- applications/Chat/examples/ray/1mmt_prompt.py | 8 ++++---- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index ebeb58137370..d74aceb40a9f 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -215,8 +215,12 @@ def update_experience_maker(self, with torch.no_grad(): if new_actor_state_dict is not None: self.experience_maker.actor.model.load_state_dict(new_actor_state_dict, strict=False) + print("actor", new_actor_state_dict.keys()) + print("local actor", self.experience_maker.actor.model.state_dict().keys()) if new_critic_state_dict is not None: self.experience_maker.critic.load_state_dict(new_critic_state_dict, strict=False) + print("critic", new_critic_state_dict.keys()) + print("local critic", self.experience_maker.critic.state_dict().keys()) # the lock must be released after both actor and critic being updated if chunk_end: @@ -227,6 +231,8 @@ def update_experience_maker(self, tracemalloc.stop() if fully_update: self._is_fully_initialized = True + print(self.experience_maker.actor.model.lm_head.weight) + print(self.experience_maker.critic.value_head.weight) def _on_make_experience_start(self) -> None: for callback in self.callbacks: diff --git a/applications/Chat/examples/ray/1mmt_dummy.py b/applications/Chat/examples/ray/1mmt_dummy.py index d293e6940fbe..095ca161ba74 100644 --- a/applications/Chat/examples/ray/1mmt_dummy.py +++ b/applications/Chat/examples/ray/1mmt_dummy.py @@ -60,17 +60,17 @@ def main(args): def model_fn(): actor_cfg = AutoConfig.from_pretrained(args.pretrain) critic_cfg = AutoConfig.from_pretrained(args.critic_pretrain) - actor = get_actor_from_args(args.model, config=actor_cfg).half().cuda() - critic = get_critic_from_args(args.critic_model, config=critic_cfg).half().cuda() - reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).half().cuda() + actor = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() if args.initial_model_quant_ckpt is not None and args.model == 'llama': # quantize initial model with low_resource_init(), no_init_weights(): initial_model = get_actor_from_args(args.model, config=actor_cfg) initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, - args.quant_group_size).cuda() + args.quant_group_size).cuda().requires_grad_(False) else: - initial_model = get_actor_from_args(args.model, config=actor_cfg).half().cuda() + initial_model = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py index 5baf96eaa508..7808f4ce72c8 100644 --- a/applications/Chat/examples/ray/1mmt_prompt.py +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -74,10 +74,10 @@ def trainer_model_fn(): ] def model_fn(): - actor = get_actor_from_args(args.model, args.pretrain).half().cuda() - critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() - reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).half().cuda() - initial_model = get_actor_from_args(args.model, args.pretrain).half().cuda() + actor = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + initial_model = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker From 08108d23f94782268658bbd719b7f07791f31bcc Mon Sep 17 00:00:00 2001 From: csric Date: Tue, 25 Apr 2023 11:28:34 +0800 Subject: [PATCH 05/23] working on zero redundancy update --- applications/Chat/coati/ray/detached_trainer_ppo.py | 12 ++++++++---- .../Chat/coati/ray/experience_maker_holder.py | 5 +++-- applications/Chat/coati/ray/utils.py | 11 +++++++++++ applications/Chat/coati/trainer/strategies/naive.py | 10 ++++++++-- 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/applications/Chat/coati/ray/detached_trainer_ppo.py b/applications/Chat/coati/ray/detached_trainer_ppo.py index b0630cd0b5ae..4c59e9f576e9 100644 --- a/applications/Chat/coati/ray/detached_trainer_ppo.py +++ b/applications/Chat/coati/ray/detached_trainer_ppo.py @@ -107,6 +107,10 @@ def __init__( @torch.no_grad() def _update_remote_makers(self, fully_update: bool = False, **config): # TODO: balance duties + self.actor.eval() + self.critic.eval() + if not fully_update: + config['requires_grad_only'] = True if is_rank_0(): self.update_target_holder_list(self.target_holder_name_list) # mark start, ensure order @@ -198,9 +202,9 @@ def _get_unwrapped_critic(self): return self.critic def _get_model_state_dict_shard(self, model: torch.nn.Module, **config): - try: - self.strategy.merge_lora_weight(model) - except AttributeError: - pass + # try: + # self.strategy.merge_lora_weight(model) + # except AttributeError: + # pass for state_dict in self.strategy.get_model_state_dict_shard(model, **config): yield state_dict_to(state_dict) diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index d74aceb40a9f..f9627c1d5249 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -216,11 +216,12 @@ def update_experience_maker(self, if new_actor_state_dict is not None: self.experience_maker.actor.model.load_state_dict(new_actor_state_dict, strict=False) print("actor", new_actor_state_dict.keys()) - print("local actor", self.experience_maker.actor.model.state_dict().keys()) + # print("local actor", self.experience_maker.actor.model.state_dict().keys()) if new_critic_state_dict is not None: self.experience_maker.critic.load_state_dict(new_critic_state_dict, strict=False) print("critic", new_critic_state_dict.keys()) - print("local critic", self.experience_maker.critic.state_dict().keys()) + # print("local critic", self.experience_maker.critic.state_dict().keys()) + # the lock must be released after both actor and critic being updated if chunk_end: diff --git a/applications/Chat/coati/ray/utils.py b/applications/Chat/coati/ray/utils.py index 6e62ba0b4841..7bd1aa5b6f47 100644 --- a/applications/Chat/coati/ray/utils.py +++ b/applications/Chat/coati/ray/utils.py @@ -126,3 +126,14 @@ def state_dict_to(state_dict: Dict[str, Any], def get_model_numel(model: nn.Module) -> int: numel = sum(p.numel() for p in model.parameters()) return numel + +def state_dict_filter_grad(state_dict: Dict[str, Any], model: nn.Module): + ''' + state_dict loses grad info. Original model needed. + ''' + for name, parameter in model.named_parameters(): + if not parameter.requires_grad: + pass + +def get_grad_required_state_dict(model: nn.Module): + pass \ No newline at end of file diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index a22be1181fb8..5d9b70d9fc21 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -1,5 +1,5 @@ from typing import Any, Optional - +from collections import OrderedDict import torch import torch.nn as nn import torch.optim as optim @@ -75,7 +75,13 @@ def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = No def get_model_state_dict_shard(self, model: nn.Module, **config): # TODO: implement sharding on naive strategy - state_dict = model.state_dict() + if 'requires_grad_only' in config and config['requires_grad_only'] == True: + state_dict = OrderedDict() + for name, parameter in model.named_parameters(): + if parameter.requires_grad: + state_dict[name] = parameter.detach() + else: + state_dict = model.state_dict() yield state_dict def merge_lora_weight(self, model: nn.Module): From 3266516d7d4092fd046e091bdcc05b6049239369 Mon Sep 17 00:00:00 2001 From: csric Date: Tue, 25 Apr 2023 15:20:07 +0800 Subject: [PATCH 06/23] mmmt_prompt example; naive strategy requires_grad state_dict & sharding; maker model requires_no_grad. --- .../Chat/coati/ray/detached_trainer_ppo.py | 15 +- .../Chat/coati/ray/experience_maker_holder.py | 6 - applications/Chat/coati/ray/utils.py | 12 +- .../coati/trainer/strategies/colossalai.py | 11 +- .../Chat/coati/trainer/strategies/naive.py | 33 ++- applications/Chat/examples/ray/1mmt_prompt.py | 25 ++- applications/Chat/examples/ray/mmmt_dummy.py | 10 +- applications/Chat/examples/ray/mmmt_prompt.py | 191 ++++++++++++++++++ 8 files changed, 257 insertions(+), 46 deletions(-) create mode 100644 applications/Chat/examples/ray/mmmt_prompt.py diff --git a/applications/Chat/coati/ray/detached_trainer_ppo.py b/applications/Chat/coati/ray/detached_trainer_ppo.py index 536a50ab4cd3..50243d88f483 100644 --- a/applications/Chat/coati/ray/detached_trainer_ppo.py +++ b/applications/Chat/coati/ray/detached_trainer_ppo.py @@ -110,17 +110,14 @@ def __init__( @torch.no_grad() def _update_remote_makers(self, fully_update: bool = False, **config): # TODO: balance duties - self.actor.eval() - self.critic.eval() if not fully_update: config['requires_grad_only'] = True - if is_rank_0(): - self.update_target_holder_list(self.target_holder_name_list) - # mark start, ensure order - tasks = [] - for target_holder in self.target_holder_list: - tasks.append(target_holder.update_experience_maker.remote(chunk_start=True, fully_update=fully_update)) - ray.get(tasks) + self.update_target_holder_list() + # mark start, ensure order + tasks = [] + for target_holder in self.target_holder_list: + tasks.append(target_holder.update_experience_maker.remote(chunk_start=True, fully_update=fully_update)) + ray.get(tasks) # sending loop tasks = [] for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_model(self.actor), **config): diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index d811a0fb8b06..20ada291a9ae 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -184,12 +184,8 @@ def update_experience_maker(self, with torch.no_grad(): if new_actor_state_dict is not None: self.experience_maker.actor.model.load_state_dict(new_actor_state_dict, strict=False) - print("actor", new_actor_state_dict.keys()) - # print("local actor", self.experience_maker.actor.model.state_dict().keys()) if new_critic_state_dict is not None: self.experience_maker.critic.load_state_dict(new_critic_state_dict, strict=False) - print("critic", new_critic_state_dict.keys()) - # print("local critic", self.experience_maker.critic.state_dict().keys()) # the lock must be released after both actor and critic being updated @@ -201,8 +197,6 @@ def update_experience_maker(self, tracemalloc.stop() if fully_update: self._is_fully_initialized = True - print(self.experience_maker.actor.model.lm_head.weight) - print(self.experience_maker.critic.value_head.weight) def _on_make_experience_start(self) -> None: for callback in self.callbacks: diff --git a/applications/Chat/coati/ray/utils.py b/applications/Chat/coati/ray/utils.py index 60ab44006c05..6cd7c564cc92 100644 --- a/applications/Chat/coati/ray/utils.py +++ b/applications/Chat/coati/ray/utils.py @@ -1,5 +1,6 @@ import os from typing import Any, Callable, Dict, List, Optional +from collections import OrderedDict import torch import torch.distributed as dist @@ -149,14 +150,3 @@ def get_receivers_per_sender(sender_idx: int, num_senders: int, num_receivers: i # a receiver may have more than one sender target_receivers.append(sender_idx % num_receivers) return target_receivers - -def state_dict_filter_grad(state_dict: Dict[str, Any], model: nn.Module): - ''' - state_dict loses grad info. Original model needed. - ''' - for name, parameter in model.named_parameters(): - if not parameter.requires_grad: - pass - -def get_grad_required_state_dict(model: nn.Module): - pass \ No newline at end of file diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index d39092d5e7ad..238408b9f676 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -214,9 +214,10 @@ def get_model_state_dict_shard(self, model: nn.Module, **config): if self.stage != 3: yield from super().get_model_state_dict_shard(model, **config) else: - unwrapped_model = self._unwrap_model(model) - for module in unwrapped_model.modules(): - if isinstance(module, LoraLinear): - module.merge_weights = True - module.eval() + # unwrapped_model = self._unwrap_model(model) + # for module in unwrapped_model.modules(): + # if isinstance(module, LoraLinear): + # module.merge_weights = True + # module.eval() + model: ZeroDDP = model yield from model.state_dict_shard(max_shard_size=1024, only_rank_0=False) diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index 4982bd6aa475..3b537fdde2d4 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -1,5 +1,6 @@ import os -from typing import Any, Optional +import sys +from typing import Any, Optional, Dict from collections import OrderedDict import torch import torch.distributed as dist @@ -14,6 +15,14 @@ from .base import Strategy +# TODO Move this to a util.py (Moving to ray.util introduces ringed import) +def get_grad_required_state_dict(model: nn.Module): + state_dict = OrderedDict() + for name, parameter in model.named_parameters(): + if parameter.requires_grad: + state_dict[name] = parameter.detach() + return state_dict + class NaiveStrategy(Strategy): """ @@ -82,13 +91,25 @@ def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = No def get_model_state_dict_shard(self, model: nn.Module, **config): # TODO: implement sharding on naive strategy if 'requires_grad_only' in config and config['requires_grad_only'] == True: - state_dict = OrderedDict() - for name, parameter in model.named_parameters(): - if parameter.requires_grad: - state_dict[name] = parameter.detach() + state_dict = get_grad_required_state_dict(model) else: state_dict = model.state_dict() - yield state_dict + + if 'shard_size' in config: + shard_size = config['shard_size'] + accumulate_size = 0 + state_dict_shard = OrderedDict() + for name, param in state_dict.items(): + state_dict_shard[name] = param + accumulate_size += param.numel() * param.element_size() + if accumulate_size >= shard_size: + accumulate_size = 0 + yield state_dict_shard + state_dict_shard = OrderedDict() + if accumulate_size > 0: + yield state_dict_shard + else: + yield state_dict def merge_lora_weight(self, model: nn.Module): unwrapped_model = self._unwrap_model(model) diff --git a/applications/Chat/examples/ray/1mmt_prompt.py b/applications/Chat/examples/ray/1mmt_prompt.py index 7808f4ce72c8..bd7224aae749 100644 --- a/applications/Chat/examples/ray/1mmt_prompt.py +++ b/applications/Chat/examples/ray/1mmt_prompt.py @@ -6,6 +6,7 @@ import pandas as pd import ray import torch +from coati.quant import llama_load_quant, low_resource_init from coati.ray.detached_trainer_ppo import DetachedPPOTrainer from coati.ray.experience_maker_holder import ExperienceMakerHolder from coati.ray.utils import ( @@ -17,6 +18,8 @@ ) from torch.utils.data import DataLoader +from transformers import AutoConfig +from transformers.modeling_utils import no_init_weights def get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -77,7 +80,15 @@ def model_fn(): actor = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() critic = get_critic_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() - initial_model = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + if args.initial_model_quant_ckpt is not None and args.model == 'llama': + # quantize initial model + actor_cfg = AutoConfig.from_pretrained(args.pretrain) + with low_resource_init(), no_init_weights(): + initial_model = get_actor_from_args(args.model, config=actor_cfg) + initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, + args.quant_group_size).cuda().requires_grad_(False) + else: + initial_model = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker @@ -118,7 +129,6 @@ def model_fn(): dataset_size = args.experience_batch_size * 4 - from torch.utils.data import DataLoader def build_dataloader(): def tokenize_fn(texts): @@ -145,10 +155,14 @@ def tokenize_fn(texts): parser.add_argument('--prompt_path', type=str, default=None) parser.add_argument('--num_trainers', type=int, default=1) parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + choices=[ + 'naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2', 'colossalai_gemini_cpu', + 'colossalai_zero2_cpu' + ], default='naive') parser.add_argument('--maker_strategy', choices=['naive'], default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--critic_model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) parser.add_argument('--pretrain', type=str, default=None) parser.add_argument('--critic_pretrain', type=str, default=None) parser.add_argument('--experience_steps', type=int, default=4) @@ -158,6 +172,9 @@ def tokenize_fn(texts): parser.add_argument('--train_batch_size', type=int, default=8) parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + parser.add_argument('--initial_model_quant_ckpt', type=str, default=None) + parser.add_argument('--quant_bits', type=int, default=4) + parser.add_argument('--quant_group_size', type=int, default=128) parser.add_argument('--debug', action='store_true') args = parser.parse_args() ray.init(namespace=os.environ["RAY_NAMESPACE"]) diff --git a/applications/Chat/examples/ray/mmmt_dummy.py b/applications/Chat/examples/ray/mmmt_dummy.py index 767fe37030f6..082f4851777e 100644 --- a/applications/Chat/examples/ray/mmmt_dummy.py +++ b/applications/Chat/examples/ray/mmmt_dummy.py @@ -61,17 +61,17 @@ def main(args): def model_fn(): actor_cfg = AutoConfig.from_pretrained(args.pretrain) critic_cfg = AutoConfig.from_pretrained(args.critic_pretrain) - actor = get_actor_from_args(args.model, config=actor_cfg).half().cuda() - critic = get_critic_from_args(args.critic_model, config=critic_cfg).half().cuda() - reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).half().cuda() + actor = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda() if args.initial_model_quant_ckpt is not None and args.model == 'llama': # quantize initial model with low_resource_init(), no_init_weights(): initial_model = get_actor_from_args(args.model, config=actor_cfg) initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, - args.quant_group_size).cuda() + args.quant_group_size).cuda().requires_grad_(False) else: - initial_model = get_actor_from_args(args.model, config=actor_cfg).half().cuda() + initial_model = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda() return actor, critic, reward_model, initial_model # configure Experience Maker diff --git a/applications/Chat/examples/ray/mmmt_prompt.py b/applications/Chat/examples/ray/mmmt_prompt.py new file mode 100644 index 000000000000..d2398d451c7b --- /dev/null +++ b/applications/Chat/examples/ray/mmmt_prompt.py @@ -0,0 +1,191 @@ +import argparse +import os +import socket +from functools import partial + +import ray +import torch +import pandas as pd +from coati.quant import llama_load_quant, low_resource_init +from coati.ray.detached_trainer_ppo import DetachedPPOTrainer +from coati.ray.experience_maker_holder import ExperienceMakerHolder +from coati.ray.utils import ( + get_actor_from_args, + get_critic_from_args, + get_receivers_per_sender, + get_reward_model_from_args, + get_strategy_from_args, +) +from torch.utils.data import DataLoader +from transformers import AutoConfig, AutoTokenizer +from transformers.modeling_utils import no_init_weights + + +def get_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + +def get_local_ip(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(('8.8.8.8', 80)) + return s.getsockname()[0] + + +def main(args): + master_addr = str(get_local_ip()) + # trainer_env_info + trainer_port = str(get_free_port()) + env_info_trainers = [{ + 'local_rank': '0', + 'rank': str(rank), + 'world_size': str(args.num_trainers), + 'master_port': trainer_port, + 'master_addr': master_addr + } for rank in range(args.num_trainers)] + + # maker_env_info + maker_port = str(get_free_port()) + env_info_makers = [{ + 'local_rank': '0', + 'rank': str(rank), + 'world_size': str(args.num_makers), + 'master_port': maker_port, + 'master_addr': master_addr + } for rank in range(args.num_makers)] + + # configure tokenizer + tokenizer = AutoTokenizer.from_pretrained(args.pretrain) + tokenizer.pad_token = tokenizer.eos_token + + def model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + reward_model = get_reward_model_from_args(args.model, args.critic_pretrain).requires_grad_(False).half().cuda() + if args.initial_model_quant_ckpt is not None and args.model == 'llama': + # quantize initial model + actor_cfg = AutoConfig.from_pretrained(args.pretrain) + with low_resource_init(), no_init_weights(): + initial_model = get_actor_from_args(args.model, config=actor_cfg) + initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits, + args.quant_group_size).cuda().requires_grad_(False) + else: + initial_model = get_actor_from_args(args.model, args.pretrain).requires_grad_(False).half().cuda() + return actor, critic, reward_model, initial_model + + # configure Experience Maker + experience_holder_refs = [ + ExperienceMakerHolder.options(name=f"maker{i}", num_gpus=1, max_concurrency=2).remote( + detached_trainer_name_list=[ + f'trainer{x}' + for x in get_receivers_per_sender(i, args.num_makers, args.num_trainers, allow_idle_sender=False) + ], + strategy_fn=partial(get_strategy_from_args, args.maker_strategy), + model_fn=model_fn, + env_info=env_info_maker, + kl_coef=0.1, + debug=args.debug, + # sync_models_from_trainers=True, + # generation kwargs: + max_length=512, + do_sample=True, + temperature=1.0, + top_k=50, + pad_token_id=tokenizer.pad_token_id, + eos_token_id=tokenizer.eos_token_id, + eval_performance=True, + use_cache=True, + ) + for i, env_info_maker in enumerate(env_info_makers) + ] + + def trainer_model_fn(): + actor = get_actor_from_args(args.model, args.pretrain).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() + return actor, critic + + # configure Trainer + trainer_refs = [ + DetachedPPOTrainer.options(name=f"trainer{i}", num_gpus=1, max_concurrency=2).remote( + experience_maker_holder_name_list=[ + f"maker{x}" + for x in get_receivers_per_sender(i, args.num_trainers, args.num_makers, allow_idle_sender=True) + ], + strategy_fn=partial(get_strategy_from_args, args.trainer_strategy), + model_fn=trainer_model_fn, + env_info=env_info_trainer, + train_batch_size=args.train_batch_size, + buffer_limit=16, + eval_performance=True, + debug=args.debug, + ) + for i, env_info_trainer in enumerate(env_info_trainers) + ] + + dataset_size = args.experience_batch_size * 4 + + def build_dataloader(): + def tokenize_fn(texts): + batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) + return {k: v.cuda() for k, v in batch.items()} + + dataset = pd.read_csv(args.prompt_path)['prompt'] + dataloader = DataLoader(dataset=dataset, + batch_size=dataset_size, + shuffle=True, + collate_fn=tokenize_fn + ) + return dataloader + + # uncomment this function if sync_models_from_trainers is True + # ray.get([ + # trainer_ref.sync_models_to_remote_makers.remote() + # for trainer_ref in trainer_refs + # ]) + + wait_tasks = [] + + for experience_holder_ref in experience_holder_refs: + wait_tasks.append( + experience_holder_ref.workingloop.remote(build_dataloader, + num_steps=args.experience_steps)) + + total_steps = args.experience_batch_size * args.experience_steps * \ + args.num_makers // (args.num_trainers * args.train_batch_size) + for trainer_ref in trainer_refs: + wait_tasks.append(trainer_ref.fit.remote(total_steps, args.update_steps, args.train_epochs)) + + ray.get(wait_tasks) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--prompt_path', type=str, default=None) + parser.add_argument('--num_makers', type=int, default=1) + parser.add_argument('--num_trainers', type=int, default=1) + parser.add_argument('--trainer_strategy', + choices=[ + 'naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2', 'colossalai_gemini_cpu', + 'colossalai_zero2_cpu' + ], + default='naive') + parser.add_argument('--maker_strategy', choices=['naive'], default='naive') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--critic_model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) + parser.add_argument('--pretrain', type=str, default=None) + parser.add_argument('--critic_pretrain', type=str, default=None) + parser.add_argument('--experience_steps', type=int, default=4) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--train_epochs', type=int, default=1) + parser.add_argument('--update_steps', type=int, default=2) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") + + parser.add_argument('--initial_model_quant_ckpt', type=str, default=None) + parser.add_argument('--quant_bits', type=int, default=4) + parser.add_argument('--quant_group_size', type=int, default=128) + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + ray.init(namespace=os.environ["RAY_NAMESPACE"]) + main(args) From 332fd3c040f9ae3df9ac9b9415639a25ba6d91bc Mon Sep 17 00:00:00 2001 From: csric Date: Tue, 25 Apr 2023 15:28:32 +0800 Subject: [PATCH 07/23] remove legacy examples --- applications/Chat/examples/ray/2m1t.py | 141 --------------- applications/Chat/examples/ray/2m1t.sh | 23 --- applications/Chat/examples/ray/2m2t.py | 230 ------------------------- applications/Chat/examples/ray/2m2t.sh | 23 --- 4 files changed, 417 deletions(-) delete mode 100644 applications/Chat/examples/ray/2m1t.py delete mode 100644 applications/Chat/examples/ray/2m1t.sh delete mode 100644 applications/Chat/examples/ray/2m2t.py delete mode 100644 applications/Chat/examples/ray/2m2t.sh diff --git a/applications/Chat/examples/ray/2m1t.py b/applications/Chat/examples/ray/2m1t.py deleted file mode 100644 index bed6246ed0d7..000000000000 --- a/applications/Chat/examples/ray/2m1t.py +++ /dev/null @@ -1,141 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def main(args): - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_ref = DetachedPPOTrainer.options(name="trainer1", num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - experience_holder_2_ref = ExperienceMakerHolder.options(name="maker2", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - ray.get(trainer_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs // 2 + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - maker_2_done_ref = experience_holder_2_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_done_ref, maker_1_done_ref, maker_2_done_ref]) - - # save model checkpoint after fitting - trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - ray.init(namespace=os.environ["RAY_NAMESPACE"]) - main(args) diff --git a/applications/Chat/examples/ray/2m1t.sh b/applications/Chat/examples/ray/2m1t.sh deleted file mode 100644 index a207d4118d60..000000000000 --- a/applications/Chat/examples/ray/2m1t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 3 - -export RAY_NAMESPACE="admin" - -python 2m1t.py "/path/to/prompts.csv" \ - --trainer_strategy naive --maker_strategy naive --lora_rank 2 --pretrain "facebook/opt-350m" --model 'opt' \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 # --debug diff --git a/applications/Chat/examples/ray/2m2t.py b/applications/Chat/examples/ray/2m2t.py deleted file mode 100644 index 05440032ce9f..000000000000 --- a/applications/Chat/examples/ray/2m2t.py +++ /dev/null @@ -1,230 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - env_info_trainer_2 = { - 'local_rank': '0', - 'rank': '1', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': maker_port, - 'master_addr': master_addr - } - env_info_maker_2 = { - 'local_rank': '0', - 'rank': '1', - 'world_size': '2', - 'master_port': maker_port, - 'master_addr': master_addr - } - print([env_info_trainer_1, env_info_trainer_2, env_info_maker_1, env_info_maker_2]) - ray.init() - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_1_ref = DetachedPPOTrainer.options(name="trainer1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_1, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - trainer_2_ref = DetachedPPOTrainer.options(name="trainer2", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1", "maker2"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_2, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_1, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - experience_holder_2_ref = ExperienceMakerHolder.options(name="maker2", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_2, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - # TODO: balance duty - ray.get(trainer_1_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_1_done_ref = trainer_1_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - trainer_2_done_ref = trainer_2_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - maker_2_done_ref = experience_holder_2_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_1_done_ref, trainer_2_done_ref, maker_1_done_ref, maker_2_done_ref]) - # save model checkpoint after fitting - trainer_1_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - trainer_2_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_1_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - trainer_2_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - main(args) diff --git a/applications/Chat/examples/ray/2m2t.sh b/applications/Chat/examples/ray/2m2t.sh deleted file mode 100644 index bd8ca84a58fb..000000000000 --- a/applications/Chat/examples/ray/2m2t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 2 - -export RAY_NAMESPACE="admin" - -python 2m2t.py "path/to/prompts.csv" \ - --maker_strategy naive --trainer_strategy colossalai_zero2 --lora_rank 2 \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 --debug From 9b2b77eeacdb915904968f5d2bca006813e5eee2 Mon Sep 17 00:00:00 2001 From: csric Date: Tue, 25 Apr 2023 15:29:24 +0800 Subject: [PATCH 08/23] remove legacy examples --- applications/Chat/examples/ray/1m1t.py | 166 -------------- applications/Chat/examples/ray/1m1t.sh | 23 -- .../Chat/examples/ray/1m1t_quantize.py | 156 -------------- applications/Chat/examples/ray/1m2t.py | 203 ------------------ applications/Chat/examples/ray/1m2t.sh | 23 -- 5 files changed, 571 deletions(-) delete mode 100644 applications/Chat/examples/ray/1m1t.py delete mode 100644 applications/Chat/examples/ray/1m1t.sh delete mode 100644 applications/Chat/examples/ray/1m1t_quantize.py delete mode 100644 applications/Chat/examples/ray/1m2t.py delete mode 100644 applications/Chat/examples/ray/1m2t.sh diff --git a/applications/Chat/examples/ray/1m1t.py b/applications/Chat/examples/ray/1m1t.py deleted file mode 100644 index 8c291abb1f8b..000000000000 --- a/applications/Chat/examples/ray/1m1t.py +++ /dev/null @@ -1,166 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.callbacks.performance_evaluator import ( - ExperienceMakerPerformanceEvaluator, - TrainerPerformaceEvaluator, -) -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': trainer_port, - 'master_addr': master_addr - } - - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': maker_port, - 'master_addr': master_addr - } - - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_ref = DetachedPPOTrainer.options(name="trainer1", num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - eval_performance=True, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - env_info=env_info_maker, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - eval_performance=True, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - ray.get(trainer_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs + 3 # +3 for fault tolerance - maker_done_ref = experience_holder_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_done_ref, maker_done_ref]) - - # save model checkpoint after fitting - trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - ray.init(namespace=os.environ["RAY_NAMESPACE"]) - main(args) diff --git a/applications/Chat/examples/ray/1m1t.sh b/applications/Chat/examples/ray/1m1t.sh deleted file mode 100644 index f7c5054c800e..000000000000 --- a/applications/Chat/examples/ray/1m1t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 2 - -export RAY_NAMESPACE="admin" - -python 1m1t.py "/path/to/prompts.csv" \ - --trainer_strategy colossalai_zero2 --maker_strategy naive --lora_rank 2 --pretrain "facebook/opt-350m" --model 'opt' \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 --debug diff --git a/applications/Chat/examples/ray/1m1t_quantize.py b/applications/Chat/examples/ray/1m1t_quantize.py deleted file mode 100644 index cc54bd1905c6..000000000000 --- a/applications/Chat/examples/ray/1m1t_quantize.py +++ /dev/null @@ -1,156 +0,0 @@ -import argparse -import os -import socket - -import pandas as pd -import ray -import torch -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': trainer_port, - 'master_addr': master_addr - } - - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '1', - 'master_port': maker_port, - 'master_addr': master_addr - } - - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_ref = DetachedPPOTrainer.options(name="trainer1", num_gpus=1, max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - eval_performance=True, - ) - - # configure Experience Maker - experience_holder_ref = ExperienceMakerHolder.options(name="maker1", num_gpus=1, max_concurrency=2).remote( - detached_trainer_name_list=["trainer1"], - strategy=args.maker_strategy, - env_info=env_info_maker, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - eval_performance=True, - ) - - # a 'jump wire' to set quantized initial_model and reward_model - - # trainer send its actor and critic to experience holders. - # ray.get(trainer_ref.initialize_remote_makers.remote()) - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_done_ref = trainer_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs + 3 # +3 for fault tolerance - maker_done_ref = experience_holder_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_done_ref, maker_done_ref]) - - # save model checkpoint after fitting - trainer_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama', 'roberta']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - ray.init(namespace=os.environ["RAY_NAMESPACE"]) - main(args) diff --git a/applications/Chat/examples/ray/1m2t.py b/applications/Chat/examples/ray/1m2t.py deleted file mode 100644 index 1a35beb6221a..000000000000 --- a/applications/Chat/examples/ray/1m2t.py +++ /dev/null @@ -1,203 +0,0 @@ -import argparse -import os -import socket -from copy import deepcopy - -import pandas as pd -import ray -import torch -from coati.experience_maker import NaiveExperienceMaker -from coati.ray.detached_trainer_ppo import DetachedPPOTrainer -from coati.ray.experience_maker_holder import ExperienceMakerHolder -from coati.trainer import PPOTrainer -from coati.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy -from torch.optim import Adam -from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer - -from colossalai.nn.optimizer import HybridAdam - - -def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - -def get_local_ip(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 80)) - return s.getsockname()[0] - - -def main(args): - master_addr = str(get_local_ip()) - # trainer_env_info - trainer_port = str(get_free_port()) - env_info_trainer_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - env_info_trainer_2 = { - 'local_rank': '0', - 'rank': '1', - 'world_size': '2', - 'master_port': trainer_port, - 'master_addr': master_addr - } - # maker_env_info - maker_port = str(get_free_port()) - env_info_maker_1 = { - 'local_rank': '0', - 'rank': '0', - 'world_size': '2', - 'master_port': maker_port, - 'master_addr': master_addr - } - print([env_info_trainer_1, env_info_trainer_2, env_info_maker_1]) - ray.init(dashboard_port=1145) - # configure tokenizer - if args.model == 'gpt2': - tokenizer = GPT2Tokenizer.from_pretrained('gpt2') - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'bloom': - tokenizer = BloomTokenizerFast.from_pretrained(args.pretrain) - tokenizer.pad_token = tokenizer.eos_token - elif args.model == 'opt': - tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") - else: - raise ValueError(f'Unsupported model "{args.model}"') - - # configure Trainer - trainer_1_ref = DetachedPPOTrainer.options(name="trainer1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_1, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - trainer_2_ref = DetachedPPOTrainer.options(name="trainer2", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - experience_maker_holder_name_list=["maker1"], - strategy=args.trainer_strategy, - model=args.model, - env_info=env_info_trainer_2, - pretrained=args.pretrain, - lora_rank=args.lora_rank, - train_batch_size=args.train_batch_size, - buffer_limit=16, - experience_batch_size=args.experience_batch_size, - max_epochs=args.max_epochs, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # configure Experience Maker - experience_holder_1_ref = ExperienceMakerHolder.options(name="maker1", - namespace=os.environ["RAY_NAMESPACE"], - num_gpus=1, - max_concurrency=2).remote( - detached_trainer_name_list=["trainer1", "trainer2"], - strategy=args.maker_strategy, - env_info=env_info_maker_1, - experience_batch_size=args.experience_batch_size, - kl_coef=0.1, - # kwargs: - max_length=128, - do_sample=True, - temperature=1.0, - top_k=50, - pad_token_id=tokenizer.pad_token_id, - eos_token_id=tokenizer.eos_token_id, - debug=args.debug, - ) - - # trainer send its actor and critic to experience holders. - # TODO: balance duty - ray.get(trainer_1_ref.initialize_remote_makers.remote()) - - # configure sampler - dataset = pd.read_csv(args.prompt_path)['prompt'] - - def tokenize_fn(texts): - # MUST padding to max length to ensure inputs of all ranks have the same length - # Different length may lead to hang when using gemini, as different generation steps - batch = tokenizer(texts, return_tensors='pt', max_length=96, padding='max_length', truncation=True) - return {k: v.cuda() for k, v in batch.items()} - - trainer_1_done_ref = trainer_1_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - trainer_2_done_ref = trainer_2_ref.fit.remote(num_episodes=args.num_episodes, - max_timesteps=args.max_timesteps, - update_timesteps=args.update_timesteps) - num_exp_per_maker = args.num_episodes * args.max_timesteps // args.update_timesteps * \ - args.max_epochs * 2 + 3 # +3 for fault tolerance - maker_1_done_ref = experience_holder_1_ref.workingloop.remote(dataset, tokenize_fn, times=num_exp_per_maker) - - ray.get([trainer_1_done_ref, trainer_2_done_ref, maker_1_done_ref]) - # save model checkpoint after fitting - trainer_1_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - trainer_2_ref.strategy_save_actor.remote(args.save_path, only_rank0=True) - # save optimizer checkpoint on all ranks - if args.need_optim_ckpt: - trainer_1_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - trainer_2_ref.strategy_save_actor_optim.remote('actor_optim_checkpoint_prompts_%d.pt' % - (torch.cuda.current_device()), - only_rank0=False) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('prompt_path') - parser.add_argument('--trainer_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--maker_strategy', - choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], - default='naive') - parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) - parser.add_argument('--pretrain', type=str, default=None) - parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') - parser.add_argument('--need_optim_ckpt', type=bool, default=False) - parser.add_argument('--num_episodes', type=int, default=10) - parser.add_argument('--max_timesteps', type=int, default=10) - parser.add_argument('--update_timesteps', type=int, default=10) - parser.add_argument('--max_epochs', type=int, default=5) - parser.add_argument('--train_batch_size', type=int, default=8) - parser.add_argument('--experience_batch_size', type=int, default=8) - parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") - - parser.add_argument('--debug', action='store_true') - args = parser.parse_args() - main(args) diff --git a/applications/Chat/examples/ray/1m2t.sh b/applications/Chat/examples/ray/1m2t.sh deleted file mode 100644 index 9608526ea7e7..000000000000 --- a/applications/Chat/examples/ray/1m2t.sh +++ /dev/null @@ -1,23 +0,0 @@ -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv \ - | tail -n +2 \ - | nl -v 0 \ - | tee /dev/tty \ - | sort -g -k 2 \ - | awk '{print $1}' \ - | head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 2 - -export RAY_NAMESPACE="admin" - -python 1m2t.py "/path/to/prompts.csv" --model gpt2 \ - --maker_strategy naive --trainer_strategy ddp --lora_rank 2 \ - --num_episodes 10 --max_timesteps 10 --update_timesteps 10 \ - --max_epochs 10 #--debug From 13fa0383430f0c1d935e0133f80d6726aa11b87d Mon Sep 17 00:00:00 2001 From: csric Date: Wed, 26 Apr 2023 11:01:04 +0800 Subject: [PATCH 09/23] remove replay buffer tp state. bad design --- .../Chat/coati/ray/detached_replay_buffer.py | 23 +++---------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/applications/Chat/coati/ray/detached_replay_buffer.py b/applications/Chat/coati/ray/detached_replay_buffer.py index 257b0b072493..6a4f5a6d67c2 100644 --- a/applications/Chat/coati/ray/detached_replay_buffer.py +++ b/applications/Chat/coati/ray/detached_replay_buffer.py @@ -26,20 +26,12 @@ class DetachedReplayBuffer: cpu_offload: Whether to offload experience to cpu when sampling. Defaults to True. ''' - def __init__(self, sample_batch_size: int, tp_world_size: int = 1, limit: int = 0) -> None: + def __init__(self, sample_batch_size: int, limit: int = 0) -> None: self.sample_batch_size = sample_batch_size self.limit = limit self.items = Queue(self.limit, actor_options={"num_cpus": 1}) self.batch_collector: List[BufferItem] = [] - ''' - Workers in the same tp group share this buffer and need same sample for one step. - Therefore a held_sample should be returned tp_world_size times before it could be dropped. - worker_state records wheter a worker got the held_sample - ''' - self.tp_world_size = tp_world_size - self.worker_state = [False] * self.tp_world_size - self.held_sample = None - self._worker_state_lock = Lock() + @torch.no_grad() def append(self, experience: Experience) -> None: @@ -70,16 +62,7 @@ def clear(self) -> None: @torch.no_grad() def sample(self, worker_rank=0, to_device="cpu") -> Experience: - self._worker_state_lock.acquire() - if not any(self.worker_state): - self.held_sample = self._sample_and_erase() - self.worker_state[worker_rank] = True - if all(self.worker_state): - self.worker_state = [False] * self.tp_world_size - ret = self.held_sample - else: - ret = copy.deepcopy(self.held_sample) - self._worker_state_lock.release() + ret = self._sample_and_erase() ret.to_device(to_device) return ret From cf5c2d12a8b0b117e097bdcf94b001e7436509e6 Mon Sep 17 00:00:00 2001 From: csric Date: Thu, 27 Apr 2023 11:32:54 +0800 Subject: [PATCH 10/23] opt benchmark --- applications/Chat/examples/ray/.gitignore | 1 + applications/Chat/examples/ray/mmmt_prompt.py | 8 +++--- applications/Chat/examples/ray/run.sh | 28 +++++++++++++++++++ 3 files changed, 33 insertions(+), 4 deletions(-) create mode 100644 applications/Chat/examples/ray/.gitignore create mode 100644 applications/Chat/examples/ray/run.sh diff --git a/applications/Chat/examples/ray/.gitignore b/applications/Chat/examples/ray/.gitignore new file mode 100644 index 000000000000..4cf8dd15619e --- /dev/null +++ b/applications/Chat/examples/ray/.gitignore @@ -0,0 +1 @@ +logs/* \ No newline at end of file diff --git a/applications/Chat/examples/ray/mmmt_prompt.py b/applications/Chat/examples/ray/mmmt_prompt.py index d2398d451c7b..84d7bf5d92bf 100644 --- a/applications/Chat/examples/ray/mmmt_prompt.py +++ b/applications/Chat/examples/ray/mmmt_prompt.py @@ -175,11 +175,11 @@ def tokenize_fn(texts): parser.add_argument('--critic_model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) parser.add_argument('--pretrain', type=str, default=None) parser.add_argument('--critic_pretrain', type=str, default=None) - parser.add_argument('--experience_steps', type=int, default=4) - parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--experience_steps', type=int, default=4) # + parser.add_argument('--experience_batch_size', type=int, default=8) # _ * _ parser.add_argument('--train_epochs', type=int, default=1) - parser.add_argument('--update_steps', type=int, default=2) - parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--update_steps', type=int, default=2) # + parser.add_argument('--train_batch_size', type=int, default=8) # parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") parser.add_argument('--initial_model_quant_ckpt', type=str, default=None) diff --git a/applications/Chat/examples/ray/run.sh b/applications/Chat/examples/ray/run.sh new file mode 100644 index 000000000000..e4b8720a3b71 --- /dev/null +++ b/applications/Chat/examples/ray/run.sh @@ -0,0 +1,28 @@ +for pretrain in "facebook/opt-350m" "facebook/opt-1.3b" "facebook/opt-2.7b" "facebook/opt-6.7b" "facebook/opt-13b" +do + for experience_steps in 8 16 + do + for experience_batch_size in 4 8 16 + do + for update_steps in 4 16 + do + for train_batch_size in 4 8 16 + do + nohup python mmmt_prompt.py \ + --prompt_path /home/lccsr/data3/awesome-chatgpt-prompts/prompts.csv \ + --trainer_strategy colossalai_gemini --maker_strategy naive \ + --model 'opt' \ + --critic_pretrain "facebook/opt-125m" \ + --pretrain $pretrain \ + --num_trainers 4 \ + --num_makers 4 \ + --experience_steps $experience_steps \ + --experience_batch_size $experience_batch_size \ + --update_steps $update_steps \ + --train_batch_size $train_batch_size \ + --debug > logs/output_4_4_pretrain_${pretrain##*/}_experience_steps_${experience_steps}_experience_batch_size_${experience_batch_size}_update_steps_${update_steps}_train_batch_size_${train_batch_size}.txt 2>&1 + done + done + done + done +done \ No newline at end of file From 94783c57d7ef69721e7a8d23f72e716dbce4de64 Mon Sep 17 00:00:00 2001 From: csric Date: Thu, 27 Apr 2023 14:29:07 +0800 Subject: [PATCH 11/23] better script --- applications/Chat/examples/ray/benchmark.sh | 37 +++++++++++++++++++++ applications/Chat/examples/ray/run.sh | 28 ---------------- 2 files changed, 37 insertions(+), 28 deletions(-) create mode 100644 applications/Chat/examples/ray/benchmark.sh delete mode 100644 applications/Chat/examples/ray/run.sh diff --git a/applications/Chat/examples/ray/benchmark.sh b/applications/Chat/examples/ray/benchmark.sh new file mode 100644 index 000000000000..2693931f6e37 --- /dev/null +++ b/applications/Chat/examples/ray/benchmark.sh @@ -0,0 +1,37 @@ +# "facebook/opt-2.7b" +PROMPT_PATH=/home/lccsr/data3/awesome-chatgpt-prompts/prompts.csv + +num_trainers=4 +num_makers=4 + +for pretrain in "facebook/opt-1.3b" "facebook/opt-6.7b" "facebook/opt-13b" +do + + for experience_batch_size in 16 32 64 + do + for train_batch_size in 16 32 64 + do + for update_steps in 8 32 128 + do + # set a big enough experience_steps for twice maker-update + experience_steps=$((2*num_trainers*train_batch_size*update_steps/num_makers/experience_batch_size)) + + echo running: ${num_trainers}_${num_makers}_pretrain_${pretrain##*/}_experience_batch_size_${experience_batch_size}_train_batch_size_${train_batch_size}_update_steps_${update_steps}_experience_steps_${experience_steps} + + nohup python mmmt_prompt.py \ + --prompt_path $PROMPT_PATH \ + --trainer_strategy colossalai_gemini --maker_strategy naive \ + --model 'opt' \ + --pretrain $pretrain \ + --critic_pretrain "facebook/opt-350m" \ + --num_trainers $num_trainers \ + --num_makers $num_makers \ + --experience_steps $experience_steps \ + --experience_batch_size $experience_batch_size \ + --update_steps $update_steps \ + --train_batch_size $train_batch_size \ + --debug > logs/output_${num_trainers}_${num_makers}_pretrain_${pretrain##*/}_experience_batch_size_${experience_batch_size}_train_batch_size_${train_batch_size}_update_steps_${update_steps}_experience_steps_${experience_steps}.txt 2>&1 + done + done + done +done \ No newline at end of file diff --git a/applications/Chat/examples/ray/run.sh b/applications/Chat/examples/ray/run.sh deleted file mode 100644 index e4b8720a3b71..000000000000 --- a/applications/Chat/examples/ray/run.sh +++ /dev/null @@ -1,28 +0,0 @@ -for pretrain in "facebook/opt-350m" "facebook/opt-1.3b" "facebook/opt-2.7b" "facebook/opt-6.7b" "facebook/opt-13b" -do - for experience_steps in 8 16 - do - for experience_batch_size in 4 8 16 - do - for update_steps in 4 16 - do - for train_batch_size in 4 8 16 - do - nohup python mmmt_prompt.py \ - --prompt_path /home/lccsr/data3/awesome-chatgpt-prompts/prompts.csv \ - --trainer_strategy colossalai_gemini --maker_strategy naive \ - --model 'opt' \ - --critic_pretrain "facebook/opt-125m" \ - --pretrain $pretrain \ - --num_trainers 4 \ - --num_makers 4 \ - --experience_steps $experience_steps \ - --experience_batch_size $experience_batch_size \ - --update_steps $update_steps \ - --train_batch_size $train_batch_size \ - --debug > logs/output_4_4_pretrain_${pretrain##*/}_experience_steps_${experience_steps}_experience_batch_size_${experience_batch_size}_update_steps_${update_steps}_train_batch_size_${train_batch_size}.txt 2>&1 - done - done - done - done -done \ No newline at end of file From bfe6a69e33eb18cff8cd0cd67a5d2bfad486075c Mon Sep 17 00:00:00 2001 From: csric Date: Thu, 27 Apr 2023 15:20:38 +0800 Subject: [PATCH 12/23] nothing --- applications/Chat/examples/ray/benchmark.sh | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/applications/Chat/examples/ray/benchmark.sh b/applications/Chat/examples/ray/benchmark.sh index 2693931f6e37..3852684007b7 100644 --- a/applications/Chat/examples/ray/benchmark.sh +++ b/applications/Chat/examples/ray/benchmark.sh @@ -1,9 +1,10 @@ -# "facebook/opt-2.7b" + PROMPT_PATH=/home/lccsr/data3/awesome-chatgpt-prompts/prompts.csv num_trainers=4 num_makers=4 +# "facebook/opt-2.7b" for pretrain in "facebook/opt-1.3b" "facebook/opt-6.7b" "facebook/opt-13b" do @@ -16,7 +17,8 @@ do # set a big enough experience_steps for twice maker-update experience_steps=$((2*num_trainers*train_batch_size*update_steps/num_makers/experience_batch_size)) - echo running: ${num_trainers}_${num_makers}_pretrain_${pretrain##*/}_experience_batch_size_${experience_batch_size}_train_batch_size_${train_batch_size}_update_steps_${update_steps}_experience_steps_${experience_steps} + config_string=${num_trainers}_${num_makers}_pretrain_${pretrain##*/}_experience_batch_size_${experience_batch_size}_train_batch_size_${train_batch_size}_update_steps_${update_steps}_experience_steps_${experience_steps} + echo running: ${config_string} nohup python mmmt_prompt.py \ --prompt_path $PROMPT_PATH \ @@ -30,7 +32,7 @@ do --experience_batch_size $experience_batch_size \ --update_steps $update_steps \ --train_batch_size $train_batch_size \ - --debug > logs/output_${num_trainers}_${num_makers}_pretrain_${pretrain##*/}_experience_batch_size_${experience_batch_size}_train_batch_size_${train_batch_size}_update_steps_${update_steps}_experience_steps_${experience_steps}.txt 2>&1 + --debug > logs/output_${config_string}.txt 2>&1 done done done From 1c98e937fe0bb369cf8246679a957ed8849537b0 Mon Sep 17 00:00:00 2001 From: ver217 Date: Thu, 27 Apr 2023 16:17:27 +0800 Subject: [PATCH 13/23] [chat] strategy refactor unwrap model --- .../Chat/coati/models/base/__init__.py | 11 ++++++- applications/Chat/coati/trainer/ppo.py | 2 +- .../Chat/coati/trainer/strategies/base.py | 30 +++++++------------ .../coati/trainer/strategies/colossalai.py | 18 +++++------ .../Chat/coati/trainer/strategies/ddp.py | 9 +++--- .../Chat/coati/trainer/strategies/naive.py | 6 ++-- 6 files changed, 38 insertions(+), 38 deletions(-) diff --git a/applications/Chat/coati/models/base/__init__.py b/applications/Chat/coati/models/base/__init__.py index 86f403556904..73ae891bae88 100644 --- a/applications/Chat/coati/models/base/__init__.py +++ b/applications/Chat/coati/models/base/__init__.py @@ -1,5 +1,14 @@ +import torch.nn as nn + from .actor import Actor from .critic import Critic from .reward_model import RewardModel -__all__ = ['Actor', 'Critic', 'RewardModel'] + +def get_base_model(model: nn.Module) -> nn.Module: + if isinstance(model, Actor): + return model.get_base_model() + return model + + +__all__ = ['Actor', 'Critic', 'RewardModel', 'get_base_model'] diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index f9ab4a556359..ead39e4919ae 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -207,7 +207,7 @@ def save_model(self, def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, actor: Actor) -> None: - origin_model = strategy._unwrap_actor(actor) + origin_model = strategy.unwrap_model(actor) new_kwargs = {**generate_kwargs} # use huggingface models method directly if 'prepare_inputs_fn' not in generate_kwargs and hasattr(origin_model, 'prepare_inputs_for_generation'): diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index a1647a4ca525..a0392f63b3bb 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -2,10 +2,9 @@ from contextlib import nullcontext from typing import Any, List, Optional, Tuple, Union -import numpy as np import torch import torch.nn as nn -from coati.models.base import Actor, Critic, RewardModel +from coati.models.base import Actor, get_base_model from coati.replay_buffer import ReplayBuffer from torch.optim import Optimizer from torch.utils.data import DataLoader @@ -72,8 +71,8 @@ def prepare( def prepare_model(model: nn.Module): if isinstance(model, Actor): - return Actor(self.setup_model(self._unwrap_model(model))) - return self.setup_model(self._unwrap_model(model)) + return Actor(self.setup_model(model.get_base_model())) + return self.setup_model(model) rets = [] for arg in models_or_model_optim_pairs: @@ -81,7 +80,7 @@ def prepare_model(model: nn.Module): assert len(arg) == 2, f'Expect (model, optimizer) pair, got a tuple with size "{len(arg)}"' model, optimizer = arg model = prepare_model(model) - optimizer = self.setup_optimizer(optimizer, self._unwrap_model(model)) + optimizer = self.setup_optimizer(optimizer, get_base_model(model)) rets.append((model, optimizer)) elif isinstance(arg, nn.Module): rets.append(prepare_model(arg)) @@ -93,24 +92,17 @@ def prepare_model(model: nn.Module): return rets @staticmethod - def _unwrap_model(model: nn.Module) -> nn.Module: - """Useful for saving state dict. As actor is wrapped by Actor class again in `prepare()`, we should unwrap it before saving. + def unwrap_model(model: nn.Module) -> nn.Module: + """Get the unwrapped model from a wrapped model. Useful for getting original huggingface model. + For Actor, it will unwrap `actor.model`. Args: - model (nn.Module): an actor or a critic - """ - if isinstance(model, Actor): - return model.model - return model - - @staticmethod - def _unwrap_actor(actor: Actor) -> nn.Module: - """Get `actor.model` from a wrapped (by `prepare()`) actor. Useful for getting original huggingface model. + model (nn.Module): the model to unwrap - Args: - actor (Actor): a wrapped actor + Returns: + nn.Module: the original model (usually a huggingface model) """ - return Strategy._unwrap_model(actor) + return get_base_model(model) @abstractmethod def save_model(self, diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index 2fb0b8a188f0..a7b03384eb74 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -5,7 +5,7 @@ import torch.distributed as dist import torch.nn as nn import torch.optim as optim -from coati.models.base import Actor, RewardModel +from coati.models.base import Actor, RewardModel, get_base_model from coati.models.lora import LoraLinear from torch.optim import Optimizer from transformers.modeling_utils import PreTrainedModel @@ -154,13 +154,6 @@ def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: optim.Optimi def optimizer_step(self, optimizer: optim.Optimizer, **kwargs) -> None: optimizer.step() - @staticmethod - def _unwrap_actor(actor: Actor) -> nn.Module: - model: Union[nn.Module, ZeroDDP] = Strategy._unwrap_actor(actor) - if isinstance(model, ZeroDDP): - return model.module - return model - def save_model(self, model: nn.Module, path: str, @@ -169,7 +162,7 @@ def save_model(self, if only_rank0 and dist.get_rank() != 0: return None - unwrapped_model = self._unwrap_model(model) + base_model = get_base_model(model) # TODO : better way to get torch model from gemini model # to get torch model from gemini model @@ -198,3 +191,10 @@ def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = Fal raise RuntimeError( f'Optimizer states are sharded when using ColossalAIStrategy. Only rank0 is not supported.') torch.save(optimizer.state_dict(), path) + + def unwrap_model(self, model: nn.Module) -> nn.Module: + base_model: Union[nn.Module, ZeroDDP] = get_base_model(model) + if self.stage == 3: + assert isinstance(base_model, ZeroDDP) + return base_model.module + return base_model diff --git a/applications/Chat/coati/trainer/strategies/ddp.py b/applications/Chat/coati/trainer/strategies/ddp.py index c4c632c67d79..f179a5ee9a11 100644 --- a/applications/Chat/coati/trainer/strategies/ddp.py +++ b/applications/Chat/coati/trainer/strategies/ddp.py @@ -68,11 +68,6 @@ def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False pin_memory=pin_memory, collate_fn=replay_buffer.collate_fn) - @staticmethod - def _unwrap_actor(actor: Actor) -> nn.Module: - model: DDP = Strategy._unwrap_actor(actor) - return model.module - def save_model(self, model: nn.Module, path: str, @@ -104,3 +99,7 @@ def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = Fal def setup_sampler(self, dataset) -> DistributedSampler: return DistributedSampler(dataset, dist.get_world_size(), dist.get_rank()) + + def unwrap_model(self, model: nn.Module) -> nn.Module: + base_model: DDP = super().unwrap_model(model) + return base_model.module diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index 7a325eca0fe6..de073ae141b4 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -3,7 +3,7 @@ import torch import torch.nn as nn import torch.optim as optim -from coati.models.base import RewardModel +from coati.models.base import RewardModel, get_base_model from coati.replay_buffer import ReplayBuffer from torch.optim import Optimizer from torch.utils.data import DataLoader @@ -58,9 +58,9 @@ def save_model(self, torch.save(state_dict, path) def load_model(self, model: nn.Module, path: str, map_location: Any = None, strict: bool = True) -> None: - unwrapped_model = self._unwrap_model(model) + base_model = get_base_model(model) state_dict = torch.load(path, map_location=map_location) - unwrapped_model.load_state_dict(state_dict, strict=strict) + base_model.load_state_dict(state_dict, strict=strict) def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = False) -> None: torch.save(optimizer.state_dict(), path) From f5a0821ef1f20154fa8df1b76e0b8dd0d35c0140 Mon Sep 17 00:00:00 2001 From: ver217 Date: Thu, 27 Apr 2023 16:44:34 +0800 Subject: [PATCH 14/23] [chat] strategy refactor save model --- .../Chat/coati/trainer/strategies/base.py | 14 +++-- .../coati/trainer/strategies/colossalai.py | 58 ++++++++----------- .../Chat/coati/trainer/strategies/ddp.py | 37 ++++-------- .../Chat/coati/trainer/strategies/naive.py | 34 +++++------ 4 files changed, 61 insertions(+), 82 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index a0392f63b3bb..b1452869179e 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -105,11 +105,7 @@ def unwrap_model(model: nn.Module) -> nn.Module: return get_base_model(model) @abstractmethod - def save_model(self, - model: nn.Module, - path: str, - only_rank0: bool = False, - tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: + def save_model(self, model: nn.Module, path: str, only_rank0: bool = True) -> None: pass @abstractmethod @@ -126,3 +122,11 @@ def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = No def setup_sampler(self, dataset) -> DistributedSampler: return DistributedSampler(dataset, 1, 0) + + @abstractmethod + def save_pretrained(self, + model: nn.Module, + path: str, + only_rank0: bool = True, + tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: + pass diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index a7b03384eb74..f9b333458dec 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -5,10 +5,8 @@ import torch.distributed as dist import torch.nn as nn import torch.optim as optim -from coati.models.base import Actor, RewardModel, get_base_model -from coati.models.lora import LoraLinear +from coati.models.base import get_base_model from torch.optim import Optimizer -from transformers.modeling_utils import PreTrainedModel from transformers.tokenization_utils_base import PreTrainedTokenizerBase import colossalai @@ -17,9 +15,7 @@ from colossalai.tensor import ProcessGroup, ShardSpec from colossalai.utils import get_current_device from colossalai.zero import ColoInitContext, ZeroDDP, zero_model_wrapper, zero_optim_wrapper -from colossalai.zero.gemini.utils import get_static_torch_model -from .base import Strategy from .ddp import DDPStrategy logger = get_dist_logger(__name__) @@ -141,7 +137,7 @@ def setup_model(self, model: nn.Module) -> nn.Module: model = zero_model_wrapper(model, zero_stage=self.stage, gemini_config=self.gemini_config) if self.stage != 3 and self.precision == 'fp16': - model = model.half() + model = model.half().cuda() return model def setup_optimizer(self, optimizer: optim.Optimizer, model: nn.Module) -> optim.Optimizer: @@ -154,37 +150,20 @@ def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: optim.Optimi def optimizer_step(self, optimizer: optim.Optimizer, **kwargs) -> None: optimizer.step() - def save_model(self, - model: nn.Module, - path: str, - only_rank0: bool = True, - tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: - - if only_rank0 and dist.get_rank() != 0: - return None + def save_model(self, model: nn.Module, path: str, only_rank0: bool = True) -> None: + if only_rank0 and dist.get_rank() != 0 and self.stage != 3: + return base_model = get_base_model(model) - # TODO : better way to get torch model from gemini model - # to get torch model from gemini model - - if isinstance(unwrapped_model, RewardModel): - state_dict = unwrapped_model.state_dict() - if only_rank0 and dist.get_rank() != 0: - return - torch.save(state_dict, path) + if self.stage == 3: + assert isinstance(base_model, ZeroDDP) + # for stage 3, state_dict() method should be called on every rank + state_dict = base_model.state_dict(only_rank_0=only_rank0) else: - try: - logger.info(f'Saving model to {path}', ranks=[0]) - unwrapped_model.save_pretrained(path) - logger.info(f'Model saved to {path} Successfully', ranks=[0]) - if tokenizer is not None: - logger.info(f'Saving tokenizer to {path}', ranks=[0]) - tokenizer.save_pretrained(path) - logger.info(f'Tokenizer saved to {path} Successfully', ranks=[0]) - except AttributeError: - state_dict = unwrapped_model.state_dict() - if only_rank0 and dist.get_rank() != 0: - return - torch.save(state_dict, path) + # only_rank0 is false or rank == 0 + state_dict = base_model.state_dict() + if only_rank0 and dist.get_rank() != 0: + return + torch.save(state_dict, path) def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = False) -> None: if only_rank0: @@ -198,3 +177,12 @@ def unwrap_model(self, model: nn.Module) -> nn.Module: assert isinstance(base_model, ZeroDDP) return base_model.module return base_model + + def save_pretrained(self, + model: nn.Module, + path: str, + only_rank0: bool = True, + tokenizer: PreTrainedTokenizerBase | None = None) -> None: + if self.stage == 3: + raise RuntimeError('ColossalAI strategy with stage-3 does not support save_pretrained() now') + super().save_pretrained(model, path, only_rank0, tokenizer) diff --git a/applications/Chat/coati/trainer/strategies/ddp.py b/applications/Chat/coati/trainer/strategies/ddp.py index f179a5ee9a11..3f22bdfc1003 100644 --- a/applications/Chat/coati/trainer/strategies/ddp.py +++ b/applications/Chat/coati/trainer/strategies/ddp.py @@ -1,19 +1,16 @@ import os import random -from typing import Optional import numpy as np import torch import torch.distributed as dist import torch.nn as nn -from coati.models.base import Actor, RewardModel from coati.replay_buffer import ReplayBuffer from torch.nn.parallel import DistributedDataParallel as DDP from torch.optim import Optimizer from torch.utils.data import DataLoader from transformers.tokenization_utils_base import PreTrainedTokenizerBase -from .base import Strategy from .naive import NaiveStrategy from .sampler import DistributedSampler @@ -68,29 +65,10 @@ def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False pin_memory=pin_memory, collate_fn=replay_buffer.collate_fn) - def save_model(self, - model: nn.Module, - path: str, - only_rank0: bool = False, - tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: + def save_model(self, model: nn.Module, path: str, only_rank0: bool = True) -> None: if only_rank0 and dist.get_rank() != 0: - return None - - if isinstance(model, RewardModel): - state_dict = model.state_dict() - if only_rank0 and dist.get_rank() != 0: - return - torch.save(state_dict, path) - else: - try: - model.save_pretrained(path) - if tokenizer is not None: - tokenizer.save_pretrained(path) - except AttributeError: - state_dict = model.state_dict() - if only_rank0 and dist.get_rank() != 0: - return - torch.save(state_dict, path) + return + super().save_model(model, path, only_rank0) def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = False) -> None: if only_rank0 and dist.get_rank() != 0: @@ -103,3 +81,12 @@ def setup_sampler(self, dataset) -> DistributedSampler: def unwrap_model(self, model: nn.Module) -> nn.Module: base_model: DDP = super().unwrap_model(model) return base_model.module + + def save_pretrained(self, + model: nn.Module, + path: str, + only_rank0: bool = True, + tokenizer: PreTrainedTokenizerBase | None = None) -> None: + if only_rank0 and dist.get_rank() != 0: + return + super().save_pretrained(model, path, only_rank0, tokenizer) diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index de073ae141b4..a391d258f92c 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -3,10 +3,11 @@ import torch import torch.nn as nn import torch.optim as optim -from coati.models.base import RewardModel, get_base_model +from coati.models.base import get_base_model from coati.replay_buffer import ReplayBuffer from torch.optim import Optimizer from torch.utils.data import DataLoader +from transformers.modeling_utils import PreTrainedModel from transformers.tokenization_utils_base import PreTrainedTokenizerBase from .base import Strategy @@ -40,22 +41,10 @@ def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False pin_memory=pin_memory, collate_fn=replay_buffer.collate_fn) - def save_model(self, - model: nn.Module, - path: str, - only_rank0: bool = False, - tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: - if isinstance(model, RewardModel): - state_dict = model.state_dict() - torch.save(state_dict, path) - else: - try: - model.save_pretrained(path) - if tokenizer is not None: - tokenizer.save_pretrained(path) - except AttributeError: - state_dict = model.state_dict() - torch.save(state_dict, path) + def save_model(self, model: nn.Module, path: str, only_rank0: bool = True) -> None: + base_model = get_base_model(model) + state_dict = base_model.state_dict() + torch.save(state_dict, path) def load_model(self, model: nn.Module, path: str, map_location: Any = None, strict: bool = True) -> None: base_model = get_base_model(model) @@ -68,3 +57,14 @@ def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = Fal def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = None) -> None: state_dict = torch.load(path, map_location=map_location) optimizer.load_state_dict(state_dict) + + def save_pretrained(self, + model: nn.Module, + path: str, + only_rank0: bool = True, + tokenizer: PreTrainedTokenizerBase | None = None) -> None: + unwrapped_model = self.unwrap_model(model) + assert isinstance(unwrapped_model, PreTrainedModel) + unwrapped_model.save_pretrained(path) + if tokenizer is not None: + tokenizer.save_pretrained(path) From 47b47eb32abe291a9050865a3fea333daf0546e1 Mon Sep 17 00:00:00 2001 From: ver217 Date: Thu, 27 Apr 2023 16:55:15 +0800 Subject: [PATCH 15/23] [chat] add docstr --- applications/Chat/coati/models/base/__init__.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/applications/Chat/coati/models/base/__init__.py b/applications/Chat/coati/models/base/__init__.py index 73ae891bae88..fe4152f2b760 100644 --- a/applications/Chat/coati/models/base/__init__.py +++ b/applications/Chat/coati/models/base/__init__.py @@ -6,6 +6,16 @@ def get_base_model(model: nn.Module) -> nn.Module: + """Get the base model of our wrapper classes. + For Actor, it's base model is ``actor.model`` and it's usually a ``transformers.PreTrainedModel``. + For Critic and RewardModel, it's base model is itself. + + Args: + model (nn.Module): model to get base model from + + Returns: + nn.Module: the base model + """ if isinstance(model, Actor): return model.get_base_model() return model From 1e5603400a9abea36241d88b4558b56986f552ac Mon Sep 17 00:00:00 2001 From: ver217 Date: Thu, 27 Apr 2023 16:56:44 +0800 Subject: [PATCH 16/23] [chat] refactor trainer save model --- applications/Chat/coati/trainer/ppo.py | 7 ++----- applications/Chat/coati/trainer/rm.py | 11 ++++------- applications/Chat/coati/trainer/sft.py | 7 ++----- 3 files changed, 8 insertions(+), 17 deletions(-) diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index ead39e4919ae..4c5a73dfcbed 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -199,11 +199,8 @@ def training_step(self, experience: Experience) -> Dict[str, float]: return {'reward': experience.reward.mean().item()} - def save_model(self, - path: str, - only_rank0: bool = False, - tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: - self.strategy.save_model(model=self.actor, path=path, only_rank0=only_rank0, tokenizer=tokenizer) + def save_model(self, path: str, only_rank0: bool = False) -> None: + self.strategy.save_model(model=self.actor, path=path, only_rank0=only_rank0) def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, actor: Actor) -> None: diff --git a/applications/Chat/coati/trainer/rm.py b/applications/Chat/coati/trainer/rm.py index ed6720abc2af..ec3ef3f1056f 100644 --- a/applications/Chat/coati/trainer/rm.py +++ b/applications/Chat/coati/trainer/rm.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Optional, List +from typing import List, Optional import pandas as pd import torch @@ -9,8 +9,8 @@ from tqdm import tqdm from transformers.tokenization_utils_base import PreTrainedTokenizerBase -from .callbacks import Callback from .base import Trainer +from .callbacks import Callback from .strategies import Strategy from .utils import is_rank_0 @@ -124,8 +124,5 @@ def fit(self): step_bar.set_postfix({'dist': dist, 'acc': acc}) step_bar.close() - def save_model(self, - path: str, - only_rank0: bool = False, - tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: - self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0, tokenizer=tokenizer) + def save_model(self, path: str, only_rank0: bool = False) -> None: + self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0) diff --git a/applications/Chat/coati/trainer/sft.py b/applications/Chat/coati/trainer/sft.py index 64a330eebeb8..73cce3410519 100644 --- a/applications/Chat/coati/trainer/sft.py +++ b/applications/Chat/coati/trainer/sft.py @@ -134,8 +134,5 @@ def fit(self, logger, use_wandb: bool = False): # epoch_bar.update() - def save_model(self, - path: str, - only_rank0: bool = False, - tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: - self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0, tokenizer=tokenizer) + def save_model(self, path: str, only_rank0: bool = False) -> None: + self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0) From cfce710d0f146b843344cd8aeabc01f02ad19e78 Mon Sep 17 00:00:00 2001 From: ver217 Date: Thu, 27 Apr 2023 16:59:22 +0800 Subject: [PATCH 17/23] [chat] fix strategy typing --- applications/Chat/coati/trainer/strategies/colossalai.py | 2 +- applications/Chat/coati/trainer/strategies/ddp.py | 3 ++- applications/Chat/coati/trainer/strategies/naive.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index f9b333458dec..8aa302c77eee 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -182,7 +182,7 @@ def save_pretrained(self, model: nn.Module, path: str, only_rank0: bool = True, - tokenizer: PreTrainedTokenizerBase | None = None) -> None: + tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: if self.stage == 3: raise RuntimeError('ColossalAI strategy with stage-3 does not support save_pretrained() now') super().save_pretrained(model, path, only_rank0, tokenizer) diff --git a/applications/Chat/coati/trainer/strategies/ddp.py b/applications/Chat/coati/trainer/strategies/ddp.py index 3f22bdfc1003..7910b57878f8 100644 --- a/applications/Chat/coati/trainer/strategies/ddp.py +++ b/applications/Chat/coati/trainer/strategies/ddp.py @@ -1,5 +1,6 @@ import os import random +from typing import Optional import numpy as np import torch @@ -86,7 +87,7 @@ def save_pretrained(self, model: nn.Module, path: str, only_rank0: bool = True, - tokenizer: PreTrainedTokenizerBase | None = None) -> None: + tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: if only_rank0 and dist.get_rank() != 0: return super().save_pretrained(model, path, only_rank0, tokenizer) diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index a391d258f92c..4d94026ce932 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -62,7 +62,7 @@ def save_pretrained(self, model: nn.Module, path: str, only_rank0: bool = True, - tokenizer: PreTrainedTokenizerBase | None = None) -> None: + tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: unwrapped_model = self.unwrap_model(model) assert isinstance(unwrapped_model, PreTrainedModel) unwrapped_model.save_pretrained(path) From 0c251238f5cf88ab684a26b9f60d2a7f0b3519a1 Mon Sep 17 00:00:00 2001 From: ver217 Date: Thu, 27 Apr 2023 17:29:48 +0800 Subject: [PATCH 18/23] [chat] refactor trainer save model --- applications/Chat/coati/trainer/ppo.py | 3 -- applications/Chat/coati/trainer/rm.py | 9 ++--- applications/Chat/coati/trainer/sft.py | 7 ++-- applications/Chat/examples/train_prompts.py | 2 +- .../Chat/examples/train_reward_model.py | 35 ++++++++++++++----- applications/Chat/examples/train_sft.py | 3 +- 6 files changed, 33 insertions(+), 26 deletions(-) diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index 4c5a73dfcbed..fe5ae48d9c2f 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -199,9 +199,6 @@ def training_step(self, experience: Experience) -> Dict[str, float]: return {'reward': experience.reward.mean().item()} - def save_model(self, path: str, only_rank0: bool = False) -> None: - self.strategy.save_model(model=self.actor, path=path, only_rank0=only_rank0) - def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, actor: Actor) -> None: origin_model = strategy.unwrap_model(actor) diff --git a/applications/Chat/coati/trainer/rm.py b/applications/Chat/coati/trainer/rm.py index ec3ef3f1056f..cdae5108ab00 100644 --- a/applications/Chat/coati/trainer/rm.py +++ b/applications/Chat/coati/trainer/rm.py @@ -41,20 +41,18 @@ def __init__( train_dataloader: DataLoader, valid_dataloader: DataLoader, eval_dataloader: DataLoader, - batch_size: int = 1, max_epochs: int = 1, callbacks: List[Callback] = [], ) -> None: super().__init__(strategy, max_epochs, callbacks=callbacks) - train_sampler = None self.train_dataloader = train_dataloader self.valid_dataloader = valid_dataloader self.eval_dataloader = eval_dataloader - self.model = strategy.setup_model(model) + self.model = model self.loss_fn = loss_fn - self.optimizer = strategy.setup_optimizer(optim, self.model) + self.optimizer = optim self.scheduler = lr_scheduler.CosineAnnealingLR(self.optimizer, self.train_dataloader.__len__() // 100) def eval_acc(self, dataloader): @@ -123,6 +121,3 @@ def fit(self): epoch_bar.update() step_bar.set_postfix({'dist': dist, 'acc': acc}) step_bar.close() - - def save_model(self, path: str, only_rank0: bool = False) -> None: - self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0) diff --git a/applications/Chat/coati/trainer/sft.py b/applications/Chat/coati/trainer/sft.py index 73cce3410519..0c09f4151a99 100644 --- a/applications/Chat/coati/trainer/sft.py +++ b/applications/Chat/coati/trainer/sft.py @@ -49,8 +49,8 @@ def __init__( super().__init__(strategy, max_epochs, callbacks=callbacks) self.train_dataloader = train_dataloader self.eval_dataloader = eval_dataloader - - (self.model, self.optimizer) = strategy.prepare((model, optim)) + self.model = model + self.optimizer = optim self.accimulation_steps = accimulation_steps num_update_steps_per_epoch = len(train_dataloader) // self.accimulation_steps @@ -133,6 +133,3 @@ def fit(self, logger, use_wandb: bool = False): logger.info(f'Eval Epoch {epoch}/{self.max_epochs} loss {loss_mean}') # epoch_bar.update() - - def save_model(self, path: str, only_rank0: bool = False) -> None: - self.strategy.save_model(model=self.model, path=path, only_rank0=only_rank0) diff --git a/applications/Chat/examples/train_prompts.py b/applications/Chat/examples/train_prompts.py index 292caa1b36b1..f4563630aad6 100644 --- a/applications/Chat/examples/train_prompts.py +++ b/applications/Chat/examples/train_prompts.py @@ -194,7 +194,7 @@ def main(args): update_timesteps=args.update_timesteps) # save model checkpoint after fitting - trainer.save_model(args.save_path, only_rank0=True, tokenizer=tokenizer) + strategy.save_model(actor, args.save_path, only_rank0=True) # save optimizer checkpoint on all ranks if args.need_optim_ckpt: strategy.save_optimizer(actor_optim, diff --git a/applications/Chat/examples/train_reward_model.py b/applications/Chat/examples/train_reward_model.py index 6a788a891ca6..5198c98dbd15 100644 --- a/applications/Chat/examples/train_reward_model.py +++ b/applications/Chat/examples/train_reward_model.py @@ -124,11 +124,23 @@ def train(args): raise ValueError(f'Unsupported dataset "{args.dataset}"') if dist.is_initialized() and dist.get_world_size() > 1: - train_sampler = DistributedSampler(train_dataset, shuffle=True, seed=42, drop_last=True, rank=dist.get_rank(), + train_sampler = DistributedSampler(train_dataset, + shuffle=True, + seed=42, + drop_last=True, + rank=dist.get_rank(), num_replicas=dist.get_world_size()) - valid_sampler = DistributedSampler(valid_dataset, shuffle=True, seed=42, drop_last=True, rank=dist.get_rank(), + valid_sampler = DistributedSampler(valid_dataset, + shuffle=True, + seed=42, + drop_last=True, + rank=dist.get_rank(), num_replicas=dist.get_world_size()) - eval_sampler = DistributedSampler(eval_dataset, shuffle=True, seed=42, drop_last=True, rank=dist.get_rank(), + eval_sampler = DistributedSampler(eval_dataset, + shuffle=True, + seed=42, + drop_last=True, + rank=dist.get_rank(), num_replicas=dist.get_world_size()) else: train_sampler = None @@ -141,13 +153,19 @@ def train(args): batch_size=args.batch_size, pin_memory=True) - valid_dataloader = DataLoader(valid_dataset, shuffle=(valid_sampler is None), + valid_dataloader = DataLoader(valid_dataset, + shuffle=(valid_sampler is None), sampler=valid_sampler, - batch_size=args.batch_size, pin_memory=True) + batch_size=args.batch_size, + pin_memory=True) - eval_dataloader = DataLoader(eval_dataset, shuffle=(eval_sampler is None), - sampler=eval_sampler, batch_size=args.batch_size, pin_memory=True) + eval_dataloader = DataLoader(eval_dataset, + shuffle=(eval_sampler is None), + sampler=eval_sampler, + batch_size=args.batch_size, + pin_memory=True) + (model, optim) = strategy.prepare((model, optim)) trainer = RewardModelTrainer(model=model, strategy=strategy, optim=optim, @@ -155,12 +173,11 @@ def train(args): train_dataloader=train_dataloader, valid_dataloader=valid_dataloader, eval_dataloader=eval_dataloader, - batch_size=args.batch_size, max_epochs=args.max_epochs) trainer.fit() # save model checkpoint after fitting on only rank0 - trainer.save_model(path=args.save_path, only_rank0=True, tokenizer=tokenizer) + strategy.save_model(model, args.save_path, only_rank0=True) # save optimizer checkpoint on all ranks if args.need_optim_ckpt: strategy.save_optimizer(trainer.optimizer, diff --git a/applications/Chat/examples/train_sft.py b/applications/Chat/examples/train_sft.py index eecf4d89dec7..b35d228dc593 100644 --- a/applications/Chat/examples/train_sft.py +++ b/applications/Chat/examples/train_sft.py @@ -152,6 +152,7 @@ def train(args): else: eval_dataloader = None + (model, optim) = strategy.prepare((model, optim)) trainer = SFTTrainer(model=model, strategy=strategy, optim=optim, @@ -163,7 +164,7 @@ def train(args): trainer.fit(logger=logger, use_wandb=args.use_wandb) # save model checkpoint after fitting on only rank0 - trainer.save_model(path=args.save_path, only_rank0=True, tokenizer=tokenizer) + strategy.save_pretrained(model, path=args.save_path, only_rank0=True, tokenizer=tokenizer) # save optimizer checkpoint on all ranks if args.need_optim_ckpt: strategy.save_optimizer(trainer.optimizer, From 53a3b90a7cf0df7568a8517f6362308f2a424bb8 Mon Sep 17 00:00:00 2001 From: ver217 Date: Thu, 27 Apr 2023 17:43:06 +0800 Subject: [PATCH 19/23] [chat] update readme --- applications/Chat/README.md | 12 ++++++++++-- applications/Chat/examples/README.md | 27 +++------------------------ 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/applications/Chat/README.md b/applications/Chat/README.md index dea562c4d2ad..9441a733a5cc 100644 --- a/applications/Chat/README.md +++ b/applications/Chat/README.md @@ -243,6 +243,7 @@ from coati.trainer import SFTTrainer model = LlamaLM(pretrained=args.pretrain) tokenizer = AutoTokenizer.from_pretrained(args.pretrain) +(model, optim) = strategy.prepare((model, optim)) trainer = SFTTrainer(model=model, strategy=strategy, optim=optim, @@ -254,7 +255,11 @@ trainer = SFTTrainer(model=model, ) trainer.fit() -trainer.save_model(path=args.save_path, only_rank0=True, tokenizer=tokenizer) +# this saves in pytorch format +strategy.save_model(model, args.save_path, only_rank0=True) + +# this saves in HF format. ColossalAI strategy with stage-3 doesn't support this method +strategy.save_pretrained(model, args.save_path, only_rank0=True, tokenizer=tokenizer) ``` @@ -263,7 +268,7 @@ trainer.save_model(path=args.save_path, only_rank0=True, tokenizer=tokenizer) Here are some examples that can allow you to train a 7B model on a single or multiple consumer-grade GPUs. -If you only have a single 24G GPU, you can use the following script. `batch_size` and `lora_rank` are the most important parameters to successfully train the model. +If you only have a single 24G GPU, you can use the following script. `batch_size`, `lora_rank` and `grad_checkpoint` are the most important parameters to successfully train the model. ``` torchrun --standalone --nproc_per_node=1 train_sft.py \ --pretrain "/path/to/LLaMa-7B/" \ @@ -278,6 +283,7 @@ torchrun --standalone --nproc_per_node=1 train_sft.py \ --max_datasets_size 512 \ --max_epochs 1 \ --lora_rank 16 \ + --grad_checkpoint ``` `colossalai_gemini` strategy can enable a single 24G GPU to train the whole model without using LoRA if you have sufficient CPU memory. You can use the following script. @@ -294,6 +300,7 @@ torchrun --standalone --nproc_per_node=1 train_sft.py \ --lr 2e-5 \ --max_datasets_size 512 \ --max_epochs 1 \ + --grad_checkpoint ``` If you have 4x32 GB GPUs, you can even train the whole 7B model using our `colossalai_zero2_cpu` strategy! The script is given as follows. @@ -310,6 +317,7 @@ torchrun --standalone --nproc_per_node=4 train_sft.py \ --lr 2e-5 \ --max_datasets_size 512 \ --max_epochs 1 \ + --grad_checkpoint ``` diff --git a/applications/Chat/examples/README.md b/applications/Chat/examples/README.md index 0083dc37227f..e3880c7e4c0c 100644 --- a/applications/Chat/examples/README.md +++ b/applications/Chat/examples/README.md @@ -66,6 +66,7 @@ torchrun --standalone --nproc_per_node=4 train_sft.py \ --lr 2e-5 \ --max_datasets_size 512 \ --max_epochs 1 \ + --grad_checkpoint ``` ### Arg List - --strategy: the strategy using for training, choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], default='naive' @@ -78,6 +79,7 @@ torchrun --standalone --nproc_per_node=4 train_sft.py \ - --batch_size: batch size while training, type=int, default=4 - --lora_rank: low-rank adaptation matrices rank, type=int, default=0 - --log_interval: how many steps to log, type=int, default=100 +- --grad_checkpoint: enable gradient checkpointing, type=bool, default=False ## Stage2 - Training reward model @@ -152,7 +154,7 @@ torchrun --standalone --nproc_per_node=4 train_prompts.py \ --rm_path /your/rm/model/path ``` -Prompt dataset: the instruction dataset mentioned in the above figure which includes the instructions, e.g. you can use [seed_prompts_ch.jsonl](https://github.com/XueFuzhao/InstructionWild/blob/main/data/seed_prompts_ch.jsonl) or [seed_prompts_en.jsonl](https://github.com/XueFuzhao/InstructionWild/blob/main/data/seed_prompts_en.jsonl) in InstructionWild. +Prompt dataset: the instruction dataset mentioned in the above figure which includes the instructions, e.g. you can use [seed_prompts_ch.jsonl](https://github.com/XueFuzhao/InstructionWild/blob/main/data/seed_prompts_ch.jsonl) or [seed_prompts_en.jsonl](https://github.com/XueFuzhao/InstructionWild/blob/main/data/seed_prompts_en.jsonl) in InstructionWild. Pretrain dataset: the pretrain dataset including the instruction and corresponding response, e.g. you can use the [InstructWild Data](https://github.com/XueFuzhao/InstructionWild/tree/main/data) in stage 1 supervised instructs tuning. ### Arg List @@ -254,29 +256,6 @@ class CoatiActor(Actor): super().__init__(model, lora_rank, lora_train_bias) ``` -### LM model - -``` -from ..base import LM -from transformers.models.coati import CoatiModel - -class GPTLM(LM): - - def __init__(self, - pretrained: Optional[str] = None, - checkpoint: bool = False, - lora_rank: int = 0, - lora_train_bias: str = 'none') -> None: - if pretrained is not None: - model = CoatiModel.from_pretrained(pretrained) - else: - model = build_model() # load your own model if it is not support in transformers - - super().__init__(model, lora_rank, lora_train_bias) - - def forward(self, input_ids, attention_mask=None, labels=None, **kwargs): - return self.model(input_ids, attention_mask=attention_mask, labels=labels, **kwargs) -``` ### Reward model ``` from ..base import RewardModel From b762e156b835ffa5bf780a482a6bcc0fa98e7866 Mon Sep 17 00:00:00 2001 From: ver217 Date: Thu, 27 Apr 2023 17:56:13 +0800 Subject: [PATCH 20/23] [chat] fix unit test --- applications/Chat/tests/test_checkpoint.py | 1 - 1 file changed, 1 deletion(-) diff --git a/applications/Chat/tests/test_checkpoint.py b/applications/Chat/tests/test_checkpoint.py index 29617f205c46..4c05a3431699 100644 --- a/applications/Chat/tests/test_checkpoint.py +++ b/applications/Chat/tests/test_checkpoint.py @@ -82,7 +82,6 @@ def run_dist(rank, world_size, port, strategy): run_test_checkpoint(strategy) -@pytest.mark.skip('temporarily skip until refactor strategy unwrap') @pytest.mark.dist @pytest.mark.parametrize('world_size', [2]) @pytest.mark.parametrize('strategy', ['ddp', 'colossalai_zero2', 'colossalai_gemini']) From eafca7780ea71e14464253f73288ce26429127de Mon Sep 17 00:00:00 2001 From: csric Date: Thu, 27 Apr 2023 18:10:22 +0800 Subject: [PATCH 21/23] working on lora reconstruction --- .../Chat/coati/ray/detached_trainer_ppo.py | 8 ++- .../Chat/coati/ray/experience_maker_holder.py | 2 + applications/Chat/coati/ray/utils.py | 50 ++++++++++++++----- .../Chat/coati/trainer/strategies/base.py | 4 ++ applications/Chat/examples/ray/mmmt_prompt.py | 4 +- 5 files changed, 53 insertions(+), 15 deletions(-) diff --git a/applications/Chat/coati/ray/detached_trainer_ppo.py b/applications/Chat/coati/ray/detached_trainer_ppo.py index d30158019d65..e703a17610f6 100644 --- a/applications/Chat/coati/ray/detached_trainer_ppo.py +++ b/applications/Chat/coati/ray/detached_trainer_ppo.py @@ -120,7 +120,13 @@ def _update_remote_makers(self, fully_update: bool = False, **config): ray.get(tasks) # sending loop tasks = [] - for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_model(self.actor), **config): + + # patch fix here. TODO: refactor _unwrap_xxx. + if isinstance(self.strategy, DDPStrategy): + unwrapped_actor = self.strategy._unwrap_actor(self.actor) + else: + unwrapped_actor = self.strategy._unwrap_model(self.actor) + for state_dict_shard in self._get_model_state_dict_shard(unwrapped_actor, **config): for target_holder in self.target_holder_list: tasks.append( target_holder.update_experience_maker.remote(new_actor_state_dict=state_dict_shard, diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index 573771ad6258..1febeb9896e9 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -189,6 +189,8 @@ def update_experience_maker(self, with torch.no_grad(): if new_actor_state_dict is not None: self.experience_maker.actor.model.load_state_dict(new_actor_state_dict, strict=False) + print(new_actor_state_dict.keys()) + print(self.experience_maker.actor.model.state_dict().keys()) if new_critic_state_dict is not None: self.experience_maker.critic.load_state_dict(new_critic_state_dict, strict=False) diff --git a/applications/Chat/coati/ray/utils.py b/applications/Chat/coati/ray/utils.py index 6cd7c564cc92..48f33e70c632 100644 --- a/applications/Chat/coati/ray/utils.py +++ b/applications/Chat/coati/ray/utils.py @@ -120,18 +120,6 @@ def set_dist_env(env_info: Dict[str, str]): os.environ['MASTER_ADDR'] = env_info['master_addr'] -def state_dict_to(state_dict: Dict[str, Any], - dtype: torch.dtype = torch.float16, - device: torch.device = torch.device('cpu')): - ''' - keep state_dict intact - ''' - new_state_dict = {} - for k, v in state_dict.items(): - new_state_dict[k] = v.to(dtype=dtype, device=device) - return new_state_dict - - def get_model_numel(model: nn.Module) -> int: numel = sum(p.numel() for p in model.parameters()) return numel @@ -150,3 +138,41 @@ def get_receivers_per_sender(sender_idx: int, num_senders: int, num_receivers: i # a receiver may have more than one sender target_receivers.append(sender_idx % num_receivers) return target_receivers + + +def state_dict_to(state_dict: Dict[str, Any], + dtype: torch.dtype = torch.float16, + device: torch.device = torch.device('cpu')): + ''' + keep state_dict intact + ''' + new_state_dict = OrderedDict() + for k, v in state_dict.items(): + new_state_dict[k] = v.to(dtype=dtype, device=device) + return new_state_dict + + +def state_dict_filter_lora(state_dict: Dict[str, Any], keep_non_lora = False): + ''' + if keep_non_lora, also return non_lora state_dict + ''' + state_dict_lora = OrderedDict() + state_dict_non_lora = OrderedDict() + for k, v in state_dict: + if 'lora_A' in k or 'lora_B' in k: + state_dict_lora[k] = v + elif keep_non_lora: + state_dict_non_lora[k] = v + if keep_non_lora: + return state_dict_lora, state_dict_non_lora + else: + return state_dict_lora + + +def state_dict_lora_reconstruct(state_dict_lora: Dict[str, Any]): + ''' + xxx.lora_A, xxx.lora_B -->> xxx.weight + ''' + state_dict_reconstruct = OrderedDict() + + \ No newline at end of file diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index e0232fbc64ad..981557526693 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -139,3 +139,7 @@ def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = No def setup_sampler(self, dataset) -> DistributedSampler: return DistributedSampler(dataset, 1, 0) + + @abstractmethod + def get_model_state_dict_shard(self, model: nn.Module, **config): + pass \ No newline at end of file diff --git a/applications/Chat/examples/ray/mmmt_prompt.py b/applications/Chat/examples/ray/mmmt_prompt.py index 84d7bf5d92bf..fdfa5fa18758 100644 --- a/applications/Chat/examples/ray/mmmt_prompt.py +++ b/applications/Chat/examples/ray/mmmt_prompt.py @@ -101,8 +101,8 @@ def model_fn(): ] def trainer_model_fn(): - actor = get_actor_from_args(args.model, args.pretrain).half().cuda() - critic = get_critic_from_args(args.model, args.critic_pretrain).half().cuda() + actor = get_actor_from_args(args.model, args.pretrain, lora_rank=args.lora_rank).half().cuda() + critic = get_critic_from_args(args.model, args.critic_pretrain, lora_rank=args.lora_rank).half().cuda() return actor, critic # configure Trainer From ed2dd61cc82219a37f95c07809db2444291ac417 Mon Sep 17 00:00:00 2001 From: csric Date: Thu, 27 Apr 2023 18:43:01 +0800 Subject: [PATCH 22/23] state_dict sending adapts to new unwrap function --- .../Chat/coati/ray/detached_trainer_ppo.py | 31 ++----------------- .../Chat/coati/ray/experience_maker_holder.py | 6 ++-- .../coati/trainer/strategies/colossalai.py | 4 +-- .../Chat/coati/trainer/strategies/naive.py | 3 +- 4 files changed, 10 insertions(+), 34 deletions(-) diff --git a/applications/Chat/coati/ray/detached_trainer_ppo.py b/applications/Chat/coati/ray/detached_trainer_ppo.py index e703a17610f6..d3dfc6e93a46 100644 --- a/applications/Chat/coati/ray/detached_trainer_ppo.py +++ b/applications/Chat/coati/ray/detached_trainer_ppo.py @@ -121,18 +121,13 @@ def _update_remote_makers(self, fully_update: bool = False, **config): # sending loop tasks = [] - # patch fix here. TODO: refactor _unwrap_xxx. - if isinstance(self.strategy, DDPStrategy): - unwrapped_actor = self.strategy._unwrap_actor(self.actor) - else: - unwrapped_actor = self.strategy._unwrap_model(self.actor) - for state_dict_shard in self._get_model_state_dict_shard(unwrapped_actor, **config): + for state_dict_shard in self._get_model_state_dict_shard(self.actor, **config): for target_holder in self.target_holder_list: tasks.append( target_holder.update_experience_maker.remote(new_actor_state_dict=state_dict_shard, fully_update=fully_update)) # sending loop - for state_dict_shard in self._get_model_state_dict_shard(self.strategy._unwrap_critic(self.critic), **config): + for state_dict_shard in self._get_model_state_dict_shard(self.critic, **config): for target_holder in self.target_holder_list: tasks.append( target_holder.update_experience_maker.remote(new_critic_state_dict=state_dict_shard, @@ -182,28 +177,6 @@ def strategy_save_actor_optim(self, path: str, only_rank0: bool = False) -> None def strategy_save_critic_optim(self, path: str, only_rank0: bool = False) -> None: self.strategy.save_optimizer(self.critic_optim, path, only_rank0) - def _get_unwrapped_actor(self): - if False: - pass - elif isinstance(self.strategy, ColossalAIStrategy): - ret = Actor(self.strategy._unwrap_model(self.actor)) - return ret - elif isinstance(self.strategy, DDPStrategy): - return Actor(self.strategy._unwrap_actor(self.actor)) - elif isinstance(self.strategy, NaiveStrategy): - return self.actor - - def _get_unwrapped_critic(self): - if False: - pass - elif isinstance(self.strategy, ColossalAIStrategy): - ret = self.strategy._unwrap_model(self.critic) - return ret - elif isinstance(self.strategy, DDPStrategy): - return self.critic.module - elif isinstance(self.strategy, NaiveStrategy): - return self.critic - def _get_model_state_dict_shard(self, model: torch.nn.Module, **config): # try: # self.strategy.merge_lora_weight(model) diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index 1febeb9896e9..cbb247224046 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -189,10 +189,12 @@ def update_experience_maker(self, with torch.no_grad(): if new_actor_state_dict is not None: self.experience_maker.actor.model.load_state_dict(new_actor_state_dict, strict=False) - print(new_actor_state_dict.keys()) - print(self.experience_maker.actor.model.state_dict().keys()) + # print("new actor", new_actor_state_dict.keys()) + # print("local actor", self.experience_maker.actor.model.state_dict().keys()) if new_critic_state_dict is not None: self.experience_maker.critic.load_state_dict(new_critic_state_dict, strict=False) + # print("new critic", new_critic_state_dict.keys()) + # print("local critic", self.experience_maker.critic.state_dict().keys()) # the lock must be released after both actor and critic being updated diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index 320fbd6471e5..bc7010036aa7 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -196,5 +196,5 @@ def get_model_state_dict_shard(self, model: nn.Module, **config): # if isinstance(module, LoraLinear): # module.merge_weights = True # module.eval() - model: ZeroDDP = model - yield from model.state_dict_shard(max_shard_size=1024, only_rank_0=False) + base_model: ZeroDDP = get_base_model(model) + yield from base_model.state_dict_shard(max_shard_size=1024, only_rank_0=False) diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index f8b3b3c1d62c..638098c93045 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -8,7 +8,7 @@ import torch.optim as optim from coati.models.base import get_base_model from coati.replay_buffer import ReplayBuffer -from coati.models.base import LM, RewardModel +from coati.models.base import RewardModel from coati.models.lora import LoraLinear from coati.replay_buffer import ReplayBuffer from torch.optim import Optimizer @@ -85,6 +85,7 @@ def save_pretrained(self, def get_model_state_dict_shard(self, model: nn.Module, **config): # TODO: implement sharding on naive strategy + model = self.unwrap_model(model) if 'requires_grad_only' in config and config['requires_grad_only'] == True: state_dict = get_grad_required_state_dict(model) else: From b79af56c3d854bb8292d87ba97d5d5509ae1829b Mon Sep 17 00:00:00 2001 From: csric Date: Thu, 27 Apr 2023 19:22:44 +0800 Subject: [PATCH 23/23] remove comments --- applications/Chat/coati/ray/experience_maker_holder.py | 4 ---- applications/Chat/coati/trainer/strategies/ddp.py | 1 - applications/Chat/examples/ray/mmmt_prompt.py | 8 ++++---- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/applications/Chat/coati/ray/experience_maker_holder.py b/applications/Chat/coati/ray/experience_maker_holder.py index cbb247224046..573771ad6258 100644 --- a/applications/Chat/coati/ray/experience_maker_holder.py +++ b/applications/Chat/coati/ray/experience_maker_holder.py @@ -189,12 +189,8 @@ def update_experience_maker(self, with torch.no_grad(): if new_actor_state_dict is not None: self.experience_maker.actor.model.load_state_dict(new_actor_state_dict, strict=False) - # print("new actor", new_actor_state_dict.keys()) - # print("local actor", self.experience_maker.actor.model.state_dict().keys()) if new_critic_state_dict is not None: self.experience_maker.critic.load_state_dict(new_critic_state_dict, strict=False) - # print("new critic", new_critic_state_dict.keys()) - # print("local critic", self.experience_maker.critic.state_dict().keys()) # the lock must be released after both actor and critic being updated diff --git a/applications/Chat/coati/trainer/strategies/ddp.py b/applications/Chat/coati/trainer/strategies/ddp.py index 1cfb21a540b0..a1fecb36373f 100644 --- a/applications/Chat/coati/trainer/strategies/ddp.py +++ b/applications/Chat/coati/trainer/strategies/ddp.py @@ -1,7 +1,6 @@ import os import random from typing import Optional -from typing import Optional import numpy as np import torch diff --git a/applications/Chat/examples/ray/mmmt_prompt.py b/applications/Chat/examples/ray/mmmt_prompt.py index fdfa5fa18758..6f43d8950758 100644 --- a/applications/Chat/examples/ray/mmmt_prompt.py +++ b/applications/Chat/examples/ray/mmmt_prompt.py @@ -175,11 +175,11 @@ def tokenize_fn(texts): parser.add_argument('--critic_model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama']) parser.add_argument('--pretrain', type=str, default=None) parser.add_argument('--critic_pretrain', type=str, default=None) - parser.add_argument('--experience_steps', type=int, default=4) # - parser.add_argument('--experience_batch_size', type=int, default=8) # _ * _ + parser.add_argument('--experience_steps', type=int, default=4) + parser.add_argument('--experience_batch_size', type=int, default=8) parser.add_argument('--train_epochs', type=int, default=1) - parser.add_argument('--update_steps', type=int, default=2) # - parser.add_argument('--train_batch_size', type=int, default=8) # + parser.add_argument('--update_steps', type=int, default=2) + parser.add_argument('--train_batch_size', type=int, default=8) parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank") parser.add_argument('--initial_model_quant_ckpt', type=str, default=None)