From fbdbb205964513e63fe76f8407614510ed5a215f Mon Sep 17 00:00:00 2001 From: Terry Kong Date: Sun, 23 Mar 2025 22:34:23 -0700 Subject: [PATCH 1/5] feat: add tracker fixture for unit test logging Signed-off-by: Terry Kong --- docs/testing.md | 47 ++++++ nemo_reinforcer/utils/logger.py | 96 ++++++++++-- tests/run_unit.sh | 2 +- tests/unit/conftest.py | 146 ++++++++++++++++++ .../unit/models/policy/test_hf_ray_policy.py | 4 +- tests/unit/utils/test_logger.py | 8 +- 6 files changed, 284 insertions(+), 19 deletions(-) diff --git a/docs/testing.md b/docs/testing.md index 610067961c..d9a7c44b1d 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -32,6 +32,53 @@ CONTAINER=... bash tests/run_unit_in_docker.sh The required `CONTAINER` can be built by following the instructions in the [docker documentation](docker.md). +### Tracking metrics in unit tests + +Unit tests may also log metrics to a fixture. The fixture is called `tracker` and has the following API: +```python +# Track an arbitrary metric (must be json serializable) +tracker.track(metric_name, metric_value) +# Log the maximum memory across the entire cluster. Okay for tests since they are run serially. +tracker.log_max_mem(metric_name) +# Returns the maximum memory. Useful if you are measuring changes in memory. +tracker.get_max_mem() +``` + +Including the `tracker` fixture also tracks the elapsed time for the test implicitly. + +Here is an example test: +```python +def test_exponentiate(tracker): + starting_mem = tracker.get_max_mem() + base = 2 + exponent = 4 + result = base ** exponent + tracker.track("result", result) + tracker.log_max_mem("memory_after_exponentiating") + change_in_mem = tracker.get_max_mem() - starting_mem + tracker.track("change_in_mem", change_in_mem) + assert result == 16 +``` + +Which would produce this file in `tests/unit/unit_results.json`: +```json +{ + "exit_status": 0, + "metrics": { + "test_logger::test_exponentiate": { + "result": 16, + "memory_after_exponentiating": 551.0, + "change_in_mem": 0.0, + "_elapsed": 0.1190798282623291 + } + }, + "gpu_types": [ + "NVIDIA H100 80GB HBM3" + ], + "coverage": 24.55897613282601 +} +``` + ## Functional tests :::{important} diff --git a/nemo_reinforcer/utils/logger.py b/nemo_reinforcer/utils/logger.py index dca4181681..42143e7593 100644 --- a/nemo_reinforcer/utils/logger.py +++ b/nemo_reinforcer/utils/logger.py @@ -144,6 +144,11 @@ def log_hyperparams(self, params: Dict[str, Any]) -> None: self.run.config.update(params) +class GpuMetricSnapshot(TypedDict): + step: int + metrics: Dict[str, Any] + + class RayGpuMonitorLogger: """Monitor GPU utilization across a Ray cluster and log metrics to a parent logger.""" @@ -163,7 +168,9 @@ def __init__( self.collection_interval = collection_interval self.flush_interval = flush_interval self.parent_logger = parent_logger - self.metrics_buffer = [] # Store metrics with timestamps + self.metrics_buffer: list[ + GpuMetricSnapshot + ] = [] # Store metrics with timestamps self.last_flush_time = time.time() self.is_running = False self.collection_thread = None @@ -228,7 +235,9 @@ def _collection_loop(self): time.sleep(self.collection_interval) except Exception as e: - print(f"Error in GPU monitoring collection loop: {e}") + print( + f"Error in GPU monitoring collection loop or stopped abruptly: {e}" + ) time.sleep(self.collection_interval) # Continue despite errors def _parse_gpu_metric(self, sample: Sample, node_idx: int) -> Dict[str, Any]: @@ -241,7 +250,6 @@ def _parse_gpu_metric(self, sample: Sample, node_idx: int) -> Dict[str, Any]: Returns: Dictionary with metric name and value """ - # TODO: Consider plumbing {'GpuDeviceName': 'NVIDIA H100 80GB HBM3'} # Expected labels for GPU metrics expected_labels = ["GpuIndex"] for label in expected_labels: @@ -266,12 +274,72 @@ def _parse_gpu_metric(self, sample: Sample, node_idx: int) -> Dict[str, Any]: metric_name = f"node.{node_idx}.gpu.{index}.{metric_name}" return {metric_name: value} + def _parse_gpu_sku(self, sample: Sample, node_idx: int) -> Dict[str, str]: + """Parse a GPU metric sample into a standardized format. + + Args: + sample: Prometheus metric sample + node_idx: Index of the node + + Returns: + Dictionary with metric name and value + """ + # TODO: Consider plumbing {'GpuDeviceName': 'NVIDIA H100 80GB HBM3'} + # Expected labels for GPU metrics + expected_labels = ["GpuIndex", "GpuDeviceName"] + for label in expected_labels: + if label not in sample.labels: + # This is probably a CPU node + return {} + + metric_name = sample.name + # Only return SKU if the metric is one of these which publish these metrics + if ( + metric_name != "ray_node_gpus_utilization" + and metric_name != "ray_node_gram_used" + ): + # Skip unexpected metrics + return {} + + labels = sample.labels + index = labels["GpuIndex"] + value = labels["GpuDeviceName"] + + metric_name = f"node.{node_idx}.gpu.{index}.type" + return {metric_name: value} + + def _collect_gpu_sku(self) -> Dict[str, str]: + """Collect GPU SKU from all Ray nodes. + + Note: This is an internal API and users are not expected to call this. + + Returns: + Dictionary of SKU types on all Ray nodes + """ + # TODO: We can re-use the same path for metrics because even though both utilization and memory metrics duplicate + # the GPU metadata information; since the metadata is the same for each node, we can overwrite it and expect them to + # be the same + return self._collect(sku=True) + def _collect_metrics(self) -> Dict[str, Any]: """Collect GPU metrics from all Ray nodes. Returns: Dictionary of collected metrics """ + return self._collect(metrics=True) + + def _collect(self, metrics: bool = False, sku: bool = False) -> Dict[str, Any]: + """Collect GPU metrics from all Ray nodes. + + Returns: + Dictionary of collected metrics + """ + assert metrics ^ sku, ( + f"Must collect either metrics or sku, not both: {metrics=}, {sku=}" + ) + parser_fn = self._parse_gpu_metric if metrics else self._parse_gpu_sku + if not ray.is_initialized(): print("Ray is not initialized. Cannot collect GPU metrics.") return {} @@ -295,7 +363,9 @@ def _collect_metrics(self) -> Dict[str, Any]: # Process each node's metrics collected_metrics = {} for node_idx, metric_address in enumerate(unique_metric_addresses): - gpu_metrics = self._fetch_and_parse_metrics(node_idx, metric_address) + gpu_metrics = self._fetch_and_parse_metrics( + node_idx, metric_address, parser_fn + ) collected_metrics.update(gpu_metrics) return collected_metrics @@ -304,7 +374,7 @@ def _collect_metrics(self) -> Dict[str, Any]: print(f"Error collecting GPU metrics: {e}") return {} - def _fetch_and_parse_metrics(self, node_idx, metric_address): + def _fetch_and_parse_metrics(self, node_idx, metric_address, parser_fn): """Fetch metrics from a node and parse GPU metrics. Args: @@ -335,7 +405,7 @@ def _fetch_and_parse_metrics(self, node_idx, metric_address): continue for sample in family.samples: - metrics = self._parse_gpu_metric(sample, node_idx) + metrics = parser_fn(sample, node_idx) gpu_metrics.update(metrics) return gpu_metrics @@ -346,18 +416,16 @@ def _fetch_and_parse_metrics(self, node_idx, metric_address): def flush(self): """Flush collected metrics to the parent logger.""" - if not self.parent_logger: - return - with self.lock: if not self.metrics_buffer: return - # Log each set of metrics with its original step - for entry in self.metrics_buffer: - step = entry["step"] - metrics = entry["metrics"] - self.parent_logger.log_metrics(metrics, step, prefix="ray") + if self.parent_logger: + # Log each set of metrics with its original step + for entry in self.metrics_buffer: + step = entry["step"] + metrics = entry["metrics"] + self.parent_logger.log_metrics(metrics, step, prefix="ray") # Clear buffer after logging self.metrics_buffer = [] diff --git a/tests/run_unit.sh b/tests/run_unit.sh index e826a18a60..f51ff49ff6 100755 --- a/tests/run_unit.sh +++ b/tests/run_unit.sh @@ -33,7 +33,7 @@ export RAY_DEDUP_LOGS=0 # Run unit tests echo "Running unit tests..." -if ! pytest unit/ -s -rA "$@"; then +if ! pytest unit/ --cov=nemo_reinforcer --cov-report=term --cov-report=json -s -rA "$@"; then echo "[ERROR]: Unit tests failed." exit 1 fi diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 67de3a36af..707f38c6e6 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -11,7 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from io import StringIO +import time import pytest +from nemo_reinforcer.utils.logger import GPUMonitoringConfig +from tests import unit import torch import torch.distributed as dist import torch.multiprocessing as mp @@ -28,7 +32,131 @@ import random from typing import Callable import ray +import json from nemo_reinforcer.distributed.virtual_cluster import init_ray +from typing import TypedDict +from datetime import datetime + +dir_path = os.path.dirname(os.path.abspath(__file__)) + +UNIT_RESULTS_FILE = os.path.join(dir_path, "unit_results.json") +UNIT_RESULTS_FILE_DATED = os.path.join( + dir_path, f"unit_results/{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" +) + + +class UnitTestData(TypedDict): + exit_status: int | str + metrics: dict + gpu_types: list[str] + coverage: dict + + +def pytest_sessionstart(session): + # Delete the unit results file at the start of a new test session + if os.path.exists(UNIT_RESULTS_FILE): + try: + os.remove(UNIT_RESULTS_FILE) + print(f"Deleted existing results file: {UNIT_RESULTS_FILE}") + except Exception as e: + print(f"Warning: Failed to delete results file: {e}") + session.config._unit_test_data = UnitTestData( + exit_status="was not set", metrics={}, gpu_types=[], coverage={} + ) + + +@pytest.fixture(scope="session", autouse=True) +def session_data(request, init_ray_cluster): + """Session-level fixture to store and save metrics data. + + This fixture tracks both metrics from tests and metadata about the test environment. + The metrics are stored in the 'metrics' dictionary. + + It's set to autouse so that we track metadata and coverage even if no test selected + explicitly track metrics. + """ + # Pass init_ray_cluster so that we can access ray metadata + + ############################################################ + # 1. Gather all the unit test data # + ############################################################ + unit_test_data: UnitTestData = request.config._unit_test_data + yield unit_test_data + + ############################################################ + # 2. Gather the ray metadata # + ############################################################ + from nemo_reinforcer.utils.logger import RayGpuMonitorLogger + + logger = RayGpuMonitorLogger( + collection_interval=float("inf"), + flush_interval=float("inf"), + parent_logger=None, + ) + unit_test_data["gpu_types"] = list(set(logger._collect_gpu_sku().values())) + + ############################################################ + # 3. Gather the coverage data # + ############################################################ + # We directly access the coverage controller from the plugin manager + # so we can access the coverage total before the pytest session finishes. + if request.config.pluginmanager.hasplugin("_cov"): + plugin = request.config.pluginmanager.getplugin("_cov") + if plugin.cov_controller: + cov_controller = plugin.cov_controller + # We currently don't use the cov_report since we can always access the coverage.json later, but + # in the future if we want to report the coverage more granularly as part of the session finish, + # we can access it here. + cov_report = StringIO() + cov_total = cov_controller.summary(cov_report) + unit_test_data["coverage"] = cov_total + + +@pytest.fixture +def tracker(request, session_data, ray_gpu_monitor): + """Test-level fixture that automatically captures test function info.""" + # Get fully qualified test name (module::test_function) + module_name = request.module.__name__ + test_name = request.function.__name__ + qualified_name = f"{module_name}::{test_name}" + + # Initialize an empty dict for this test if it doesn't exist + if qualified_name not in session_data: + session_data["metrics"][qualified_name] = {} + + class Tracker: + def track(self, metric_name: str, value): + """Tracking an arbitrary metric.""" + session_data["metrics"][qualified_name][metric_name] = value + + def get_max_mem(self): + metrics = ray_gpu_monitor._collect_metrics() + max_mem = 0 + for m_name, m_value in metrics.items(): + if m_name.endswith(".memory"): + max_mem = max(max_mem, m_value) + return max_mem + + def log_max_mem(self, metric_name: str): + session_data["metrics"][qualified_name][metric_name] = self.get_max_mem() + + start_time = time.time() + yield Tracker() + end_time = time.time() + # Prefix with `_` to indicate it's automatically collected + session_data["metrics"][qualified_name]["_elapsed"] = end_time - start_time + + +def pytest_sessionfinish(session, exitstatus): + data = session.config._unit_test_data + data["exit_status"] = exitstatus + print(f"\nSaving unit test data to {UNIT_RESULTS_FILE}") + print(f"and saving to {UNIT_RESULTS_FILE_DATED}") + with open(UNIT_RESULTS_FILE, "w") as f: + json.dump(data, f, indent=2) + os.makedirs(os.path.dirname(UNIT_RESULTS_FILE_DATED), exist_ok=True) + with open(UNIT_RESULTS_FILE_DATED, "w") as f: + json.dump(data, f, indent=2) @pytest.fixture(scope="session", autouse=True) @@ -42,6 +170,24 @@ def init_ray_cluster(): ray.shutdown() +@pytest.fixture(scope="session", autouse=True) +def ray_gpu_monitor(init_ray_cluster): + """Initialize Ray for the test module and clean up afterward. + + This fixture doesn't need to be called directly. + """ + from nemo_reinforcer.utils.logger import RayGpuMonitorLogger + + gpu_monitor = RayGpuMonitorLogger( + collection_interval=1, + flush_interval=float("inf"), # Disabling flushing since we will do it manually + parent_logger=None, + ) + gpu_monitor.start() + yield gpu_monitor + gpu_monitor.stop() + + def _setup_distributed(rank, world_size, port, backend="nccl"): """Initialize the distributed environment for a test (internal use only)""" os.environ["MASTER_ADDR"] = "localhost" diff --git a/tests/unit/models/policy/test_hf_ray_policy.py b/tests/unit/models/policy/test_hf_ray_policy.py index ae80b3fd1e..29d8cbcbee 100644 --- a/tests/unit/models/policy/test_hf_ray_policy.py +++ b/tests/unit/models/policy/test_hf_ray_policy.py @@ -364,7 +364,7 @@ def generation_setup(): @pytest.mark.timeout(180) -def test_hf_policy_generation(generation_setup): +def test_hf_policy_generation(generation_setup, tracker): policy, cluster, data, tokenizer, prompts, expected_generations = generation_setup # Verify resources were created properly @@ -415,6 +415,7 @@ def test_hf_policy_generation(generation_setup): torch.exp(torch.abs(results["logprobs"] - fprop_results["logprobs"])) ) print(f"avg prob mult error: {avg_prob_mult_error}") + tracker.track("avg_prob_mult_error", float(avg_prob_mult_error)) assert avg_prob_mult_error <= 1.025 # get logprobs for the expected generations @@ -439,6 +440,7 @@ def test_hf_policy_generation(generation_setup): expected_logprobs = policy.get_logprobs(expected_data)["logprobs"] mean_lps = torch.mean(expected_logprobs * expected_tokenized["attention_mask"]) + tracker.track("mean_lps", float(mean_lps)) assert mean_lps > -1.7, "Expected logprobs should be greater than -1.7" assert mean_lps < -1.4, "Expected logprobs should be less than -1.4" diff --git a/tests/unit/utils/test_logger.py b/tests/unit/utils/test_logger.py index a8d982266b..dd1c24fb27 100644 --- a/tests/unit/utils/test_logger.py +++ b/tests/unit/utils/test_logger.py @@ -417,7 +417,9 @@ def test_fetch_and_parse_metrics(self, mock_get, mock_ray): # Call the method result = monitor._fetch_and_parse_metrics( - node_idx=2, metric_address="test_ip:test_port" + node_idx=2, + metric_address="test_ip:test_port", + parser_fn=monitor._parse_gpu_metric, ) # Verify request was made correctly @@ -460,8 +462,8 @@ def test_collect_metrics(self, mock_ray): # Verify _fetch_and_parse_metrics was called for each node assert mock_fetch.call_count == 2 - mock_fetch.assert_any_call(0, "10.0.0.1:8080") - mock_fetch.assert_any_call(1, "10.0.0.2:8080") + mock_fetch.assert_any_call(0, "10.0.0.1:8080", monitor._parse_gpu_metric) + mock_fetch.assert_any_call(1, "10.0.0.2:8080", monitor._parse_gpu_metric) # Verify the result combines metrics from all nodes assert result == { From 5a425414c1a36119564aa048a039075688f41eed Mon Sep 17 00:00:00 2001 From: Terry Kong Date: Sun, 23 Mar 2025 23:02:46 -0700 Subject: [PATCH 2/5] log the metrics Signed-off-by: Terry Kong --- .github/workflows/_run_test.yml | 18 ++++++++++++++++-- .github/workflows/cicd-main.yml | 7 +++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/.github/workflows/_run_test.yml b/.github/workflows/_run_test.yml index 813719503c..3081d9b204 100644 --- a/.github/workflows/_run_test.yml +++ b/.github/workflows/_run_test.yml @@ -27,11 +27,16 @@ on: default: 10 SCRIPT: type: string - description: Test script to execute + description: Test script to execute in container required: true AFTER_SCRIPT: type: string - description: Script to run after main test + description: Script to run after main test in container + required: false + default: ":" + FINAL_SCRIPT_EXTERNAL: + type: string + description: Script to run after SCRIPT and AFTER_SCRIPT, but outside container (useful for logging) required: false default: ":" IS_OPTIONAL: @@ -163,6 +168,15 @@ jobs: ) docker exec nemo_container_${{ github.run_id }} bash -eux -o pipefail -c "$cmd" + - name: final_script_external + if: always() && inputs.FINAL_SCRIPT_EXTERNAL != ':' + run: | + cmd=$(cat <<"RUN_TEST_EOF" + ${{ inputs.FINAL_SCRIPT_EXTERNAL }} + RUN_TEST_EOF + ) + bash -eux -o pipefail -c "$cmd" + - name: Container shutdown if: always() run: | diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index 0e2a289c62..62d274a9b7 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -154,6 +154,13 @@ jobs: SCRIPT: | cd ${REINFORCER_REPO_DIR} uv run --extra test bash -x ./tests/run_unit.sh + FINAL_SCRIPT_EXTERNAL: | + cat < Date: Mon, 24 Mar 2025 16:01:52 -0700 Subject: [PATCH 3/5] escape backticks in step summary Signed-off-by: Terry Kong --- .github/workflows/cicd-main.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index 62d274a9b7..05dcefaf86 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -157,9 +157,9 @@ jobs: FINAL_SCRIPT_EXTERNAL: | cat < Date: Mon, 24 Mar 2025 23:30:36 -0700 Subject: [PATCH 4/5] add git commit to the meta Signed-off-by: Terry Kong --- tests/unit/conftest.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 707f38c6e6..3e8ca5a801 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -47,6 +47,7 @@ class UnitTestData(TypedDict): exit_status: int | str + git_commit: str metrics: dict gpu_types: list[str] coverage: dict @@ -60,8 +61,27 @@ def pytest_sessionstart(session): print(f"Deleted existing results file: {UNIT_RESULTS_FILE}") except Exception as e: print(f"Warning: Failed to delete results file: {e}") + + # Get the git commit hash + try: + import subprocess + + result = subprocess.run( + ["git", "-C", dir_path, "rev-parse", "HEAD"], + capture_output=True, + text=True, + check=True, + ) + git_commit = result.stdout.strip() + except Exception as e: + git_commit = f"Error getting git commit: {str(e)}" + session.config._unit_test_data = UnitTestData( - exit_status="was not set", metrics={}, gpu_types=[], coverage={} + exit_status="was not set", + git_commit=git_commit, + metrics={}, + gpu_types=[], + coverage={}, ) From 8b7a88e65e4400fae21e8f8c7f8377c67102394e Mon Sep 17 00:00:00 2001 From: Terry Kong Date: Tue, 25 Mar 2025 17:35:50 -0700 Subject: [PATCH 5/5] add start time Signed-off-by: Terry Kong --- docs/testing.md | 28 +++++++++++++++++++++++----- tests/unit/conftest.py | 2 ++ 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/docs/testing.md b/docs/testing.md index d9a7c44b1d..570c9c8696 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -64,12 +64,13 @@ Which would produce this file in `tests/unit/unit_results.json`: ```json { "exit_status": 0, + "git_commit": "f1062bd3fd95fc64443e2d9ee4a35fc654ba897e", + "start_time": "2025-03-24 23:34:12", "metrics": { - "test_logger::test_exponentiate": { - "result": 16, - "memory_after_exponentiating": 551.0, - "change_in_mem": 0.0, - "_elapsed": 0.1190798282623291 + "test_hf_ray_policy::test_hf_policy_generation": { + "avg_prob_mult_error": 1.0000039339065552, + "mean_lps": -1.5399343967437744, + "_elapsed": 17.323044061660767 } }, "gpu_types": [ @@ -79,6 +80,23 @@ Which would produce this file in `tests/unit/unit_results.json`: } ``` +:::{tip} +Past unit test results are logged in `tests/unit/unit_results/`. These are helpful to view trends over time and commits. + +Here's an example `jq` command to view trends: + +```sh +jq -r '[.start_time, .git_commit, .metrics["test_hf_ray_policy::test_hf_policy_generation"].avg_prob_mult_error] | @tsv' tests/unit/unit_results/* + +# Example output: +#2025-03-24 23:35:39 778d288bb5d2edfd3eec4d07bb7dffffad5ef21b 1.0000039339065552 +#2025-03-24 23:36:37 778d288bb5d2edfd3eec4d07bb7dffffad5ef21b 1.0000039339065552 +#2025-03-24 23:37:37 778d288bb5d2edfd3eec4d07bb7dffffad5ef21b 1.0000039339065552 +#2025-03-24 23:38:14 778d288bb5d2edfd3eec4d07bb7dffffad5ef21b 1.0000039339065552 +#2025-03-24 23:38:50 778d288bb5d2edfd3eec4d07bb7dffffad5ef21b 1.0000039339065552 +``` +::: + ## Functional tests :::{important} diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 3e8ca5a801..b13d6588a4 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -48,6 +48,7 @@ class UnitTestData(TypedDict): exit_status: int | str git_commit: str + start_time: str metrics: dict gpu_types: list[str] coverage: dict @@ -79,6 +80,7 @@ def pytest_sessionstart(session): session.config._unit_test_data = UnitTestData( exit_status="was not set", git_commit=git_commit, + start_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), metrics={}, gpu_types=[], coverage={},