From 6bcd5b1ded1153c5559dcca19a17f8fceef3ba7a Mon Sep 17 00:00:00 2001 From: jiangwen Date: Wed, 29 Mar 2023 10:53:40 +0800 Subject: [PATCH 01/13] [feat][chatgpt]train prompts on ray example --- .../ChatGPT/examples/train_prompts_on_ray.py | 730 ++++++++++++++++++ ...in_prompts_on_ray_job_submission_script.py | 11 + 2 files changed, 741 insertions(+) create mode 100644 applications/ChatGPT/examples/train_prompts_on_ray.py create mode 100644 applications/ChatGPT/examples/train_prompts_on_ray_job_submission_script.py diff --git a/applications/ChatGPT/examples/train_prompts_on_ray.py b/applications/ChatGPT/examples/train_prompts_on_ray.py new file mode 100644 index 000000000000..fe06446ef741 --- /dev/null +++ b/applications/ChatGPT/examples/train_prompts_on_ray.py @@ -0,0 +1,730 @@ +import argparse +import logging +import os +import socket +from copy import deepcopy + +import ray +import torch +from chatgpt.experience_maker.base import Experience +from chatgpt.models.base import RewardModel +from chatgpt.models.bloom import BLOOMActor, BLOOMCritic +from chatgpt.models.gpt import GPTActor, GPTCritic +from chatgpt.models.loss import PolicyLoss, ValueLoss +from chatgpt.models.opt import OPTActor, OPTCritic +from chatgpt.models.utils import compute_reward +from chatgpt.replay_buffer import NaiveReplayBuffer +from chatgpt.trainer.strategies import ColossalAIStrategy, DDPStrategy, NaiveStrategy +from ray.util.placement_group import placement_group +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy +from torch.optim import Adam +from transformers import AutoTokenizer, BloomTokenizerFast +from transformers.models.gpt2.configuration_gpt2 import GPT2Config +from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer + +from colossalai.nn.optimizer import HybridAdam + + +class ExperienceCompositionRefs: + + def __init__(self, sequences_attention_mask_action_mask_ref, action_log_probs_ref, base_action_log_probs_ref, + value_ref, r_ref) -> None: + self.sequences_attention_mask_action_mask_ref = sequences_attention_mask_action_mask_ref + self.action_log_probs_ref = action_log_probs_ref + self.base_action_log_probs_ref = base_action_log_probs_ref + self.value_ref = value_ref + self.r_ref = r_ref + + +class ExperienceMaker: + + def __init__(self, kl_coef) -> None: + self.kl_coef = kl_coef + + @torch.no_grad() + def make_experience(self, experiment_computation_refs: ExperienceCompositionRefs): + sequences, attention_mask, action_mask = ray.get( + experiment_computation_refs.sequences_attention_mask_action_mask_ref) + action_log_probs = ray.get(experiment_computation_refs.action_log_probs_ref) + base_action_log_probs = ray.get(experiment_computation_refs.base_action_log_probs_ref) + r = ray.get(experiment_computation_refs.r_ref) + reward = compute_reward(r, self.kl_coef, action_log_probs, base_action_log_probs, action_mask=action_mask) + value = ray.get(experiment_computation_refs.value_ref) + advantage = reward - value + if advantage.ndim == 1: + advantage = advantage.unsqueeze(-1) + experience = Experience(sequences, action_log_probs, value, reward, advantage, attention_mask, action_mask) + return experience + + +class BasePPOActor: + + def __init__(self, world_size, rank, local_rank, master_addr, master_port, kl_coef: float = 0.1): + logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S') + self._model = None + self._world_size = world_size + self._rank = rank + self._local_rank = local_rank + self._master_addr = master_addr if master_addr else self._get_current_node_ip() + self._master_port = master_port if master_port else self._get_free_port() + os.environ["MASTER_ADDR"] = self._master_addr + os.environ["MASTER_PORT"] = str(self._master_port) + os.environ["WORLD_SIZE"] = str(self._world_size) + os.environ["RANK"] = str(self._rank) + os.environ["LOCAL_RANK"] = str(self._local_rank) + self._experience_maker = ExperienceMaker(kl_coef) + + def _init_strategy(self, strategy): + # configure strategy + if strategy == 'naive': + self._strategy = NaiveStrategy() + elif strategy == 'ddp': + self._strategy = DDPStrategy() + elif strategy == 'colossalai_gemini': + self._strategy = ColossalAIStrategy(stage=3, placement_policy='cuda', initial_scale=2**5) + elif strategy == 'colossalai_zero2': + self._strategy = ColossalAIStrategy(stage=2, placement_policy='cuda') + else: + raise ValueError(f'Unsupported strategy "{strategy}"') + + def _init_optimizer(self): + if isinstance(self._strategy, ColossalAIStrategy): + self._optimizer = HybridAdam(self._model.parameters(), lr=5e-6) + else: + self._optimizer = Adam(self._model.parameters(), lr=5e-6) + + def _prepare_model_with_strategy(self, has_optimizer: bool): + if has_optimizer: + self._init_optimizer() + (self._model, self._optimizer) = self._strategy.prepare((self._model, self._optimizer)) + else: + self._model = self._strategy.prepare(self._model) + + @staticmethod + def _get_gpt_config(model_name: str): + model_map = { + 's': GPT2Config(), + 'm': GPT2Config(n_embd=1024, n_layer=24, n_head=16), + 'l': GPT2Config(n_embd=1280, n_layer=36, n_head=20), + 'xl': GPT2Config(n_embd=1600, n_layer=48, n_head=25), + '2b': GPT2Config(n_embd=2048, n_layer=40, n_head=16), + '4b': GPT2Config(n_embd=2304, n_layer=64, n_head=16), + '6b': GPT2Config(n_embd=4096, n_layer=30, n_head=16), + '8b': GPT2Config(n_embd=4096, n_layer=40, n_head=16), + '10b': GPT2Config(n_embd=4096, n_layer=50, n_head=16), + '12b': GPT2Config(n_embd=4096, n_layer=60, n_head=16), + '15b': GPT2Config(n_embd=4096, n_layer=78, n_head=16), + '18b': GPT2Config(n_embd=4096, n_layer=90, n_head=16), + '20b': GPT2Config(n_embd=8192, n_layer=25, n_head=16), + '24b': GPT2Config(n_embd=8192, n_layer=30, n_head=16), + '28b': GPT2Config(n_embd=8192, n_layer=35, n_head=16), + '32b': GPT2Config(n_embd=8192, n_layer=40, n_head=16), + '36b': GPT2Config(n_embd=8192, n_layer=45, n_head=16), + '40b': GPT2Config(n_embd=8192, n_layer=50, n_head=16), + '175b': GPT2Config(n_positions=2048, n_embd=12288, n_layer=96, n_head=96), + } + try: + return model_map[model_name] + except KeyError: + raise ValueError(f'Unknown model "{model_name}"') + + @staticmethod + def _get_current_node_ip(): + return ray._private.services.get_node_ip_address() + + @staticmethod + def _get_free_port(): + with socket.socket() as sock: + sock.bind(('', 0)) + return sock.getsockname()[1] + + def get_master_addr_port(self): + return self._master_addr, self._master_port + + def init_model_from_pretrained(self, strategy, model, pretrained_model_name_or_path, has_optimizer=False): + self._init_strategy(strategy) + self._load_model_from_pretrained(model, pretrained_model_name_or_path) + self._prepare_model_with_strategy(has_optimizer) + + def init_model_from_model_name(self, strategy, model, model_name, has_optimizer=False): + self._init_strategy(strategy) + self._load_model_from_model_name(model, model_name) + self._prepare_model_with_strategy(has_optimizer) + + def _load_model_from_pretrained(self, model, pretrained_model_name_or_path): + raise NotImplementedError() + + def _load_model_from_model_name(self, model, model_config): + raise NotImplementedError() + + def eval(self): + self._model.eval() + + def make_experience(self, experience_computation_ref: ExperienceCompositionRefs): + return self._experience_maker.make_experience(experience_computation_ref) + + +class TrainablePPOActor(BasePPOActor): + + def __init__(self, + world_size, + rank, + local_rank, + master_addr, + master_port, + kl_coef: float = 0.1, + train_batch_size: int = 1, + buffer_limit: int = 0, + buffer_cpu_offload: bool = True, + dataloader_pin_memory: bool = True): + super().__init__(world_size, rank, local_rank, master_addr, master_port, kl_coef) + self._replay_buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload) + self._dataloader_pin_memory = dataloader_pin_memory + + def init_model_from_pretrained(self, strategy, model, pretrained_model_name_or_path): + super().init_model_from_pretrained(strategy, model, pretrained_model_name_or_path, True) + + def init_model_from_model_name(self, strategy, model, pretrained_model_name_or_path): + super().init_model_from_model_name(strategy, model, pretrained_model_name_or_path, True) + + def _train(self): + self._model.train() + + def _training_step(self, experience: Experience): + pass + + def learn_on_experiences(self, experience_refs): + experiences = ray.get(experience_refs) + device = torch.cuda.current_device() + self._train() + for exp in experiences: + exp.to_device(device) + self._training_step(exp) + self.eval() + + +@ray.remote(num_gpus=1) +class RayPPOActor(TrainablePPOActor): + + def __init__(self, + world_size, + rank, + local_rank, + master_addr, + master_port, + kl_coef: float = 0.1, + train_batch_size: int = 1, + buffer_limit: int = 0, + buffer_cpu_offload: bool = True, + dataloader_pin_memory: bool = True, + eps_clip: float = 0.2): + super().__init__(world_size, rank, local_rank, master_addr, master_port, kl_coef, train_batch_size, + buffer_limit, buffer_cpu_offload, dataloader_pin_memory) + self._actor_loss_fn = PolicyLoss(eps_clip) + + def _load_model_from_pretrained(self, model, pretrained_model_name_or_path): + with self._strategy.model_init_context(): + if model == 'gpt2': + self._model = GPTActor(pretrained=os.path.abspath("gpt2")).to(torch.cuda.current_device()) + elif model == 'bloom': + self._model = BLOOMActor(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + elif model == 'opt': + self._model = OPTActor(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + else: + raise ValueError(f'Unsupported model "{model}"') + + def _load_model_from_model_name(self, model, model_name): + if model == 'gpt2': + model_config = self._get_gpt_config(model_name) + with self._strategy.model_init_context(): + self._model = GPTActor(config=model_config).to(torch.cuda.current_device()) + else: + raise ValueError(f'Unsupported model "{model}"') + + def load_tokenizer_from_pretrained(self, model, pretrained): + if model == 'gpt2': + self._model_tokenizer = GPT2Tokenizer.from_pretrained(os.path.abspath('gpt2')) + self._model_tokenizer.pad_token = self._model_tokenizer.eos_token + elif model == 'bloom': + self._model_tokenizer = BloomTokenizerFast.from_pretrained(pretrained) + self._model_tokenizer.pad_token = self._model_tokenizer.eos_token + elif model == 'opt': + self._model_tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") + else: + raise ValueError(f'Unsupported model "{model}"') + + def set_generate_kwargs(self, generate_kwargs: dict): + self._generate_kwargs = {} + self._generate_kwargs['pad_token_id'] = self._model_tokenizer.pad_token_id + self._generate_kwargs['eos_token_id'] = self._model_tokenizer.eos_token_id + self._generate_kwargs.update(generate_kwargs) + self._generate_kwargs.update(self._get_generate_function_kwargs(generate_kwargs)) + + def _get_generate_function_kwargs(self, generate_kwargs: dict): + origin_model = self._strategy._unwrap_actor(self._model) + _to_update_kwargs = {} + # use huggingface models method directly + if 'prepare_inputs_fn' not in generate_kwargs and hasattr(origin_model, 'prepare_inputs_for_generation'): + _to_update_kwargs['prepare_inputs_fn'] = origin_model.prepare_inputs_for_generation + + def update_model_kwargs_fn(outputs: dict, **model_kwargs) -> dict: + if "past_key_values" in outputs: + model_kwargs["past"] = outputs["past_key_values"] + else: + model_kwargs["past"] = None + + # update token_type_ids with last value + if "token_type_ids" in model_kwargs: + token_type_ids = model_kwargs["token_type_ids"] + model_kwargs["token_type_ids"] = torch.cat([token_type_ids, token_type_ids[:, -1].unsqueeze(-1)], + dim=-1) + + # update attention mask + if "attention_mask" in model_kwargs: + attention_mask = model_kwargs["attention_mask"] + model_kwargs["attention_mask"] = torch.cat( + [attention_mask, attention_mask.new_ones((attention_mask.shape[0], 1))], dim=-1) + + return model_kwargs + + if 'update_model_kwargs_fn' not in generate_kwargs: + _to_update_kwargs['update_model_kwargs_fn'] = update_model_kwargs_fn + return _to_update_kwargs + + def load_csv_prompt_file_from_url(self, url): + import pandas as pd + prompts = pd.read_csv(url)['prompt'] + + # Set tokenize function for training + def _tokenize_fn(texts): + batch = self._model_tokenizer(texts, return_tensors='pt', max_length=96, padding=True, truncation=True) + return {k: v.cuda() for k, v in batch.items()} + + self._tokenize_function = _tokenize_fn + self._sampler = self._strategy.setup_sampler(prompts) + + def _generate(self, input_ids, **generate_kwargs): + return self._model.generate(input_ids, return_action_mask=True, **generate_kwargs) + + @torch.no_grad() + def calculate_action_log_probs(self, sequence_attention_action_mask): + sequences, attention_mask, action_mask = sequence_attention_action_mask + return self._model.forward(sequences, action_mask.size(1), attention_mask) + + def _training_step(self, experience): + num_actions = experience.action_mask.size(1) + action_log_probs = self._model(experience.sequences, num_actions, attention_mask=experience.attention_mask) + actor_loss = self._actor_loss_fn(action_log_probs, + experience.action_log_probs, + experience.advantages, + action_mask=experience.action_mask) + self._strategy.backward(actor_loss, self._model, self._optimizer) + self._strategy.optimizer_step(self._optimizer) + self._optimizer.zero_grad() + logging.info("actor_loss: {}".format(actor_loss)) + + def _sample_prompts(self, experience_batch_size) -> list: + return self._sampler.sample(experience_batch_size) + + def sample_prompts_and_make_sequence(self, experience_batch_size): + input_ids = self._tokenize_function(self._sample_prompts(experience_batch_size)) + if isinstance(input_ids, dict): + return self._generate(**input_ids, **self._generate_kwargs) + else: + return self._generate(input_ids, **self._generate_kwargs) + + def save_checkpoint(self, save_path, should_save_optimizer: bool): + if self._rank == 0: + # save model checkpoint only on rank 0 + self._strategy.save_model(self._model, save_path, only_rank0=True) + # save optimizer checkpoint on all ranks + if should_save_optimizer: + self._strategy.save_optimizer(self._optimizer, + 'actor_optim_checkpoint_prompts_%d.pt' % (torch.cuda.current_device()), + only_rank0=False) + + def generate_answer(self, prompt, max_length=30, num_return_sequences=5): + encoded_input = self._model_tokenizer(prompt, return_tensors='pt') + input_ids = {k: v.cuda() for k, v in encoded_input.items()} + sequence, _ = self._model.generate(**input_ids, + max_length=max_length, + return_action_mask=False, + num_return_sequences=num_return_sequences) + token_list = list(sequence.data[0]) + output = " ".join([self._model_tokenizer.decode(token) for token in token_list]) + return output + + +@ray.remote(num_gpus=1) +class RayPPOCritic(TrainablePPOActor): + + def __init__(self, + world_size, + rank, + local_rank, + master_addr, + master_port, + kl_coef: float = 0.1, + train_batch_size: int = 1, + buffer_limit: int = 0, + buffer_cpu_offload: bool = True, + dataloader_pin_memory: bool = True, + value_clip: float = 0.4): + super().__init__(world_size, rank, local_rank, master_addr, master_port, kl_coef, train_batch_size, + buffer_limit, buffer_cpu_offload, dataloader_pin_memory) + self._critic_loss_fn = ValueLoss(value_clip) + + def _load_model_from_pretrained(self, model, pretrained_model_name_or_path): + with self._strategy.model_init_context(): + if model == 'gpt2': + self._model = GPTCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + elif model == 'bloom': + self._model = BLOOMCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + elif model == 'opt': + self._model = OPTCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + else: + raise ValueError(f'Unsupported model "{model}"') + + def _load_model_from_model_name(self, model, model_name): + if model == 'gpt2': + model_config = self._get_gpt_config(model_name) + with self._strategy.model_init_context(): + self._model = GPTCritic(config=model_config).to(torch.cuda.current_device()) + else: + raise ValueError(f'Unsupported model "{model}"') + + def _training_step(self, experience): + values = self._model(experience.sequences, + action_mask=experience.action_mask, + attention_mask=experience.attention_mask) + critic_loss = self._critic_loss_fn(values, + experience.values, + experience.reward, + action_mask=experience.action_mask) + self._strategy.backward(critic_loss, self._model, self._optimizer) + self._strategy.optimizer_step(self._optimizer) + self._optimizer.zero_grad() + logging.info("critic_loss: {}".format(critic_loss)) + + @torch.no_grad() + def calculate_value(self, sequence_attention_action_mask): + sequences, attention_mask, action_mask = sequence_attention_action_mask + return self._model(sequences, action_mask, attention_mask) + + +@ray.remote(num_gpus=1) +class RayPPORewardModel(BasePPOActor): + + def _load_model_from_pretrained(self, model, pretrained_model_name_or_path): + with self._strategy.model_init_context(): + if model == 'gpt2': + critic = GPTCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + elif model == 'bloom': + critic = BLOOMCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + elif model == 'opt': + critic = OPTCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + else: + raise ValueError(f'Unsupported model "{model}"') + self._model = RewardModel(deepcopy(critic.model), + deepcopy(critic.value_head)).to(torch.cuda.current_device()) + + def _load_model_from_model_name(self, model, model_name): + if model == 'gpt2': + model_config = self._get_gpt_config(model_name) + with self._strategy.model_init_context(): + critic = GPTCritic(config=model_config).to(torch.cuda.current_device()) + else: + raise ValueError(f'Unsupported model "{model}"') + self._model = RewardModel(deepcopy(critic.model), deepcopy(critic.value_head)).to(torch.cuda.current_device()) + + @torch.no_grad() + def calculate_r(self, sequence_attention_action_mask): + sequences, attention_mask, _ = sequence_attention_action_mask + return self._model(sequences, attention_mask) + + +@ray.remote(num_gpus=1) +class RayPPOInitialModel(BasePPOActor): + + def _load_model_from_pretrained(self, model, pretrained_model_name_or_path): + with self._strategy.model_init_context(): + if model == 'gpt2': + self._model = GPTActor(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + elif model == 'bloom': + self._model = BLOOMActor(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + elif model == 'opt': + self._model = OPTActor(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) + else: + raise ValueError(f'Unsupported model "{model}"') + + def _load_model_from_model_name(self, model, model_name): + if model == 'gpt2': + model_config = self._get_gpt_config(model_name) + with self._strategy.model_init_context(): + self._model = GPTActor(config=model_config).to(torch.cuda.current_device()) + else: + raise ValueError(f'Unsupported model "{model}"') + + @torch.no_grad() + def calculate_base_action_log_probs(self, sequence_attention_action_mask): + sequences, attention_mask, action_mask = sequence_attention_action_mask + return self._model(sequences, action_mask.size(1), attention_mask) + + def load_model_from_model_config(self, model): + model_config = self._get_gpt_config(model) + with self._strategy.model_init_context(): + self._model = GPTActor(config=model_config).cuda() + return True + + +class ModelRayActorGroup: + + def __init__(self, num_nodes, num_gpus_per_node, actor_type) -> None: + self._num_nodes = num_nodes + self._num_gpus_per_node = num_gpus_per_node + self._actor_type = actor_type + + def initiate_actors(self): + world_size = self._num_nodes * self._num_gpus_per_node + pg = None + if self._num_gpus_per_node >= 1: + bundles = [{ + "GPU": self._num_gpus_per_node, + "CPU": 1 * self._num_gpus_per_node + } for _ in range(self._num_nodes)] + pg = placement_group(bundles, strategy="STRICT_SPREAD") + ray.get(pg.ready()) + print("PG is ready") + if pg: + master_actor = self._actor_type.options(scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, placement_group_bundle_index=0)).remote(world_size, 0, 0, None, None) + else: + master_actor = self._actor_type.options(num_gpus=1).remote(world_size, 0, 0, None, None) + self._actor_handlers = [master_actor] + if world_size > 1: + master_addr, master_port = ray.get(master_actor.get_master_addr_port.remote()) + for rank in range(1, world_size): + local_rank = rank % self._num_gpus_per_node + if pg: + worker_actor = self._actor_type.options(scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, placement_group_bundle_index=rank // self._num_gpus_per_node)).remote( + world_size, rank, local_rank, master_addr, master_port) + else: + worker_actor = self._actor_type.options(num_gpus=1).remote(world_size, rank, local_rank, + master_addr, master_port) + self._actor_handlers.append(worker_actor) + + def async_init_model_from_pretrained(self, strategy: str, model: str, pretrained_model_name_or_path: str): + return [ + actor.init_model_from_pretrained.remote(strategy, model, pretrained_model_name_or_path) + for actor in self._actor_handlers + ] + + def async_init_model_from_model_name(self, strategy: str, model: str, model_name: str): + return [actor.init_model_from_model_name.remote(strategy, model, model_name) for actor in self._actor_handlers] + + +class TrainableModelRayActorGroup(ModelRayActorGroup): + + def __init__(self, num_nodes, num_gpus_per_node, actor_type) -> None: + super().__init__(num_nodes, num_gpus_per_node, actor_type) + + def async_learn_on_experiences(self, experience_refs): + num_actors = len(self._actor_handlers) + learn_result_refs = [] + for i in range(num_actors): + exp_refs_batch = experience_refs[i::num_actors] + learn_result_refs.append(self._actor_handlers[i].learn_on_experiences.remote(exp_refs_batch)) + return learn_result_refs + + +class GPTActorRayActorGroup(TrainableModelRayActorGroup): + + def __init__(self, num_nodes, num_gpus_per_node) -> None: + super().__init__(num_nodes, num_gpus_per_node, RayPPOActor) + self._world_size = num_nodes * num_gpus_per_node + self._next_actor_index_used_in_inference = 0 + + def async_load_tokenizer_from_pretrained(self, model, pretrained) -> list: + return [actor.load_tokenizer_from_pretrained.remote(model, pretrained) for actor in self._actor_handlers] + + def load_csv_prompts_data_from_url(self, csv_url): + ray.get([actor.load_csv_prompt_file_from_url.remote(csv_url) for actor in self._actor_handlers]) + + def set_generate_kwargs(self, kwargs: dict): + return ray.get([actor.set_generate_kwargs.remote(kwargs) for actor in self._actor_handlers]) + + def async_sample_prompts_and_make_sequence(self, experience_batch_size): + return [actor.sample_prompts_and_make_sequence.remote(experience_batch_size) for actor in self._actor_handlers] + + def async_calculate_action_log_probs(self, sequences_attention_mask_action_mask_refs): + num_actors = len(self._actor_handlers) + action_log_probs_refs = [] + for i in range(len(sequences_attention_mask_action_mask_refs)): + action_log_probs_ref = self._actor_handlers[i % num_actors].calculate_action_log_probs.remote( + sequences_attention_mask_action_mask_refs[i]) + action_log_probs_refs.append(action_log_probs_ref) + return action_log_probs_refs + + def save_checkpoint(self, save_path, should_save_optimizer): + ray.get([actor.save_checkpoint.remote(save_path, should_save_optimizer) for actor in self._actor_handlers]) + + def generate_answer(self, prompt, max_length=30, num_return_sequences=1): + refs = [ + actor.generate_answer.remote(prompt, max_length, num_return_sequences) for actor in self._actor_handlers + ] + answers = ray.get(refs[0]) + self._next_actor_index_used_in_inference = (self._next_actor_index_used_in_inference + 1) % self._world_size + return answers + + +class GPTCriticRayActorGroup(TrainableModelRayActorGroup): + + def __init__(self, num_nodes, num_gpus_per_node) -> None: + super().__init__(num_nodes, num_gpus_per_node, RayPPOCritic) + + def async_calculate_value(self, sequences_attention_mask_action_mask_refs): + num_actors = len(self._actor_handlers) + value_refs = [] + for i in range(len(sequences_attention_mask_action_mask_refs)): + value_ref = self._actor_handlers[i % num_actors].calculate_value.remote( + sequences_attention_mask_action_mask_refs[i]) + value_refs.append(value_ref) + return value_refs + + +class GPTInitialRayActorGroup(ModelRayActorGroup): + + def __init__(self, num_nodes, num_gpus_per_node) -> None: + super().__init__(num_nodes, num_gpus_per_node, RayPPOInitialModel) + + def async_calculate_base_action_log_probs(self, sequences_attention_mask_action_mask_refs): + num_actors = len(self._actor_handlers) + base_action_log_probs_refs = [] + for i in range(len(sequences_attention_mask_action_mask_refs)): + base_action_log_probs_ref = self._actor_handlers[i % num_actors].calculate_base_action_log_probs.remote( + sequences_attention_mask_action_mask_refs[i]) + base_action_log_probs_refs.append(base_action_log_probs_ref) + return base_action_log_probs_refs + + +class GPTRewardRayActorGroup(ModelRayActorGroup): + + def __init__(self, num_nodes, num_gpus_per_node) -> None: + super().__init__(num_nodes, num_gpus_per_node, RayPPORewardModel) + + def async_calculate_r(self, sequences_attention_mask_action_mask_refs): + num_actors = len(self._actor_handlers) + r_refs = [] + for i in range(len(sequences_attention_mask_action_mask_refs)): + r_ref = self._actor_handlers[i % num_actors].calculate_r.remote( + sequences_attention_mask_action_mask_refs[i]) + r_refs.append(r_ref) + return r_refs + + +def main(args): + logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S') + logging.info("Start creating actors") + # Initialize 4 models (actor, critic, initial_model and reward_model) + actor_group = GPTActorRayActorGroup(num_nodes=args.num_actor_nodes, num_gpus_per_node=args.num_gpus_per_node) + actor_group.initiate_actors() + critic_group = GPTCriticRayActorGroup(num_nodes=args.num_critic_nodes, num_gpus_per_node=args.num_gpus_per_node) + critic_group.initiate_actors() + initial_group = GPTInitialRayActorGroup(num_nodes=args.num_initial_nodes, num_gpus_per_node=args.num_gpus_per_node) + initial_group.initiate_actors() + reward_group = GPTRewardRayActorGroup(num_nodes=args.num_reward_nodes, num_gpus_per_node=args.num_gpus_per_node) + reward_group.initiate_actors() + logging.info("Actors created") + + # Load model + ray.get( + actor_group.async_init_model_from_pretrained(args.strategy, args.model, args.pretrain) + + critic_group.async_init_model_from_model_name(args.strategy, args.model, args.model_name) + + initial_group.async_init_model_from_pretrained(args.strategy, args.model, args.pretrain) + + reward_group.async_init_model_from_model_name(args.strategy, args.model, args.model_name)) + ray.get(actor_group.async_load_tokenizer_from_pretrained(args.model, args.pretrain)) + + # Prepare actors for training + actor_group.load_csv_prompts_data_from_url(args.prompt_csv_url) + generate_kwargs = {'max_length': 128, 'do_sample': True, 'temperature': 1.0, 'top_k': 50} + actor_group.set_generate_kwargs(generate_kwargs) + logging.info("Models prepared for training") + + # Training parameter + num_episodes = args.num_episodes + max_timesteps = args.max_timesteps + update_timesteps = args.update_timesteps + experience_batch_size = args.experience_batch_size + # Start training + logging.info("Training start") + ray.get([ray_actor.eval.remote() for ray_actor in actor_group._actor_handlers]) + all_ray_actors = actor_group._actor_handlers + critic_group._actor_handlers + \ + initial_group._actor_handlers + reward_group._actor_handlers + num_ray_actors = len(all_ray_actors) + ray.get([ray_actor.eval.remote() for ray_actor in all_ray_actors]) + # Used as a queue to coordinate experience making + experience_composition_refs = [] + time = 0 + for episode in range(num_episodes): + logging.info("episode {} started".format(episode)) + for _ in range(max_timesteps): + time += 1 + sequences_attention_mask_action_mask_refs = actor_group.async_sample_prompts_and_make_sequence( + experience_batch_size) + base_action_log_probs_refs = initial_group.async_calculate_base_action_log_probs( + sequences_attention_mask_action_mask_refs) + values_refs = critic_group.async_calculate_value(sequences_attention_mask_action_mask_refs) + r_refs = reward_group.async_calculate_r(sequences_attention_mask_action_mask_refs) + action_log_probs_refs = actor_group.async_calculate_action_log_probs( + sequences_attention_mask_action_mask_refs) + experience_composition_refs.extend([ + ExperienceCompositionRefs(sequences_attention_mask_action_mask_refs[i], action_log_probs_refs[i], + base_action_log_probs_refs[i], values_refs[i], r_refs[i]) + for i in range(len(sequences_attention_mask_action_mask_refs)) + ]) + if time % update_timesteps == 0: + experience_refs = [] + # calculate experiences + for i in range(len(experience_composition_refs)): + exp_composition_ref = experience_composition_refs[i] + selected_ray_actor = all_ray_actors[i % num_ray_actors] + experience_refs.append(selected_ray_actor.make_experience.remote(exp_composition_ref)) + # backward + ray.get( + actor_group.async_learn_on_experiences(experience_refs) + + critic_group.async_learn_on_experiences(experience_refs)) + # clear refs queue + experience_composition_refs.clear() + logging.info("Training finished") + actor_group.save_checkpoint(args.save_path, args.need_optim_ckpt) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--prompt_csv_url', type=str) + parser.add_argument('--strategy', + choices=['naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2'], + default='naive') + parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) + parser.add_argument('--pretrain', type=str, default='gpt2') + parser.add_argument('--model_name', type=str, help='gpt2 model name represented in pretrain model', default='s') + parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') + parser.add_argument('--need_optim_ckpt', type=bool, default=False) + parser.add_argument('--num_episodes', type=int, default=10) + parser.add_argument('--max_timesteps', type=int, default=10) + parser.add_argument('--update_timesteps', type=int, default=10) + parser.add_argument('--train_batch_size', type=int, default=8) + parser.add_argument('--experience_batch_size', type=int, default=8) + parser.add_argument('--num_actor_nodes', type=int, help='num of nodes to use to host actor model', default=1) + parser.add_argument('--num_critic_nodes', type=int, help='num of nodes to use to host critic model', default=1) + parser.add_argument('--num_initial_nodes', type=int, help='num of nodes to use to host initial model', default=1) + parser.add_argument('--num_reward_nodes', type=int, help='num of nodes to use to host reward model', default=1) + parser.add_argument('--num_gpus_per_node', type=int, help='num of gpus on a ray node', default=1) + args = parser.parse_args() + ray.init() + main(args) diff --git a/applications/ChatGPT/examples/train_prompts_on_ray_job_submission_script.py b/applications/ChatGPT/examples/train_prompts_on_ray_job_submission_script.py new file mode 100644 index 000000000000..90d971af688b --- /dev/null +++ b/applications/ChatGPT/examples/train_prompts_on_ray_job_submission_script.py @@ -0,0 +1,11 @@ +from ray.job_submission import JobSubmissionClient + +client = JobSubmissionClient("http://127.0.0.1:8265") +client.submit_job( + entrypoint= + "python examples/train_prompts_on_ray.py --strategy colossalai_zero2 --prompt_csv_url https://huggingface.co/datasets/fka/awesome-chatgpt-prompts/resolve/main/prompts.csv" + + " --pretrain gpt2-large --model_name l", + runtime_env={ + "working_dir": "../", + "pip": ["torch==1.13.1", "transformers>=4.20.1", "datasets", "loralib", "colossalai>=0.2.4"] + }) From 12b42667aeaadc066ad2575892de42c6ccf73e96 Mon Sep 17 00:00:00 2001 From: jiangwen Date: Thu, 30 Mar 2023 12:56:10 +0800 Subject: [PATCH 02/13] [fix]simplify code --- .../ChatGPT/examples/train_prompts_on_ray.py | 462 ++++++------------ 1 file changed, 139 insertions(+), 323 deletions(-) diff --git a/applications/ChatGPT/examples/train_prompts_on_ray.py b/applications/ChatGPT/examples/train_prompts_on_ray.py index fe06446ef741..a51d68707d17 100644 --- a/applications/ChatGPT/examples/train_prompts_on_ray.py +++ b/applications/ChatGPT/examples/train_prompts_on_ray.py @@ -3,6 +3,7 @@ import os import socket from copy import deepcopy +from typing import Type import ray import torch @@ -10,6 +11,7 @@ from chatgpt.models.base import RewardModel from chatgpt.models.bloom import BLOOMActor, BLOOMCritic from chatgpt.models.gpt import GPTActor, GPTCritic +from chatgpt.models.lora import LoRAModule from chatgpt.models.loss import PolicyLoss, ValueLoss from chatgpt.models.opt import OPTActor, OPTCritic from chatgpt.models.utils import compute_reward @@ -19,7 +21,6 @@ from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from torch.optim import Adam from transformers import AutoTokenizer, BloomTokenizerFast -from transformers.models.gpt2.configuration_gpt2 import GPT2Config from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer from colossalai.nn.optimizer import HybridAdam @@ -57,9 +58,9 @@ def make_experience(self, experiment_computation_refs: ExperienceCompositionRefs return experience -class BasePPOActor: +class DistributedTorchRayActor: - def __init__(self, world_size, rank, local_rank, master_addr, master_port, kl_coef: float = 0.1): + def __init__(self, world_size, rank, local_rank, master_addr, master_port): logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') @@ -74,9 +75,30 @@ def __init__(self, world_size, rank, local_rank, master_addr, master_port, kl_co os.environ["WORLD_SIZE"] = str(self._world_size) os.environ["RANK"] = str(self._rank) os.environ["LOCAL_RANK"] = str(self._local_rank) + + @staticmethod + def _get_current_node_ip(): + return ray._private.services.get_node_ip_address() + + @staticmethod + def _get_free_port(): + with socket.socket() as sock: + sock.bind(('', 0)) + return sock.getsockname()[1] + + def get_master_addr_port(self): + return self._master_addr, self._master_port + + +class BasePPORole(DistributedTorchRayActor): + + def add_experience_maker(self, kl_coef: float = 0.1): self._experience_maker = ExperienceMaker(kl_coef) - def _init_strategy(self, strategy): + def make_experience(self, experience_computation_ref: ExperienceCompositionRefs): + return self._experience_maker.make_experience(experience_computation_ref) + + def _init_strategy(self, strategy: str): # configure strategy if strategy == 'naive': self._strategy = NaiveStrategy() @@ -102,98 +124,33 @@ def _prepare_model_with_strategy(self, has_optimizer: bool): else: self._model = self._strategy.prepare(self._model) - @staticmethod - def _get_gpt_config(model_name: str): - model_map = { - 's': GPT2Config(), - 'm': GPT2Config(n_embd=1024, n_layer=24, n_head=16), - 'l': GPT2Config(n_embd=1280, n_layer=36, n_head=20), - 'xl': GPT2Config(n_embd=1600, n_layer=48, n_head=25), - '2b': GPT2Config(n_embd=2048, n_layer=40, n_head=16), - '4b': GPT2Config(n_embd=2304, n_layer=64, n_head=16), - '6b': GPT2Config(n_embd=4096, n_layer=30, n_head=16), - '8b': GPT2Config(n_embd=4096, n_layer=40, n_head=16), - '10b': GPT2Config(n_embd=4096, n_layer=50, n_head=16), - '12b': GPT2Config(n_embd=4096, n_layer=60, n_head=16), - '15b': GPT2Config(n_embd=4096, n_layer=78, n_head=16), - '18b': GPT2Config(n_embd=4096, n_layer=90, n_head=16), - '20b': GPT2Config(n_embd=8192, n_layer=25, n_head=16), - '24b': GPT2Config(n_embd=8192, n_layer=30, n_head=16), - '28b': GPT2Config(n_embd=8192, n_layer=35, n_head=16), - '32b': GPT2Config(n_embd=8192, n_layer=40, n_head=16), - '36b': GPT2Config(n_embd=8192, n_layer=45, n_head=16), - '40b': GPT2Config(n_embd=8192, n_layer=50, n_head=16), - '175b': GPT2Config(n_positions=2048, n_embd=12288, n_layer=96, n_head=96), - } - try: - return model_map[model_name] - except KeyError: - raise ValueError(f'Unknown model "{model_name}"') - - @staticmethod - def _get_current_node_ip(): - return ray._private.services.get_node_ip_address() - - @staticmethod - def _get_free_port(): - with socket.socket() as sock: - sock.bind(('', 0)) - return sock.getsockname()[1] - - def get_master_addr_port(self): - return self._master_addr, self._master_port - - def init_model_from_pretrained(self, strategy, model, pretrained_model_name_or_path, has_optimizer=False): - self._init_strategy(strategy) - self._load_model_from_pretrained(model, pretrained_model_name_or_path) - self._prepare_model_with_strategy(has_optimizer) + def _load_model_from_pretrained(self, model_class: Type[LoRAModule], pretrain: str): + raise NotImplementedError() - def init_model_from_model_name(self, strategy, model, model_name, has_optimizer=False): + def init_model_from_pretrained(self, + strategy: str, + model_class: Type[LoRAModule], + pretrain: str, + has_optimizer=False): self._init_strategy(strategy) - self._load_model_from_model_name(model, model_name) + self._load_model_from_pretrained(model_class, pretrain) self._prepare_model_with_strategy(has_optimizer) - def _load_model_from_pretrained(self, model, pretrained_model_name_or_path): - raise NotImplementedError() - - def _load_model_from_model_name(self, model, model_config): - raise NotImplementedError() - def eval(self): self._model.eval() - def make_experience(self, experience_computation_ref: ExperienceCompositionRefs): - return self._experience_maker.make_experience(experience_computation_ref) - - -class TrainablePPOActor(BasePPOActor): - - def __init__(self, - world_size, - rank, - local_rank, - master_addr, - master_port, - kl_coef: float = 0.1, - train_batch_size: int = 1, - buffer_limit: int = 0, - buffer_cpu_offload: bool = True, - dataloader_pin_memory: bool = True): - super().__init__(world_size, rank, local_rank, master_addr, master_port, kl_coef) - self._replay_buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload) - self._dataloader_pin_memory = dataloader_pin_memory - def init_model_from_pretrained(self, strategy, model, pretrained_model_name_or_path): - super().init_model_from_pretrained(strategy, model, pretrained_model_name_or_path, True) +class TrainablePPORole(BasePPORole): - def init_model_from_model_name(self, strategy, model, pretrained_model_name_or_path): - super().init_model_from_model_name(strategy, model, pretrained_model_name_or_path, True) + def _load_model_from_pretrained(self, model_class, pretrain): + with self._strategy.model_init_context(): + self._model = model_class(pretrain).to(torch.cuda.current_device()) def _train(self): self._model.train() def _training_step(self, experience: Experience): - pass + raise NotImplementedError() def learn_on_experiences(self, experience_refs): experiences = ray.get(experience_refs) @@ -206,108 +163,52 @@ def learn_on_experiences(self, experience_refs): @ray.remote(num_gpus=1) -class RayPPOActor(TrainablePPOActor): - - def __init__(self, - world_size, - rank, - local_rank, - master_addr, - master_port, - kl_coef: float = 0.1, - train_batch_size: int = 1, - buffer_limit: int = 0, - buffer_cpu_offload: bool = True, - dataloader_pin_memory: bool = True, - eps_clip: float = 0.2): - super().__init__(world_size, rank, local_rank, master_addr, master_port, kl_coef, train_batch_size, - buffer_limit, buffer_cpu_offload, dataloader_pin_memory) - self._actor_loss_fn = PolicyLoss(eps_clip) +class RayPPOActor(TrainablePPORole): - def _load_model_from_pretrained(self, model, pretrained_model_name_or_path): - with self._strategy.model_init_context(): - if model == 'gpt2': - self._model = GPTActor(pretrained=os.path.abspath("gpt2")).to(torch.cuda.current_device()) - elif model == 'bloom': - self._model = BLOOMActor(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - elif model == 'opt': - self._model = OPTActor(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - else: - raise ValueError(f'Unsupported model "{model}"') - - def _load_model_from_model_name(self, model, model_name): - if model == 'gpt2': - model_config = self._get_gpt_config(model_name) - with self._strategy.model_init_context(): - self._model = GPTActor(config=model_config).to(torch.cuda.current_device()) - else: - raise ValueError(f'Unsupported model "{model}"') + def set_loss_function(self, eps_clip: float = 0.2): + self._actor_loss_fn = PolicyLoss(eps_clip) - def load_tokenizer_from_pretrained(self, model, pretrained): - if model == 'gpt2': - self._model_tokenizer = GPT2Tokenizer.from_pretrained(os.path.abspath('gpt2')) + def load_tokenizer_from_pretrained(self, model_type: str, pretrained): + if model_type == 'gpt2': + self._model_tokenizer = GPT2Tokenizer.from_pretrained(pretrained) self._model_tokenizer.pad_token = self._model_tokenizer.eos_token - elif model == 'bloom': + elif model_type == 'bloom': self._model_tokenizer = BloomTokenizerFast.from_pretrained(pretrained) self._model_tokenizer.pad_token = self._model_tokenizer.eos_token - elif model == 'opt': - self._model_tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") + elif model_type == 'opt': + self._model_tokenizer = AutoTokenizer.from_pretrained(pretrained) else: - raise ValueError(f'Unsupported model "{model}"') + raise ValueError(f'Unsupported model "{model_type}"') - def set_generate_kwargs(self, generate_kwargs: dict): - self._generate_kwargs = {} - self._generate_kwargs['pad_token_id'] = self._model_tokenizer.pad_token_id - self._generate_kwargs['eos_token_id'] = self._model_tokenizer.eos_token_id - self._generate_kwargs.update(generate_kwargs) - self._generate_kwargs.update(self._get_generate_function_kwargs(generate_kwargs)) - - def _get_generate_function_kwargs(self, generate_kwargs: dict): - origin_model = self._strategy._unwrap_actor(self._model) - _to_update_kwargs = {} - # use huggingface models method directly - if 'prepare_inputs_fn' not in generate_kwargs and hasattr(origin_model, 'prepare_inputs_for_generation'): - _to_update_kwargs['prepare_inputs_fn'] = origin_model.prepare_inputs_for_generation - - def update_model_kwargs_fn(outputs: dict, **model_kwargs) -> dict: - if "past_key_values" in outputs: - model_kwargs["past"] = outputs["past_key_values"] - else: - model_kwargs["past"] = None - - # update token_type_ids with last value - if "token_type_ids" in model_kwargs: - token_type_ids = model_kwargs["token_type_ids"] - model_kwargs["token_type_ids"] = torch.cat([token_type_ids, token_type_ids[:, -1].unsqueeze(-1)], - dim=-1) - - # update attention mask - if "attention_mask" in model_kwargs: - attention_mask = model_kwargs["attention_mask"] - model_kwargs["attention_mask"] = torch.cat( - [attention_mask, attention_mask.new_ones((attention_mask.shape[0], 1))], dim=-1) - - return model_kwargs - - if 'update_model_kwargs_fn' not in generate_kwargs: - _to_update_kwargs['update_model_kwargs_fn'] = update_model_kwargs_fn - return _to_update_kwargs - - def load_csv_prompt_file_from_url(self, url): - import pandas as pd - prompts = pd.read_csv(url)['prompt'] - - # Set tokenize function for training - def _tokenize_fn(texts): + # Set tokenize function for sequence generation + def _text_input_tokenize_fn(texts): batch = self._model_tokenizer(texts, return_tensors='pt', max_length=96, padding=True, truncation=True) return {k: v.cuda() for k, v in batch.items()} - self._tokenize_function = _tokenize_fn + self._sample_tokenize_function = _text_input_tokenize_fn + + def setup_generate_kwargs(self, generate_kwargs: dict): + from chatgpt.trainer.ppo import _set_default_generate_kwargs + self._generate_kwargs = _set_default_generate_kwargs(self._strategy, generate_kwargs, self._model) + self._generate_kwargs['pad_token_id'] = self._model_tokenizer.pad_token_id + self._generate_kwargs['eos_token_id'] = self._model_tokenizer.eos_token_id + + def load_csv_prompt_file_from_url_to_sampler(self, prompt_url): + import pandas as pd + prompts = pd.read_csv(prompt_url)['prompt'] self._sampler = self._strategy.setup_sampler(prompts) def _generate(self, input_ids, **generate_kwargs): return self._model.generate(input_ids, return_action_mask=True, **generate_kwargs) + def sample_prompts_and_make_sequence(self, experience_batch_size): + sampled_prompts = self._sampler.sample(experience_batch_size) + input_ids = self._sample_tokenize_function(sampled_prompts) + if isinstance(input_ids, dict): + return self._generate(**input_ids, **self._generate_kwargs) + else: + return self._generate(input_ids, **self._generate_kwargs) + @torch.no_grad() def calculate_action_log_probs(self, sequence_attention_action_mask): sequences, attention_mask, action_mask = sequence_attention_action_mask @@ -325,16 +226,6 @@ def _training_step(self, experience): self._optimizer.zero_grad() logging.info("actor_loss: {}".format(actor_loss)) - def _sample_prompts(self, experience_batch_size) -> list: - return self._sampler.sample(experience_batch_size) - - def sample_prompts_and_make_sequence(self, experience_batch_size): - input_ids = self._tokenize_function(self._sample_prompts(experience_batch_size)) - if isinstance(input_ids, dict): - return self._generate(**input_ids, **self._generate_kwargs) - else: - return self._generate(input_ids, **self._generate_kwargs) - def save_checkpoint(self, save_path, should_save_optimizer: bool): if self._rank == 0: # save model checkpoint only on rank 0 @@ -358,42 +249,10 @@ def generate_answer(self, prompt, max_length=30, num_return_sequences=5): @ray.remote(num_gpus=1) -class RayPPOCritic(TrainablePPOActor): - - def __init__(self, - world_size, - rank, - local_rank, - master_addr, - master_port, - kl_coef: float = 0.1, - train_batch_size: int = 1, - buffer_limit: int = 0, - buffer_cpu_offload: bool = True, - dataloader_pin_memory: bool = True, - value_clip: float = 0.4): - super().__init__(world_size, rank, local_rank, master_addr, master_port, kl_coef, train_batch_size, - buffer_limit, buffer_cpu_offload, dataloader_pin_memory) - self._critic_loss_fn = ValueLoss(value_clip) +class RayPPOCritic(TrainablePPORole): - def _load_model_from_pretrained(self, model, pretrained_model_name_or_path): - with self._strategy.model_init_context(): - if model == 'gpt2': - self._model = GPTCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - elif model == 'bloom': - self._model = BLOOMCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - elif model == 'opt': - self._model = OPTCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - else: - raise ValueError(f'Unsupported model "{model}"') - - def _load_model_from_model_name(self, model, model_name): - if model == 'gpt2': - model_config = self._get_gpt_config(model_name) - with self._strategy.model_init_context(): - self._model = GPTCritic(config=model_config).to(torch.cuda.current_device()) - else: - raise ValueError(f'Unsupported model "{model}"') + def set_loss_function(self, value_clip): + self._critic_loss_fn = ValueLoss(value_clip) def _training_step(self, experience): values = self._model(experience.sequences, @@ -415,30 +274,14 @@ def calculate_value(self, sequence_attention_action_mask): @ray.remote(num_gpus=1) -class RayPPORewardModel(BasePPOActor): +class RayPPORewardModel(BasePPORole): - def _load_model_from_pretrained(self, model, pretrained_model_name_or_path): + def _load_model_from_pretrained(self, model_class, pretrain): with self._strategy.model_init_context(): - if model == 'gpt2': - critic = GPTCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - elif model == 'bloom': - critic = BLOOMCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - elif model == 'opt': - critic = OPTCritic(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - else: - raise ValueError(f'Unsupported model "{model}"') + critic = model_class(pretrained=pretrain).to(torch.cuda.current_device()) self._model = RewardModel(deepcopy(critic.model), deepcopy(critic.value_head)).to(torch.cuda.current_device()) - def _load_model_from_model_name(self, model, model_name): - if model == 'gpt2': - model_config = self._get_gpt_config(model_name) - with self._strategy.model_init_context(): - critic = GPTCritic(config=model_config).to(torch.cuda.current_device()) - else: - raise ValueError(f'Unsupported model "{model}"') - self._model = RewardModel(deepcopy(critic.model), deepcopy(critic.value_head)).to(torch.cuda.current_device()) - @torch.no_grad() def calculate_r(self, sequence_attention_action_mask): sequences, attention_mask, _ = sequence_attention_action_mask @@ -446,90 +289,66 @@ def calculate_r(self, sequence_attention_action_mask): @ray.remote(num_gpus=1) -class RayPPOInitialModel(BasePPOActor): +class RayPPOInitialModel(BasePPORole): - def _load_model_from_pretrained(self, model, pretrained_model_name_or_path): + def _load_model_from_pretrained(self, model_class, pretrain): with self._strategy.model_init_context(): - if model == 'gpt2': - self._model = GPTActor(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - elif model == 'bloom': - self._model = BLOOMActor(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - elif model == 'opt': - self._model = OPTActor(pretrained=pretrained_model_name_or_path).to(torch.cuda.current_device()) - else: - raise ValueError(f'Unsupported model "{model}"') - - def _load_model_from_model_name(self, model, model_name): - if model == 'gpt2': - model_config = self._get_gpt_config(model_name) - with self._strategy.model_init_context(): - self._model = GPTActor(config=model_config).to(torch.cuda.current_device()) - else: - raise ValueError(f'Unsupported model "{model}"') + self._model = model_class(pretrain).to(torch.cuda.current_device()) @torch.no_grad() def calculate_base_action_log_probs(self, sequence_attention_action_mask): sequences, attention_mask, action_mask = sequence_attention_action_mask return self._model(sequences, action_mask.size(1), attention_mask) - def load_model_from_model_config(self, model): - model_config = self._get_gpt_config(model) - with self._strategy.model_init_context(): - self._model = GPTActor(config=model_config).cuda() - return True - -class ModelRayActorGroup: +class PPORayActorGroup: + """ + A group of ray actors + Functions start with 'async' should return list of object refs + """ - def __init__(self, num_nodes, num_gpus_per_node, actor_type) -> None: + def __init__(self, num_nodes, num_gpus_per_node, ray_actor_type: Type[BasePPORole]) -> None: self._num_nodes = num_nodes self._num_gpus_per_node = num_gpus_per_node - self._actor_type = actor_type + self.ray_actor_type = ray_actor_type + self._initiate_actors() - def initiate_actors(self): + def _initiate_actors(self): world_size = self._num_nodes * self._num_gpus_per_node + # Use placement group to lock resources for models of same type pg = None - if self._num_gpus_per_node >= 1: - bundles = [{ - "GPU": self._num_gpus_per_node, - "CPU": 1 * self._num_gpus_per_node - } for _ in range(self._num_nodes)] + if self._num_gpus_per_node > 1: + bundles = [{"GPU": self._num_gpus_per_node, "CPU": self._num_gpus_per_node} for _ in range(self._num_nodes)] pg = placement_group(bundles, strategy="STRICT_SPREAD") ray.get(pg.ready()) - print("PG is ready") if pg: - master_actor = self._actor_type.options(scheduling_strategy=PlacementGroupSchedulingStrategy( + master_actor = self.ray_actor_type.options(scheduling_strategy=PlacementGroupSchedulingStrategy( placement_group=pg, placement_group_bundle_index=0)).remote(world_size, 0, 0, None, None) else: - master_actor = self._actor_type.options(num_gpus=1).remote(world_size, 0, 0, None, None) + master_actor = self.ray_actor_type.options(num_gpus=1).remote(world_size, 0, 0, None, None) self._actor_handlers = [master_actor] + + # Create worker actors if world_size > 1: master_addr, master_port = ray.get(master_actor.get_master_addr_port.remote()) for rank in range(1, world_size): local_rank = rank % self._num_gpus_per_node if pg: - worker_actor = self._actor_type.options(scheduling_strategy=PlacementGroupSchedulingStrategy( + worker_actor = self.ray_actor_type.options(scheduling_strategy=PlacementGroupSchedulingStrategy( placement_group=pg, placement_group_bundle_index=rank // self._num_gpus_per_node)).remote( world_size, rank, local_rank, master_addr, master_port) else: - worker_actor = self._actor_type.options(num_gpus=1).remote(world_size, rank, local_rank, - master_addr, master_port) + worker_actor = self.ray_actor_type.options(num_gpus=1).remote(world_size, rank, local_rank, + master_addr, master_port) self._actor_handlers.append(worker_actor) - def async_init_model_from_pretrained(self, strategy: str, model: str, pretrained_model_name_or_path: str): + def async_init_model_from_pretrained(self, strategy: str, model_class: Type[LoRAModule], pretrain: str): return [ - actor.init_model_from_pretrained.remote(strategy, model, pretrained_model_name_or_path) - for actor in self._actor_handlers + actor.init_model_from_pretrained.remote(strategy, model_class, pretrain) for actor in self._actor_handlers ] - def async_init_model_from_model_name(self, strategy: str, model: str, model_name: str): - return [actor.init_model_from_model_name.remote(strategy, model, model_name) for actor in self._actor_handlers] - -class TrainableModelRayActorGroup(ModelRayActorGroup): - - def __init__(self, num_nodes, num_gpus_per_node, actor_type) -> None: - super().__init__(num_nodes, num_gpus_per_node, actor_type) +class TrainableModelRayActorGroup(PPORayActorGroup): def async_learn_on_experiences(self, experience_refs): num_actors = len(self._actor_handlers) @@ -540,21 +359,20 @@ def async_learn_on_experiences(self, experience_refs): return learn_result_refs -class GPTActorRayActorGroup(TrainableModelRayActorGroup): +class PPOActorRayActorGroup(TrainableModelRayActorGroup): def __init__(self, num_nodes, num_gpus_per_node) -> None: super().__init__(num_nodes, num_gpus_per_node, RayPPOActor) - self._world_size = num_nodes * num_gpus_per_node - self._next_actor_index_used_in_inference = 0 - - def async_load_tokenizer_from_pretrained(self, model, pretrained) -> list: - return [actor.load_tokenizer_from_pretrained.remote(model, pretrained) for actor in self._actor_handlers] - def load_csv_prompts_data_from_url(self, csv_url): - ray.get([actor.load_csv_prompt_file_from_url.remote(csv_url) for actor in self._actor_handlers]) + def async_prepare_for_sequence_generation(self, model: str, pretrain: str, generation_kwargs: dict): + refs = [] + for actor in self._actor_handlers: + refs.append(actor.load_tokenizer_from_pretrained.remote(model, pretrain)) + refs.append(actor.setup_generate_kwargs.remote(generation_kwargs)) + return refs - def set_generate_kwargs(self, kwargs: dict): - return ray.get([actor.set_generate_kwargs.remote(kwargs) for actor in self._actor_handlers]) + def load_csv_prompts_data_from_url_to_sampler(self, csv_url): + ray.get([actor.load_csv_prompts_data_from_url_to_sampler.remote(csv_url) for actor in self._actor_handlers]) def async_sample_prompts_and_make_sequence(self, experience_batch_size): return [actor.sample_prompts_and_make_sequence.remote(experience_batch_size) for actor in self._actor_handlers] @@ -571,16 +389,8 @@ def async_calculate_action_log_probs(self, sequences_attention_mask_action_mask_ def save_checkpoint(self, save_path, should_save_optimizer): ray.get([actor.save_checkpoint.remote(save_path, should_save_optimizer) for actor in self._actor_handlers]) - def generate_answer(self, prompt, max_length=30, num_return_sequences=1): - refs = [ - actor.generate_answer.remote(prompt, max_length, num_return_sequences) for actor in self._actor_handlers - ] - answers = ray.get(refs[0]) - self._next_actor_index_used_in_inference = (self._next_actor_index_used_in_inference + 1) % self._world_size - return answers - -class GPTCriticRayActorGroup(TrainableModelRayActorGroup): +class PPOCriticRayActorGroup(TrainableModelRayActorGroup): def __init__(self, num_nodes, num_gpus_per_node) -> None: super().__init__(num_nodes, num_gpus_per_node, RayPPOCritic) @@ -595,7 +405,7 @@ def async_calculate_value(self, sequences_attention_mask_action_mask_refs): return value_refs -class GPTInitialRayActorGroup(ModelRayActorGroup): +class PPOInitialRayActorGroup(PPORayActorGroup): def __init__(self, num_nodes, num_gpus_per_node) -> None: super().__init__(num_nodes, num_gpus_per_node, RayPPOInitialModel) @@ -610,7 +420,7 @@ def async_calculate_base_action_log_probs(self, sequences_attention_mask_action_ return base_action_log_probs_refs -class GPTRewardRayActorGroup(ModelRayActorGroup): +class PPORewardRayActorGroup(PPORayActorGroup): def __init__(self, num_nodes, num_gpus_per_node) -> None: super().__init__(num_nodes, num_gpus_per_node, RayPPORewardModel) @@ -629,32 +439,35 @@ def main(args): logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') + if args.model == 'gpt2': + actor_model_class, critic_model_class = GPTActor, GPTCritic + elif args.model == 'bloom': + actor_model_class, critic_model_class = BLOOMActor, BLOOMCritic + elif args.model == 'opt': + actor_model_class, critic_model_class = OPTActor, OPTActor + else: + raise ValueError(f'Unsupported model "{args.model}"') + logging.info("Start creating actors") # Initialize 4 models (actor, critic, initial_model and reward_model) - actor_group = GPTActorRayActorGroup(num_nodes=args.num_actor_nodes, num_gpus_per_node=args.num_gpus_per_node) - actor_group.initiate_actors() - critic_group = GPTCriticRayActorGroup(num_nodes=args.num_critic_nodes, num_gpus_per_node=args.num_gpus_per_node) - critic_group.initiate_actors() - initial_group = GPTInitialRayActorGroup(num_nodes=args.num_initial_nodes, num_gpus_per_node=args.num_gpus_per_node) - initial_group.initiate_actors() - reward_group = GPTRewardRayActorGroup(num_nodes=args.num_reward_nodes, num_gpus_per_node=args.num_gpus_per_node) - reward_group.initiate_actors() + actor_group = PPOActorRayActorGroup(num_nodes=args.num_actor_nodes, num_gpus_per_node=args.num_gpus_per_node) + critic_group = PPOCriticRayActorGroup(num_nodes=args.num_critic_nodes, num_gpus_per_node=args.num_gpus_per_node) + initial_group = PPOInitialRayActorGroup(num_nodes=args.num_initial_nodes, num_gpus_per_node=args.num_gpus_per_node) + reward_group = PPORewardRayActorGroup(num_nodes=args.num_reward_nodes, num_gpus_per_node=args.num_gpus_per_node) logging.info("Actors created") - # Load model - ray.get( - actor_group.async_init_model_from_pretrained(args.strategy, args.model, args.pretrain) + - critic_group.async_init_model_from_model_name(args.strategy, args.model, args.model_name) + - initial_group.async_init_model_from_pretrained(args.strategy, args.model, args.pretrain) + - reward_group.async_init_model_from_model_name(args.strategy, args.model, args.model_name)) - ray.get(actor_group.async_load_tokenizer_from_pretrained(args.model, args.pretrain)) - - # Prepare actors for training - actor_group.load_csv_prompts_data_from_url(args.prompt_csv_url) + # Prepare model for training generate_kwargs = {'max_length': 128, 'do_sample': True, 'temperature': 1.0, 'top_k': 50} - actor_group.set_generate_kwargs(generate_kwargs) + ray.get( + actor_group.async_init_model_from_pretrained(args.strategy, actor_model_class, args.pretrain) + + critic_group.async_init_model_from_pretrained(args.strategy, critic_model_class, args.pretrain) + + initial_group.async_init_model_from_pretrained(args.strategy, actor_model_class, args.pretrain) + + reward_group.async_init_model_from_pretrained(args.strategy, critic_model_class, args.pretrain) + + actor_group.async_prepare_for_sequence_generation(args.model, args.pretrain, generate_kwargs)) logging.info("Models prepared for training") + # Load training data to actor group + actor_group.load_csv_prompts_data_from_url_to_sampler(args.prompt_csv_url) # Training parameter num_episodes = args.num_episodes max_timesteps = args.max_timesteps @@ -662,7 +475,7 @@ def main(args): experience_batch_size = args.experience_batch_size # Start training logging.info("Training start") - ray.get([ray_actor.eval.remote() for ray_actor in actor_group._actor_handlers]) + # Set all models to eval all_ray_actors = actor_group._actor_handlers + critic_group._actor_handlers + \ initial_group._actor_handlers + reward_group._actor_handlers num_ray_actors = len(all_ray_actors) @@ -674,6 +487,7 @@ def main(args): logging.info("episode {} started".format(episode)) for _ in range(max_timesteps): time += 1 + # Experience queueing stage sequences_attention_mask_action_mask_refs = actor_group.async_sample_prompts_and_make_sequence( experience_batch_size) base_action_log_probs_refs = initial_group.async_calculate_base_action_log_probs( @@ -687,6 +501,7 @@ def main(args): base_action_log_probs_refs[i], values_refs[i], r_refs[i]) for i in range(len(sequences_attention_mask_action_mask_refs)) ]) + # Learning stage if time % update_timesteps == 0: experience_refs = [] # calculate experiences @@ -701,6 +516,7 @@ def main(args): # clear refs queue experience_composition_refs.clear() logging.info("Training finished") + # Save checkpoint actor_group.save_checkpoint(args.save_path, args.need_optim_ckpt) From 1e7d24440d4e8e8aee99b37c56e4fe8474776bd9 Mon Sep 17 00:00:00 2001 From: jiangwen Date: Thu, 30 Mar 2023 13:21:41 +0800 Subject: [PATCH 03/13] [fix]remove depreciated parameter --- applications/Chat/examples/train_prompts_on_ray.py | 1 - .../examples/train_prompts_on_ray_job_submission_script.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/applications/Chat/examples/train_prompts_on_ray.py b/applications/Chat/examples/train_prompts_on_ray.py index 102a7c383e58..adff176be778 100644 --- a/applications/Chat/examples/train_prompts_on_ray.py +++ b/applications/Chat/examples/train_prompts_on_ray.py @@ -527,7 +527,6 @@ def main(args): default='naive') parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt']) parser.add_argument('--pretrain', type=str, default='gpt2') - parser.add_argument('--model_name', type=str, help='gpt2 model name represented in pretrain model', default='s') parser.add_argument('--save_path', type=str, default='actor_checkpoint_prompts.pt') parser.add_argument('--need_optim_ckpt', type=bool, default=False) parser.add_argument('--num_episodes', type=int, default=10) diff --git a/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py b/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py index 90d971af688b..cb383bde1949 100644 --- a/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py +++ b/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py @@ -3,8 +3,7 @@ client = JobSubmissionClient("http://127.0.0.1:8265") client.submit_job( entrypoint= - "python examples/train_prompts_on_ray.py --strategy colossalai_zero2 --prompt_csv_url https://huggingface.co/datasets/fka/awesome-chatgpt-prompts/resolve/main/prompts.csv" - + " --pretrain gpt2-large --model_name l", + "python examples/train_prompts_on_ray.py --strategy colossalai_zero2 --prompt_csv_url https://huggingface.co/datasets/fka/awesome-chatgpt-prompts/resolve/main/prompts.csv", runtime_env={ "working_dir": "../", "pip": ["torch==1.13.1", "transformers>=4.20.1", "datasets", "loralib", "colossalai>=0.2.4"] From a376086e1a85e02b68e8a92c39ff58b4e2be9965 Mon Sep 17 00:00:00 2001 From: jiangwen Date: Thu, 30 Mar 2023 13:34:56 +0800 Subject: [PATCH 04/13] [fix]add dependencies --- .../train_prompts_on_ray_job_submission_script.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py b/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py index cb383bde1949..b9857ec0518e 100644 --- a/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py +++ b/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py @@ -5,6 +5,10 @@ entrypoint= "python examples/train_prompts_on_ray.py --strategy colossalai_zero2 --prompt_csv_url https://huggingface.co/datasets/fka/awesome-chatgpt-prompts/resolve/main/prompts.csv", runtime_env={ - "working_dir": "../", - "pip": ["torch==1.13.1", "transformers>=4.20.1", "datasets", "loralib", "colossalai>=0.2.4"] + "working_dir": + "../", + "pip": [ + "torch==1.13.1", "transformers>=4.20.1", "datasets", "loralib", "colossalai>=0.2.4", "langchain", + "tokenizers", "fastapi", "sse_starlette", "wandb", "sentencepiece", "gpustat" + ] }) From 624c2f03ff0820bbc3a42ba6724b6f8724d3065e Mon Sep 17 00:00:00 2001 From: jiangwen Date: Thu, 30 Mar 2023 15:27:51 +0800 Subject: [PATCH 05/13] [fix]method calling --- applications/Chat/examples/train_prompts_on_ray.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/applications/Chat/examples/train_prompts_on_ray.py b/applications/Chat/examples/train_prompts_on_ray.py index adff176be778..79724e7f5e73 100644 --- a/applications/Chat/examples/train_prompts_on_ray.py +++ b/applications/Chat/examples/train_prompts_on_ray.py @@ -370,8 +370,8 @@ def async_prepare_for_sequence_generation(self, model: str, pretrain: str, gener refs.append(actor.setup_generate_kwargs.remote(generation_kwargs)) return refs - def load_csv_prompts_data_from_url_to_sampler(self, csv_url): - ray.get([actor.load_csv_prompts_data_from_url_to_sampler.remote(csv_url) for actor in self._actor_handlers]) + def load_csv_prompt_file_from_url_to_sampler(self, csv_url): + ray.get([actor.load_csv_prompt_file_from_url_to_sampler.remote(csv_url) for actor in self._actor_handlers]) def async_sample_prompts_and_make_sequence(self, experience_batch_size): return [actor.sample_prompts_and_make_sequence.remote(experience_batch_size) for actor in self._actor_handlers] @@ -466,7 +466,7 @@ def main(args): logging.info("Models prepared for training") # Load training data to actor group - actor_group.load_csv_prompts_data_from_url_to_sampler(args.prompt_csv_url) + actor_group.load_csv_prompt_file_from_url_to_sampler(args.prompt_csv_url) # Training parameter num_episodes = args.num_episodes max_timesteps = args.max_timesteps From e67b4441e410974b00df7e7f3349b47d27cfe2c0 Mon Sep 17 00:00:00 2001 From: jiangwen Date: Thu, 30 Mar 2023 15:56:32 +0800 Subject: [PATCH 06/13] [fix]experience maker --- applications/Chat/examples/train_prompts_on_ray.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/applications/Chat/examples/train_prompts_on_ray.py b/applications/Chat/examples/train_prompts_on_ray.py index 79724e7f5e73..d90503af0d7a 100644 --- a/applications/Chat/examples/train_prompts_on_ray.py +++ b/applications/Chat/examples/train_prompts_on_ray.py @@ -474,11 +474,12 @@ def main(args): experience_batch_size = args.experience_batch_size # Start training logging.info("Training start") - # Set all models to eval + # Set all models to eval and add experience maker all_ray_actors = actor_group._actor_handlers + critic_group._actor_handlers + \ initial_group._actor_handlers + reward_group._actor_handlers num_ray_actors = len(all_ray_actors) ray.get([ray_actor.eval.remote() for ray_actor in all_ray_actors]) + ray.get([ray_actor.add_experience_maker.remote() for ray_actor in all_ray_actors]) # Used as a queue to coordinate experience making experience_composition_refs = [] time = 0 From 2714e0878cebf6551a879294e0dcd04e5c20ed15 Mon Sep 17 00:00:00 2001 From: jiangwen Date: Thu, 30 Mar 2023 16:27:22 +0800 Subject: [PATCH 07/13] [fix]missing loss function --- applications/Chat/examples/train_prompts_on_ray.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/applications/Chat/examples/train_prompts_on_ray.py b/applications/Chat/examples/train_prompts_on_ray.py index d90503af0d7a..4e10286e8359 100644 --- a/applications/Chat/examples/train_prompts_on_ray.py +++ b/applications/Chat/examples/train_prompts_on_ray.py @@ -164,7 +164,7 @@ def learn_on_experiences(self, experience_refs): @ray.remote(num_gpus=1) class RayPPOActor(TrainablePPORole): - def set_loss_function(self, eps_clip: float = 0.2): + def set_loss_function(self, eps_clip: float): self._actor_loss_fn = PolicyLoss(eps_clip) def load_tokenizer_from_pretrained(self, model_type: str, pretrained): @@ -250,7 +250,7 @@ def generate_answer(self, prompt, max_length=30, num_return_sequences=5): @ray.remote(num_gpus=1) class RayPPOCritic(TrainablePPORole): - def set_loss_function(self, value_clip): + def set_loss_function(self, value_clip: float): self._critic_loss_fn = ValueLoss(value_clip) def _training_step(self, experience): @@ -385,6 +385,9 @@ def async_calculate_action_log_probs(self, sequences_attention_mask_action_mask_ action_log_probs_refs.append(action_log_probs_ref) return action_log_probs_refs + def set_loss_function(self, eps_clip: float = 0.2): + ray.get([actor.set_loss_function.remote(eps_clip) for actor in self._actor_handlers]) + def save_checkpoint(self, save_path, should_save_optimizer): ray.get([actor.save_checkpoint.remote(save_path, should_save_optimizer) for actor in self._actor_handlers]) @@ -403,6 +406,9 @@ def async_calculate_value(self, sequences_attention_mask_action_mask_refs): value_refs.append(value_ref) return value_refs + def set_loss_function(self, value_clip: float = 0.4): + ray.get([actor.set_loss_function.remote(value_clip) for actor in self._actor_handlers]) + class PPOInitialRayActorGroup(PPORayActorGroup): @@ -465,8 +471,10 @@ def main(args): actor_group.async_prepare_for_sequence_generation(args.model, args.pretrain, generate_kwargs)) logging.info("Models prepared for training") - # Load training data to actor group + # Prepare models for training actor_group.load_csv_prompt_file_from_url_to_sampler(args.prompt_csv_url) + actor_group.set_loss_function() + critic_group.set_loss_function() # Training parameter num_episodes = args.num_episodes max_timesteps = args.max_timesteps From a86c846683a4498915bf1d871065d33551a49d88 Mon Sep 17 00:00:00 2001 From: jiangwen Date: Thu, 30 Mar 2023 17:20:29 +0800 Subject: [PATCH 08/13] [fix]init optimizer --- applications/Chat/examples/train_prompts_on_ray.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/applications/Chat/examples/train_prompts_on_ray.py b/applications/Chat/examples/train_prompts_on_ray.py index 4e10286e8359..20b7efa3bdaa 100644 --- a/applications/Chat/examples/train_prompts_on_ray.py +++ b/applications/Chat/examples/train_prompts_on_ray.py @@ -341,9 +341,11 @@ def _initiate_actors(self): master_addr, master_port) self._actor_handlers.append(worker_actor) - def async_init_model_from_pretrained(self, strategy: str, model_class: Type[LoRAModule], pretrain: str): + def async_init_model_from_pretrained(self, strategy: str, model_class: Type[LoRAModule], pretrain: str, + has_optimizer: bool): return [ - actor.init_model_from_pretrained.remote(strategy, model_class, pretrain) for actor in self._actor_handlers + actor.init_model_from_pretrained.remote(strategy, model_class, pretrain, has_optimizer) + for actor in self._actor_handlers ] @@ -464,10 +466,10 @@ def main(args): # Prepare model for training generate_kwargs = {'max_length': 128, 'do_sample': True, 'temperature': 1.0, 'top_k': 50} ray.get( - actor_group.async_init_model_from_pretrained(args.strategy, actor_model_class, args.pretrain) + - critic_group.async_init_model_from_pretrained(args.strategy, critic_model_class, args.pretrain) + - initial_group.async_init_model_from_pretrained(args.strategy, actor_model_class, args.pretrain) + - reward_group.async_init_model_from_pretrained(args.strategy, critic_model_class, args.pretrain) + + actor_group.async_init_model_from_pretrained(args.strategy, actor_model_class, args.pretrain, True) + + critic_group.async_init_model_from_pretrained(args.strategy, critic_model_class, args.pretrain, True) + + initial_group.async_init_model_from_pretrained(args.strategy, actor_model_class, args.pretrain, False) + + reward_group.async_init_model_from_pretrained(args.strategy, critic_model_class, args.pretrain, False) + actor_group.async_prepare_for_sequence_generation(args.model, args.pretrain, generate_kwargs)) logging.info("Models prepared for training") From 63484d628f1cfa8cc9cf8b78f8fd8c29d55095ad Mon Sep 17 00:00:00 2001 From: jiangwen Date: Fri, 31 Mar 2023 16:36:13 +0800 Subject: [PATCH 09/13] [feat]add usage comment --- .../Chat/examples/train_prompts_on_ray_job_submission_script.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py b/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py index b9857ec0518e..45b712e3cc63 100644 --- a/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py +++ b/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py @@ -12,3 +12,5 @@ "tokenizers", "fastapi", "sse_starlette", "wandb", "sentencepiece", "gpustat" ] }) + +# Use this script with 'python train_prompts_on_ray_job_submission_script' on your Ray cluster. From b263e0a85a91860e5609771869b4ad313732a662 Mon Sep 17 00:00:00 2001 From: jiangwen Date: Mon, 3 Apr 2023 22:54:27 +0800 Subject: [PATCH 10/13] [fix]rename files --- ...in_prompts_on_ray_job_submission_script.py | 16 -------------- .../Chat/experimental/ray/ray_job_script.py | 22 +++++++++++++++++++ .../ray}/train_prompts_on_ray.py | 8 +++---- 3 files changed, 26 insertions(+), 20 deletions(-) delete mode 100644 applications/Chat/examples/train_prompts_on_ray_job_submission_script.py create mode 100644 applications/Chat/experimental/ray/ray_job_script.py rename applications/Chat/{examples => experimental/ray}/train_prompts_on_ray.py (98%) diff --git a/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py b/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py deleted file mode 100644 index 45b712e3cc63..000000000000 --- a/applications/Chat/examples/train_prompts_on_ray_job_submission_script.py +++ /dev/null @@ -1,16 +0,0 @@ -from ray.job_submission import JobSubmissionClient - -client = JobSubmissionClient("http://127.0.0.1:8265") -client.submit_job( - entrypoint= - "python examples/train_prompts_on_ray.py --strategy colossalai_zero2 --prompt_csv_url https://huggingface.co/datasets/fka/awesome-chatgpt-prompts/resolve/main/prompts.csv", - runtime_env={ - "working_dir": - "../", - "pip": [ - "torch==1.13.1", "transformers>=4.20.1", "datasets", "loralib", "colossalai>=0.2.4", "langchain", - "tokenizers", "fastapi", "sse_starlette", "wandb", "sentencepiece", "gpustat" - ] - }) - -# Use this script with 'python train_prompts_on_ray_job_submission_script' on your Ray cluster. diff --git a/applications/Chat/experimental/ray/ray_job_script.py b/applications/Chat/experimental/ray/ray_job_script.py new file mode 100644 index 000000000000..9f4c252c9aef --- /dev/null +++ b/applications/Chat/experimental/ray/ray_job_script.py @@ -0,0 +1,22 @@ +import sys + +from ray.job_submission import JobSubmissionClient + + +def main(api_server_endpoint="http://127.0.0.1:8265"): + client = JobSubmissionClient(api_server_endpoint) + client.submit_job( + entrypoint= + "python examples/train_prompts_on_ray.py --strategy colossalai_zero2 --prompt_csv_url https://huggingface.co/datasets/fka/awesome-chatgpt-prompts/resolve/main/prompts.csv", + runtime_env={ + "working_dir": + "../../", + "pip": [ + "torch==1.13.1", "transformers>=4.20.1", "datasets", "loralib", "colossalai>=0.2.4", "langchain", + "tokenizers", "fastapi", "sse_starlette", "wandb", "sentencepiece", "gpustat" + ] + }) + + +if __name__ == "__main__": + main(sys.argv[1]) diff --git a/applications/Chat/examples/train_prompts_on_ray.py b/applications/Chat/experimental/ray/train_prompts_on_ray.py similarity index 98% rename from applications/Chat/examples/train_prompts_on_ray.py rename to applications/Chat/experimental/ray/train_prompts_on_ray.py index 20b7efa3bdaa..289330ad8415 100644 --- a/applications/Chat/examples/train_prompts_on_ray.py +++ b/applications/Chat/experimental/ray/train_prompts_on_ray.py @@ -27,8 +27,8 @@ class ExperienceCompositionRefs: - def __init__(self, sequences_attention_mask_action_mask_ref, action_log_probs_ref, base_action_log_probs_ref, - value_ref, r_ref) -> None: + def __init__(self, sequences_attention_mask_action_mask_ref: ray.ObjectRef, action_log_probs_ref: ray.ObjectRef, + base_action_log_probs_ref: ray.ObjectRef, value_ref: ray.ObjectRef, r_ref: ray.ObjectRef) -> None: self.sequences_attention_mask_action_mask_ref = sequences_attention_mask_action_mask_ref self.action_log_probs_ref = action_log_probs_ref self.base_action_log_probs_ref = base_action_log_probs_ref @@ -515,8 +515,8 @@ def main(args): if time % update_timesteps == 0: experience_refs = [] # calculate experiences - for i in range(len(experience_composition_refs)): - exp_composition_ref = experience_composition_refs[i] + for i, experience_composition_ref in enumerate(experience_composition_refs): + exp_composition_ref = experience_composition_ref selected_ray_actor = all_ray_actors[i % num_ray_actors] experience_refs.append(selected_ray_actor.make_experience.remote(exp_composition_ref)) # backward From e4de9a78938ba6de4137b37ecdb5f4f939c3674c Mon Sep 17 00:00:00 2001 From: jiangwen Date: Mon, 3 Apr 2023 22:54:40 +0800 Subject: [PATCH 11/13] [fix]add readme --- applications/Chat/experimental/ray/README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 applications/Chat/experimental/ray/README.md diff --git a/applications/Chat/experimental/ray/README.md b/applications/Chat/experimental/ray/README.md new file mode 100644 index 000000000000..faf42c2ba056 --- /dev/null +++ b/applications/Chat/experimental/ray/README.md @@ -0,0 +1,18 @@ +# ColossalAI on Ray +## Abstract +This is an experimental effort to run ColossalAI Chat training on Ray +## How to use? +### 1. Setup Ray clusters +Please follow the official [Ray cluster setup instructions](https://docs.ray.io/en/latest/cluster/getting-started.html) to setup an cluster with GPU support. Record the cluster's api server endpoint, it should be something similar to http://your.head.node.addrees:8265 +### 2. Clone repo +Clone this project: +```shell +git clone https://github.com/hpcaitech/ColossalAI.git +``` +### 3. Submit the ray job +```shell +cd applications/Chat/experimental/ray +python ray_job_script.py http://your.head.node.addrees:8265 +``` +### 4. View your job on the Ray Dashboard +Open your ray cluster dashboard http://your.head.node.addrees:8265 to view your submitted training job. From f200cebaed3fc944136823d3f579b48abff1c74a Mon Sep 17 00:00:00 2001 From: jiangwen Date: Mon, 3 Apr 2023 23:05:52 +0800 Subject: [PATCH 12/13] [fix]file path --- applications/Chat/experimental/ray/ray_job_script.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/applications/Chat/experimental/ray/ray_job_script.py b/applications/Chat/experimental/ray/ray_job_script.py index 9f4c252c9aef..ee34c3681ca3 100644 --- a/applications/Chat/experimental/ray/ray_job_script.py +++ b/applications/Chat/experimental/ray/ray_job_script.py @@ -7,7 +7,7 @@ def main(api_server_endpoint="http://127.0.0.1:8265"): client = JobSubmissionClient(api_server_endpoint) client.submit_job( entrypoint= - "python examples/train_prompts_on_ray.py --strategy colossalai_zero2 --prompt_csv_url https://huggingface.co/datasets/fka/awesome-chatgpt-prompts/resolve/main/prompts.csv", + "python experimental/ray/train_prompts_on_ray.py --strategy colossalai_zero2 --prompt_csv_url https://huggingface.co/datasets/fka/awesome-chatgpt-prompts/resolve/main/prompts.csv", runtime_env={ "working_dir": "../../", From 4241a7c0854f9c8710fb60a26a8c3de35b93234c Mon Sep 17 00:00:00 2001 From: jiangwen Date: Thu, 13 Apr 2023 17:37:29 +0800 Subject: [PATCH 13/13] [fix]move directory --- applications/Chat/examples/community/README.md | 5 +++-- .../Chat/{experimental => examples/community}/ray/README.md | 3 +-- .../community}/ray/ray_job_script.py | 2 +- .../community}/ray/train_prompts_on_ray.py | 0 4 files changed, 5 insertions(+), 5 deletions(-) rename applications/Chat/{experimental => examples/community}/ray/README.md (87%) rename applications/Chat/{experimental => examples/community}/ray/ray_job_script.py (95%) rename applications/Chat/{experimental => examples/community}/ray/train_prompts_on_ray.py (100%) diff --git a/applications/Chat/examples/community/README.md b/applications/Chat/examples/community/README.md index 905418892611..c9c645032288 100644 --- a/applications/Chat/examples/community/README.md +++ b/applications/Chat/examples/community/README.md @@ -1,6 +1,6 @@ # Community Examples --- -We are thrilled to announce the latest updates to ColossalChat, an open-source solution for cloning ChatGPT with a complete RLHF (Reinforcement Learning with Human Feedback) pipeline. +We are thrilled to announce the latest updates to ColossalChat, an open-source solution for cloning ChatGPT with a complete RLHF (Reinforcement Learning with Human Feedback) pipeline. As Colossal-AI undergoes major updates, we are actively maintaining ColossalChat to stay aligned with the project's progress. With the introduction of Community-driven example, we aim to create a collaborative platform for developers to contribute exotic features built on top of ColossalChat. @@ -16,7 +16,8 @@ Community examples consist of both inference and training examples that have bee | Example | Description | Code Example | Colab | Author | |:---------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------:| -| Peft | Adding Peft support for SFT and Prompts model training | [Huggingface Peft](https://github.com/hpcaitech/ColossalAI/tree/main/applications/Chat/examples/community/peft) | - | [YY Lin](https://github.com/yynil) | +| Peft | Adding Peft support for SFT and Prompts model training | [Huggingface Peft](https://github.com/hpcaitech/ColossalAI/tree/main/applications/Chat/examples/community/peft) | - | [YY Lin](https://github.com/yynil) | +| Train prompts on Ray | A Ray based implementation of Train prompts example | [Huggingface Peft](https://github.com/hpcaitech/ColossalAI/tree/main/applications/Chat/examples/community/ray) | - | [MisterLin1995](https://github.com/MisterLin1995) | |...|...|...|...|...| ### How to get involved diff --git a/applications/Chat/experimental/ray/README.md b/applications/Chat/examples/community/ray/README.md similarity index 87% rename from applications/Chat/experimental/ray/README.md rename to applications/Chat/examples/community/ray/README.md index faf42c2ba056..64360bd73ddc 100644 --- a/applications/Chat/experimental/ray/README.md +++ b/applications/Chat/examples/community/ray/README.md @@ -11,8 +11,7 @@ git clone https://github.com/hpcaitech/ColossalAI.git ``` ### 3. Submit the ray job ```shell -cd applications/Chat/experimental/ray -python ray_job_script.py http://your.head.node.addrees:8265 +python applications/Chat/examples/community/ray/ray_job_script.py http://your.head.node.addrees:8265 ``` ### 4. View your job on the Ray Dashboard Open your ray cluster dashboard http://your.head.node.addrees:8265 to view your submitted training job. diff --git a/applications/Chat/experimental/ray/ray_job_script.py b/applications/Chat/examples/community/ray/ray_job_script.py similarity index 95% rename from applications/Chat/experimental/ray/ray_job_script.py rename to applications/Chat/examples/community/ray/ray_job_script.py index ee34c3681ca3..53f304d379fe 100644 --- a/applications/Chat/experimental/ray/ray_job_script.py +++ b/applications/Chat/examples/community/ray/ray_job_script.py @@ -10,7 +10,7 @@ def main(api_server_endpoint="http://127.0.0.1:8265"): "python experimental/ray/train_prompts_on_ray.py --strategy colossalai_zero2 --prompt_csv_url https://huggingface.co/datasets/fka/awesome-chatgpt-prompts/resolve/main/prompts.csv", runtime_env={ "working_dir": - "../../", + "applications/Chat", "pip": [ "torch==1.13.1", "transformers>=4.20.1", "datasets", "loralib", "colossalai>=0.2.4", "langchain", "tokenizers", "fastapi", "sse_starlette", "wandb", "sentencepiece", "gpustat" diff --git a/applications/Chat/experimental/ray/train_prompts_on_ray.py b/applications/Chat/examples/community/ray/train_prompts_on_ray.py similarity index 100% rename from applications/Chat/experimental/ray/train_prompts_on_ray.py rename to applications/Chat/examples/community/ray/train_prompts_on_ray.py