From 74b28032c6c7cc35ac4b381dde3d8b0a0a001300 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Wed, 26 Mar 2025 12:13:22 -0700 Subject: [PATCH 1/4] Fix wandb gpu monitoring bug https://github.com/NVIDIA/reinforcer/issues/83 Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/utils/logger.py | 83 +++++++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 10 deletions(-) diff --git a/nemo_reinforcer/utils/logger.py b/nemo_reinforcer/utils/logger.py index dca4181681..232ff4f146 100644 --- a/nemo_reinforcer/utils/logger.py +++ b/nemo_reinforcer/utils/logger.py @@ -68,7 +68,11 @@ class LoggerInterface(ABC): @abstractmethod def log_metrics( - self, metrics: Dict[str, Any], step: int, prefix: Optional[str] = "" + self, + metrics: Dict[str, Any], + step: int, + prefix: Optional[str] = "", + step_metric: Optional[str] = None ) -> None: """Log a dictionary of metrics.""" pass @@ -87,7 +91,11 @@ def __init__(self, cfg: TensorboardConfig, log_dir: Optional[str] = None): print(f"Initialized TensorboardLogger at {log_dir}") def log_metrics( - self, metrics: Dict[str, Any], step: int, prefix: Optional[str] = "" + self, + metrics: Dict[str, Any], + step: int, + prefix: Optional[str] = "", + step_metric: Optional[str] = None # ignored in TensorBoard ) -> None: """Log metrics to Tensorboard. @@ -95,6 +103,7 @@ def log_metrics( metrics: Dict of metrics to log step: Global step value prefix: Optional prefix for metric names + step_metric: Optional step metric name (ignored in TensorBoard) """ for name, value in metrics.items(): if prefix: @@ -120,8 +129,25 @@ def __init__(self, cfg: WandbConfig, log_dir: Optional[str] = None): f"Initialized WandbLogger for project {cfg.get('project')}, run {cfg.get('name')} at {log_dir}" ) + def define_metric( + self, + name: str, + step_metric: Optional[str] = None, + ) -> None: + """Define a metric with custom step metric. + + Args: + name: Name of the metric or pattern (e.g. 'ray/*') + step_metric: Optional name of the step metric to use + """ + self.run.define_metric(name, step_metric=step_metric) + def log_metrics( - self, metrics: Dict[str, Any], step: int, prefix: Optional[str] = "" + self, + metrics: Dict[str, Any], + step: int, + prefix: Optional[str] = "", + step_metric: Optional[str] = None ) -> None: """Log metrics to wandb. @@ -129,11 +155,18 @@ def log_metrics( metrics: Dict of metrics to log step: Global step value prefix: Optional prefix for metric names + step_metric: Optional name of a field in metrics to use as step instead + of the provided step value """ if prefix: - metrics = {f"{prefix}/{k}": v for k, v in metrics.items()} + metrics = {f"{prefix}/{k}" if k != step_metric else k: v for k, v in metrics.items()} - self.run.log(metrics, step=step) + # If step_metric is provided, use the corresponding value from metrics as step + if step_metric and step_metric in metrics: + # commit=False so the step does not get incremented + self.run.log(metrics, commit=False) + else: + self.run.log(metrics, step=step) def log_hyperparams(self, params: Dict[str, Any]) -> None: """Log hyperparameters to wandb. @@ -151,6 +184,8 @@ def __init__( self, collection_interval: int | float, flush_interval: int | float, + metric_prefix: str, + step_metric: str, parent_logger: Optional["Logger"] = None, ): """Initialize the GPU monitor. @@ -158,10 +193,13 @@ def __init__( Args: collection_interval: Interval in seconds to collect GPU metrics flush_interval: Interval in seconds to flush metrics to parent logger + step_metric: Name of the field to use as the step metric parent_logger: Logger to receive the collected metrics """ self.collection_interval = collection_interval self.flush_interval = flush_interval + self.metric_prefix = metric_prefix + self.step_metric = step_metric self.parent_logger = parent_logger self.metrics_buffer = [] # Store metrics with timestamps self.last_flush_time = time.time() @@ -357,7 +395,17 @@ def flush(self): for entry in self.metrics_buffer: step = entry["step"] metrics = entry["metrics"] - self.parent_logger.log_metrics(metrics, step, prefix="ray") + + # Add the step metric directly to metrics for use as step_metric + metrics[self.step_metric] = step + + # Pass step_metric as the step_metric to use it as the step value in WandB + self.parent_logger.log_metrics( + metrics, + step=step, + prefix=self.metric_prefix, + step_metric=self.step_metric + ) # Clear buffer after logging self.metrics_buffer = [] @@ -380,6 +428,7 @@ def __init__(self, cfg: LoggerConfig): - gpu_flush_interval """ self.loggers = [] + self.wandb_logger = None self.base_log_dir = cfg["log_dir"] os.makedirs(self.base_log_dir, exist_ok=True) @@ -387,8 +436,8 @@ def __init__(self, cfg: LoggerConfig): if cfg["wandb_enabled"]: wandb_log_dir = os.path.join(self.base_log_dir, "wandb") os.makedirs(wandb_log_dir, exist_ok=True) - wandb_logger = WandbLogger(cfg["wandb"], log_dir=wandb_log_dir) - self.loggers.append(wandb_logger) + self.wandb_logger = WandbLogger(cfg["wandb"], log_dir=wandb_log_dir) + self.loggers.append(self.wandb_logger) if cfg["tensorboard_enabled"]: tensorboard_log_dir = os.path.join(self.base_log_dir, "tensorboard") @@ -401,9 +450,17 @@ def __init__(self, cfg: LoggerConfig): # Initialize GPU monitoring if requested self.gpu_monitor = None if cfg["monitor_gpus"]: + # Define ray metrics to use ray_step as step metric - only if monitoring is enabled + metric_prefix = "ray" + step_metric = f"{metric_prefix}/ray_step" + if cfg["wandb_enabled"] and self.wandb_logger: + self.wandb_logger.define_metric(f"{metric_prefix}/*", step_metric=step_metric) + self.gpu_monitor = RayGpuMonitorLogger( collection_interval=cfg["gpu_monitoring"]["collection_interval"], flush_interval=cfg["gpu_monitoring"]["flush_interval"], + metric_prefix=metric_prefix, + step_metric=step_metric, parent_logger=self, ) self.gpu_monitor.start() @@ -412,7 +469,11 @@ def __init__(self, cfg: LoggerConfig): print("No loggers initialized") def log_metrics( - self, metrics: Dict[str, Any], step: int, prefix: Optional[str] = "" + self, + metrics: Dict[str, Any], + step: int, + prefix: Optional[str] = "", + step_metric: Optional[str] = None ) -> None: """Log metrics to all enabled backends. @@ -420,9 +481,11 @@ def log_metrics( metrics: Dict of metrics to log step: Global step value prefix: Optional prefix for metric names + step_metric: Optional name of a field in metrics to use as step instead + of the provided step value (currently only needed for wandb) """ for logger in self.loggers: - logger.log_metrics(metrics, step, prefix) + logger.log_metrics(metrics, step, prefix, step_metric) def log_hyperparams(self, params: Dict[str, Any]) -> None: """Log hyperparameters to all enabled backends. From f7ad8a1cd4ae7ea8052addf1f255522f3241ff95 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Wed, 26 Mar 2025 13:33:49 -0700 Subject: [PATCH 2/4] Unit tests Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/utils/logger.py | 3 +- tests/unit/utils/test_logger.py | 304 ++++++++++++++++++++++++++++++-- 2 files changed, 291 insertions(+), 16 deletions(-) diff --git a/nemo_reinforcer/utils/logger.py b/nemo_reinforcer/utils/logger.py index 232ff4f146..3674a27606 100644 --- a/nemo_reinforcer/utils/logger.py +++ b/nemo_reinforcer/utils/logger.py @@ -399,7 +399,7 @@ def flush(self): # Add the step metric directly to metrics for use as step_metric metrics[self.step_metric] = step - # Pass step_metric as the step_metric to use it as the step value in WandB + # Pass step_metric as the step_metric to use it as the step value in wandb self.parent_logger.log_metrics( metrics, step=step, @@ -450,7 +450,6 @@ def __init__(self, cfg: LoggerConfig): # Initialize GPU monitoring if requested self.gpu_monitor = None if cfg["monitor_gpus"]: - # Define ray metrics to use ray_step as step metric - only if monitoring is enabled metric_prefix = "ray" step_metric = f"{metric_prefix}/ray_step" if cfg["wandb_enabled"] and self.wandb_logger: diff --git a/tests/unit/utils/test_logger.py b/tests/unit/utils/test_logger.py index a8d982266b..8b6130f7a7 100644 --- a/tests/unit/utils/test_logger.py +++ b/tests/unit/utils/test_logger.py @@ -186,6 +186,67 @@ def test_log_metrics_with_prefix(self, mock_wandb): expected_metrics = {"train/loss": 0.5, "train/accuracy": 0.8} mock_run.log.assert_called_once_with(expected_metrics, step=step) + @patch("nemo_reinforcer.utils.logger.wandb") + def test_log_metrics_with_step_metric(self, mock_wandb): + """Test logging metrics with a step metric to WandbLogger.""" + cfg = {} + logger = WandbLogger(cfg) + + # Define step metric + step_metric = "iteration" + + # Include the step metric in the metrics + metrics = {"loss": 0.5, "accuracy": 0.8, "iteration": 15} + step = 10 # This should be ignored when step_metric is provided + + logger.log_metrics(metrics, step, step_metric=step_metric) + + # Check that log was called with metrics and commit=False + # When using step_metric, step should be ignored and commit=False should be used + mock_run = mock_wandb.init.return_value + mock_run.log.assert_called_once_with(metrics, commit=False) + + @patch("nemo_reinforcer.utils.logger.wandb") + def test_log_metrics_with_prefix_and_step_metric(self, mock_wandb): + """Test logging metrics with both prefix and step metric.""" + cfg = {} + logger = WandbLogger(cfg) + + # Define prefix and step metric + prefix = "train" + step_metric = "train/iteration" + + # Include the step metric in the metrics + metrics = {"loss": 0.5, "accuracy": 0.8, "iteration": 15} + step = 10 # This should be ignored when step_metric is provided + + logger.log_metrics(metrics, step, prefix=prefix, step_metric=step_metric) + + # Check that log was called with prefixed metrics and commit=False + # The step_metric key gets prefixed based on the current implementation + mock_run = mock_wandb.init.return_value + expected_metrics = { + "train/loss": 0.5, + "train/accuracy": 0.8, + "train/iteration": 15, + } + mock_run.log.assert_called_once_with(expected_metrics, commit=False) + + @patch("nemo_reinforcer.utils.logger.wandb") + def test_define_metric(self, mock_wandb): + """Test defining a metric with a custom step metric.""" + cfg = {} + logger = WandbLogger(cfg) + + # Define metric pattern and step metric + logger.define_metric("ray/*", step_metric="ray/ray_step") + + # Check that define_metric was called + mock_run = mock_wandb.init.return_value + mock_run.define_metric.assert_called_once_with( + "ray/*", step_metric="ray/ray_step" + ) + @patch("nemo_reinforcer.utils.logger.wandb") def test_log_hyperparams(self, mock_wandb): """Test logging hyperparameters to WandbLogger.""" @@ -219,11 +280,13 @@ def __init__(self): self.logged_metrics = [] self.logged_steps = [] self.logged_prefixes = [] + self.logged_step_metrics = [] - def log_metrics(self, metrics, step, prefix=""): + def log_metrics(self, metrics, step, prefix="", step_metric=None): self.logged_metrics.append(metrics) self.logged_steps.append(step) self.logged_prefixes.append(prefix) + self.logged_step_metrics.append(step_metric) return MockLogger() @@ -235,12 +298,18 @@ def test_init(self, mock_ray): # Initialize the monitor with standard settings monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Verify initialization parameters assert monitor.collection_interval == 10.0 assert monitor.flush_interval == 60.0 + assert monitor.metric_prefix == "test" + assert monitor.step_metric == "test/step" assert monitor.parent_logger is None assert monitor.metrics_buffer == [] assert monitor.is_running is False @@ -255,7 +324,11 @@ def test_start(self, mock_thread, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Start the monitor @@ -277,7 +350,11 @@ def test_start_ray_not_initialized(self, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Starting should raise a ValueError @@ -293,7 +370,11 @@ def test_stop(self, mock_thread, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Start the monitor @@ -318,7 +399,11 @@ def test_parse_gpu_metric(self, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Create a sample with GPU utilization metric @@ -405,7 +490,11 @@ def test_fetch_and_parse_metrics(self, mock_get, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Mock the _parse_gpu_metric method to return expected values @@ -445,7 +534,11 @@ def test_collect_metrics(self, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Mock the _fetch_and_parse_metrics method @@ -481,6 +574,8 @@ def test_flush_empty_buffer(self, mock_ray, mock_parent_logger): monitor = RayGpuMonitorLogger( collection_interval=10.0, flush_interval=60.0, + metric_prefix="ray", + step_metric="ray/ray_step", parent_logger=mock_parent_logger, ) @@ -500,6 +595,8 @@ def test_flush(self, mock_ray, mock_parent_logger): monitor = RayGpuMonitorLogger( collection_interval=10.0, flush_interval=60.0, + metric_prefix="ray", + step_metric="ray/ray_step", parent_logger=mock_parent_logger, ) @@ -520,23 +617,68 @@ def test_flush(self, mock_ray, mock_parent_logger): # Verify parent logger's log_metrics was called for each entry assert len(mock_parent_logger.logged_metrics) == 2 - assert mock_parent_logger.logged_metrics[0] == { + + # First metrics entry should include the step metric + expected_first_metrics = { "node.0.gpu.0.gpu": 75.5, "node.0.gpu.0.memory": 4096.0, + "ray/ray_step": 10, # Step metric added } + assert mock_parent_logger.logged_metrics[0] == expected_first_metrics assert mock_parent_logger.logged_steps[0] == 10 assert mock_parent_logger.logged_prefixes[0] == "ray" + assert mock_parent_logger.logged_step_metrics[0] == "ray/ray_step" - assert mock_parent_logger.logged_metrics[1] == { + # Second metrics entry should include the step metric + expected_second_metrics = { "node.0.gpu.0.gpu": 80.0, "node.0.gpu.0.memory": 5120.0, + "ray/ray_step": 20, # Step metric added } + assert mock_parent_logger.logged_metrics[1] == expected_second_metrics assert mock_parent_logger.logged_steps[1] == 20 assert mock_parent_logger.logged_prefixes[1] == "ray" + assert mock_parent_logger.logged_step_metrics[1] == "ray/ray_step" # Verify buffer was cleared assert monitor.metrics_buffer == [] + @patch("nemo_reinforcer.utils.logger.ray") + def test_flush_with_custom_prefix(self, mock_ray, mock_parent_logger): + """Test flush method with custom metric prefix.""" + # Mock ray.is_initialized to return True + mock_ray.is_initialized.return_value = True + + # Initialize the monitor with parent logger and custom prefix + custom_prefix = "custom_metrics" + custom_step_metric = "custom_metrics/step" + monitor = RayGpuMonitorLogger( + collection_interval=10.0, + flush_interval=60.0, + metric_prefix=custom_prefix, + step_metric=custom_step_metric, + parent_logger=mock_parent_logger, + ) + + # Add test metrics to buffer + monitor.metrics_buffer = [ + { + "step": 15, + "metrics": {"node.0.gpu.0.gpu": 60.0}, + } + ] + + # Call flush + monitor.flush() + + # Verify parent logger's log_metrics was called with the custom prefix + assert len(mock_parent_logger.logged_metrics) == 1 + expected_metrics = {"node.0.gpu.0.gpu": 60.0, "custom_metrics/step": 15} + assert mock_parent_logger.logged_metrics[0] == expected_metrics + assert mock_parent_logger.logged_steps[0] == 15 + assert mock_parent_logger.logged_prefixes[0] == custom_prefix + assert mock_parent_logger.logged_step_metrics[0] == custom_step_metric + @patch("nemo_reinforcer.utils.logger.ray") @patch("nemo_reinforcer.utils.logger.time") def test_collection_loop(self, mock_time, mock_ray): @@ -554,7 +696,11 @@ def test_collection_loop(self, mock_time, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Set start time and running flag @@ -591,6 +737,89 @@ def side_effect(): # Verify flush was called (flush_interval elapsed) mock_flush.assert_called_once() + @patch("nemo_reinforcer.utils.logger.WandbLogger") + @patch("nemo_reinforcer.utils.logger.TensorboardLogger") + @patch("nemo_reinforcer.utils.logger.RayGpuMonitorLogger") + def test_init_with_gpu_monitoring( + self, mock_gpu_monitor, mock_tb_logger, mock_wandb_logger, temp_dir + ): + """Test initialization with GPU monitoring enabled.""" + cfg = { + "wandb_enabled": True, + "tensorboard_enabled": True, + "monitor_gpus": True, + "gpu_monitoring": { + "collection_interval": 15.0, + "flush_interval": 45.0, + }, + "wandb": {"project": "test-project"}, + "tensorboard": {"log_dir": "test_logs"}, + "log_dir": temp_dir, + } + logger = Logger(cfg) + + # Check that regular loggers were initialized + assert len(logger.loggers) == 2 + mock_wandb_logger.assert_called_once() + mock_tb_logger.assert_called_once() + + # Check that GPU monitor was initialized with correct parameters + mock_gpu_monitor.assert_called_once_with( + collection_interval=15.0, + flush_interval=45.0, + metric_prefix="ray", + step_metric="ray/ray_step", + parent_logger=logger, + ) + + # Check that GPU monitor was started + mock_gpu_instance = mock_gpu_monitor.return_value + mock_gpu_instance.start.assert_called_once() + + # Check that wandb metrics are defined with the step metric + mock_wandb_instance = mock_wandb_logger.return_value + mock_wandb_instance.define_metric.assert_called_once_with( + "ray/*", step_metric="ray/ray_step" + ) + + @patch("nemo_reinforcer.utils.logger.WandbLogger") + @patch("nemo_reinforcer.utils.logger.TensorboardLogger") + @patch("nemo_reinforcer.utils.logger.RayGpuMonitorLogger") + def test_gpu_monitoring_without_wandb( + self, mock_gpu_monitor, mock_tb_logger, mock_wandb_logger, temp_dir + ): + """Test GPU monitoring initialization when wandb is disabled.""" + cfg = { + "wandb_enabled": False, + "tensorboard_enabled": True, + "monitor_gpus": True, + "gpu_monitoring": { + "collection_interval": 15.0, + "flush_interval": 45.0, + }, + "tensorboard": {"log_dir": "test_logs"}, + "log_dir": temp_dir, + } + logger = Logger(cfg) + + # Check that only tensorboard logger was initialized + assert len(logger.loggers) == 1 + mock_wandb_logger.assert_not_called() + mock_tb_logger.assert_called_once() + + # Check that GPU monitor was initialized with correct parameters + mock_gpu_monitor.assert_called_once_with( + collection_interval=15.0, + flush_interval=45.0, + metric_prefix="ray", + step_metric="ray/ray_step", + parent_logger=logger, + ) + + # Since wandb is disabled, define_metric should not be called + mock_wandb_instance = mock_wandb_logger.return_value + assert not mock_wandb_instance.define_metric.called + class TestLogger: """Test the main Logger class.""" @@ -702,8 +931,8 @@ def test_log_metrics(self, mock_tb_logger, mock_wandb_logger, temp_dir): logger.log_metrics(metrics, step) # Check that log_metrics was called on both loggers - mock_wandb_instance.log_metrics.assert_called_once_with(metrics, step, "") - mock_tb_instance.log_metrics.assert_called_once_with(metrics, step, "") + mock_wandb_instance.log_metrics.assert_called_once_with(metrics, step, "", None) + mock_tb_instance.log_metrics.assert_called_once_with(metrics, step, "", None) @patch("nemo_reinforcer.utils.logger.WandbLogger") @patch("nemo_reinforcer.utils.logger.TensorboardLogger") @@ -758,9 +987,56 @@ def test_init_with_gpu_monitoring( # Check that GPU monitor was initialized with correct parameters mock_gpu_monitor.assert_called_once_with( - collection_interval=15.0, flush_interval=45.0, parent_logger=logger + collection_interval=15.0, + flush_interval=45.0, + metric_prefix="ray", + step_metric="ray/ray_step", + parent_logger=logger, ) # Check that GPU monitor was started mock_gpu_instance = mock_gpu_monitor.return_value mock_gpu_instance.start.assert_called_once() + + # Check that wandb metrics are defined with the step metric + mock_wandb_instance = mock_wandb_logger.return_value + mock_wandb_instance.define_metric.assert_called_once_with( + "ray/*", step_metric="ray/ray_step" + ) + + @patch("nemo_reinforcer.utils.logger.WandbLogger") + @patch("nemo_reinforcer.utils.logger.TensorboardLogger") + def test_log_metrics_with_prefix_and_step_metric( + self, mock_tb_logger, mock_wandb_logger, temp_dir + ): + """Test logging metrics with prefix and step_metric.""" + cfg = { + "wandb_enabled": True, + "tensorboard_enabled": True, + "monitor_gpus": False, + "wandb": {"project": "test-project"}, + "tensorboard": {"log_dir": "test_logs"}, + "log_dir": temp_dir, + } + logger = Logger(cfg) + + # Create mock logger instances + mock_wandb_instance = mock_wandb_logger.return_value + mock_tb_instance = mock_tb_logger.return_value + + # Create metrics with a step metric field + metrics = {"loss": 0.5, "accuracy": 0.8, "iteration": 15} + step = 10 + prefix = "train" + step_metric = "iteration" + + # Log metrics with prefix and step_metric + logger.log_metrics(metrics, step, prefix=prefix, step_metric=step_metric) + + # Check that log_metrics was called on both loggers with correct parameters + mock_wandb_instance.log_metrics.assert_called_once_with( + metrics, step, prefix, step_metric + ) + mock_tb_instance.log_metrics.assert_called_once_with( + metrics, step, prefix, step_metric + ) From 2eff876c04a0b1b71967ea7ae4c14f681878d07c Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Wed, 26 Mar 2025 14:39:20 -0700 Subject: [PATCH 3/4] ruff Signed-off-by: Yi-Fu Wu --- nemo_reinforcer/utils/logger.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/nemo_reinforcer/utils/logger.py b/nemo_reinforcer/utils/logger.py index 4a22d1a6e1..bc0157d564 100644 --- a/nemo_reinforcer/utils/logger.py +++ b/nemo_reinforcer/utils/logger.py @@ -72,7 +72,7 @@ def log_metrics( metrics: Dict[str, Any], step: int, prefix: Optional[str] = "", - step_metric: Optional[str] = None + step_metric: Optional[str] = None, ) -> None: """Log a dictionary of metrics.""" pass @@ -95,7 +95,7 @@ def log_metrics( metrics: Dict[str, Any], step: int, prefix: Optional[str] = "", - step_metric: Optional[str] = None # ignored in TensorBoard + step_metric: Optional[str] = None, # ignored in TensorBoard ) -> None: """Log metrics to Tensorboard. @@ -147,7 +147,7 @@ def log_metrics( metrics: Dict[str, Any], step: int, prefix: Optional[str] = "", - step_metric: Optional[str] = None + step_metric: Optional[str] = None, ) -> None: """Log metrics to wandb. @@ -159,7 +159,10 @@ def log_metrics( of the provided step value """ if prefix: - metrics = {f"{prefix}/{k}" if k != step_metric else k: v for k, v in metrics.items()} + metrics = { + f"{prefix}/{k}" if k != step_metric else k: v + for k, v in metrics.items() + } # If step_metric is provided, use the corresponding value from metrics as step if step_metric and step_metric in metrics: @@ -472,7 +475,7 @@ def flush(self): metrics, step=step, prefix=self.metric_prefix, - step_metric=self.step_metric + step_metric=self.step_metric, ) # Clear buffer after logging @@ -521,7 +524,9 @@ def __init__(self, cfg: LoggerConfig): metric_prefix = "ray" step_metric = f"{metric_prefix}/ray_step" if cfg["wandb_enabled"] and self.wandb_logger: - self.wandb_logger.define_metric(f"{metric_prefix}/*", step_metric=step_metric) + self.wandb_logger.define_metric( + f"{metric_prefix}/*", step_metric=step_metric + ) self.gpu_monitor = RayGpuMonitorLogger( collection_interval=cfg["gpu_monitoring"]["collection_interval"], @@ -540,7 +545,7 @@ def log_metrics( metrics: Dict[str, Any], step: int, prefix: Optional[str] = "", - step_metric: Optional[str] = None + step_metric: Optional[str] = None, ) -> None: """Log metrics to all enabled backends. From 52e8c11e9f88d861a0723ccdd235ca1ebc4aee15 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Mon, 31 Mar 2025 07:32:50 -0700 Subject: [PATCH 4/4] Fix unit tests Signed-off-by: Yi-Fu Wu --- tests/unit/conftest.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 1dbbd6a169..6056effa57 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -113,6 +113,8 @@ def session_data(request, init_ray_cluster): logger = RayGpuMonitorLogger( collection_interval=float("inf"), flush_interval=float("inf"), + metric_prefix="test", + step_metric="test/step", parent_logger=None, ) unit_test_data["gpu_types"] = list(set(logger._collect_gpu_sku().values())) @@ -209,6 +211,8 @@ def ray_gpu_monitor(init_ray_cluster): gpu_monitor = RayGpuMonitorLogger( collection_interval=1, flush_interval=float("inf"), # Disabling flushing since we will do it manually + metric_prefix="test", + step_metric="test/step", parent_logger=None, ) gpu_monitor.start()