Skip to content
Merged

k #60

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,18 @@ I will provide the details of each workflow below.

| Workflow Name | File name | Description |
| ---------------------- | -------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------- |
| `Build on PR` | `build_on_pr.yml` | This workflow is triggered when a PR changes essential files. It will run all the unit tests in the repository with 4 GPUs. |
| `Build on PR` | `build_on_pr.yml` | This workflow is triggered when a PR changes essential files and a branch is created/deleted. It will run all the unit tests in the repository with 4 GPUs. |
| `Build on Schedule` | `build_on_schedule.yml` | This workflow will run the unit tests everyday with 8 GPUs. The result is sent to Lark. |
| `Report test coverage` | `report_test_coverage.yml` | This PR will put up a comment to report the test coverage results when `Build` is done. |

To reduce the average time of the unit test on PR, `Build on PR` workflow manages testmon cache.

1. When creating a new branch, it copies `cache/main/.testmondata*` to `cache/<branch>/`.
2. When creating a new PR or change the base branch of a PR, it copies `cache/<base_ref>/.testmondata*` to `cache/_pull/<pr_number>/`.
3. When running unit tests for each PR, it restores testmon cache from `cache/_pull/<pr_number>/`. After the test, it stores the cache back to `cache/_pull/<pr_number>/`.
4. When a PR is closed, if it's merged, it copies `cache/_pull/<pr_number>/.testmondata*` to `cache/<base_ref>/`. Otherwise, it just removes `cache/_pull/<pr_number>`.
5. When a branch is deleted, it removes `cache/<ref>`.

### Example Test

| Workflow Name | File name | Description |
Expand Down
117 changes: 112 additions & 5 deletions .github/workflows/build_on_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Build on PR

on:
pull_request:
types: [synchronize, opened, reopened]
types: [synchronize, opened, reopened, ready_for_review, closed, edited]
branches:
- "main"
- "develop"
Expand All @@ -18,11 +18,63 @@ on:
- "!tests/**.md" # ignore doc change
- "pytest.ini" # test config change
- "setup.py" # install command change
create:
delete:

jobs:
prepare_cache:
name: Prepare testmon cache
if: |
github.event_name == 'create' &&
github.event.ref_type == 'branch' &&
github.event.repository.full_name == 'hpcaitech/ColossalAI'
runs-on: [self-hosted, gpu]
container:
image: hpcaitech/pytorch-cuda:1.12.0-11.3.0
options: --rm
timeout-minutes: 5
defaults:
run:
shell: bash
steps:
- name: Copy testmon cache
run: | # branch name may contain slash, we need to replace it with space
export REF_BRANCH=$(echo ${{ github.event.ref }} | sed "s/\// /")
if [ -d /github/home/testmon_cache/${MAIN_BRANCH} ]; then
[ ! -z "$(ls -A /github/home/testmon_cache/${MAIN_BRANCH})" ] && cp -p -r /github/home/testmon_cache/${MAIN_BRANCH} "/github/home/testmon_cache/${REF_BRANCH}"
fi
env:
MAIN_BRANCH: ${{ github.event.master_branch }}

prepare_cache_for_pr:
name: Prepare testmon cache for PR
if: |
github.event_name == 'pull_request' &&
(github.event.action == 'opened' || github.event.action == 'reopened' || (github.event.action == 'edited' && github.event.changes.base != null)) &&
github.event.pull_request.base.repo.full_name == 'hpcaitech/ColossalAI'
runs-on: [self-hosted, gpu]
container:
image: hpcaitech/pytorch-cuda:1.12.0-11.3.0
options: --rm
timeout-minutes: 5
defaults:
run:
shell: bash
steps:
- name: Copy testmon cache
run: | # branch name may contain slash, we need to replace it with space
export BASE=$(echo ${{ github.event.pull_request.base.ref }} | sed "s/\// /")
if [ -d "/github/home/testmon_cache/${BASE}" ]; then
[ ! -z "$(ls -A "/github/home/testmon_cache/${BASE}")" ] && mkdir -p /github/home/testmon_cache/_pull && cp -p -r "/github/home/testmon_cache/${BASE}" /github/home/testmon_cache/_pull/${PR_NUMBER}
fi
env:
PR_NUMBER: ${{ github.event.number }}

detect:
name: Detect file change
if: |
github.event_name == 'pull_request' &&
(github.event.action == 'synchronize' || github.event.action == 'opened' || github.event.action == 'reopened' || github.event.action == 'ready_for_review') &&
github.event.pull_request.draft == false &&
github.event.pull_request.base.repo.full_name == 'hpcaitech/ColossalAI'
outputs:
Expand Down Expand Up @@ -135,9 +187,11 @@ jobs:

- name: Restore Testmon Cache
run: |
if [ -d /github/home/testmon_cache ]; then
[ ! -z "$(ls -A /github/home/testmon_cache)" ] && cp -p -r /github/home/testmon_cache/.testmondata* /__w/ColossalAI/ColossalAI/
if [ -d /github/home/testmon_cache/_pull/${PR_NUMBER} ]; then
[ ! -z "$(ls -A /github/home/testmon_cache/_pull/${PR_NUMBER})" ] && cp -p -r /github/home/testmon_cache/_pull/${PR_NUMBER}/.testmondata* /__w/ColossalAI/ColossalAI/
fi
env:
PR_NUMBER: ${{ github.event.number }}

- name: Execute Unit Testing
run: |
Expand All @@ -149,8 +203,10 @@ jobs:

- name: Store Testmon Cache
run: |
[ -d /github/home/testmon_cache ] || mkdir /github/home/testmon_cache
cp -p -r /__w/ColossalAI/ColossalAI/.testmondata* /github/home/testmon_cache/
mkdir -p /github/home/testmon_cache/_pull/${PR_NUMBER}
cp -p -r /__w/ColossalAI/ColossalAI/.testmondata* /github/home/testmon_cache/_pull/${PR_NUMBER}/
env:
PR_NUMBER: ${{ github.event.number }}

- name: Collate artifact
env:
Expand Down Expand Up @@ -188,3 +244,54 @@ jobs:
with:
name: report
path: report/

store_cache:
name: Store testmon cache for PR
if: |
github.event_name == 'pull_request' &&
github.event.action == 'closed' &&
github.event.pull_request.base.repo.full_name == 'hpcaitech/ColossalAI'
runs-on: [self-hosted, gpu]
container:
image: hpcaitech/pytorch-cuda:1.12.0-11.3.0
options: --rm
timeout-minutes: 5
defaults:
run:
shell: bash
steps:
- name: Store testmon cache if possible
if: github.event.pull_request.merged == true
run: | # branch name may contain slash, we need to replace it with space
export BASE=$(echo ${{ github.event.pull_request.base.ref }} | sed "s/\// /")
if [ -d /github/home/testmon_cache/_pull/${PR_NUMBER} ]; then
[ ! -z "$(ls -A /github/home/testmon_cache/_pull/${PR_NUMBER})" ] && cp -p -r /github/home/testmon_cache/_pull/${PR_NUMBER}/.testmondata* "/github/home/testmon_cache/${BASE}/"
fi
env:
PR_NUMBER: ${{ github.event.pull_request.number }}

- name: Remove testmon cache
run: |
rm -rf /github/home/testmon_cache/_pull/${PR_NUMBER}
env:
PR_NUMBER: ${{ github.event.pull_request.number }}

remove_cache:
name: Remove testmon cache
if: |
github.event_name == 'delete' &&
github.event.ref_type == 'branch' &&
github.event.repository.full_name == 'hpcaitech/ColossalAI'
runs-on: [self-hosted, gpu]
container:
image: hpcaitech/pytorch-cuda:1.12.0-11.3.0
options: --rm
timeout-minutes: 5
defaults:
run:
shell: bash
steps:
- name: Remove testmon cache
run: | # branch name may contain slash, we need to replace it with space
export BASE=$(echo ${{ github.event.ref }} | sed "s/\// /")
rm -rf "/github/home/testmon_cache/${BASE}"
4 changes: 4 additions & 0 deletions .github/workflows/release_docker_after_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ jobs:
run: |
version=$(cat version.txt)
tag=hpcaitech/colossalai:$version
latest=hpcaitech/colossalai:latest
docker build --build-arg http_proxy=http://172.17.0.1:7890 --build-arg https_proxy=http://172.17.0.1:7890 --build-arg VERSION=v${version} -t $tag ./docker
docker tag $tag $latest
echo "tag=${tag}" >> $GITHUB_OUTPUT
echo "latest=${latest}" >> $GITHUB_OUTPUT

- name: Log in to Docker Hub
uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
Expand All @@ -36,6 +39,7 @@ jobs:
id: docker-push
run: |
docker push ${{ steps.build.outputs.tag }}
docker push ${{ steps.build.outputs.latest }}

notify:
name: Notify Lark via webhook
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/run_chatgpt_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
runs-on: [self-hosted, gpu]
container:
image: hpcaitech/pytorch-cuda:1.12.0-11.3.0
options: --gpus all --rm -v /data/scratch/github_actions/chat:/data/scratch/github_actions/chat
options: --gpus all --rm -v /data/scratch/github_actions/chat:/data/scratch/github_actions/chat --shm-size=10.24gb
timeout-minutes: 30
defaults:
run:
Expand Down
178 changes: 178 additions & 0 deletions applications/Chat/benchmarks/ray/1mmt_dummy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import argparse
import os
import socket
from functools import partial

import ray
import torch
from coati.quant import llama_load_quant, low_resource_init
from coati.ray.detached_trainer_ppo import DetachedPPOTrainer
from coati.ray.experience_maker_holder import ExperienceMakerHolder
from coati.ray.utils import (
get_actor_from_args,
get_critic_from_args,
get_receivers_per_sender,
get_reward_model_from_args,
get_strategy_from_args,
)
from torch.utils.data import DataLoader
from transformers import AutoConfig, AutoTokenizer
from transformers.modeling_utils import no_init_weights


def get_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
return s.getsockname()[1]


def get_local_ip():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('8.8.8.8', 80))
return s.getsockname()[0]


def main(args):
master_addr = str(get_local_ip())
# trainer_env_info
trainer_port = str(get_free_port())
env_info_trainers = [{
'local_rank': '0',
'rank': str(rank),
'world_size': str(args.num_trainers),
'master_port': trainer_port,
'master_addr': master_addr
} for rank in range(args.num_trainers)]

# maker_env_info
maker_port = str(get_free_port())
env_info_maker = {
'local_rank': '0',
'rank': '0',
'world_size': '1',
'master_port': maker_port,
'master_addr': master_addr
}

# configure tokenizer
tokenizer = AutoTokenizer.from_pretrained(args.pretrain)
tokenizer.pad_token = tokenizer.eos_token

def model_fn():
actor_cfg = AutoConfig.from_pretrained(args.pretrain)
critic_cfg = AutoConfig.from_pretrained(args.critic_pretrain)
actor = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda()
critic = get_critic_from_args(args.critic_model, config=critic_cfg).requires_grad_(False).half().cuda()
reward_model = get_reward_model_from_args(args.critic_model,
config=critic_cfg).requires_grad_(False).half().cuda()
if args.initial_model_quant_ckpt is not None and args.model == 'llama':
# quantize initial model
with low_resource_init(), no_init_weights():
initial_model = get_actor_from_args(args.model, config=actor_cfg)
initial_model.model = llama_load_quant(initial_model.model, args.initial_model_quant_ckpt, args.quant_bits,
args.quant_group_size).cuda().requires_grad_(False)
else:
initial_model = get_actor_from_args(args.model, config=actor_cfg).requires_grad_(False).half().cuda()
return actor, critic, reward_model, initial_model

# configure Experience Maker
experience_holder_ref = ExperienceMakerHolder.options(name="maker0", num_gpus=1, max_concurrency=2).remote(
detached_trainer_name_list=[f'trainer{i}' for i in range(args.num_trainers)],
strategy_fn=partial(get_strategy_from_args, args.maker_strategy),
model_fn=model_fn,
env_info=env_info_maker,
kl_coef=0.1,
debug=args.debug,
# sync_models_from_trainers=True,
# generation kwargs:
max_length=512,
do_sample=True,
temperature=1.0,
top_k=50,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id,
eval_performance=True,
use_cache=True,
)

def trainer_model_fn():
actor = get_actor_from_args(args.model, config=AutoConfig.from_pretrained(args.pretrain)).half().cuda()
critic = get_critic_from_args(args.critic_model,
config=AutoConfig.from_pretrained(args.critic_pretrain)).half().cuda()
return actor, critic

# configure Trainer
trainer_refs = [
DetachedPPOTrainer.options(name=f"trainer{i}", num_gpus=1, max_concurrency=2).remote(
experience_maker_holder_name_list=[
f'maker{x}' for x in get_receivers_per_sender(i, args.num_trainers, 1, allow_idle_sender=True)
],
strategy_fn=partial(get_strategy_from_args, args.trainer_strategy),
model_fn=trainer_model_fn,
env_info=env_info_trainer,
train_batch_size=args.train_batch_size,
buffer_limit=16,
eval_performance=True,
debug=args.debug,
) for i, env_info_trainer in enumerate(env_info_trainers)
]

dataset_size = args.experience_batch_size * 4

def data_gen_fn():
input_ids = torch.randint(tokenizer.vocab_size, (256,), device=torch.cuda.current_device())
attn_mask = torch.ones_like(input_ids)
return {'input_ids': input_ids, 'attention_mask': attn_mask}

def build_dataloader(size):
dataset = [data_gen_fn() for _ in range(size)]
dataloader = DataLoader(dataset, batch_size=args.experience_batch_size)
return dataloader

# uncomment this function if sync_models_from_trainers is True
# ray.get([
# trainer_ref.sync_models_to_remote_makers.remote()
# for trainer_ref in trainer_refs
# ])

wait_tasks = []

wait_tasks.append(
experience_holder_ref.workingloop.remote(partial(build_dataloader, dataset_size),
num_steps=args.experience_steps))

total_steps = args.experience_batch_size * args.experience_steps // (args.num_trainers * args.train_batch_size)
for trainer_ref in trainer_refs:
wait_tasks.append(trainer_ref.fit.remote(total_steps, args.update_steps, args.train_epochs))

ray.get(wait_tasks)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num_trainers', type=int, default=1)
parser.add_argument('--trainer_strategy',
choices=[
'naive', 'ddp', 'colossalai_gemini', 'colossalai_zero2', 'colossalai_gemini_cpu',
'colossalai_zero2_cpu'
],
default='naive')
parser.add_argument('--maker_strategy', choices=['naive'], default='naive')
parser.add_argument('--model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama'])
parser.add_argument('--critic_model', default='gpt2', choices=['gpt2', 'bloom', 'opt', 'llama'])
parser.add_argument('--pretrain', type=str, default=None)
parser.add_argument('--critic_pretrain', type=str, default=None)
parser.add_argument('--experience_steps', type=int, default=4)
parser.add_argument('--experience_batch_size', type=int, default=8)
parser.add_argument('--train_epochs', type=int, default=1)
parser.add_argument('--update_steps', type=int, default=2)
parser.add_argument('--train_batch_size', type=int, default=8)
parser.add_argument('--lora_rank', type=int, default=0, help="low-rank adaptation matrices rank")

parser.add_argument('--initial_model_quant_ckpt', type=str, default=None)
parser.add_argument('--quant_bits', type=int, default=4)
parser.add_argument('--quant_group_size', type=int, default=128)
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
ray.init(namespace=os.environ["RAY_NAMESPACE"], runtime_env={"env_vars": dict(os.environ)})
main(args)
Loading