From 3e5f432c33413f5ea3943e9383573b6409ef128e Mon Sep 17 00:00:00 2001 From: CWHer Date: Tue, 13 Jun 2023 15:59:39 +0800 Subject: [PATCH 01/25] refactor: adapt boost API in base and naive strategies --- .../Chat/coati/trainer/strategies/base.py | 75 ++++++++++++------- .../Chat/coati/trainer/strategies/naive.py | 42 +++-------- 2 files changed, 59 insertions(+), 58 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index 06f81f21ab26..5a26189e08bc 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -3,12 +3,16 @@ from typing import Any, List, Optional, Tuple, Union import torch +import torch.distributed as dist import torch.nn as nn from coati.replay_buffer import ReplayBuffer from torch.optim import Optimizer from torch.utils.data import DataLoader from transformers.tokenization_utils_base import PreTrainedTokenizerBase +from colossalai.booster import Booster +from colossalai.booster.plugin import Plugin + from .sampler import DistributedSampler ModelOptimPair = Tuple[nn.Module, Optimizer] @@ -20,29 +24,35 @@ class Strategy(ABC): Base class for training strategies. """ - def __init__(self) -> None: + def __init__(self, plugin: Optional[Plugin] = None) -> None: super().__init__() self.setup_distributed() + self.booster = Booster(plugin=plugin) - @abstractmethod def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: Optimizer, **kwargs) -> None: - pass + self.booster.backward(loss, optimizer) - @abstractmethod def optimizer_step(self, optimizer: Optimizer, **kwargs) -> None: - pass + optimizer.step() @abstractmethod def setup_distributed(self) -> None: pass - @abstractmethod def setup_model(self, model: nn.Module) -> nn.Module: - pass + raise NotImplementedError() - @abstractmethod def setup_optimizer(self, optimizer: Optimizer, model: nn.Module) -> Optimizer: - pass + raise NotImplementedError() + + def setup_model_optimizer(self, + model: nn.Module, + optimizer: Optimizer + ) -> Tuple[nn.Module, Optimizer]: + # NOTE: Booster.boost must take in both model and optimizer + model, optimizer, *_ = self.booster.boost(model=model, + optimizer=optimizer) + return model, optimizer @abstractmethod def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False) -> DataLoader: @@ -72,12 +82,10 @@ def prepare( for arg in models_or_model_optim_pairs: if isinstance(arg, tuple): assert len(arg) == 2, f'Expect (model, optimizer) pair, got a tuple with size "{len(arg)}"' - model, optimizer = arg - model = self.setup_model(model) - optimizer = self.setup_optimizer(optimizer, model) + model, optimizer = self.setup_model_optimizer(*arg) rets.append((model, optimizer)) elif isinstance(arg, nn.Module): - rets.append(self.setup_model(model)) + rets.append(self.setup_model(arg)) else: raise RuntimeError(f'Expect model or (model, optimizer) pair, got {type(arg)}') @@ -97,23 +105,34 @@ def unwrap_model(model: nn.Module) -> nn.Module: """ return model - @abstractmethod - def save_model(self, model: nn.Module, path: str, only_rank0: bool = True) -> None: - pass - - @abstractmethod - def load_model(self, model: nn.Module, path: str, map_location: Any = None, strict: bool = True) -> None: - pass - - @abstractmethod - def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = False) -> None: - pass - - @abstractmethod - def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = None) -> None: - pass + def save_model(self, + model: nn.Module, + path: str, + only_rank0: bool = True, + **kwargs + ) -> None: + if only_rank0 and dist.get_rank() != 0: + return + self.booster.save_model(model, path, **kwargs) + + def load_model(self, model: nn.Module, path: str, strict: bool = True) -> None: + self.booster.load_model(model, path, strict) + + def save_optimizer(self, + optimizer: Optimizer, + path: str, + only_rank0: bool = False, + **kwargs + ) -> None: + if only_rank0 and dist.get_rank() != 0: + return + self.booster.save_optimizer(optimizer, path, **kwargs) + + def load_optimizer(self, optimizer: Optimizer, path: str) -> None: + self.booster.load_optimizer(optimizer, path) def setup_sampler(self, dataset) -> DistributedSampler: + # FIXME(cwher): this is only invoked in train_on_ray, not tested after adapt Boost API. return DistributedSampler(dataset, 1, 0) @abstractmethod diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index 202c480e06d9..4ae5e44940d5 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -1,17 +1,12 @@ import os -import sys from collections import OrderedDict -from typing import Any, Dict, Optional +from typing import Optional import torch import torch.distributed as dist import torch.nn as nn -import torch.optim as optim from coati.models.base import get_base_model from coati.replay_buffer import ReplayBuffer -from coati.models.base import RewardModel -from coati.models.lora import LoraLinear -from coati.replay_buffer import ReplayBuffer from torch.optim import Optimizer from torch.utils.data import DataLoader from transformers.modeling_utils import PreTrainedModel @@ -34,19 +29,21 @@ class NaiveStrategy(Strategy): Strategy for single GPU. No parallelism is used. """ - def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: optim.Optimizer, **kwargs) -> None: - loss.backward() - - def optimizer_step(self, optimizer: optim.Optimizer, **kwargs) -> None: - optimizer.step() + def __init__(self) -> None: + self.plugin = None + super().__init__(self.plugin) def setup_distributed(self) -> None: self._try_init_dist(force=False) + def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: Optimizer, **kwargs) -> None: + # HACK: torch.Adam isn't working properly within booster + loss.backward() + def setup_model(self, model: nn.Module) -> nn.Module: return model - def setup_optimizer(self, optimizer: optim.Optimizer, model: nn.Module) -> optim.Optimizer: + def setup_optimizer(self, optimizer: Optimizer, model: nn.Module) -> Optimizer: return optimizer def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False) -> DataLoader: @@ -57,30 +54,15 @@ def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False pin_memory=pin_memory, collate_fn=replay_buffer.collate_fn) - def save_model(self, model: nn.Module, path: str, only_rank0: bool = True) -> None: - state_dict = model.state_dict() - torch.save(state_dict, path) - - def load_model(self, model: nn.Module, path: str, map_location: Any = None, strict: bool = True) -> None: - unwrapped_model = self.unwrap_model(model) - state_dict = torch.load(path, map_location=map_location) - unwrapped_model.load_state_dict(state_dict, strict=strict) - - def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = False) -> None: - torch.save(optimizer.state_dict(), path) - - def load_optimizer(self, optimizer: Optimizer, path: str, map_location: Any = None) -> None: - state_dict = torch.load(path, map_location=map_location) - optimizer.load_state_dict(state_dict) - def save_pretrained(self, model: nn.Module, path: str, only_rank0: bool = True, tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: unwrapped_model = self.unwrap_model(model) - assert isinstance(unwrapped_model, PreTrainedModel) - unwrapped_model.save_pretrained(path) + hf_model = get_base_model(unwrapped_model) + assert isinstance(hf_model, PreTrainedModel) + hf_model.save_pretrained(path) if tokenizer is not None: tokenizer.save_pretrained(path) From ab3a855b6819953357cf35ee2b57e7b8a2d3896a Mon Sep 17 00:00:00 2001 From: CWHer Date: Tue, 13 Jun 2023 16:10:19 +0800 Subject: [PATCH 02/25] fix: initialize plugin after setup_distributed --- applications/Chat/coati/trainer/strategies/base.py | 8 +++++--- applications/Chat/coati/trainer/strategies/naive.py | 4 ---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index 5a26189e08bc..9674e48e3316 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from contextlib import nullcontext -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Callable, List, Optional, Tuple, Union import torch import torch.distributed as dist @@ -24,10 +24,12 @@ class Strategy(ABC): Base class for training strategies. """ - def __init__(self, plugin: Optional[Plugin] = None) -> None: + def __init__(self, plugin_initializer: Callable[..., Optional[Plugin]] = lambda: None) -> None: super().__init__() self.setup_distributed() - self.booster = Booster(plugin=plugin) + # NOTE: plugin should be initialized after distributed setup + self.plugin = plugin_initializer() + self.booster = Booster(plugin=self.plugin) def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: Optimizer, **kwargs) -> None: self.booster.backward(loss, optimizer) diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index 4ae5e44940d5..2f7120955a2d 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -29,10 +29,6 @@ class NaiveStrategy(Strategy): Strategy for single GPU. No parallelism is used. """ - def __init__(self) -> None: - self.plugin = None - super().__init__(self.plugin) - def setup_distributed(self) -> None: self._try_init_dist(force=False) From 3ca33eff373b52358cef30a1b99096175113a6ca Mon Sep 17 00:00:00 2001 From: CWHer Date: Tue, 13 Jun 2023 16:35:25 +0800 Subject: [PATCH 03/25] fix: fix save_pretrained fn --- applications/Chat/coati/trainer/strategies/naive.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index 2f7120955a2d..7a7fae9949b9 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -5,7 +5,6 @@ import torch import torch.distributed as dist import torch.nn as nn -from coati.models.base import get_base_model from coati.replay_buffer import ReplayBuffer from torch.optim import Optimizer from torch.utils.data import DataLoader @@ -56,9 +55,8 @@ def save_pretrained(self, only_rank0: bool = True, tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: unwrapped_model = self.unwrap_model(model) - hf_model = get_base_model(unwrapped_model) - assert isinstance(hf_model, PreTrainedModel) - hf_model.save_pretrained(path) + assert isinstance(unwrapped_model, PreTrainedModel) + unwrapped_model.save_pretrained(path) if tokenizer is not None: tokenizer.save_pretrained(path) From 40cebd3510d08ac881f145e86b4304999a5fbcf8 Mon Sep 17 00:00:00 2001 From: CWHer Date: Tue, 13 Jun 2023 17:23:52 +0800 Subject: [PATCH 04/25] refactor: adapt boost API in DDPStrategy --- .../Chat/coati/trainer/strategies/ddp.py | 49 ++++++------------- 1 file changed, 16 insertions(+), 33 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/ddp.py b/applications/Chat/coati/trainer/strategies/ddp.py index 713d7b90c6f0..3ed7eef62193 100644 --- a/applications/Chat/coati/trainer/strategies/ddp.py +++ b/applications/Chat/coati/trainer/strategies/ddp.py @@ -1,4 +1,3 @@ -import os import random from typing import Optional @@ -8,10 +7,12 @@ import torch.nn as nn from coati.replay_buffer import ReplayBuffer from torch.nn.parallel import DistributedDataParallel as DDP -from torch.optim import Optimizer from torch.utils.data import DataLoader from transformers.tokenization_utils_base import PreTrainedTokenizerBase +from colossalai.booster.plugin import TorchDDPPlugin # This is a wrapper of torch.DDP +from colossalai.booster.plugin.torch_ddp_plugin import TorchDDPModel + from .naive import NaiveStrategy from .sampler import DistributedSampler @@ -23,7 +24,7 @@ class DDPStrategy(NaiveStrategy): def __init__(self, seed: int = 42) -> None: self.seed = seed - super().__init__() + super().__init__(plugin_initializer=TorchDDPPlugin) def setup_distributed(self) -> None: self._try_init_dist(force=True) @@ -34,43 +35,25 @@ def set_seed(self, seed: int) -> None: np.random.seed(seed) torch.manual_seed(seed) - def setup_model(self, model: nn.Module) -> nn.Module: - device = torch.cuda.current_device() - return DDP(model, device_ids=[device]) - def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False) -> DataLoader: # DDP only mode, replay buffers on each rank are different. - # sampler = DistributedSampler(replay_buffer, - # num_replicas=dist.get_world_size(), - # rank=dist.get_rank(), - # shuffle=True, - # seed=self.seed, - # drop_last=True) - return DataLoader( - replay_buffer, - batch_size=replay_buffer.sample_batch_size, - # sampler=sampler, - shuffle=True, - drop_last=True, - pin_memory=pin_memory, - collate_fn=replay_buffer.collate_fn) - - def save_model(self, model: nn.Module, path: str, only_rank0: bool = True) -> None: - if only_rank0 and dist.get_rank() != 0: - return - super().save_model(model, path, only_rank0) - - def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = False) -> None: - if only_rank0 and dist.get_rank() != 0: - return - super().save_optimizer(optimizer, path, only_rank0) + assert isinstance(self.plugin, TorchDDPPlugin), "self.plugin isn't initialized properly." + return self.plugin.prepare_dataloader(replay_buffer, + batch_size=replay_buffer.sample_batch_size, + shuffle=True, + drop_last=True, + pin_memory=pin_memory, + collate_fn=replay_buffer.collate_fn) def setup_sampler(self, dataset) -> DistributedSampler: + # FIXME(cwher): this is only invoked in train_on_ray, not tested after adapt Boost API. return DistributedSampler(dataset, dist.get_world_size(), dist.get_rank()) def unwrap_model(self, model: nn.Module) -> nn.Module: - assert isinstance(model, DDP) - return model.module + assert isinstance(model, TorchDDPModel), "model is not wrapped by TorchDDPModel." + ddp_model = model.module + assert isinstance(ddp_model, DDP), "TorchDDPModel should wrap a DDP model." + return ddp_model.module def save_pretrained(self, model: nn.Module, From 46f1a0563aba7412af4c9e979ddd7ded291eab52 Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 14 Jun 2023 13:45:34 +0800 Subject: [PATCH 05/25] to: add _post_init check --- .../Chat/coati/trainer/strategies/base.py | 5 +++++ .../Chat/coati/trainer/strategies/colossalai.py | 15 +++------------ applications/Chat/coati/trainer/strategies/ddp.py | 11 +++++++++-- .../Chat/coati/trainer/strategies/naive.py | 10 ++++------ 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index 9674e48e3316..864e18a67f68 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -30,6 +30,11 @@ def __init__(self, plugin_initializer: Callable[..., Optional[Plugin]] = lambda: # NOTE: plugin should be initialized after distributed setup self.plugin = plugin_initializer() self.booster = Booster(plugin=self.plugin) + self._post_init() + + @abstractmethod + def _post_init(self) -> None: + pass def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: Optimizer, **kwargs) -> None: self.booster.backward(loss, optimizer) diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index cfdab2806a25..ed638c3e956a 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -104,18 +104,9 @@ def __init__( if stage == 3: self.zero_optim_config = dict(gpu_margin_mem_ratio=gpu_margin_mem_ratio) else: - self.zero_optim_config = dict(reduce_bucket_size=reduce_bucket_size, - overlap_communication=overlap_communication, - cpu_offload=(placement_policy == 'cpu')) - self.optim_kwargs = dict(initial_scale=initial_scale, - growth_factor=growth_factor, - backoff_factor=backoff_factor, - growth_interval=growth_interval, - hysteresis=hysteresis, - min_scale=min_scale, - max_scale=max_scale, - max_norm=max_norm, - norm_type=norm_type) + def _post_init(self) -> None: + assert isinstance(self.plugin, (LowLevelZeroPlugin, GeminiPlugin)), \ + f'{type(self).__name__}\'s plugin is not initialized properly.' def setup_distributed(self) -> None: colossalai.launch_from_torch({}, seed=self.seed) diff --git a/applications/Chat/coati/trainer/strategies/ddp.py b/applications/Chat/coati/trainer/strategies/ddp.py index 3ed7eef62193..dda335ff8147 100644 --- a/applications/Chat/coati/trainer/strategies/ddp.py +++ b/applications/Chat/coati/trainer/strategies/ddp.py @@ -22,9 +22,16 @@ class DDPStrategy(NaiveStrategy): Strategy for distributed training using torch.distributed. """ - def __init__(self, seed: int = 42) -> None: + def __init__(self, + seed: int = 42, + plugin_initializer: Callable = TorchDDPPlugin + ) -> None: self.seed = seed - super().__init__(plugin_initializer=TorchDDPPlugin) + super().__init__(plugin_initializer) + + def _post_init(self) -> None: + assert isinstance(self.plugin, TorchDDPPlugin), \ + f'{type(self).__name__}\'s plugin is not initialized properly.' def setup_distributed(self) -> None: self._try_init_dist(force=True) diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index 7a7fae9949b9..3196d2f00938 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -28,6 +28,10 @@ class NaiveStrategy(Strategy): Strategy for single GPU. No parallelism is used. """ + def _post_init(self) -> None: + assert self.plugin is None, \ + f'{type(self).__name__}\'s plugin is not initialized properly.' + def setup_distributed(self) -> None: self._try_init_dist(force=False) @@ -35,12 +39,6 @@ def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: Optimizer, * # HACK: torch.Adam isn't working properly within booster loss.backward() - def setup_model(self, model: nn.Module) -> nn.Module: - return model - - def setup_optimizer(self, optimizer: Optimizer, model: nn.Module) -> Optimizer: - return optimizer - def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False) -> DataLoader: return DataLoader(replay_buffer, batch_size=replay_buffer.sample_batch_size, From bf1353119f62a98ff496aab8779c3580dde026df Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 14 Jun 2023 15:11:33 +0800 Subject: [PATCH 06/25] to: fix ddp backward, modify ddp dataloader and unwrap --- applications/Chat/coati/trainer/strategies/ddp.py | 12 ++++++------ applications/Chat/coati/trainer/strategies/naive.py | 4 +++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/ddp.py b/applications/Chat/coati/trainer/strategies/ddp.py index dda335ff8147..3ed49b229540 100644 --- a/applications/Chat/coati/trainer/strategies/ddp.py +++ b/applications/Chat/coati/trainer/strategies/ddp.py @@ -1,5 +1,5 @@ import random -from typing import Optional +from typing import Callable, Optional import numpy as np import torch @@ -7,6 +7,7 @@ import torch.nn as nn from coati.replay_buffer import ReplayBuffer from torch.nn.parallel import DistributedDataParallel as DDP +from torch.optim import Optimizer from torch.utils.data import DataLoader from transformers.tokenization_utils_base import PreTrainedTokenizerBase @@ -42,9 +43,10 @@ def set_seed(self, seed: int) -> None: np.random.seed(seed) torch.manual_seed(seed) + def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: Optimizer, **kwargs) -> None: + self.booster.backward(loss, optimizer) + def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False) -> DataLoader: - # DDP only mode, replay buffers on each rank are different. - assert isinstance(self.plugin, TorchDDPPlugin), "self.plugin isn't initialized properly." return self.plugin.prepare_dataloader(replay_buffer, batch_size=replay_buffer.sample_batch_size, shuffle=True, @@ -58,9 +60,7 @@ def setup_sampler(self, dataset) -> DistributedSampler: def unwrap_model(self, model: nn.Module) -> nn.Module: assert isinstance(model, TorchDDPModel), "model is not wrapped by TorchDDPModel." - ddp_model = model.module - assert isinstance(ddp_model, DDP), "TorchDDPModel should wrap a DDP model." - return ddp_model.module + return model.unwrap() def save_pretrained(self, model: nn.Module, diff --git a/applications/Chat/coati/trainer/strategies/naive.py b/applications/Chat/coati/trainer/strategies/naive.py index 3196d2f00938..d121237a68ea 100644 --- a/applications/Chat/coati/trainer/strategies/naive.py +++ b/applications/Chat/coati/trainer/strategies/naive.py @@ -36,7 +36,9 @@ def setup_distributed(self) -> None: self._try_init_dist(force=False) def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: Optimizer, **kwargs) -> None: - # HACK: torch.Adam isn't working properly within booster + # HACK: self.booster.backward(loss, optimizer) can't work if plugin is None, + # it would run `optimizer.backward(loss)`, which is not compatible with torch.optim.Optimizer + assert self.plugin is None, "DO NOT call this method if plugin is not None" loss.backward() def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False) -> DataLoader: From 76a70347a2e3a7bb247916197c732c0e2ef34518 Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 14 Jun 2023 15:19:22 +0800 Subject: [PATCH 07/25] feat: adapt boost API in ColossalAIStrategy --- .../coati/trainer/strategies/colossalai.py | 137 +++++++++--------- 1 file changed, 70 insertions(+), 67 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index ed638c3e956a..0895e0e57e98 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -1,24 +1,23 @@ +import functools import warnings -from typing import Optional, Union +from typing import Optional import torch import torch.distributed as dist import torch.nn as nn -import torch.optim as optim -from torch.optim import Optimizer from transformers.tokenization_utils_base import PreTrainedTokenizerBase import colossalai -from colossalai.logging import get_dist_logger -from colossalai.nn.optimizer import CPUAdam, HybridAdam +from colossalai.booster.plugin import GeminiPlugin, LowLevelZeroPlugin +from colossalai.booster.plugin.gemini_plugin import GeminiModel +from colossalai.booster.plugin.low_level_zero_plugin import LowLevelZeroModel from colossalai.tensor import ProcessGroup, ShardSpec from colossalai.utils import get_current_device -from colossalai.zero import ColoInitContext, ZeroDDP, zero_model_wrapper, zero_optim_wrapper +from colossalai.zero import ColoInitContext +from colossalai.zero.gemini.gemini_ddp import GeminiDDP from .ddp import DDPStrategy -logger = get_dist_logger(__name__) - class ColossalAIStrategy(DDPStrategy): """ @@ -62,7 +61,6 @@ def __init__( placement_policy: str = 'cuda', pin_memory: bool = True, # only for stage 3 force_outputs_fp32: bool = False, # only for stage 3 - scatter_after_inference: bool = False, # only for stage 3 search_range_mb: int = 32, # only for stage 3 hidden_dim: Optional[int] = None, # only for stage 3 min_chunk_size_mb: float = 32, # only for stage 3 @@ -78,32 +76,69 @@ def __init__( max_scale: float = 2**32, max_norm: float = 0.0, norm_type: float = 2.0) -> None: - super().__init__(seed) + + assert stage in (1, 2, 3), f'Unsupported stage "{stage}"' assert placement_policy in ('cpu', 'cuda'), f'Unsupported placement policy "{placement_policy}"' assert precision in ('fp32', 'fp16'), f'Unsupported precision "{precision}"' - self.stage = stage + # TODO(ver217): support shard_init when using from_pretrained() if shard_init: warnings.warn( - f'Shard init is not supported model.from_pretrained() yet. Please load weights after strategy.prepare()' + f'Shard init is not supported model.from_pretrained() yet. ' + 'Please load weights after strategy.prepare()' ) if stage == 3 and precision == 'fp32': warnings.warn(f'Stage 3 only supports fp16. Precision is set to fp16.') precision = 'fp16' self.precision = precision self.shard_init = shard_init - self.gemini_config = dict(device=get_current_device(), - placement_policy=placement_policy, - pin_memory=pin_memory, - force_outputs_fp32=force_outputs_fp32, - strict_ddp_mode=shard_init, - search_range_mb=search_range_mb, - hidden_dim=hidden_dim, - min_chunk_size_mb=min_chunk_size_mb, - scatter_after_inference=scatter_after_inference) + + optim_kwargs = dict( + initial_scale=initial_scale, + growth_factor=growth_factor, + backoff_factor=backoff_factor, + growth_interval=growth_interval, + hysteresis=hysteresis, + min_scale=min_scale, + max_scale=max_scale, + max_norm=max_norm, + norm_type=norm_type + ) + if stage == 3: - self.zero_optim_config = dict(gpu_margin_mem_ratio=gpu_margin_mem_ratio) + plugin_initializer = functools.partial( + GeminiPlugin, + # gemini_config + device=get_current_device(), + placement_policy=placement_policy, + precision=precision, + pin_memory=pin_memory, + force_outputs_fp32=force_outputs_fp32, + strict_ddp_mode=shard_init, + search_range_mb=search_range_mb, + hidden_dim=hidden_dim, + min_chunk_size_mb=min_chunk_size_mb, + # zero_optim_config + gpu_margin_mem_ratio=gpu_margin_mem_ratio, + # optim_config + **optim_kwargs + ) else: + plugin_initializer = functools.partial( + LowLevelZeroPlugin, + # zero_config + stage=stage, + precision=precision, + # zero_optim_config + reduce_bucket_size_in_m=reduce_bucket_size, + overlap_communication=overlap_communication, + cpu_offload=(placement_policy == 'cpu'), + # optim_config + **optim_kwargs + ) + + super().__init__(seed, plugin_initializer) + def _post_init(self) -> None: assert isinstance(self.plugin, (LowLevelZeroPlugin, GeminiPlugin)), \ f'{type(self).__name__}\'s plugin is not initialized properly.' @@ -112,7 +147,7 @@ def setup_distributed(self) -> None: colossalai.launch_from_torch({}, seed=self.seed) def model_init_context(self): - if self.stage == 3: + if isinstance(self.plugin, GeminiPlugin): world_size = dist.get_world_size() shard_pg = ProcessGroup(tp_degree=world_size) if self.shard_init else None default_dist_spec = ShardSpec([-1], [world_size]) if self.shard_init else None @@ -122,61 +157,29 @@ def model_init_context(self): default_dist_spec=default_dist_spec) return super().model_init_context() - def setup_model(self, model: nn.Module) -> nn.Module: - - model = zero_model_wrapper(model, zero_stage=self.stage, gemini_config=self.gemini_config) - - if self.stage != 3 and self.precision == 'fp16': - model = model.half().cuda() - return model - - def setup_optimizer(self, optimizer: optim.Optimizer, model: nn.Module) -> optim.Optimizer: - assert isinstance(optimizer, (CPUAdam, HybridAdam)), f'Unsupported optimizer {type(optimizer)}' - return zero_optim_wrapper(model, optimizer, optim_config=self.zero_optim_config, **self.optim_kwargs) - - def backward(self, loss: torch.Tensor, model: nn.Module, optimizer: optim.Optimizer, **kwargs) -> None: - optimizer.backward(loss) - - def optimizer_step(self, optimizer: optim.Optimizer, **kwargs) -> None: - optimizer.step() - - def save_model(self, model: nn.Module, path: str, only_rank0: bool = True) -> None: - if only_rank0 and dist.get_rank() != 0 and self.stage != 3: - return - if self.stage == 3: - assert isinstance(model, ZeroDDP) - # for stage 3, state_dict() method should be called on every rank - state_dict = model.state_dict(only_rank_0=only_rank0) - else: - # only_rank0 is false or rank == 0 - state_dict = model.state_dict() - if only_rank0 and dist.get_rank() != 0: - return - torch.save(state_dict, path) - - def save_optimizer(self, optimizer: Optimizer, path: str, only_rank0: bool = False) -> None: - if only_rank0: - raise RuntimeError( - f'Optimizer states are sharded when using ColossalAIStrategy. Only rank0 is not supported.') - torch.save(optimizer.state_dict(), path) - def unwrap_model(self, model: nn.Module) -> nn.Module: - if self.stage == 3: - assert isinstance(model, ZeroDDP) + if isinstance(self.plugin, GeminiPlugin): + assert isinstance(model, GeminiModel) + ddp_model = model.unwrap() + assert isinstance(ddp_model, GeminiDDP) + return ddp_model.module + elif isinstance(self.plugin, LowLevelZeroPlugin): + assert isinstance(model, LowLevelZeroModel) return model.module - return model + else: + raise RuntimeError(f'Unsupported plugin {type(self.plugin)}') def save_pretrained(self, model: nn.Module, path: str, only_rank0: bool = True, tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None: - if self.stage == 3: + if isinstance(self.plugin, GeminiPlugin): raise RuntimeError('ColossalAI strategy with stage-3 does not support save_pretrained() now') super().save_pretrained(model, path, only_rank0, tokenizer) def get_model_state_dict_shard(self, model: nn.Module, **config): - if self.stage != 3: + if not isinstance(self.plugin, GeminiPlugin): yield from super().get_model_state_dict_shard(model, **config) else: # unwrapped_model = self._unwrap_model(model) @@ -184,5 +187,5 @@ def get_model_state_dict_shard(self, model: nn.Module, **config): # if isinstance(module, LoraLinear): # module.merge_weights = True # module.eval() - assert isinstance(model, ZeroDDP) + assert isinstance(model, LowLevelZeroModel) yield from model.state_dict_shard(max_shard_size=1024, only_rank_0=False) From 6414f7fd695a605981cbfe2da89d86d1cc33d72b Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 14 Jun 2023 17:23:34 +0800 Subject: [PATCH 08/25] fix: call setup_distributed before use get_current_device --- applications/Chat/coati/trainer/strategies/base.py | 5 +++-- applications/Chat/coati/trainer/strategies/colossalai.py | 8 +++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index 864e18a67f68..947fbc034049 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -26,8 +26,9 @@ class Strategy(ABC): def __init__(self, plugin_initializer: Callable[..., Optional[Plugin]] = lambda: None) -> None: super().__init__() - self.setup_distributed() - # NOTE: plugin should be initialized after distributed setup + if not dist.is_initialized(): + # NOTE: dist must be initialized before Booster + self.setup_distributed() self.plugin = plugin_initializer() self.booster = Booster(plugin=self.plugin) self._post_init() diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index 0895e0e57e98..823fea1ad9cd 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -93,6 +93,9 @@ def __init__( self.precision = precision self.shard_init = shard_init + # NOTE: dist should be initialized before calling get_current_device() + self.setup_distributed(seed) + optim_kwargs = dict( initial_scale=initial_scale, growth_factor=growth_factor, @@ -104,7 +107,6 @@ def __init__( max_norm=max_norm, norm_type=norm_type ) - if stage == 3: plugin_initializer = functools.partial( GeminiPlugin, @@ -143,8 +145,8 @@ def _post_init(self) -> None: assert isinstance(self.plugin, (LowLevelZeroPlugin, GeminiPlugin)), \ f'{type(self).__name__}\'s plugin is not initialized properly.' - def setup_distributed(self) -> None: - colossalai.launch_from_torch({}, seed=self.seed) + def setup_distributed(self, seed: int) -> None: + colossalai.launch_from_torch({}, seed=seed) def model_init_context(self): if isinstance(self.plugin, GeminiPlugin): From 476cb72c1dca428558f3208f554e63e9d1862525 Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 14 Jun 2023 17:58:55 +0800 Subject: [PATCH 09/25] fix: fix save_model and save_optimizer --- applications/Chat/coati/trainer/strategies/base.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index 947fbc034049..55bb561b2e3d 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -119,9 +119,7 @@ def save_model(self, only_rank0: bool = True, **kwargs ) -> None: - if only_rank0 and dist.get_rank() != 0: - return - self.booster.save_model(model, path, **kwargs) + self.booster.save_model(model, path, shard=not only_rank0, **kwargs) def load_model(self, model: nn.Module, path: str, strict: bool = True) -> None: self.booster.load_model(model, path, strict) @@ -132,9 +130,7 @@ def save_optimizer(self, only_rank0: bool = False, **kwargs ) -> None: - if only_rank0 and dist.get_rank() != 0: - return - self.booster.save_optimizer(optimizer, path, **kwargs) + self.booster.save_optimizer(optimizer, path, shard=not only_rank0, **kwargs) def load_optimizer(self, optimizer: Optimizer, path: str) -> None: self.booster.load_optimizer(optimizer, path) From e1a86f176132371ebee3e1b597cce66655f0bb26 Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 14 Jun 2023 18:00:09 +0800 Subject: [PATCH 10/25] test: remove save_sharded_optimizer test --- applications/Chat/tests/test_checkpoint.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/applications/Chat/tests/test_checkpoint.py b/applications/Chat/tests/test_checkpoint.py index d93a5c94d8ea..cfa39e44b476 100644 --- a/applications/Chat/tests/test_checkpoint.py +++ b/applications/Chat/tests/test_checkpoint.py @@ -60,10 +60,15 @@ def run_step(): rank0_dirname = rank0_dirname[0] model_path = os.path.join(rank0_dirname, 'model.pt') - optim_path = os.path.join(rank0_dirname, f'optim-r{dist.get_rank()}.pt') - strategy.save_model(actor, model_path, only_rank0=True) - strategy.save_optimizer(actor_optim, optim_path, only_rank0=False) + + optim_path = os.path.join(rank0_dirname, f'optim.pt') + strategy.save_optimizer(actor_optim, optim_path, only_rank0=True) + + # FIXME(cwher): Sharded optimizer checkpoint is not supported yet. + # at "ColossalAI/colossalai/checkpoint_io/general_checkpoint_io.py", line 62 + # optim_path = os.path.join(rank0_dirname, f'optim-r{dist.get_rank()}.pt') + # strategy.save_optimizer(actor_optim, optim_path, only_rank0=False) dist.barrier() From 6b435c360d7967aed7698fecda9b5b9eb0a62cb6 Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 14 Jun 2023 18:06:34 +0800 Subject: [PATCH 11/25] style: apply formatter --- applications/Chat/examples/test_ci.sh | 139 +++++++++++++------------- 1 file changed, 71 insertions(+), 68 deletions(-) diff --git a/applications/Chat/examples/test_ci.sh b/applications/Chat/examples/test_ci.sh index ac3a9b507864..c4be3356e45b 100755 --- a/applications/Chat/examples/test_ci.sh +++ b/applications/Chat/examples/test_ci.sh @@ -28,107 +28,110 @@ wandb init -m offline # train sft torchrun --standalone --nproc_per_node=4 ${BASE}/train_sft.py --pretrain 'bigscience/bloom-560m' \ - --model 'bloom' --strategy colossalai_zero2 --lora_rank 4\ - --dataset $SFT_DATASET --max_datasets_size 512 --max_epochs 1 \ - --save_path ${BASE}/output + --model 'bloom' --strategy colossalai_zero2 --lora_rank 4 \ + --dataset $SFT_DATASET --max_datasets_size 512 --max_epochs 1 \ + --save_path ${BASE}/output rm -rf ${BASE}/output torchrun --standalone --nproc_per_node=4 ${BASE}/train_sft.py --pretrain 'gpt2' \ - --model 'gpt2' --strategy colossalai_zero2 \ - --dataset $SFT_DATASET --max_datasets_size 512 --max_epochs 1 \ - --save_path ${BASE}/output + --model 'gpt2' --strategy colossalai_zero2 \ + --dataset $SFT_DATASET --max_datasets_size 512 --max_epochs 1 \ + --save_path ${BASE}/output rm -rf ${BASE}/output torchrun --standalone --nproc_per_node=4 ${BASE}/train_sft.py --pretrain 'facebook/opt-350m' \ - --model 'opt' --strategy colossalai_zero2 --lora_rank 4\ - --dataset $SFT_DATASET --max_datasets_size 512 --max_epochs 1 \ - --save_path ${BASE}/output + --model 'opt' --strategy colossalai_zero2 --lora_rank 4 \ + --dataset $SFT_DATASET --max_datasets_size 512 --max_epochs 1 \ + --save_path ${BASE}/output rm -rf ${BASE}/output torchrun --standalone --nproc_per_node=4 ${BASE}/train_sft.py --pretrain 'gpt2' \ - --model 'gpt2' --strategy ddp --lora_rank 4\ - --dataset $SFT_DATASET --max_datasets_size 512 --max_epochs 1 \ - --save_path ${BASE}/output + --model 'gpt2' --strategy ddp --lora_rank 4 \ + --dataset $SFT_DATASET --max_datasets_size 512 --max_epochs 1 \ + --save_path ${BASE}/output -#torchrun --standalone --nproc_per_node=4 ${BASE}/train_sft.py --pretrain 'facebook/opt-350m' \ -# --model 'opt' --strategy naive \ -# --dataset $SFT_DATASET --max_datasets_size 512 --max_epochs 1 \ -# --save_path ${BASE}/output +# torchrun --standalone --nproc_per_node=4 ${BASE}/train_sft.py --pretrain 'facebook/opt-350m' \ +# --model 'opt' --strategy naive \ +# --dataset $SFT_DATASET --max_datasets_size 512 --max_epochs 1 \ +# --save_path ${BASE}/output rm -rf ${BASE}/output # train rm torchrun --standalone --nproc_per_node=2 ${BASE}/train_reward_model.py \ - --pretrain 'facebook/opt-350m' --model 'opt' \ - --strategy colossalai_zero2 --loss_fn 'log_sig'\ - --dataset 'Anthropic/hh-rlhf' --subset 'harmless-base' \ - --test True --lora_rank 0 \ - --save_path ${BASE}/rm_ckpt_opt.pt + --pretrain 'facebook/opt-350m' --model 'opt' \ + --strategy colossalai_zero2 --loss_fn 'log_sig' \ + --dataset 'Anthropic/hh-rlhf' --subset 'harmless-base' \ + --test True --lora_rank 0 \ + --save_path ${BASE}/rm_ckpt_opt.pt torchrun --standalone --nproc_per_node=2 ${BASE}/train_reward_model.py \ - --pretrain 'gpt2' --model 'gpt2' \ - --strategy colossalai_zero2 --loss_fn 'log_exp' \ - --dataset 'Dahoas/rm-static' \ - --test True --lora_rank 0 \ - --save_path ${BASE}/rm_ckpt_gpt.pt + --pretrain 'gpt2' --model 'gpt2' \ + --strategy colossalai_zero2 --loss_fn 'log_exp' \ + --dataset 'Dahoas/rm-static' \ + --test True --lora_rank 0 \ + --save_path ${BASE}/rm_ckpt_gpt.pt torchrun --standalone --nproc_per_node=2 ${BASE}/train_reward_model.py \ - --pretrain 'gpt2' --model 'gpt2' \ - --strategy ddp --loss_fn 'log_exp' \ - --dataset 'Dahoas/rm-static' \ - --test True --lora_rank 4 \ - --save_path ${BASE}/rm_ckpt.pt + --pretrain 'gpt2' --model 'gpt2' \ + --strategy ddp --loss_fn 'log_exp' \ + --dataset 'Dahoas/rm-static' \ + --test True --lora_rank 4 \ + --save_path ${BASE}/rm_ckpt.pt rm -rf ${BASE}/rm_ckpt.pt torchrun --standalone --nproc_per_node=2 ${BASE}/train_reward_model.py \ - --pretrain 'bigscience/bloom-560m' --model 'bloom' \ - --strategy colossalai_zero2 --loss_fn 'log_sig' \ - --dataset 'Anthropic/hh-rlhf' --subset 'harmless-base' \ - --test True --lora_rank 4 \ - --save_path ${BASE}/rm_ckpt.pt + --pretrain 'bigscience/bloom-560m' --model 'bloom' \ + --strategy colossalai_zero2 --loss_fn 'log_sig' \ + --dataset 'Anthropic/hh-rlhf' --subset 'harmless-base' \ + --test True --lora_rank 4 \ + --save_path ${BASE}/rm_ckpt.pt rm -rf ${BASE}/rm_ckpt.pt torchrun --standalone --nproc_per_node=2 ${BASE}/train_reward_model.py \ - --pretrain 'microsoft/deberta-v3-large' --model 'deberta' \ - --strategy colossalai_zero2 --loss_fn 'log_sig' \ - --dataset 'Anthropic/hh-rlhf' --subset 'harmless-base' \ - --test True --lora_rank 4 \ - --save_path ${BASE}/rm_ckpt.pt + --pretrain 'microsoft/deberta-v3-large' --model 'deberta' \ + --strategy colossalai_zero2 --loss_fn 'log_sig' \ + --dataset 'Anthropic/hh-rlhf' --subset 'harmless-base' \ + --test True --lora_rank 4 \ + --save_path ${BASE}/rm_ckpt.pt rm -rf ${BASE}/rm_ckpt.pt torchrun --standalone --nproc_per_node=2 ${BASE}/train_reward_model.py \ - --pretrain 'roberta-base' --model 'roberta' \ - --strategy colossalai_zero2 --loss_fn 'log_exp'\ - --dataset 'Anthropic/hh-rlhf' --subset 'harmless-base'\ - --test True --lora_rank 4 \ - --save_path ${BASE}/rm_ckpt.pt + --pretrain 'roberta-base' --model 'roberta' \ + --strategy colossalai_zero2 --loss_fn 'log_exp' \ + --dataset 'Anthropic/hh-rlhf' --subset 'harmless-base' \ + --test True --lora_rank 4 \ + --save_path ${BASE}/rm_ckpt.pt rm -rf ${BASE}/rm_ckpt.pt -torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ - --strategy colossalai_zero2 --num_episodes 1 --max_timesteps 2 \ - --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ - --pretrain 'facebook/opt-350m' --model opt \ - --rm_pretrain 'facebook/opt-350m' \ - --rm_path ${BASE}/rm_ckpt_opt.pt \ - --save_path ${BASE}/actor_checkpoint_prompts.pt +torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ + --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ + --strategy colossalai_zero2 --num_episodes 1 --max_timesteps 2 \ + --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ + --pretrain 'facebook/opt-350m' --model opt \ + --rm_pretrain 'facebook/opt-350m' \ + --rm_path ${BASE}/rm_ckpt_opt.pt \ + --save_path ${BASE}/actor_checkpoint_prompts.pt rm -rf ${BASE}/rm_ckpt_opt.pt -torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ - --strategy colossalai_zero2 --num_episodes 1 --max_timesteps 2 \ - --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ - --pretrain 'gpt2' --model gpt2 \ - --rm_pretrain 'gpt2' \ - --rm_path ${BASE}/rm_ckpt_gpt.pt \ - --save_path ${BASE}/actor_checkpoint_prompts.pt - -torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ - --strategy colossalai_gemini --num_episodes 1 --max_timesteps 2 \ - --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ - --pretrain 'gpt2' --model gpt2 \ - --rm_pretrain 'gpt2' \ - --rm_path ${BASE}/rm_ckpt_gpt.pt \ - --save_path ${BASE}/actor_checkpoint_prompts.pt +torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ + --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ + --strategy colossalai_zero2 --num_episodes 1 --max_timesteps 2 \ + --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ + --pretrain 'gpt2' --model gpt2 \ + --rm_pretrain 'gpt2' \ + --rm_path ${BASE}/rm_ckpt_gpt.pt \ + --save_path ${BASE}/actor_checkpoint_prompts.pt + +torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ + --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ + --strategy colossalai_gemini --num_episodes 1 --max_timesteps 2 \ + --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ + --pretrain 'gpt2' --model gpt2 \ + --rm_pretrain 'gpt2' \ + --rm_path ${BASE}/rm_ckpt_gpt.pt \ + --save_path ${BASE}/actor_checkpoint_prompts.pt rm -rf ${BASE}/rm_ckpt_gpt.pt rm -rf ${BASE}/actor_checkpoint_prompts.pt From 8e97777b71787663680994d07f2c991eba003a1d Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 14 Jun 2023 18:27:16 +0800 Subject: [PATCH 12/25] fix: fix stage check and add comments --- applications/Chat/benchmarks/benchmark_opt_lora_dummy.py | 6 ++++-- applications/Chat/coati/trainer/ppo.py | 2 ++ applications/Chat/coati/trainer/sft.py | 6 ++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py b/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py index 7a47624f74d8..dea7ebc60a8b 100644 --- a/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py +++ b/applications/Chat/benchmarks/benchmark_opt_lora_dummy.py @@ -19,8 +19,10 @@ def get_model_numel(model: nn.Module, strategy: Strategy) -> int: numel = sum(p.numel() for p in model.parameters()) - if isinstance(strategy, ColossalAIStrategy) and strategy.stage == 3 and strategy.shard_init: - numel *= dist.get_world_size() + if isinstance(strategy, ColossalAIStrategy): + from colossalai.booster.plugin import GeminiPlugin + if isinstance(strategy.plugin, GeminiPlugin) and strategy.shard_init: + numel *= dist.get_world_size() return numel diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index e2e44e62533e..8427d3e0acfd 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -105,6 +105,8 @@ def _make_experience(self, inputs: Union[Tensor, Dict[str, Tensor]]) -> Experien def _learn(self): # replay buffer may be empty at first, we should rebuild at each training if not self.sample_replay_buffer: + # HACK(cwher): according to the design of boost API, dataloader should also be boosted, + # but it is impractical to adapt this pattern in RL training. Thus, I left dataloader unboosted. dataloader = self.strategy.setup_dataloader(self.replay_buffer, self.dataloader_pin_memory) if self.sample_replay_buffer: pbar = tqdm(range(self.max_epochs), desc='Train epoch', disable=not is_rank_0()) diff --git a/applications/Chat/coati/trainer/sft.py b/applications/Chat/coati/trainer/sft.py index 63fde53956cc..e7f1d9111c7e 100644 --- a/applications/Chat/coati/trainer/sft.py +++ b/applications/Chat/coati/trainer/sft.py @@ -44,8 +44,10 @@ def __init__( accumulation_steps: int = 8, callbacks: List[Callback] = [], ) -> None: - if accumulation_steps > 1 and isinstance(strategy, ColossalAIStrategy) and strategy.stage == 3: - raise ValueError("Accumulation steps are not supported in stage 3 of ColossalAI") + if accumulation_steps > 1 and isinstance(strategy, ColossalAIStrategy): + from colossalai.booster.plugin import GeminiPlugin + assert not isinstance(strategy.plugin, GeminiPlugin), \ + "Accumulation steps are not supported in stage 3 of ColossalAI" super().__init__(strategy, max_epochs, callbacks=callbacks) self.train_dataloader = train_dataloader self.eval_dataloader = eval_dataloader From 4fbae167daff71c4f67f71d4dece4fe902a54c23 Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 14 Jun 2023 18:42:07 +0800 Subject: [PATCH 13/25] feat: allow dict type arg in strategy.prepare --- .../Chat/coati/trainer/strategies/base.py | 66 +++++++++---------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index 55bb561b2e3d..e34414a40a0c 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from contextlib import nullcontext -from typing import Any, Callable, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Tuple, Union import torch import torch.distributed as dist @@ -15,8 +15,7 @@ from .sampler import DistributedSampler -ModelOptimPair = Tuple[nn.Module, Optimizer] -ModelOrModelOptimPair = Union[nn.Module, ModelOptimPair] +_BoostArgSpec = Union[nn.Module, Tuple[nn.Module, Optimizer], Dict] class Strategy(ABC): @@ -47,21 +46,6 @@ def optimizer_step(self, optimizer: Optimizer, **kwargs) -> None: def setup_distributed(self) -> None: pass - def setup_model(self, model: nn.Module) -> nn.Module: - raise NotImplementedError() - - def setup_optimizer(self, optimizer: Optimizer, model: nn.Module) -> Optimizer: - raise NotImplementedError() - - def setup_model_optimizer(self, - model: nn.Module, - optimizer: Optimizer - ) -> Tuple[nn.Module, Optimizer]: - # NOTE: Booster.boost must take in both model and optimizer - model, optimizer, *_ = self.booster.boost(model=model, - optimizer=optimizer) - return model, optimizer - @abstractmethod def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False) -> DataLoader: pass @@ -69,12 +53,12 @@ def setup_dataloader(self, replay_buffer: ReplayBuffer, pin_memory: bool = False def model_init_context(self): return nullcontext() - def prepare( - self, *models_or_model_optim_pairs: ModelOrModelOptimPair - ) -> Union[List[ModelOrModelOptimPair], ModelOrModelOptimPair]: - """Prepare models or model-optimizer-pairs based on each strategy. + def prepare(self, *boost_args: _BoostArgSpec) -> Union[List[_BoostArgSpec], _BoostArgSpec]: + """Prepare [model | (model, optimizer) | Dict] based on each strategy. Example:: + >>> # e.g., include lr_scheduler + >>> result_dict = strategy.prepare(dict(model=model, lr_scheduler=lr_scheduler)) >>> # when fine-tuning actor and critic >>> (actor, actor_optim), (critic, critic_optim), reward_model, initial_model = strategy.prepare((actor, actor_optim), (critic, critic_optim), reward_model, initial_model) >>> # or when training reward model @@ -83,23 +67,39 @@ def prepare( >>> actor, critic = strategy.prepare(actor, critic) Returns: - Union[List[ModelOrModelOptimPair], ModelOrModelOptimPair]: Models or model-optimizer-pairs in the original order. + Union[List[_BoostArgSpec], _BoostArgSpec]: [model | (model, optimizer) | Dict] in the original order. """ rets = [] - for arg in models_or_model_optim_pairs: - if isinstance(arg, tuple): - assert len(arg) == 2, f'Expect (model, optimizer) pair, got a tuple with size "{len(arg)}"' - model, optimizer = self.setup_model_optimizer(*arg) + for arg in boost_args: + if isinstance(arg, nn.Module): + # HACK: directly use plugin's method to wrap model, as booster.boost must take in both model and optimizer + # TODO(cwher): support inference only case + raise NotImplementedError() + elif isinstance(arg, tuple): + try: + model, optimizer = arg + except ValueError: + raise RuntimeError(f'Expect (model, optimizer) pair, got a tuple with size "{len(arg)}"') + model, optimizer, *_ = self.booster.boost(model=model, + optimizer=optimizer) rets.append((model, optimizer)) - elif isinstance(arg, nn.Module): - rets.append(self.setup_model(arg)) + elif isinstance(arg, Dict): + model, optimizer, criterion, dataloader, lr_scheduler = self.booster.boost(**arg) + boost_result = dict(model=model, + optimizer=optimizer, + criterion=criterion, + dataloader=dataloader, + lr_scheduler=lr_scheduler) + # remove None values + for key in boost_result.keys(): + if key not in arg: + boost_result.pop(key) + rets.append(boost_result) else: - raise RuntimeError(f'Expect model or (model, optimizer) pair, got {type(arg)}') + raise RuntimeError(f'Type {type(arg)} is not supported') - if len(rets) == 1: - return rets[0] - return rets + return rets[0] if len(rets) == 1 else rets @staticmethod def unwrap_model(model: nn.Module) -> nn.Module: From 4333407aea3d4d8f585cd10e591b9484d70c6ba5 Mon Sep 17 00:00:00 2001 From: CWHer Date: Wed, 14 Jun 2023 18:47:50 +0800 Subject: [PATCH 14/25] to: temporarily remove lr_scheduler for testing --- applications/Chat/coati/trainer/rm.py | 5 +++-- applications/Chat/coati/trainer/sft.py | 13 +++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/applications/Chat/coati/trainer/rm.py b/applications/Chat/coati/trainer/rm.py index cdae5108ab00..c0a550e04bf6 100644 --- a/applications/Chat/coati/trainer/rm.py +++ b/applications/Chat/coati/trainer/rm.py @@ -53,7 +53,8 @@ def __init__( self.model = model self.loss_fn = loss_fn self.optimizer = optim - self.scheduler = lr_scheduler.CosineAnnealingLR(self.optimizer, self.train_dataloader.__len__() // 100) + # TODO(cwher): + # self.scheduler = lr_scheduler.CosineAnnealingLR(self.optimizer, self.train_dataloader.__len__() // 100) def eval_acc(self, dataloader): dist = 0 @@ -103,7 +104,7 @@ def fit(self): self.optimizer.zero_grad() cnt += 1 if cnt == 100: - self.scheduler.step() + # self.scheduler.step() dist, acc = self.eval_acc(self.valid_dataloader) cnt = 0 if is_rank_0(): diff --git a/applications/Chat/coati/trainer/sft.py b/applications/Chat/coati/trainer/sft.py index e7f1d9111c7e..4003c4456ae4 100644 --- a/applications/Chat/coati/trainer/sft.py +++ b/applications/Chat/coati/trainer/sft.py @@ -58,10 +58,11 @@ def __init__( num_update_steps_per_epoch = len(train_dataloader) // self.accumulation_steps max_steps = math.ceil(self.max_epochs * num_update_steps_per_epoch) - self.scheduler = get_scheduler("cosine", - self.optimizer, - num_warmup_steps=math.ceil(max_steps * 0.03), - num_training_steps=max_steps) + # TODO(cwher): + # self.scheduler = get_scheduler("cosine", + # self.optimizer, + # num_warmup_steps=math.ceil(max_steps * 0.03), + # num_training_steps=max_steps) def fit(self, logger, use_wandb: bool = False): if use_wandb: @@ -97,11 +98,11 @@ def fit(self, logger, use_wandb: bool = False): if (batch_id + 1) % self.accumulation_steps == 0: self.strategy.optimizer_step(self.optimizer) self.optimizer.zero_grad() - self.scheduler.step() + # self.scheduler.step() if is_rank_0() and use_wandb: wandb.log({ "loss": total_loss / self.accumulation_steps, - "lr": self.scheduler.get_last_lr()[0], + # "lr": self.scheduler.get_last_lr()[0], "epoch": epoch, "batch_id": batch_id }) From 82499ec0f5727d8e98c38ef3b5a848e60304ec17 Mon Sep 17 00:00:00 2001 From: CWHer Date: Thu, 15 Jun 2023 10:17:24 +0800 Subject: [PATCH 15/25] style: simplify init of ColossalAIStrategy --- applications/Chat/coati/trainer/strategies/base.py | 6 ++---- .../Chat/coati/trainer/strategies/colossalai.py | 14 +++++--------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index e34414a40a0c..603cb1fbf2a9 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -3,7 +3,6 @@ from typing import Callable, Dict, List, Optional, Tuple, Union import torch -import torch.distributed as dist import torch.nn as nn from coati.replay_buffer import ReplayBuffer from torch.optim import Optimizer @@ -25,9 +24,8 @@ class Strategy(ABC): def __init__(self, plugin_initializer: Callable[..., Optional[Plugin]] = lambda: None) -> None: super().__init__() - if not dist.is_initialized(): - # NOTE: dist must be initialized before Booster - self.setup_distributed() + # NOTE: dist must be initialized before Booster + self.setup_distributed() self.plugin = plugin_initializer() self.booster = Booster(plugin=self.plugin) self._post_init() diff --git a/applications/Chat/coati/trainer/strategies/colossalai.py b/applications/Chat/coati/trainer/strategies/colossalai.py index 823fea1ad9cd..8c9b8ac0334b 100644 --- a/applications/Chat/coati/trainer/strategies/colossalai.py +++ b/applications/Chat/coati/trainer/strategies/colossalai.py @@ -93,9 +93,6 @@ def __init__( self.precision = precision self.shard_init = shard_init - # NOTE: dist should be initialized before calling get_current_device() - self.setup_distributed(seed) - optim_kwargs = dict( initial_scale=initial_scale, growth_factor=growth_factor, @@ -107,9 +104,9 @@ def __init__( max_norm=max_norm, norm_type=norm_type ) + # NOTE: dist should be initialized before calling get_current_device() if stage == 3: - plugin_initializer = functools.partial( - GeminiPlugin, + plugin_initializer = lambda: GeminiPlugin( # gemini_config device=get_current_device(), placement_policy=placement_policy, @@ -126,8 +123,7 @@ def __init__( **optim_kwargs ) else: - plugin_initializer = functools.partial( - LowLevelZeroPlugin, + plugin_initializer = lambda: LowLevelZeroPlugin( # zero_config stage=stage, precision=precision, @@ -145,8 +141,8 @@ def _post_init(self) -> None: assert isinstance(self.plugin, (LowLevelZeroPlugin, GeminiPlugin)), \ f'{type(self).__name__}\'s plugin is not initialized properly.' - def setup_distributed(self, seed: int) -> None: - colossalai.launch_from_torch({}, seed=seed) + def setup_distributed(self) -> None: + colossalai.launch_from_torch({}, seed=self.seed) def model_init_context(self): if isinstance(self.plugin, GeminiPlugin): From af5dd55ae2774a753f8f9089c629fed4f1cf9eaa Mon Sep 17 00:00:00 2001 From: CWHer Date: Thu, 15 Jun 2023 10:49:18 +0800 Subject: [PATCH 16/25] fix: fix lr_scheduler in sft and rm --- applications/Chat/coati/trainer/rm.py | 23 ++++++++++--------- applications/Chat/coati/trainer/sft.py | 19 +++++---------- .../Chat/coati/trainer/strategies/base.py | 7 +++--- .../Chat/examples/train_reward_model.py | 10 +++++++- applications/Chat/examples/train_sft.py | 16 ++++++++++++- 5 files changed, 46 insertions(+), 29 deletions(-) diff --git a/applications/Chat/coati/trainer/rm.py b/applications/Chat/coati/trainer/rm.py index c0a550e04bf6..316eded7ea5d 100644 --- a/applications/Chat/coati/trainer/rm.py +++ b/applications/Chat/coati/trainer/rm.py @@ -1,13 +1,12 @@ from datetime import datetime -from typing import List, Optional +from typing import Callable, List import pandas as pd import torch -import torch.distributed as dist -from torch.optim import Optimizer, lr_scheduler -from torch.utils.data import DataLoader, Dataset, DistributedSampler +from torch.optim import Optimizer +from torch.optim.lr_scheduler import _LRScheduler +from torch.utils.data import DataLoader from tqdm import tqdm -from transformers.tokenization_utils_base import PreTrainedTokenizerBase from .base import Trainer from .callbacks import Callback @@ -22,7 +21,8 @@ class RewardModelTrainer(Trainer): Args: model (torch.nn.Module): the model to train strategy (Strategy): the strategy to use for training - optim(Optimizer): the optimizer to use for training + optim (Optimizer): the optimizer to use for training + lr_scheduler (_LRScheduler): the lr scheduler to use for training loss_fn (callable): the loss function to use for training train_dataloader (DataLoader): the dataloader to use for training valid_dataloader (DataLoader): the dataloader to use for validation @@ -37,7 +37,8 @@ def __init__( model, strategy: Strategy, optim: Optimizer, - loss_fn, + lr_scheduler: _LRScheduler, + loss_fn: Callable, train_dataloader: DataLoader, valid_dataloader: DataLoader, eval_dataloader: DataLoader, @@ -53,8 +54,7 @@ def __init__( self.model = model self.loss_fn = loss_fn self.optimizer = optim - # TODO(cwher): - # self.scheduler = lr_scheduler.CosineAnnealingLR(self.optimizer, self.train_dataloader.__len__() // 100) + self.scheduler = lr_scheduler def eval_acc(self, dataloader): dist = 0 @@ -104,7 +104,7 @@ def fit(self): self.optimizer.zero_grad() cnt += 1 if cnt == 100: - # self.scheduler.step() + self.scheduler.step() dist, acc = self.eval_acc(self.valid_dataloader) cnt = 0 if is_rank_0(): @@ -117,7 +117,8 @@ def fit(self): # eval dist, acc = self.eval_acc(self.eval_dataloader) if is_rank_0(): - log = pd.DataFrame([[step_bar.n, loss.item(), dist, acc]], columns=['step', 'loss', 'dist', 'acc']) + log = pd.DataFrame([[step_bar.n, loss.item(), dist, acc]], + columns=['step', 'loss', 'dist', 'acc']) log.to_csv('log.csv', mode='a', header=False, index=False) epoch_bar.update() step_bar.set_postfix({'dist': dist, 'acc': acc}) diff --git a/applications/Chat/coati/trainer/sft.py b/applications/Chat/coati/trainer/sft.py index 4003c4456ae4..da223f1f33ff 100644 --- a/applications/Chat/coati/trainer/sft.py +++ b/applications/Chat/coati/trainer/sft.py @@ -1,15 +1,13 @@ -import math import time -from typing import List, Optional +from typing import List import torch import torch.distributed as dist import wandb from torch.optim import Optimizer +from torch.optim.lr_scheduler import _LRScheduler from torch.utils.data import DataLoader from tqdm import tqdm -from transformers.tokenization_utils_base import PreTrainedTokenizerBase -from transformers.trainer import get_scheduler from .base import Trainer from .callbacks import Callback @@ -38,6 +36,7 @@ def __init__( model, strategy: Strategy, optim: Optimizer, + lr_scheduler: _LRScheduler, train_dataloader: DataLoader, eval_dataloader: DataLoader = None, max_epochs: int = 2, @@ -55,14 +54,8 @@ def __init__( self.optimizer = optim self.accumulation_steps = accumulation_steps - num_update_steps_per_epoch = len(train_dataloader) // self.accumulation_steps - max_steps = math.ceil(self.max_epochs * num_update_steps_per_epoch) - # TODO(cwher): - # self.scheduler = get_scheduler("cosine", - # self.optimizer, - # num_warmup_steps=math.ceil(max_steps * 0.03), - # num_training_steps=max_steps) + self.scheduler = lr_scheduler def fit(self, logger, use_wandb: bool = False): if use_wandb: @@ -98,11 +91,11 @@ def fit(self, logger, use_wandb: bool = False): if (batch_id + 1) % self.accumulation_steps == 0: self.strategy.optimizer_step(self.optimizer) self.optimizer.zero_grad() - # self.scheduler.step() + self.scheduler.step() if is_rank_0() and use_wandb: wandb.log({ "loss": total_loss / self.accumulation_steps, - # "lr": self.scheduler.get_last_lr()[0], + "lr": self.scheduler.get_last_lr()[0], "epoch": epoch, "batch_id": batch_id }) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index 603cb1fbf2a9..939c3be01af1 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -90,9 +90,10 @@ def prepare(self, *boost_args: _BoostArgSpec) -> Union[List[_BoostArgSpec], _Boo dataloader=dataloader, lr_scheduler=lr_scheduler) # remove None values - for key in boost_result.keys(): - if key not in arg: - boost_result.pop(key) + boost_result = { + key: value + for key, value in boost_result.items() if value is not None + } rets.append(boost_result) else: raise RuntimeError(f'Type {type(arg)} is not supported') diff --git a/applications/Chat/examples/train_reward_model.py b/applications/Chat/examples/train_reward_model.py index 48b12336fa67..2df3bc391b9b 100644 --- a/applications/Chat/examples/train_reward_model.py +++ b/applications/Chat/examples/train_reward_model.py @@ -18,6 +18,7 @@ from coati.utils import prepare_llama_tokenizer_and_embedding from datasets import load_dataset from torch.optim import Adam +from torch.optim.lr_scheduler import CosineAnnealingLR from torch.utils.data import DataLoader from torch.utils.data.distributed import DistributedSampler from transformers import AutoTokenizer, BloomTokenizerFast, DebertaV2Tokenizer, LlamaTokenizer, RobertaTokenizer @@ -165,10 +166,17 @@ def train(args): batch_size=args.batch_size, pin_memory=True) - (model, optim) = strategy.prepare((model, optim)) + lr_scheduler = CosineAnnealingLR(optim, train_dataloader.__len__() // 100) + strategy_dict = strategy.prepare( + dict(model=model, optimizer=optim, lr_scheduler=lr_scheduler) + ) + model = strategy_dict['model'] + optim = strategy_dict['optimizer'] + lr_scheduler = strategy_dict['lr_scheduler'] trainer = RewardModelTrainer(model=model, strategy=strategy, optim=optim, + lr_scheduler=lr_scheduler, loss_fn=loss_fn, train_dataloader=train_dataloader, valid_dataloader=valid_dataloader, diff --git a/applications/Chat/examples/train_sft.py b/applications/Chat/examples/train_sft.py index 7fcd026fb538..717eb95311fb 100644 --- a/applications/Chat/examples/train_sft.py +++ b/applications/Chat/examples/train_sft.py @@ -1,4 +1,5 @@ import argparse +import math import os import loralib as lora @@ -19,6 +20,7 @@ from transformers.models.gpt2.tokenization_gpt2 import GPT2Tokenizer from transformers.models.opt.configuration_opt import OPTConfig from transformers.models.opt.modeling_opt import OPTForCausalLM +from transformers.trainer import get_scheduler from colossalai.logging import get_dist_logger from colossalai.nn.optimizer import HybridAdam @@ -152,10 +154,22 @@ def train(args): else: eval_dataloader = None - (model, optim) = strategy.prepare((model, optim)) + num_update_steps_per_epoch = len(train_dataloader) // args.accumulation_steps + max_steps = math.ceil(args.max_epochs * num_update_steps_per_epoch) + lr_scheduler = get_scheduler("cosine", + optim, + num_warmup_steps=math.ceil(max_steps * 0.03), + num_training_steps=max_steps) + strategy_dict = strategy.prepare( + dict(model=model, optimizer=optim, lr_scheduler=lr_scheduler) + ) + model = strategy_dict['model'] + optim = strategy_dict['optimizer'] + lr_scheduler = strategy_dict['lr_scheduler'] trainer = SFTTrainer(model=model, strategy=strategy, optim=optim, + lr_scheduler=lr_scheduler, train_dataloader=train_dataloader, eval_dataloader=eval_dataloader, max_epochs=args.max_epochs, From 7cccb8f95cef93e9446a1bf076a876f1b34e8c3a Mon Sep 17 00:00:00 2001 From: CWHer Date: Thu, 15 Jun 2023 10:58:53 +0800 Subject: [PATCH 17/25] style: modify comments --- applications/Chat/coati/trainer/strategies/base.py | 1 + applications/Chat/coati/trainer/strategies/ddp.py | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index 939c3be01af1..8a6b877c3788 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -53,6 +53,7 @@ def model_init_context(self): def prepare(self, *boost_args: _BoostArgSpec) -> Union[List[_BoostArgSpec], _BoostArgSpec]: """Prepare [model | (model, optimizer) | Dict] based on each strategy. + NOTE: the keys of Dict must be a subset of `self.booster.boost`'s arguments. Example:: >>> # e.g., include lr_scheduler diff --git a/applications/Chat/coati/trainer/strategies/ddp.py b/applications/Chat/coati/trainer/strategies/ddp.py index 3ed49b229540..42867645290c 100644 --- a/applications/Chat/coati/trainer/strategies/ddp.py +++ b/applications/Chat/coati/trainer/strategies/ddp.py @@ -6,12 +6,11 @@ import torch.distributed as dist import torch.nn as nn from coati.replay_buffer import ReplayBuffer -from torch.nn.parallel import DistributedDataParallel as DDP from torch.optim import Optimizer from torch.utils.data import DataLoader from transformers.tokenization_utils_base import PreTrainedTokenizerBase -from colossalai.booster.plugin import TorchDDPPlugin # This is a wrapper of torch.DDP +from colossalai.booster.plugin import TorchDDPPlugin from colossalai.booster.plugin.torch_ddp_plugin import TorchDDPModel from .naive import NaiveStrategy From 6632896d63a6652a5c40452d6da8bad240558d0a Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 16 Jun 2023 10:19:49 +0800 Subject: [PATCH 18/25] test: add train_prompts tests --- applications/Chat/examples/test_ci.sh | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/applications/Chat/examples/test_ci.sh b/applications/Chat/examples/test_ci.sh index c4be3356e45b..fd659662f6dc 100755 --- a/applications/Chat/examples/test_ci.sh +++ b/applications/Chat/examples/test_ci.sh @@ -136,5 +136,29 @@ rm -rf ${BASE}/rm_ckpt_gpt.pt rm -rf ${BASE}/actor_checkpoint_prompts.pt +# FIXME: This is a hack to skip tests that are not working (tested at commit b3ab7fbabf) +# - gpt2-ddp: RuntimeError: one of the variables needed for gradient computation has been modified by an inplace operation +# - llama-*: Repository Not Found for url: https://huggingface.co/{...}/resolve/main/tokenizer.model. +# - roberta-*: RuntimeError: CUDA error: CUBLAS_STATUS_NOT_INITIALIZED when calling `cublasCreate(handle)` +SKIPPED_TESTS=( + "gpt2-ddp" + "llama-naive" "llama-ddp" "llama-colossalai_gemini" "llama-colossalai_zero2" + "roberta-naive" "roberta-ddp" "roberta-colossalai_gemini" "roberta-colossalai_zero2" +) + +for model in 'gpt2' 'bloom' 'opt' 'llama' 'roberta'; do + for strategy in 'naive' 'ddp' 'colossalai_gemini' 'colossalai_zero2'; do + if [[ " ${SKIPPED_TESTS[@]} " =~ " ${model}-${strategy} " ]]; then + echo "[Test]: Skipped $model-$strategy" + continue + fi + torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ + --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ + --strategy $strategy --model $model \ + --num_episodes 1 --max_timesteps 2 \ + --update_timesteps 2 --max_epochs 1 --train_batch_size 2 + done +done + # 3080 doesn't support P2P, skip this test # cd ${BASE}/ray && bash test_ci.sh && cd ${BASE} From f598bf87da6f4873395200910eb835523ea3f1d0 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 16 Jun 2023 17:10:07 +0800 Subject: [PATCH 19/25] fix: fix inference only case and use in train_prompts --- applications/Chat/coati/trainer/strategies/base.py | 5 ++--- applications/Chat/examples/train_prompts.py | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/applications/Chat/coati/trainer/strategies/base.py b/applications/Chat/coati/trainer/strategies/base.py index 8a6b877c3788..80bc3272872e 100644 --- a/applications/Chat/coati/trainer/strategies/base.py +++ b/applications/Chat/coati/trainer/strategies/base.py @@ -72,9 +72,8 @@ def prepare(self, *boost_args: _BoostArgSpec) -> Union[List[_BoostArgSpec], _Boo rets = [] for arg in boost_args: if isinstance(arg, nn.Module): - # HACK: directly use plugin's method to wrap model, as booster.boost must take in both model and optimizer - # TODO(cwher): support inference only case - raise NotImplementedError() + model, *_ = self.booster.boost(arg) + rets.append(model) elif isinstance(arg, tuple): try: model, optimizer = arg diff --git a/applications/Chat/examples/train_prompts.py b/applications/Chat/examples/train_prompts.py index 134f21f80ef1..cd26a36f5de7 100644 --- a/applications/Chat/examples/train_prompts.py +++ b/applications/Chat/examples/train_prompts.py @@ -1,6 +1,5 @@ import argparse -import pandas as pd import torch import torch.distributed as dist from coati.dataset import DataCollatorForSupervisedDataset, PromptDataset, SupervisedDataset @@ -51,7 +50,7 @@ def main(args): else: raise ValueError(f'Unsupported actor model "{args.model}"') - if args.rm_model == None: + if args.rm_model is None: rm_model_name = args.model else: rm_model_name = args.rm_model @@ -163,7 +162,8 @@ def main(args): batch_size=args.ptx_batch_size, collate_fn=data_collator) - (actor, actor_optim), (critic, critic_optim) = strategy.prepare((actor, actor_optim), (critic, critic_optim)) + (actor, actor_optim), (critic, critic_optim), reward_model, initial_model = \ + strategy.prepare((actor, actor_optim), (critic, critic_optim), reward_model, initial_model) # configure trainer trainer = PPOTrainer( From 92b466427be57d5e827ecf5018903dedbedcbd85 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 16 Jun 2023 17:11:52 +0800 Subject: [PATCH 20/25] test: skip failed tests in ci --- applications/Chat/examples/test_ci.sh | 68 ++++++++++++++------------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/applications/Chat/examples/test_ci.sh b/applications/Chat/examples/test_ci.sh index fd659662f6dc..067ecf599ec5 100755 --- a/applications/Chat/examples/test_ci.sh +++ b/applications/Chat/examples/test_ci.sh @@ -26,6 +26,33 @@ pip install -r ${BASE}/requirements.txt wandb init -m offline +# FIXME: This is a hack to skip tests that are not working (tested at commit b3ab7fbabf) +# - gpt2-ddp: RuntimeError: one of the variables needed for gradient computation has been modified by an inplace operation +# - llama-*: Repository Not Found for url: https://huggingface.co/{...}/resolve/main/tokenizer.model. +# - roberta-*: RuntimeError: CUDA error: CUBLAS_STATUS_NOT_INITIALIZED when calling `cublasCreate(handle)` +# - *-colossalai_gemini: model.to('cpu') failed +SKIPPED_TESTS=( + "gpt2-ddp" + "llama-naive" "llama-ddp" "llama-colossalai_gemini" "llama-colossalai_zero2" + "roberta-naive" "roberta-ddp" "roberta-colossalai_gemini" "roberta-colossalai_zero2" + "gpt2-colossalai_gemini" "opt-colossalai_gemini" +) + +# These tests are quick and do not have any dependencies +for model in 'gpt2' 'bloom' 'opt' 'llama' 'roberta'; do + for strategy in 'naive' 'ddp' 'colossalai_gemini' 'colossalai_zero2'; do + if [[ " ${SKIPPED_TESTS[@]} " =~ " ${model}-${strategy} " ]]; then + echo "[Test]: Skipped $model-$strategy" + continue + fi + torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ + --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ + --strategy $strategy --model $model \ + --num_episodes 1 --max_timesteps 2 \ + --update_timesteps 2 --max_epochs 1 --train_batch_size 2 + done +done + # train sft torchrun --standalone --nproc_per_node=4 ${BASE}/train_sft.py --pretrain 'bigscience/bloom-560m' \ --model 'bloom' --strategy colossalai_zero2 --lora_rank 4 \ @@ -124,41 +151,18 @@ torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ --rm_path ${BASE}/rm_ckpt_gpt.pt \ --save_path ${BASE}/actor_checkpoint_prompts.pt -torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ - --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ - --strategy colossalai_gemini --num_episodes 1 --max_timesteps 2 \ - --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ - --pretrain 'gpt2' --model gpt2 \ - --rm_pretrain 'gpt2' \ - --rm_path ${BASE}/rm_ckpt_gpt.pt \ - --save_path ${BASE}/actor_checkpoint_prompts.pt +# FIXME: this test would fail after commit 18fc116724 +# torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ +# --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ +# --strategy colossalai_gemini --num_episodes 1 --max_timesteps 2 \ +# --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ +# --pretrain 'gpt2' --model gpt2 \ +# --rm_pretrain 'gpt2' \ +# --rm_path ${BASE}/rm_ckpt_gpt.pt \ +# --save_path ${BASE}/actor_checkpoint_prompts.pt rm -rf ${BASE}/rm_ckpt_gpt.pt rm -rf ${BASE}/actor_checkpoint_prompts.pt -# FIXME: This is a hack to skip tests that are not working (tested at commit b3ab7fbabf) -# - gpt2-ddp: RuntimeError: one of the variables needed for gradient computation has been modified by an inplace operation -# - llama-*: Repository Not Found for url: https://huggingface.co/{...}/resolve/main/tokenizer.model. -# - roberta-*: RuntimeError: CUDA error: CUBLAS_STATUS_NOT_INITIALIZED when calling `cublasCreate(handle)` -SKIPPED_TESTS=( - "gpt2-ddp" - "llama-naive" "llama-ddp" "llama-colossalai_gemini" "llama-colossalai_zero2" - "roberta-naive" "roberta-ddp" "roberta-colossalai_gemini" "roberta-colossalai_zero2" -) - -for model in 'gpt2' 'bloom' 'opt' 'llama' 'roberta'; do - for strategy in 'naive' 'ddp' 'colossalai_gemini' 'colossalai_zero2'; do - if [[ " ${SKIPPED_TESTS[@]} " =~ " ${model}-${strategy} " ]]; then - echo "[Test]: Skipped $model-$strategy" - continue - fi - torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ - --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ - --strategy $strategy --model $model \ - --num_episodes 1 --max_timesteps 2 \ - --update_timesteps 2 --max_epochs 1 --train_batch_size 2 - done -done - # 3080 doesn't support P2P, skip this test # cd ${BASE}/ray && bash test_ci.sh && cd ${BASE} From 3376450024e8b5aae71ab3d6afa5faf08c212d1b Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 16 Jun 2023 17:36:46 +0800 Subject: [PATCH 21/25] style: fix CodeFactor check --- applications/Chat/examples/test_ci.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/applications/Chat/examples/test_ci.sh b/applications/Chat/examples/test_ci.sh index 067ecf599ec5..46f70a0ca32f 100755 --- a/applications/Chat/examples/test_ci.sh +++ b/applications/Chat/examples/test_ci.sh @@ -41,7 +41,7 @@ SKIPPED_TESTS=( # These tests are quick and do not have any dependencies for model in 'gpt2' 'bloom' 'opt' 'llama' 'roberta'; do for strategy in 'naive' 'ddp' 'colossalai_gemini' 'colossalai_zero2'; do - if [[ " ${SKIPPED_TESTS[@]} " =~ " ${model}-${strategy} " ]]; then + if [[ " ${SKIPPED_TESTS[*]} " =~ " ${model}-${strategy} " ]]; then echo "[Test]: Skipped $model-$strategy" continue fi From a9fb675371b12578280cc4cd5f3e726e3ff6cd43 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 16 Jun 2023 18:11:40 +0800 Subject: [PATCH 22/25] fix: do not use model.to('cpu') with GeminiPlugin --- applications/Chat/coati/trainer/ppo.py | 7 ++++++- applications/Chat/examples/train_prompts.py | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/applications/Chat/coati/trainer/ppo.py b/applications/Chat/coati/trainer/ppo.py index 8427d3e0acfd..cfb18e2ae483 100644 --- a/applications/Chat/coati/trainer/ppo.py +++ b/applications/Chat/coati/trainer/ppo.py @@ -17,7 +17,7 @@ from .base import Trainer from .callbacks import Callback -from .strategies import Strategy +from .strategies import ColossalAIStrategy, Strategy from .utils import is_rank_0, to_device @@ -71,6 +71,11 @@ def __init__(self, offload_inference_models: bool = True, callbacks: List[Callback] = [], **generate_kwargs) -> None: + if isinstance(strategy, ColossalAIStrategy): + from colossalai.booster.plugin import GeminiPlugin + assert not (isinstance(strategy.plugin, GeminiPlugin) and offload_inference_models), \ + "GeminiPlugin is not compatible with manual model.to('cpu')" + experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, kl_coef) replay_buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload) generate_kwargs = _set_default_generate_kwargs(strategy, generate_kwargs, actor) diff --git a/applications/Chat/examples/train_prompts.py b/applications/Chat/examples/train_prompts.py index cd26a36f5de7..c0a06b4a3fff 100644 --- a/applications/Chat/examples/train_prompts.py +++ b/applications/Chat/examples/train_prompts.py @@ -185,6 +185,7 @@ def main(args): top_k=50, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, + offload_inference_models=args.strategy != 'colossalai_gemini' ) trainer.fit(prompt_dataloader=prompt_dataloader, From 7849182f62ed65e6976c5569b09ff2a0ce358b35 Mon Sep 17 00:00:00 2001 From: CWHer Date: Fri, 16 Jun 2023 18:13:19 +0800 Subject: [PATCH 23/25] test: enable colossalai_gemini tests --- applications/Chat/examples/test_ci.sh | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/applications/Chat/examples/test_ci.sh b/applications/Chat/examples/test_ci.sh index 46f70a0ca32f..2310f295e3d4 100755 --- a/applications/Chat/examples/test_ci.sh +++ b/applications/Chat/examples/test_ci.sh @@ -30,12 +30,10 @@ wandb init -m offline # - gpt2-ddp: RuntimeError: one of the variables needed for gradient computation has been modified by an inplace operation # - llama-*: Repository Not Found for url: https://huggingface.co/{...}/resolve/main/tokenizer.model. # - roberta-*: RuntimeError: CUDA error: CUBLAS_STATUS_NOT_INITIALIZED when calling `cublasCreate(handle)` -# - *-colossalai_gemini: model.to('cpu') failed SKIPPED_TESTS=( "gpt2-ddp" "llama-naive" "llama-ddp" "llama-colossalai_gemini" "llama-colossalai_zero2" "roberta-naive" "roberta-ddp" "roberta-colossalai_gemini" "roberta-colossalai_zero2" - "gpt2-colossalai_gemini" "opt-colossalai_gemini" ) # These tests are quick and do not have any dependencies @@ -151,15 +149,14 @@ torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ --rm_path ${BASE}/rm_ckpt_gpt.pt \ --save_path ${BASE}/actor_checkpoint_prompts.pt -# FIXME: this test would fail after commit 18fc116724 -# torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ -# --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ -# --strategy colossalai_gemini --num_episodes 1 --max_timesteps 2 \ -# --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ -# --pretrain 'gpt2' --model gpt2 \ -# --rm_pretrain 'gpt2' \ -# --rm_path ${BASE}/rm_ckpt_gpt.pt \ -# --save_path ${BASE}/actor_checkpoint_prompts.pt +torchrun --standalone --nproc_per_node=2 ${BASE}/train_prompts.py \ + --prompt_dataset $PROMPT_PATH --pretrain_dataset $PRETRAIN_DATASET \ + --strategy colossalai_gemini --num_episodes 1 --max_timesteps 2 \ + --update_timesteps 2 --max_epochs 1 --train_batch_size 2 \ + --pretrain 'gpt2' --model gpt2 \ + --rm_pretrain 'gpt2' \ + --rm_path ${BASE}/rm_ckpt_gpt.pt \ + --save_path ${BASE}/actor_checkpoint_prompts.pt rm -rf ${BASE}/rm_ckpt_gpt.pt rm -rf ${BASE}/actor_checkpoint_prompts.pt From 6b54b042712131a1e8173f5811e03c50c29024da Mon Sep 17 00:00:00 2001 From: CWHer Date: Sun, 25 Jun 2023 10:28:05 +0800 Subject: [PATCH 24/25] test: set CUDA_VISIBLE_DEVICES in ci --- applications/Chat/examples/test_ci.sh | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/applications/Chat/examples/test_ci.sh b/applications/Chat/examples/test_ci.sh index 2310f295e3d4..85728e95820c 100755 --- a/applications/Chat/examples/test_ci.sh +++ b/applications/Chat/examples/test_ci.sh @@ -1,5 +1,22 @@ #!/usr/bin/env bash +set_n_least_used_CUDA_VISIBLE_DEVICES() { + local n=${1:-"9999"} + echo "GPU Memory Usage:" + local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv | + tail -n +2 | + nl -v 0 | + tee /dev/tty | + sort -g -k 2 | + awk '{print $1}' | + head -n $n) + export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') + echo "Now CUDA_VISIBLE_DEVICES is set to:" + echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" +} + +set_n_least_used_CUDA_VISIBLE_DEVICES 4 + set -xue if [ -z "$SFT_DATASET" ]; then From af2ff25a53d8dddd057234f5fcfbaad0b9e41b67 Mon Sep 17 00:00:00 2001 From: CWHer Date: Sun, 25 Jun 2023 17:00:56 +0800 Subject: [PATCH 25/25] docs: add note --- applications/Chat/examples/train_prompts.py | 1 + 1 file changed, 1 insertion(+) diff --git a/applications/Chat/examples/train_prompts.py b/applications/Chat/examples/train_prompts.py index c0a06b4a3fff..2a47dda637bb 100644 --- a/applications/Chat/examples/train_prompts.py +++ b/applications/Chat/examples/train_prompts.py @@ -162,6 +162,7 @@ def main(args): batch_size=args.ptx_batch_size, collate_fn=data_collator) + # NOTE: For small models like opt-1.3b, reward model and initial model are not required to be parallelized. (actor, actor_optim), (critic, critic_optim), reward_model, initial_model = \ strategy.prepare((actor, actor_optim), (critic, critic_optim), reward_model, initial_model)