From 43b5e82af85b6b81e1c34d10f54c658d4bae314a Mon Sep 17 00:00:00 2001 From: Parth Chadha Date: Fri, 21 Mar 2025 02:11:25 -0700 Subject: [PATCH 1/9] feat: Use openmathinstruct2 training in grpo math example Signed-off-by: Parth Chadha --- examples/configs/grpo_math_1B.yaml | 4 +- examples/configs/grpo_math_8B.yaml | 2 +- examples/run_grpo_math.py | 94 +++++++++++++++---- examples/run_sft.py | 4 + nemo_reinforcer/algorithms/grpo.py | 7 +- .../data/hf_datasets/openmathinstruct2.py | 87 +++++++++++++++++ 6 files changed, 174 insertions(+), 24 deletions(-) create mode 100644 nemo_reinforcer/data/hf_datasets/openmathinstruct2.py diff --git a/examples/configs/grpo_math_1B.yaml b/examples/configs/grpo_math_1B.yaml index 684e24e7f5..b5da2974ed 100644 --- a/examples/configs/grpo_math_1B.yaml +++ b/examples/configs/grpo_math_1B.yaml @@ -58,8 +58,6 @@ data: max_input_seq_length: ${policy.max_total_sequence_length} # upper bound, real truncation occurs at vllm.max_model_len prompt_file: "examples/prompts/cot.txt" system_prompt_file: null - dataset_name: "datasets/Eurus-2-RL-Data/Eurus-2-RL-Data-math_train.jsonl" - val_dataset_name: "datasets/Eurus-2-RL-Data/Eurus-2-RL-Data-math_val.jsonl" env: math: @@ -77,4 +75,4 @@ logger: cluster: gpus_per_node: 1 - num_nodes: 1 \ No newline at end of file + num_nodes: 1 diff --git a/examples/configs/grpo_math_8B.yaml b/examples/configs/grpo_math_8B.yaml index 2415ee8cd8..3de0ff1668 100644 --- a/examples/configs/grpo_math_8B.yaml +++ b/examples/configs/grpo_math_8B.yaml @@ -35,4 +35,4 @@ policy: cluster: gpus_per_node: 8 - num_nodes: 1 \ No newline at end of file + num_nodes: 1 diff --git a/examples/run_grpo_math.py b/examples/run_grpo_math.py index 4e7732def1..9c382cef9a 100644 --- a/examples/run_grpo_math.py +++ b/examples/run_grpo_math.py @@ -32,6 +32,7 @@ from nemo_reinforcer.models.policy import PolicyConfig from nemo_reinforcer.data.datasets import AllTaskProcessedDataset, rl_collate_fn from nemo_reinforcer.environments.math_environment import MathEnvironment +from nemo_reinforcer.data.hf_datasets.openmathinstruct2 import OpenMathInstruct2Dataset def parse_args(): @@ -56,6 +57,58 @@ def parse_args(): # this processor expects the datum_dict to have a 'problem' key and an 'expected_answer' key +def openinstructmath2_data_processor( + datum_dict: Dict[str, Any], + task_data_spec: TaskDataSpec, + tokenizer, + max_seq_length: int, + idx: int, +) -> DatumSpec: + """Process a datum dictionary (directly loaded from data/hf_datasets/openmathinstruct2.py) into a DatumSpec for the Math Environment.""" + user_message = datum_dict["messages"] + problem = user_message[0]["content"] + extra_env_info = {"ground_truth": user_message[1]["content"]} + + template = task_data_spec.custom_template + message_log: LLMMessageLogType = [] + user_message = { + "role": "user", + "content": task_data_spec.prompt.format(problem), + } + message = tokenizer.apply_chat_template( + [user_message], + chat_template=template, + tokenize=False, + add_generation_prompt=True, + add_special_tokens=False, + ) + user_message["token_ids"] = tokenizer(message, return_tensors="pt")["input_ids"][0] + user_message["content"] = message + message_log.append(user_message) + + length = sum(len(m["token_ids"]) for m in message_log) + + loss_multiplier = 1.0 + if length > max_seq_length: + # make smaller and mask out + for message in message_log: + message["token_ids"] = message["token_ids"][ + : min(4, max_seq_length // len(message_log)) + ] + loss_multiplier = 0.0 + + output = { + "message_log": message_log, + "length": length, + "extra_env_info": extra_env_info, + "loss_multiplier": loss_multiplier, + "idx": idx, + "task_name": datum_dict["task_name"], + } + return output + + +# Example of a generic math data processor def math_data_processor( datum_dict: Dict[str, Any], task_data_spec: TaskDataSpec, @@ -128,36 +181,35 @@ def setup_data(data_config: DataConfig, policy_config: PolicyConfig, env_configs system_prompt_file=data_config["system_prompt_file"], ) - base_dataset = load_dataset("json", data_files=data_config["dataset_name"])["train"] + # Load OpenMathInstruct2Dataset using reinforcer datasets + print(f"Loading nvidia/OpenMathInstruct2Dataset for training and validation") + data = OpenMathInstruct2Dataset() + tokenizer = AutoTokenizer.from_pretrained(policy_config["model_name"]) - task_data_processors = defaultdict(lambda: (math_task_spec, math_data_processor)) - task_data_processors["math"] = (math_task_spec, math_data_processor) + task_data_processors = defaultdict( + lambda: (math_task_spec, openinstructmath2_data_processor) + ) + task_data_processors["math"] = (math_task_spec, openinstructmath2_data_processor) math_env = MathEnvironment.options( runtime_env={"py_executable": MathEnvironment.DEFAULT_PY_EXECUTABLE} ).remote(env_configs["math"]) dataset = AllTaskProcessedDataset( - base_dataset, + data.formatted_ds["train"], tokenizer, math_task_spec, task_data_processors, max_seq_length=data_config["max_input_seq_length"], ) - if "val_dataset_name" in data_config and data_config["val_dataset_name"]: - val_dataset = load_dataset("json", data_files=data_config["val_dataset_name"])[ - "train" - ] - val_dataset = AllTaskProcessedDataset( - val_dataset, - tokenizer, - math_task_spec, - task_data_processors, - max_seq_length=data_config["max_input_seq_length"], - ) - else: - val_dataset = None + val_dataset = AllTaskProcessedDataset( + data.formatted_ds["validation"], + tokenizer, + math_task_spec, + task_data_processors, + max_seq_length=data_config["max_input_seq_length"], + ) task_to_env = defaultdict(lambda: math_env) task_to_env["math"] = math_env @@ -170,7 +222,9 @@ def main(): args, overrides = parse_args() if not args.config: - args.config = os.path.join(os.path.dirname(__file__), "configs", "grpo_math_1B.yaml") + args.config = os.path.join( + os.path.dirname(__file__), "configs", "grpo_math_1B.yaml" + ) config = load_config(args.config) print(f"Loaded configuration from: {args.config}") @@ -189,6 +243,10 @@ def main(): # Get the next experiment directory with incremented ID config["logger"]["log_dir"] = get_next_experiment_dir(config["logger"]["log_dir"]) print(f"๐Ÿ“Š Using log directory: {config['logger']['log_dir']}") + if config["checkpointing"]["enabled"]: + print( + f"๐Ÿ“Š Using checkpoint directory: {config['checkpointing']['checkpoint_dir']}" + ) init_ray() diff --git a/examples/run_sft.py b/examples/run_sft.py index de0ef8c1ce..950938b4da 100644 --- a/examples/run_sft.py +++ b/examples/run_sft.py @@ -145,6 +145,10 @@ def main(): config["logger"]["log_dir"] = get_next_experiment_dir(config["logger"]["log_dir"]) print(f"๐Ÿ“Š Using log directory: {config['logger']['log_dir']}") + if config["checkpointing"]["enabled"]: + print( + f"๐Ÿ“Š Using checkpoint directory: {config['checkpointing']['checkpoint_dir']}" + ) init_ray() diff --git a/nemo_reinforcer/algorithms/grpo.py b/nemo_reinforcer/algorithms/grpo.py index 94b9f29a2b..7acfccd51b 100644 --- a/nemo_reinforcer/algorithms/grpo.py +++ b/nemo_reinforcer/algorithms/grpo.py @@ -180,7 +180,8 @@ def setup( # Load validation dataset if provided val_dataloader = None - if "val_dataset_name" in data_config and data_config["val_dataset_name"]: + # If validation is enabled, load the validation dataloader + if grpo_config["val_period"] > 0 or grpo_config["val_at_start"]: val_dataloader = StatefulDataLoader( val_dataset, batch_size=grpo_config["val_batch_size"], @@ -445,7 +446,9 @@ def grpo_train( # Run grpo training (single-turn) for batch in dataloader: - print(f"\n{'=' * 25} Step {step + 1}/{min(len(dataloader), master_config['grpo']['max_num_steps'])} {'=' * 25}") + print( + f"\n{'=' * 25} Step {step + 1}/{min(len(dataloader), master_config['grpo']['max_num_steps'])} {'=' * 25}" + ) with timer.time("total_step_time"): # Prepare batch diff --git a/nemo_reinforcer/data/hf_datasets/openmathinstruct2.py b/nemo_reinforcer/data/hf_datasets/openmathinstruct2.py new file mode 100644 index 0000000000..331fe2bc88 --- /dev/null +++ b/nemo_reinforcer/data/hf_datasets/openmathinstruct2.py @@ -0,0 +1,87 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional +from datasets import load_dataset +from nemo_reinforcer.data.hf_datasets.interfaces import HfDataset +from dataclasses import dataclass + + +def format_math(data): + return { + "messages": [ + { + "role": "user", + "content": data["problem"], + }, + { + "role": "assistant", + "content": data["expected_answer"], + }, + ], + # For v0.1 release, reinforcer datasets require a task_name key such that user can map a task processor per unique task. + "task_name": "math", + } + + +def prepare_openinstructmath2_dataset(split: str = "train_1M", seed=42, test_size=0.05): + """Load and split the OpenMathInstruct-2 dataset into train and validation sets using HF's train_test_split.""" + print( + f"WARNING: Dataset splitting with seed {seed} creates train/validation splits that may differ across machines or runs. For reproducible experiments, preprocess the dataset once and use the same seed across runs." + ) + + # Load the original dataset + original_ds = load_dataset("nvidia/OpenMathInstruct-2", split=split) + + # Split into train and validation sets using HF's train_test_split + split_ds = original_ds.train_test_split(test_size=test_size, seed=seed) + + # Format the examples, removing original columns + train_formatted = split_ds["train"].map( + format_math, remove_columns=split_ds["train"].column_names + ) + val_formatted = split_ds["test"].map( + format_math, remove_columns=split_ds["test"].column_names + ) + + return { + "train": train_formatted, + "validation": val_formatted, + } + + +@dataclass +class OpenMathInstruct2Dataset(HfDataset): + def __init__( + self, split: str = "train_1M", seed: int = 42, test_size: float = 0.05 + ): + """Initialize the OpenMathInstruct2 dataset with train/validation split. + + Args: + seed: Random seed for reproducible splitting + test_size: Proportion of data to use for validation (0.0-1.0) + """ + # train, train_1M, train_2M, and train_5M are supported splits. + if split not in ["train", "train_1M", "train_2M", "train_5M"]: + raise ValueError( + f"Invalid split: {split}. Please use 'train', 'train_1M', 'train_2M', or 'train_5M'." + ) + + self.formatted_ds = prepare_openinstructmath2_dataset( + split=split, seed=seed, test_size=test_size + ) + + super().__init__( + dataset_name="OpenMathInstruct-2", + ) From 240c48db65704d71c93c30866b0efc3c0d8857a9 Mon Sep 17 00:00:00 2001 From: Parth Chadha Date: Fri, 21 Mar 2025 11:11:35 -0700 Subject: [PATCH 2/9] Fix warning message Signed-off-by: Parth Chadha --- nemo_reinforcer/data/hf_datasets/openmathinstruct2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_reinforcer/data/hf_datasets/openmathinstruct2.py b/nemo_reinforcer/data/hf_datasets/openmathinstruct2.py index 331fe2bc88..c3c5126263 100644 --- a/nemo_reinforcer/data/hf_datasets/openmathinstruct2.py +++ b/nemo_reinforcer/data/hf_datasets/openmathinstruct2.py @@ -38,7 +38,7 @@ def format_math(data): def prepare_openinstructmath2_dataset(split: str = "train_1M", seed=42, test_size=0.05): """Load and split the OpenMathInstruct-2 dataset into train and validation sets using HF's train_test_split.""" print( - f"WARNING: Dataset splitting with seed {seed} creates train/validation splits that may differ across machines or runs. For reproducible experiments, preprocess the dataset once and use the same seed across runs." + f"WARNING: For reproducible experiments, preprocess the dataset once and define your own HfDataset subclass that directly uses the preprocessed datasets." ) # Load the original dataset From 79e90b036dcca65853682493f7cec209339c41b7 Mon Sep 17 00:00:00 2001 From: Terry Kong Date: Fri, 21 Mar 2025 11:22:02 -0700 Subject: [PATCH 3/9] feat: lots of fixes (#17) - flatten hyperparams for tb no longer errors for lists (was an issue for schedulers) - the submission script now overlaps the head on the first worker (no longer needs extra node just for head) - fixes the CI to handle weird permissions issues - added sphinx build and doctest to CI - added functional tests to CI - nuked an old example - added docs for functional tests - --no-container-mount-home - fix a unit tests that expected cuda to skip - allow running unit tests on slurm head node with no gpu - add a hermetic script to run functional tests Signed-off-by: Terry Kong Signed-off-by: Parth Chadha --- .github/workflows/_run_test.yml | 26 +- .github/workflows/cicd-main.yml | 51 ++- docker/Dockerfile | 41 +- docs/cluster.md | 4 +- docs/docker.md | 21 +- docs/testing.md | 60 ++- nemo_reinforcer/utils/logger.py | 32 +- ray.sub | 30 +- tests/functional/grpo.sh | 17 +- .../hello_world_fsdp_llama/__init__.py | 13 - .../hello_world_fsdp_llama/example.ipynb | 350 ------------------ .../functional/hello_world_fsdp_llama/main.py | 77 ---- .../hello_world_fsdp_llama/train.py | 329 ---------------- tests/functional/sft.sh | 14 +- tests/run_functional_in_docker.sh | 47 +++ tests/run_unit.sh | 20 +- tests/run_unit_in_docker.sh | 0 tests/unit/algorithms/test_loss_functions.py | 4 + 18 files changed, 307 insertions(+), 829 deletions(-) delete mode 100644 tests/functional/hello_world_fsdp_llama/__init__.py delete mode 100644 tests/functional/hello_world_fsdp_llama/example.ipynb delete mode 100644 tests/functional/hello_world_fsdp_llama/main.py delete mode 100644 tests/functional/hello_world_fsdp_llama/train.py create mode 100755 tests/run_functional_in_docker.sh mode change 100644 => 100755 tests/run_unit.sh mode change 100644 => 100755 tests/run_unit_in_docker.sh diff --git a/.github/workflows/_run_test.yml b/.github/workflows/_run_test.yml index faf1848cad..d75a6a7178 100644 --- a/.github/workflows/_run_test.yml +++ b/.github/workflows/_run_test.yml @@ -64,6 +64,24 @@ jobs: - name: Docker pull image run: | docker pull nemoci.azurecr.io/nemo_reinforcer_container:${{ github.run_id }} + + # NOTE: under certain circumstances, the checkout action cannot clean up the workspace properly, so + # this workaround is needed to ensure that the workspace is clean by removing all files created by root. + # + # The error observed looked like this from the checkout action: + # Run actions/checkout@v4 + # ... + # Deleting the contents of '/home/azureuser/actions-runner/_work/reinforcer/reinforcer' + # Error: File was unable to be removed Error: EACCES: permission denied, rmdir '/home/azureuser/actions-runner/_work/reinforcer/reinforcer/docs/_build/doctest' + - name: Forcefully clean up the repository + run: | + docker run --rm -u root \ + -v /home/azureuser/actions-runner/_work/reinforcer/reinforcer:/home/azureuser/actions-runner/_work/reinforcer/reinforcer \ + nemoci.azurecr.io/nemo_reinforcer_container:${{ github.run_id }} \ + bash -x -c "ls -lah /home/azureuser/actions-runner/_work/reinforcer/reinforcer && shopt -s dotglob && rm -rf /home/azureuser/actions-runner/_work/reinforcer/reinforcer/*" + + - name: Checkout repository + uses: actions/checkout@v4 - name: Start container run: | @@ -71,8 +89,9 @@ jobs: --env TRANSFORMERS_OFFLINE=0 \ --env HYDRA_FULL_ERROR=1 \ --env HF_HOME=/home/TestData/reinforcer/hf_home \ - --env REINFORCER_CI_DIR=/home/TestData/reinforcer \ - --env REINFORCER_REPO_DIR=/opt/NeMo-Reinforcer \ + --env REINFORCER_REPO_DIR=/opt/reinforcer \ + --volume $PWD:/opt/reinforcer \ + --volume /mnt/datadrive/TestData/reinforcer/datasets:/opt/reinforcer/datasets:ro \ --volume /mnt/datadrive/TestData/reinforcer/checkpoints:/home/TestData/reinforcer/checkpoints:ro \ --volume /mnt/datadrive/TestData/reinforcer/hf_home/hub:/home/TestData/reinforcer/hf_home/hub \ nemoci.azurecr.io/nemo_reinforcer_container:${{ github.run_id }} \ @@ -94,6 +113,9 @@ jobs: set -e cmd=$(cat <<"RUN_TEST_EOF" + # This is needed since we create virtualenvs in the workspace, so this allows it to be cleaned up if necessary + umask 000 + nvidia-smi # In case git commands need to be run inside Reinforcer diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index d79d0cf61f..cbb5e8132b 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -102,6 +102,20 @@ jobs: pre-commit install pre-commit run --all-files --show-diff-on-failure --color=always + sphinx-build: + name: Sphinx build + needs: [pre-flight] + runs-on: ubuntu-latest + if: ${{ needs.pre-flight.outputs.run_ci == 'true' }} + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: build docs + run: | + pip install uv + cd docs/ + uv run --extra docs sphinx-build . _build/html + build-container: if: ${{ needs.pre-flight.outputs.run_ci == 'true' }} needs: [pre-flight] @@ -115,6 +129,20 @@ jobs: MAX_JOBS=32 REINFORCER_COMMIT=${{ github.sha }} + sphinx-doctest: + name: Sphinx doctest + needs: [build-container, pre-flight] + uses: ./.github/workflows/_run_test.yml + if: ${{ needs.pre-flight.outputs.run_ci == 'true' }} + with: + RUNNER: self-hosted-azure + TIMEOUT: 10 + SCRIPT: | + cd ${REINFORCER_REPO_DIR}/docs + uv run --extra docs sphinx-build -b doctest . _build/doctest + secrets: + HF_TOKEN: ${{ secrets.HF_TOKEN }} + unit-tests: name: Unit tests needs: [build-container, pre-flight] @@ -125,6 +153,27 @@ jobs: TIMEOUT: 10 SCRIPT: | cd ${REINFORCER_REPO_DIR} - uv run bash -x ./tests/run_unit.sh + uv run --extra test bash -x ./tests/run_unit.sh secrets: HF_TOKEN: ${{ secrets.HF_TOKEN }} + + functional-tests: + name: ${{ matrix.test_case }} + needs: [build-container, pre-flight] + uses: ./.github/workflows/_run_test.yml + if: ${{ needs.pre-flight.outputs.run_ci == 'true' }} + strategy: + matrix: + test_case: + - sft.sh + - grpo.sh + with: + # TODO: For now, allow these to fail since the checks are not robust. + OPTIONAL: true + RUNNER: self-hosted-azure + TIMEOUT: 8 + SCRIPT: | + cd ${REINFORCER_REPO_DIR} + uv run bash ./tests/functional/${{ matrix.test_case }} + secrets: + HF_TOKEN: ${{ secrets.HF_TOKEN }} \ No newline at end of file diff --git a/docker/Dockerfile b/docker/Dockerfile index 3ac272200e..0abb6ebd9e 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,18 +1,41 @@ ARG BASE_IMAGE=anyscale/ray:2.43.0-py312-cu125 -FROM ${BASE_IMAGE} +FROM ${BASE_IMAGE} AS base +# base is just ray + uv with minimal installs so it is a very lightweight container -WORKDIR /opt/NeMo-Reinforcer +# It is more convenient for users to run as root +USER root -RUN sudo apt-get update && sudo apt-get install -y jq +RUN apt-get update && sudo apt-get install -y jq +RUN pip install uv RUN echo "unset RAY_RUNTIME_ENV_HOOK" >> /home/ray/.bashrc -COPY pyproject.toml . +FROM base AS hermetic +# hermetic creates a virtual environment with the default dependencies pre-installed for convenience -RUN pip install uv && \ - uv venv -p python3.12 && \ - uv pip install -r pyproject.toml --extra dev --extra test +COPY --chown=ray --chmod=755 pyproject.toml /opt/reinforcer/pyproject.toml +RUN chmod 755 /home/ray/.cache +WORKDIR /opt/reinforcer +RUN uv venv .venv +# uv sync has a more reliable resolver than simple uv pip install which can fail +RUN uv sync --extra test --extra dev --extra docs --no-install-project -COPY . . +ENV VIRTUAL_ENV=/opt/reinforcer/.venv +ENV PATH="/opt/reinforcer/.venv/bin:$PATH" +# The ray images automatically activate the anaconda venv. We will +# comment this out of the .bashrc to give the same UX between docker +# and other clusters like slurm. +RUN <<"EOF" +cp ~/.bashrc ~/.bashrc.backup # backup existing .bashrc -RUN uv pip install -e . +# Comment out the conda initialize block +sed -i '/# >>> conda initialize >>>/,/# <<< conda initialize << None: def flatten_dict(d: Dict[str, Any], sep: str = ".") -> Dict[str, Any]: - """Flatten a nested dictionary.""" + """Flatten a nested dictionary. + + Handles nested dictionaries and lists by creating keys with separators. + For lists, the index is used as part of the key. + + Args: + d: Dictionary to flatten + sep: Separator to use between nested keys + + Returns: + Flattened dictionary with compound keys + + Examples: + ```{doctest} + >>> flatten_dict({"a": 1, "b": {"c": 2}}) + {'a': 1, 'b.c': 2} + + >>> flatten_dict({"a": [1, 2], "b": {"c": [3, 4]}}) + {'a.0': 1, 'a.1': 2, 'b.c.0': 3, 'b.c.1': 4} + + >>> flatten_dict({"a": [{"b": 1}, {"c": 2}]}) + {'a.0.b': 1, 'a.1.c': 2} + ``` + """ result = {} def _flatten(d, parent_key=""): @@ -198,6 +221,13 @@ def _flatten(d, parent_key=""): if isinstance(value, dict): _flatten(value, new_key) + elif isinstance(value, list): + for i, item in enumerate(value): + list_key = f"{new_key}{sep}{i}" + if isinstance(item, dict): + _flatten(item, list_key) + else: + result[list_key] = item else: result[new_key] = value diff --git a/ray.sub b/ray.sub index 1d5e00db4b..0fab8a8fd9 100644 --- a/ray.sub +++ b/ray.sub @@ -58,12 +58,13 @@ port=41993 ip_head=$head_node_ip:$port # First we start the head of the ray cluster on one of the physical nodes -# In this case we are giving an entire physical node to the ray head node -# The ray head node is marked by including --head to the ray start command +# Set GPU/CPU resources to 0 to avoid scheduling on the head node head_cmd=$(cat <$SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh # No args launches on the head node -WORKER_NUM=\${1:-0} -if [[ \$WORKER_NUM -eq 0 ]]; then - srun --gres=gpu:8 -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-head --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "$head_node" --jobid $SLURM_JOB_ID --pty bash +WORKER_NUM=\${1:-} +if [[ -z "\$WORKER_NUM" ]]; then + # Empty means we are on the head node + srun --no-container-mount-home --gpus=0 -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-head --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "$head_node" --jobid $SLURM_JOB_ID --pty bash else nodes_array=($nodes) - srun --gres=gpu:8 -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-worker-\$WORKER_NUM --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "\${nodes_array[\$WORKER_NUM]}" --jobid $SLURM_JOB_ID --pty bash + srun --no-container-mount-home --gres=gpu:8 -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-worker-\$WORKER_NUM --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "\${nodes_array[\$WORKER_NUM]}" --jobid $SLURM_JOB_ID --pty bash fi EOF chmod +x $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh diff --git a/tests/functional/grpo.sh b/tests/functional/grpo.sh index cfbc1bc712..16f7e7530d 100755 --- a/tests/functional/grpo.sh +++ b/tests/functional/grpo.sh @@ -2,6 +2,8 @@ SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) PROJECT_ROOT=$(realpath $SCRIPT_DIR/../..) +# Mark the current repo as safe, since wandb fetchs metadata about the repo +git config --global --add safe.directory $PROJECT_ROOT set -eou pipefail @@ -10,11 +12,12 @@ JSON_METRICS=$LOG_DIR/$(basename $0 .sh).json RUN_LOG=$LOG_DIR/$(basename $0 .sh).log export RAY_DEDUP_LOGS=0 export UV_CACHE_DIR=$PROJECT_ROOT/uv_cache +export PYTHONPATH=${PROJECT_ROOT}:${PYTHONPATH:-} mkdir -p $LOG_DIR cd $PROJECT_ROOT -uv run $PROJECT_ROOT/examples/run_grpo_math.py \ +python -u $PROJECT_ROOT/examples/run_grpo_math.py \ cluster.gpus_per_node=2 \ grpo.max_num_steps=10 \ logger.tensorboard_enabled=true \ @@ -25,12 +28,8 @@ uv run $PROJECT_ROOT/examples/run_grpo_math.py \ 2>&1 | tee $RUN_LOG cd $SCRIPT_DIR -uv run json_dump_tb_logs.py $LOG_DIR --output_path $JSON_METRICS - -uv run check_metrics.py $JSON_METRICS \ - 'data["timing/train/policy_refit"]["10"] < 3.0' \ - 'data["timing/train/total_step_time"]["10"] < 20.0' \ - 'data["timing/validation/generation"]["10"] < 3.0' \ - 'max(data["train/token_mult_prob_error"]) < 1.05' \ - 'data["validation/avg_length"]["10"] < 1024' \ +python json_dump_tb_logs.py $LOG_DIR --output_path $JSON_METRICS + +python check_metrics.py $JSON_METRICS \ + 'max(data["train/token_mult_prob_error"]) < 1.1' \ diff --git a/tests/functional/hello_world_fsdp_llama/__init__.py b/tests/functional/hello_world_fsdp_llama/__init__.py deleted file mode 100644 index 341a77c5bc..0000000000 --- a/tests/functional/hello_world_fsdp_llama/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tests/functional/hello_world_fsdp_llama/example.ipynb b/tests/functional/hello_world_fsdp_llama/example.ipynb deleted file mode 100644 index 4529efa34f..0000000000 --- a/tests/functional/hello_world_fsdp_llama/example.ipynb +++ /dev/null @@ -1,350 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com\n", - "Requirement already satisfied: ipywidgets in /usr/local/lib/python3.10/dist-packages (8.1.5)\n", - "Requirement already satisfied: comm>=0.1.3 in /usr/local/lib/python3.10/dist-packages (from ipywidgets) (0.2.2)\n", - "Requirement already satisfied: ipython>=6.1.0 in /usr/local/lib/python3.10/dist-packages (from ipywidgets) (8.28.0)\n", - "Requirement already satisfied: traitlets>=4.3.1 in /usr/local/lib/python3.10/dist-packages (from ipywidgets) (5.14.3)\n", - "Requirement already satisfied: widgetsnbextension~=4.0.12 in /usr/local/lib/python3.10/dist-packages (from ipywidgets) (4.0.13)\n", - "Requirement already satisfied: jupyterlab-widgets~=3.0.12 in /usr/local/lib/python3.10/dist-packages (from ipywidgets) (3.0.13)\n", - "Requirement already satisfied: decorator in /usr/local/lib/python3.10/dist-packages (from ipython>=6.1.0->ipywidgets) (5.1.1)\n", - "Requirement already satisfied: jedi>=0.16 in /usr/local/lib/python3.10/dist-packages (from ipython>=6.1.0->ipywidgets) (0.19.1)\n", - "Requirement already satisfied: matplotlib-inline in /usr/local/lib/python3.10/dist-packages (from ipython>=6.1.0->ipywidgets) (0.1.7)\n", - "Requirement already satisfied: prompt-toolkit<3.1.0,>=3.0.41 in /usr/local/lib/python3.10/dist-packages (from ipython>=6.1.0->ipywidgets) (3.0.48)\n", - "Requirement already satisfied: pygments>=2.4.0 in /usr/local/lib/python3.10/dist-packages (from ipython>=6.1.0->ipywidgets) (2.18.0)\n", - "Requirement already satisfied: stack-data in /usr/local/lib/python3.10/dist-packages (from ipython>=6.1.0->ipywidgets) (0.6.3)\n", - "Requirement already satisfied: exceptiongroup in /usr/local/lib/python3.10/dist-packages (from ipython>=6.1.0->ipywidgets) (1.2.2)\n", - "Requirement already satisfied: typing-extensions>=4.6 in /usr/local/lib/python3.10/dist-packages (from ipython>=6.1.0->ipywidgets) (4.12.2)\n", - "Requirement already satisfied: pexpect>4.3 in /usr/local/lib/python3.10/dist-packages (from ipython>=6.1.0->ipywidgets) (4.9.0)\n", - "Requirement already satisfied: parso<0.9.0,>=0.8.3 in /usr/local/lib/python3.10/dist-packages (from jedi>=0.16->ipython>=6.1.0->ipywidgets) (0.8.4)\n", - "Requirement already satisfied: ptyprocess>=0.5 in /usr/local/lib/python3.10/dist-packages (from pexpect>4.3->ipython>=6.1.0->ipywidgets) (0.7.0)\n", - "Requirement already satisfied: wcwidth in /usr/local/lib/python3.10/dist-packages (from prompt-toolkit<3.1.0,>=3.0.41->ipython>=6.1.0->ipywidgets) (0.2.13)\n", - "Requirement already satisfied: executing>=1.2.0 in /usr/local/lib/python3.10/dist-packages (from stack-data->ipython>=6.1.0->ipywidgets) (2.1.0)\n", - "Requirement already satisfied: asttokens>=2.1.0 in /usr/local/lib/python3.10/dist-packages (from stack-data->ipython>=6.1.0->ipywidgets) (2.4.1)\n", - "Requirement already satisfied: pure-eval in /usr/local/lib/python3.10/dist-packages (from stack-data->ipython>=6.1.0->ipywidgets) (0.2.3)\n", - "Requirement already satisfied: six>=1.12.0 in /usr/local/lib/python3.10/dist-packages (from asttokens>=2.1.0->stack-data->ipython>=6.1.0->ipywidgets) (1.16.0)\n", - "\u001b[33mWARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager, possibly rendering your system unusable.It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv. Use the --root-user-action option if you know what you are doing and want to suppress this warning.\u001b[0m\u001b[33m\n", - "\u001b[0m\n", - "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m24.2\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m25.0.1\u001b[0m\n", - "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n" - ] - } - ], - "source": [ - "!pip install -U ipywidgets" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "import ray" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2025-03-02 23:13:34,875\tINFO worker.py:1654 -- Connecting to existing Ray cluster at address: 10.65.26.15:41993...\n", - "2025-03-02 23:13:34,883\tINFO worker.py:1832 -- Connected to Ray cluster. View the dashboard at \u001b[1m\u001b[32m127.0.0.1:8265 \u001b[39m\u001b[22m\n" - ] - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "d16239ac6cf44259a74cf9c02d97c0a6", - "version_major": 2, - "version_minor": 0 - }, - "text/html": [ - "
\n", - "
\n", - "
\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - "\n", - "
Python version:3.10.12
Ray version:2.43.0
Dashboard:http://127.0.0.1:8265
\n", - "\n", - "
\n", - "
\n" - ], - "text/plain": [ - "RayContext(dashboard_url='127.0.0.1:8265', python_version='3.10.12', ray_version='2.43.0', ray_commit='ecdcdc6a6e63dc4bcd6ea16aae256ce4d32a7e2c')" - ] - }, - "execution_count": 3, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "ray.init()" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "16.0" - ] - }, - "execution_count": 4, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "avail_workers = ray.cluster_resources()[\"worker_units\"]\n", - "avail_workers" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, - "outputs": [], - "source": [ - "from examples.hello_world_fsdp_llama import train" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "['AutoModelForCausalLM',\n", - " 'AutoTokenizer',\n", - " 'LlamaDecoderLayer',\n", - " 'ModelTrainer',\n", - " 'NASS',\n", - " 'NodeInfo',\n", - " 'RayClusterCoordinator',\n", - " 'Worker',\n", - " 'WorkerGroupResources',\n", - " '__builtins__',\n", - " '__cached__',\n", - " '__doc__',\n", - " '__file__',\n", - " '__loader__',\n", - " '__name__',\n", - " '__package__',\n", - " '__spec__',\n", - " 'dataclass',\n", - " 'dist',\n", - " 'fully_shard',\n", - " 'init_device_mesh',\n", - " 'json',\n", - " 'os',\n", - " 'random',\n", - " 'ray',\n", - " 'register_fsdp_forward_method',\n", - " 'time',\n", - " 'torch']" - ] - }, - "execution_count": 6, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "dir(train)" - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Worker node info: worker_node_info=[NodeInfo(node_id='9faf44e0e79cf43aef2eec7b5003ba0ec197a5bf7c07c7e4706bde7f', node_rank=0, node_ip='10.65.26.23'), NodeInfo(node_id='9faf44e0e79cf43aef2eec7b5003ba0ec197a5bf7c07c7e4706bde7f', node_rank=0, node_ip='10.65.26.23'), NodeInfo(node_id='9faf44e0e79cf43aef2eec7b5003ba0ec197a5bf7c07c7e4706bde7f', node_rank=0, node_ip='10.65.26.23'), NodeInfo(node_id='9faf44e0e79cf43aef2eec7b5003ba0ec197a5bf7c07c7e4706bde7f', node_rank=0, node_ip='10.65.26.23'), NodeInfo(node_id='9faf44e0e79cf43aef2eec7b5003ba0ec197a5bf7c07c7e4706bde7f', node_rank=0, node_ip='10.65.26.23'), NodeInfo(node_id='9faf44e0e79cf43aef2eec7b5003ba0ec197a5bf7c07c7e4706bde7f', node_rank=0, node_ip='10.65.26.23'), NodeInfo(node_id='9faf44e0e79cf43aef2eec7b5003ba0ec197a5bf7c07c7e4706bde7f', node_rank=0, node_ip='10.65.26.23'), NodeInfo(node_id='9faf44e0e79cf43aef2eec7b5003ba0ec197a5bf7c07c7e4706bde7f', node_rank=0, node_ip='10.65.26.23')]\n", - "Num physical nodes: self.num_physical_nodes=1\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[36m(ModelTrainer pid=52042, ip=10.65.26.23)\u001b[0m DEBUG: self.logical_gpu_id=2 (process_id, world_size, physical_node_id, physical_node_ip, master_addr, num_workers_per_node)=(1, 8, 0, '10.65.26.23', '10.65.26.23', 8)\n", - "\u001b[36m(ModelTrainer pid=52043, ip=10.65.26.23)\u001b[0m DEBUG: self.logical_gpu_id=4 (process_id, world_size, physical_node_id, physical_node_ip, master_addr, num_workers_per_node)=(6, 8, 0, '10.65.26.23', '10.65.26.23', 8)\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m [Rank 0] Loading model meta-llama/Llama-3.2-1B on CPU...\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m [Rank 0] Starting synthetic training...\n" - ] - } - ], - "source": [ - "worker_resources = train.WorkerGroupResources(\n", - " num_nodes=1, num_gpus_per_node=8, num_cpus_per_worker=16\n", - ")\n", - "coordinator = train.RayClusterCoordinator(train.ModelTrainer, worker_resources)\n", - "coordinator.initialize_workers()" - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "16.0" - ] - }, - "execution_count": 8, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# Workers after consuming 1 node should be 16 -> 8\n", - "ray.cluster_resources()[\"worker_units\"]" - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m /usr/local/lib/python3.10/dist-packages/torch/autograd/graph.py:818: UserWarning: cuDNN SDPA backward got grad_output.strides() != output.strides(), attempting to materialize a grad_output with matching strides... (Triggered internally at /opt/pytorch/pytorch/aten/src/ATen/native/cudnn/MHA.cpp:670.)\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m return Variable._execution_engine.run_backward( # Calls into the C++ engine to run the backward pass\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 0, Step: 0, Loss: 30.59472656\n", - "\u001b[36m(ModelTrainer pid=52041, ip=10.65.26.23)\u001b[0m [Rank 2] Loading model meta-llama/Llama-3.2-1B on CPU...\u001b[32m [repeated 7x across cluster]\u001b[0m\n", - "\u001b[36m(ModelTrainer pid=52041, ip=10.65.26.23)\u001b[0m [Rank 2] Starting synthetic training...\u001b[32m [repeated 7x across cluster]\u001b[0m\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 0, Step: 1, Loss: 0.55084229\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 0, Step: 2, Loss: 3.00805664\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 0, Step: 3, Loss: 2.95410156\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 0, Step: 4, Loss: 2.15722656\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 0, Step: 5, Loss: 1.41015625\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 0, Step: 6, Loss: 0.75201416\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 0, Step: 7, Loss: 0.16345596\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 0, Step: 8, Loss: 0.07264709\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 0, Step: 9, Loss: 0.07691956\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 1, Step: 0, Loss: 0.03055668\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 1, Step: 1, Loss: 0.00250489\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 1, Step: 2, Loss: 0.00558114\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 1, Step: 3, Loss: 0.03055668\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 1, Step: 4, Loss: 0.05177402\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 1, Step: 5, Loss: 0.06747818\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 1, Step: 6, Loss: 0.06152725\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 1, Step: 7, Loss: 0.04326725\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 1, Step: 8, Loss: 0.02659702\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Epoch: 1, Step: 9, Loss: 0.01250291\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m [Rank 0] Testing loss is close to expected loss: 0.012502908706665039\n", - "\u001b[36m(ModelTrainer pid=52039, ip=10.65.26.23)\u001b[0m Yay! Loss was close :)\n" - ] - }, - { - "data": { - "text/plain": [ - "[None, None, None, None, None, None, None, None]" - ] - }, - "execution_count": 9, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "coordinator.run(hf_model_name=\"meta-llama/Llama-3.2-1B\")" - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "metadata": {}, - "outputs": [], - "source": [ - "ray.shutdown()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.12" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/tests/functional/hello_world_fsdp_llama/main.py b/tests/functional/hello_world_fsdp_llama/main.py deleted file mode 100644 index df5b9bf1c9..0000000000 --- a/tests/functional/hello_world_fsdp_llama/main.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import time -from ray.job_submission import JobSubmissionClient, JobStatus - - -def main() -> None: - client = JobSubmissionClient("http://127.0.0.1:8265") - print("Connected to head!", flush=True) - - # HACK: for now just - def num_ray_nodes_available() -> int: - import ray - - ray.init() - num_gpus_per_node = 8 # hard coded - num_nodes_avail = ( - int(ray.cluster_resources()["worker_units"]) // num_gpus_per_node - ) - ray.shutdown() - return num_nodes_avail - - job_id = client.submit_job( - entrypoint="RAY_DEDUP_LOGS=0 python3 tests/functional/hello_world_fsdp_llama/train.py", - runtime_env={ - # TODO: disabling for now since it causes issues if my hf_home is in my working dir and ray - # wants to upload it to all workers. you get an error like this: - # 2025-03-02 11:16:48,187 WARNING packaging.py:417 -- File /workspace/hf_home/hub/models--meta-llama--Meta-Llama-3-8b/snapshots/8cde5ca8380496c9a6cc7ef3a8b46a0372a1d920/model-00001-of-00004.safetensors is very large (4746.15MiB). Consider adding this file to the 'excludes' list to skip uploading it: `ray.init(..., runtime_env={'excludes': ['/workspace/hf_home/hub/models--meta-llama--Meta-Llama-3-8b/snapshots/8cde5ca8380496c9a6cc7ef3a8b46a0372a1d920/model-00001-of-00004.safetensors']})` - # "working_dir": "./", - "driver_args": { - # Scope each "workergroup" - "trainer": { - "resources": { - # TODO: read this in from cli args eventually, but for now just use all available - "num_nodes": num_ray_nodes_available(), - "num_gpus_per_node": 8, - "num_cpus_per_worker": 16, - }, - "hf_model_name": "meta-llama/Llama-3.2-1B", - } - }, - "env_vars": { - # TODO: hardcoded, parametrize - "HF_HOME": "/workspace/hf_home", - }, - }, - ) - - print(f"Launched job: {job_id}", flush=True) - prev_logs = "" - while True: - status = client.get_job_status(job_id) - if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}: - if status in {JobStatus.STOPPED, JobStatus.FAILED}: - logs = client.get_job_logs(job_id) - print(logs, flush=True) - break - time.sleep(5) - if status == JobStatus.RUNNING: - logs = client.get_job_logs(job_id) - print(logs[len(prev_logs) :], flush=True) - prev_logs = logs - - -if __name__ == "__main__": - main() diff --git a/tests/functional/hello_world_fsdp_llama/train.py b/tests/functional/hello_world_fsdp_llama/train.py deleted file mode 100644 index 61c823f37f..0000000000 --- a/tests/functional/hello_world_fsdp_llama/train.py +++ /dev/null @@ -1,329 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import json -import ray -import os -import random -import torch -import torch.distributed as dist -from torch.distributed.device_mesh import init_device_mesh -from transformers import AutoModelForCausalLM, AutoTokenizer -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy as NASS -from dataclasses import dataclass -from torch.distributed.fsdp import FullyShardedDataParallel, MixedPrecision -from torch.distributed.fsdp.wrap import size_based_auto_wrap_policy -from torch.distributed.fsdp import register_fsdp_forward_method - - -@dataclass -class WorkerGroupResources: - num_nodes: int - num_cpus_per_worker: int = 16 # 128 hyperthread / 8 gpu = 16 cpu/gpu - num_gpus_per_node: int = 8 # will always be true on slurm - - -@dataclass -class NodeInfo: - node_id: str - node_rank: int - node_ip: str - - -# Define the coordinator and worker -class RayClusterCoordinator: - def __init__( - self, worker_cls: type["ModelTrainer"], worker_resources: WorkerGroupResources - ) -> None: - self.worker_cls = worker_cls - self.worker_resources = worker_resources - self.num_workers = ( - worker_resources.num_nodes * worker_resources.num_gpus_per_node - ) - self.num_workers_per_node = worker_resources.num_gpus_per_node - - ray_available_workers = int(ray.cluster_resources()["worker_units"]) - assert self.num_workers // self.num_workers_per_node <= ray_available_workers, ( - f"Only {ray_available_workers} workers available, which is not enough to schedule {self.num_workers} workers with {self.num_workers_per_node} workers per node" - ) - - self.workers_initialized = False - - worker_node_info, self.num_physical_nodes = self._get_schedulable_worker_info() - print(f"Worker node info: {worker_node_info=}") - print(f"Num physical nodes: {self.num_physical_nodes=}") - # Assume there's one worker per GPU - self.workers = [ - worker_cls.options( - num_gpus=1, - num_cpus=worker_resources.num_cpus_per_worker, - resources={"worker_units": 1}, - # Use NodeAffinitySchedulingStrategy to ensure each worker is placed on a specific node - # node_id: Unique ID of the target node for this worker - # soft=False: Strictly enforce placement on the specified node (no fallback to other nodes) - scheduling_strategy=NASS( - node_id=worker_node_info[i].node_id, soft=False - ), - ).remote( - i, - self.num_workers, - worker_node_info[i].node_rank, - worker_node_info[i].node_ip, # TODO: probably can delete this - worker_node_info[ - 0 - ].node_ip, # Arbitrarily make the first worker's hots the master - self.num_workers_per_node, - ) - for i in range(self.num_workers) - ] - - def _get_schedulable_worker_info(self) -> tuple[list[NodeInfo], int]: - """Collects information about available worker nodes in the Ray cluster and prepares - scheduling information for worker actors. - - This method: - 1. Identifies all alive worker nodes with 'worker_units' resources - 2. Sorts them by NodeID for consistent allocation - 3. Calculates how many physical nodes are needed based on workers per node - 4. Verifies that enough nodes are available - 5. Creates a list of NodeInfo objects for each worker - - Returns: - tuple: (worker_node_info, num_nodes_required) - - worker_node_info: List of NodeInfo objects containing node_id, node_rank, and node_ip for each worker - - num_nodes_required: Number of physical nodes needed for all workers - - Raises: - AssertionError: If there aren't enough nodes available to schedule all workers - """ - # Get list of alive worker nodes sorted by NodeID for deterministic allocation - worker_node_info = [] - worker_nodes = sorted( - [ - node - for node in ray.nodes() - if (node["Alive"] and "worker_units" in node["Resources"]) - ], - key=lambda x: x["NodeID"], - ) - - # Calculate required nodes and verify availability - num_nodes_required = self.num_workers // self.num_workers_per_node - num_nodes_available = len(worker_nodes) - assert num_nodes_required <= num_nodes_available - - # Create worker info entries - one per GPU across all needed nodes - worker_nodes = worker_nodes[:num_nodes_required] - for worker_node_id, worker_node in enumerate(worker_nodes): - for _ in range(self.num_workers_per_node): - worker_node_info.append( - NodeInfo( - worker_node["NodeID"], worker_node_id, worker_node["NodeName"] - ) - ) - - return worker_node_info, num_nodes_required - - def initialize_workers(self, **kwargs): - self.worker_init_kwargs = kwargs - ray.get([w.initialize.remote(**kwargs) for i, w in enumerate(self.workers)]) - self.workers_initialized = True - - def run(self, *args, **kwargs): - if not self.workers_initialized: - raise ValueError("""Cannot run workers without initializing them first. - Please call the initialize_workers method of your cluster coordinator first.""") - - worker_results = ray.get([w.run.remote(*args, **kwargs) for w in self.workers]) - return worker_results - - -class Worker: - def __init__( - self, - process_id, - world_size, - physical_node_id, - physical_node_ip, - master_addr: str, - num_workers_per_node: int, - ): - self.process_id = process_id - self.world_size = world_size - self.physical_node_id = physical_node_id - self.host_ip = physical_node_ip - self.master_addr = master_addr - self.logical_gpu_id = int(os.environ["CUDA_VISIBLE_DEVICES"]) - print( - f"DEBUG: {self.logical_gpu_id=} {(process_id, world_size, physical_node_id, physical_node_ip, master_addr, num_workers_per_node)=}" - ) - self.num_workers_per_node = num_workers_per_node - - def get_process_id(self): - return self.process_id - - def get_host_ip(self): - return self.host_ip - - def get_logical_gpu_id(self): - return self.logical_gpu_id - - def get_physical_node_id(self): - return self.physical_node_id - - def initialize(self): - # Set distributed training environment variables - os.environ["RANK"] = str(self.process_id) - os.environ["WORLD_SIZE"] = str(self.world_size) - os.environ["LOCAL_RANK"] = str(self.logical_gpu_id) - os.environ["LOCAL_WORLD_SIZE"] = str(self.num_workers_per_node) - os.environ["MASTER_ADDR"] = self.master_addr - os.environ["MASTER_PORT"] = "29500" - - dist.init_process_group("nccl") - - def run(self, *args, **kwargs): - raise NotImplementedError - - -@ray.remote -class ModelTrainer(Worker): - def __init__( - self, - process_id, - world_size, - physical_node_id, - physical_node_ip, - master_addr, - num_workers_per_node, - ): - super().__init__( - process_id, - world_size, - physical_node_id, - physical_node_ip, - master_addr, - num_workers_per_node, - ) - - def run(self, hf_model_name): - rank = dist.get_rank() - world_size = dist.get_world_size() - ####local_device = torch.device(f"cuda:{rank}") - ####torch.cuda.set_device(local_device) - - print(f"[Rank {rank}] Loading model {hf_model_name} on CPU...") - model = AutoModelForCausalLM.from_pretrained( - hf_model_name, - device_map="cpu", - torch_dtype=torch.bfloat16, - ) - - tokenizer = AutoTokenizer.from_pretrained(hf_model_name) - if tokenizer.pad_token is None: - tokenizer.pad_token = tokenizer.eos_token - - # TODO: could oom? - # ------------------------------------------------ - # 3) Move to GPU + Composable FSDP - # (Initialize device mesh, shard submodules, then shard entire model) - # ------------------------------------------------ - model.cuda() - - # Create a device mesh with 'world_size' GPUs in a 1D arrangement. - mesh = init_device_mesh("cuda", (world_size,)) - - param_dtype = torch.bfloat16 - reduce_dtype = torch.float32 - buffer_dtype = torch.float32 - - mp = MixedPrecision( - param_dtype=param_dtype, - reduce_dtype=reduce_dtype, - buffer_dtype=buffer_dtype, - ) - - model = FullyShardedDataParallel( - model, - device_mesh=mesh, - auto_wrap_policy=size_based_auto_wrap_policy, - mixed_precision=mp, - ) - - # Optionally register "generate" as the forward method so FSDP can handle it properly. - register_fsdp_forward_method(model, "generate") - - optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4) - model.train() - - num_epochs = 2 - num_batches = 10 - batch_size = 2 - seq_length = 16 - vocab_size = tokenizer.vocab_size or 32000 - - print(f"[Rank {rank}] Starting synthetic training...") - - torch.manual_seed(42) - torch.cuda.manual_seed_all(42) - random.seed(42) - for epoch in range(num_epochs): - for step in range(num_batches): - input_ids = torch.ones( - (batch_size, seq_length), device="cuda", dtype=torch.long - ) * (vocab_size - 1) - attention_mask = torch.ones_like(input_ids) - labels = input_ids.clone() - - optimizer.zero_grad() - outputs = model( - input_ids=input_ids, attention_mask=attention_mask, labels=labels - ) - loss = torch.square(outputs.logits.view(-1)[0]) - loss.backward() - optimizer.step() - - if rank == 0: - print( - f"Epoch: {epoch}, Step: {step}, Loss: {loss.item():.8f}", - flush=True, - ) - - if rank == 0: - expected_loss = 0.012502908706665039 - print( - f"[Rank {rank}] Testing loss is close to expected loss: {expected_loss}" - ) - torch.testing.assert_close(loss.item(), expected_loss) - print("Yay! Loss was close :)") - - -if __name__ == "__main__": - ray.init(address="auto", logging_level=0) - print(json.dumps(json.loads(os.environ["RAY_JOB_CONFIG_JSON_ENV_VAR"]), indent=4)) - driver_args = json.loads(os.environ["RAY_JOB_CONFIG_JSON_ENV_VAR"])["runtime_env"][ - "driver_args" - ] - - # TODO: very simple, need to think thru CLI - trainer_args = driver_args["trainer"] - trainer_resources = trainer_args.pop("resources") - worker_resources = WorkerGroupResources(**trainer_resources) - - coordinator = RayClusterCoordinator(ModelTrainer, worker_resources) - coordinator.initialize_workers() - print("Initialized workers") - # Get the job configuration set during launch. - # This is automatically set by Ray - coordinator.run(**trainer_args) - print("Finished") diff --git a/tests/functional/sft.sh b/tests/functional/sft.sh index 0e2298c983..b9836886d6 100755 --- a/tests/functional/sft.sh +++ b/tests/functional/sft.sh @@ -2,6 +2,8 @@ SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) PROJECT_ROOT=$(realpath $SCRIPT_DIR/../..) +# Mark the current repo as safe, since wandb fetchs metadata about the repo +git config --global --add safe.directory $PROJECT_ROOT set -eou pipefail @@ -10,23 +12,25 @@ JSON_METRICS=$LOG_DIR/$(basename $0 .sh).json RUN_LOG=$LOG_DIR/$(basename $0 .sh).log export RAY_DEDUP_LOGS=0 export UV_CACHE_DIR=$PROJECT_ROOT/uv_cache +export PYTHONPATH=${PROJECT_ROOT}:${PYTHONPATH:-} mkdir -p $LOG_DIR cd $PROJECT_ROOT -uv run $PROJECT_ROOT/examples/run_sft.py \ +python -u $PROJECT_ROOT/examples/run_sft.py \ cluster.gpus_per_node=2 \ sft.max_num_steps=10 \ logger.tensorboard_enabled=true \ logger.log_dir=$LOG_DIR \ logger.wandb_enabled=false \ + checkpointing.enabled=false \ $@ \ 2>&1 | tee $RUN_LOG cd $SCRIPT_DIR -uv run json_dump_tb_logs.py $LOG_DIR --output_path $JSON_METRICS +python json_dump_tb_logs.py $LOG_DIR --output_path $JSON_METRICS -uv run check_metrics.py $JSON_METRICS \ - 'data["train/loss"]["9"] < 600' \ - 'data["timing/train/sft_train_step"]["9"] < 0.25' +# TODO: loss is very noisy, this check is mainly for sanity of immediate divergence +python check_metrics.py $JSON_METRICS \ + 'data["train/loss"]["9"] < 1500' \ diff --git a/tests/run_functional_in_docker.sh b/tests/run_functional_in_docker.sh new file mode 100755 index 0000000000..e3fd403ba1 --- /dev/null +++ b/tests/run_functional_in_docker.sh @@ -0,0 +1,47 @@ +#!/bin/bash +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +PROJECT_ROOT=$(realpath $SCRIPT_DIR/..) + +set -eou pipefail + +# Ensure Docker is installed +if ! command -v docker &> /dev/null; then + echo "Error: Docker is not installed or not in PATH." + exit 1 +fi + +# CONTAINER is expected to be set as an environment variable +if [[ -z "${CONTAINER:-}" ]]; then + echo "Error: CONTAINER environment variable is not set." + echo "Usage: CONTAINER= $0