Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions .github/workflows/_run_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ on:
default: 10
SCRIPT:
type: string
description: Test script to execute
description: Test script to execute in container
required: true
AFTER_SCRIPT:
type: string
description: Script to run after main test
description: Script to run after main test in container
required: false
default: ":"
FINAL_SCRIPT_EXTERNAL:
type: string
description: Script to run after SCRIPT and AFTER_SCRIPT, but outside container (useful for logging)
required: false
default: ":"
IS_OPTIONAL:
Expand Down Expand Up @@ -163,6 +168,15 @@ jobs:
)
docker exec nemo_container_${{ github.run_id }} bash -eux -o pipefail -c "$cmd"

- name: final_script_external
if: always() && inputs.FINAL_SCRIPT_EXTERNAL != ':'
run: |
cmd=$(cat <<"RUN_TEST_EOF"
${{ inputs.FINAL_SCRIPT_EXTERNAL }}
RUN_TEST_EOF
)
bash -eux -o pipefail -c "$cmd"

- name: Container shutdown
if: always()
run: |
Expand Down
7 changes: 7 additions & 0 deletions .github/workflows/cicd-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ jobs:
SCRIPT: |
cd ${REINFORCER_REPO_DIR}
uv run --extra test bash -x ./tests/run_unit.sh
FINAL_SCRIPT_EXTERNAL: |
cat <<EOF | tee -a $GITHUB_STEP_SUMMARY
# Unit test results
\`\`\`json
$(cat tests/unit/unit_results.json || echo "n/a")
\`\`\`
EOF
secrets:
HF_TOKEN: ${{ secrets.HF_TOKEN }}

Expand Down
65 changes: 65 additions & 0 deletions docs/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,71 @@ CONTAINER=... bash tests/run_unit_in_docker.sh

The required `CONTAINER` can be built by following the instructions in the [docker documentation](docker.md).

### Tracking metrics in unit tests

Unit tests may also log metrics to a fixture. The fixture is called `tracker` and has the following API:
```python
# Track an arbitrary metric (must be json serializable)
tracker.track(metric_name, metric_value)
# Log the maximum memory across the entire cluster. Okay for tests since they are run serially.
tracker.log_max_mem(metric_name)
# Returns the maximum memory. Useful if you are measuring changes in memory.
tracker.get_max_mem()
```

Including the `tracker` fixture also tracks the elapsed time for the test implicitly.

Here is an example test:
```python
def test_exponentiate(tracker):
starting_mem = tracker.get_max_mem()
base = 2
exponent = 4
result = base ** exponent
tracker.track("result", result)
tracker.log_max_mem("memory_after_exponentiating")
change_in_mem = tracker.get_max_mem() - starting_mem
tracker.track("change_in_mem", change_in_mem)
assert result == 16
```

Which would produce this file in `tests/unit/unit_results.json`:
```json
{
"exit_status": 0,
"git_commit": "f1062bd3fd95fc64443e2d9ee4a35fc654ba897e",
"start_time": "2025-03-24 23:34:12",
"metrics": {
"test_hf_ray_policy::test_hf_policy_generation": {
"avg_prob_mult_error": 1.0000039339065552,
"mean_lps": -1.5399343967437744,
"_elapsed": 17.323044061660767
}
},
"gpu_types": [
"NVIDIA H100 80GB HBM3"
],
"coverage": 24.55897613282601
}
```

:::{tip}
Past unit test results are logged in `tests/unit/unit_results/`. These are helpful to view trends over time and commits.

Here's an example `jq` command to view trends:

```sh
jq -r '[.start_time, .git_commit, .metrics["test_hf_ray_policy::test_hf_policy_generation"].avg_prob_mult_error] | @tsv' tests/unit/unit_results/*

# Example output:
#2025-03-24 23:35:39 778d288bb5d2edfd3eec4d07bb7dffffad5ef21b 1.0000039339065552
#2025-03-24 23:36:37 778d288bb5d2edfd3eec4d07bb7dffffad5ef21b 1.0000039339065552
#2025-03-24 23:37:37 778d288bb5d2edfd3eec4d07bb7dffffad5ef21b 1.0000039339065552
#2025-03-24 23:38:14 778d288bb5d2edfd3eec4d07bb7dffffad5ef21b 1.0000039339065552
#2025-03-24 23:38:50 778d288bb5d2edfd3eec4d07bb7dffffad5ef21b 1.0000039339065552
```
:::

## Functional tests

:::{important}
Expand Down
96 changes: 82 additions & 14 deletions nemo_reinforcer/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ def log_hyperparams(self, params: Dict[str, Any]) -> None:
self.run.config.update(params)


class GpuMetricSnapshot(TypedDict):
step: int
metrics: Dict[str, Any]


class RayGpuMonitorLogger:
"""Monitor GPU utilization across a Ray cluster and log metrics to a parent logger."""

Expand All @@ -163,7 +168,9 @@ def __init__(
self.collection_interval = collection_interval
self.flush_interval = flush_interval
self.parent_logger = parent_logger
self.metrics_buffer = [] # Store metrics with timestamps
self.metrics_buffer: list[
GpuMetricSnapshot
] = [] # Store metrics with timestamps
self.last_flush_time = time.time()
self.is_running = False
self.collection_thread = None
Expand Down Expand Up @@ -228,7 +235,9 @@ def _collection_loop(self):

time.sleep(self.collection_interval)
except Exception as e:
print(f"Error in GPU monitoring collection loop: {e}")
print(
f"Error in GPU monitoring collection loop or stopped abruptly: {e}"
)
time.sleep(self.collection_interval) # Continue despite errors

def _parse_gpu_metric(self, sample: Sample, node_idx: int) -> Dict[str, Any]:
Expand All @@ -241,7 +250,6 @@ def _parse_gpu_metric(self, sample: Sample, node_idx: int) -> Dict[str, Any]:
Returns:
Dictionary with metric name and value
"""
# TODO: Consider plumbing {'GpuDeviceName': 'NVIDIA H100 80GB HBM3'}
# Expected labels for GPU metrics
expected_labels = ["GpuIndex"]
for label in expected_labels:
Expand All @@ -266,12 +274,72 @@ def _parse_gpu_metric(self, sample: Sample, node_idx: int) -> Dict[str, Any]:
metric_name = f"node.{node_idx}.gpu.{index}.{metric_name}"
return {metric_name: value}

def _parse_gpu_sku(self, sample: Sample, node_idx: int) -> Dict[str, str]:
"""Parse a GPU metric sample into a standardized format.

Args:
sample: Prometheus metric sample
node_idx: Index of the node

Returns:
Dictionary with metric name and value
"""
# TODO: Consider plumbing {'GpuDeviceName': 'NVIDIA H100 80GB HBM3'}
# Expected labels for GPU metrics
expected_labels = ["GpuIndex", "GpuDeviceName"]
for label in expected_labels:
if label not in sample.labels:
# This is probably a CPU node
return {}

metric_name = sample.name
# Only return SKU if the metric is one of these which publish these metrics
if (
metric_name != "ray_node_gpus_utilization"
and metric_name != "ray_node_gram_used"
):
# Skip unexpected metrics
return {}

labels = sample.labels
index = labels["GpuIndex"]
value = labels["GpuDeviceName"]

metric_name = f"node.{node_idx}.gpu.{index}.type"
return {metric_name: value}

def _collect_gpu_sku(self) -> Dict[str, str]:
"""Collect GPU SKU from all Ray nodes.

Note: This is an internal API and users are not expected to call this.

Returns:
Dictionary of SKU types on all Ray nodes
"""
# TODO: We can re-use the same path for metrics because even though both utilization and memory metrics duplicate
# the GPU metadata information; since the metadata is the same for each node, we can overwrite it and expect them to
# be the same
return self._collect(sku=True)

def _collect_metrics(self) -> Dict[str, Any]:
"""Collect GPU metrics from all Ray nodes.

Returns:
Dictionary of collected metrics
"""
return self._collect(metrics=True)

def _collect(self, metrics: bool = False, sku: bool = False) -> Dict[str, Any]:
"""Collect GPU metrics from all Ray nodes.

Returns:
Dictionary of collected metrics
"""
assert metrics ^ sku, (
f"Must collect either metrics or sku, not both: {metrics=}, {sku=}"
)
parser_fn = self._parse_gpu_metric if metrics else self._parse_gpu_sku

if not ray.is_initialized():
print("Ray is not initialized. Cannot collect GPU metrics.")
return {}
Expand All @@ -295,7 +363,9 @@ def _collect_metrics(self) -> Dict[str, Any]:
# Process each node's metrics
collected_metrics = {}
for node_idx, metric_address in enumerate(unique_metric_addresses):
gpu_metrics = self._fetch_and_parse_metrics(node_idx, metric_address)
gpu_metrics = self._fetch_and_parse_metrics(
node_idx, metric_address, parser_fn
)
collected_metrics.update(gpu_metrics)

return collected_metrics
Expand All @@ -304,7 +374,7 @@ def _collect_metrics(self) -> Dict[str, Any]:
print(f"Error collecting GPU metrics: {e}")
return {}

def _fetch_and_parse_metrics(self, node_idx, metric_address):
def _fetch_and_parse_metrics(self, node_idx, metric_address, parser_fn):
"""Fetch metrics from a node and parse GPU metrics.

Args:
Expand Down Expand Up @@ -335,7 +405,7 @@ def _fetch_and_parse_metrics(self, node_idx, metric_address):
continue

for sample in family.samples:
metrics = self._parse_gpu_metric(sample, node_idx)
metrics = parser_fn(sample, node_idx)
gpu_metrics.update(metrics)

return gpu_metrics
Expand All @@ -346,18 +416,16 @@ def _fetch_and_parse_metrics(self, node_idx, metric_address):

def flush(self):
"""Flush collected metrics to the parent logger."""
if not self.parent_logger:
return

with self.lock:
if not self.metrics_buffer:
return

# Log each set of metrics with its original step
for entry in self.metrics_buffer:
step = entry["step"]
metrics = entry["metrics"]
self.parent_logger.log_metrics(metrics, step, prefix="ray")
if self.parent_logger:
# Log each set of metrics with its original step
for entry in self.metrics_buffer:
step = entry["step"]
metrics = entry["metrics"]
self.parent_logger.log_metrics(metrics, step, prefix="ray")

# Clear buffer after logging
self.metrics_buffer = []
Expand Down
2 changes: 1 addition & 1 deletion tests/run_unit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export RAY_DEDUP_LOGS=0

# Run unit tests
echo "Running unit tests..."
if ! pytest unit/ -s -rA "$@"; then
if ! pytest unit/ --cov=nemo_reinforcer --cov-report=term --cov-report=json -s -rA "$@"; then
echo "[ERROR]: Unit tests failed."
exit 1
fi
Expand Down
Loading