diff --git a/nemo_reinforcer/utils/logger.py b/nemo_reinforcer/utils/logger.py index 42143e7593..bc0157d564 100644 --- a/nemo_reinforcer/utils/logger.py +++ b/nemo_reinforcer/utils/logger.py @@ -68,7 +68,11 @@ class LoggerInterface(ABC): @abstractmethod def log_metrics( - self, metrics: Dict[str, Any], step: int, prefix: Optional[str] = "" + self, + metrics: Dict[str, Any], + step: int, + prefix: Optional[str] = "", + step_metric: Optional[str] = None, ) -> None: """Log a dictionary of metrics.""" pass @@ -87,7 +91,11 @@ def __init__(self, cfg: TensorboardConfig, log_dir: Optional[str] = None): print(f"Initialized TensorboardLogger at {log_dir}") def log_metrics( - self, metrics: Dict[str, Any], step: int, prefix: Optional[str] = "" + self, + metrics: Dict[str, Any], + step: int, + prefix: Optional[str] = "", + step_metric: Optional[str] = None, # ignored in TensorBoard ) -> None: """Log metrics to Tensorboard. @@ -95,6 +103,7 @@ def log_metrics( metrics: Dict of metrics to log step: Global step value prefix: Optional prefix for metric names + step_metric: Optional step metric name (ignored in TensorBoard) """ for name, value in metrics.items(): if prefix: @@ -120,8 +129,25 @@ def __init__(self, cfg: WandbConfig, log_dir: Optional[str] = None): f"Initialized WandbLogger for project {cfg.get('project')}, run {cfg.get('name')} at {log_dir}" ) + def define_metric( + self, + name: str, + step_metric: Optional[str] = None, + ) -> None: + """Define a metric with custom step metric. + + Args: + name: Name of the metric or pattern (e.g. 'ray/*') + step_metric: Optional name of the step metric to use + """ + self.run.define_metric(name, step_metric=step_metric) + def log_metrics( - self, metrics: Dict[str, Any], step: int, prefix: Optional[str] = "" + self, + metrics: Dict[str, Any], + step: int, + prefix: Optional[str] = "", + step_metric: Optional[str] = None, ) -> None: """Log metrics to wandb. @@ -129,11 +155,21 @@ def log_metrics( metrics: Dict of metrics to log step: Global step value prefix: Optional prefix for metric names + step_metric: Optional name of a field in metrics to use as step instead + of the provided step value """ if prefix: - metrics = {f"{prefix}/{k}": v for k, v in metrics.items()} - - self.run.log(metrics, step=step) + metrics = { + f"{prefix}/{k}" if k != step_metric else k: v + for k, v in metrics.items() + } + + # If step_metric is provided, use the corresponding value from metrics as step + if step_metric and step_metric in metrics: + # commit=False so the step does not get incremented + self.run.log(metrics, commit=False) + else: + self.run.log(metrics, step=step) def log_hyperparams(self, params: Dict[str, Any]) -> None: """Log hyperparameters to wandb. @@ -156,6 +192,8 @@ def __init__( self, collection_interval: int | float, flush_interval: int | float, + metric_prefix: str, + step_metric: str, parent_logger: Optional["Logger"] = None, ): """Initialize the GPU monitor. @@ -163,10 +201,13 @@ def __init__( Args: collection_interval: Interval in seconds to collect GPU metrics flush_interval: Interval in seconds to flush metrics to parent logger + step_metric: Name of the field to use as the step metric parent_logger: Logger to receive the collected metrics """ self.collection_interval = collection_interval self.flush_interval = flush_interval + self.metric_prefix = metric_prefix + self.step_metric = step_metric self.parent_logger = parent_logger self.metrics_buffer: list[ GpuMetricSnapshot @@ -425,7 +466,17 @@ def flush(self): for entry in self.metrics_buffer: step = entry["step"] metrics = entry["metrics"] - self.parent_logger.log_metrics(metrics, step, prefix="ray") + + # Add the step metric directly to metrics for use as step_metric + metrics[self.step_metric] = step + + # Pass step_metric as the step_metric to use it as the step value in wandb + self.parent_logger.log_metrics( + metrics, + step=step, + prefix=self.metric_prefix, + step_metric=self.step_metric, + ) # Clear buffer after logging self.metrics_buffer = [] @@ -448,6 +499,7 @@ def __init__(self, cfg: LoggerConfig): - gpu_flush_interval """ self.loggers = [] + self.wandb_logger = None self.base_log_dir = cfg["log_dir"] os.makedirs(self.base_log_dir, exist_ok=True) @@ -455,8 +507,8 @@ def __init__(self, cfg: LoggerConfig): if cfg["wandb_enabled"]: wandb_log_dir = os.path.join(self.base_log_dir, "wandb") os.makedirs(wandb_log_dir, exist_ok=True) - wandb_logger = WandbLogger(cfg["wandb"], log_dir=wandb_log_dir) - self.loggers.append(wandb_logger) + self.wandb_logger = WandbLogger(cfg["wandb"], log_dir=wandb_log_dir) + self.loggers.append(self.wandb_logger) if cfg["tensorboard_enabled"]: tensorboard_log_dir = os.path.join(self.base_log_dir, "tensorboard") @@ -469,9 +521,18 @@ def __init__(self, cfg: LoggerConfig): # Initialize GPU monitoring if requested self.gpu_monitor = None if cfg["monitor_gpus"]: + metric_prefix = "ray" + step_metric = f"{metric_prefix}/ray_step" + if cfg["wandb_enabled"] and self.wandb_logger: + self.wandb_logger.define_metric( + f"{metric_prefix}/*", step_metric=step_metric + ) + self.gpu_monitor = RayGpuMonitorLogger( collection_interval=cfg["gpu_monitoring"]["collection_interval"], flush_interval=cfg["gpu_monitoring"]["flush_interval"], + metric_prefix=metric_prefix, + step_metric=step_metric, parent_logger=self, ) self.gpu_monitor.start() @@ -480,7 +541,11 @@ def __init__(self, cfg: LoggerConfig): print("No loggers initialized") def log_metrics( - self, metrics: Dict[str, Any], step: int, prefix: Optional[str] = "" + self, + metrics: Dict[str, Any], + step: int, + prefix: Optional[str] = "", + step_metric: Optional[str] = None, ) -> None: """Log metrics to all enabled backends. @@ -488,9 +553,11 @@ def log_metrics( metrics: Dict of metrics to log step: Global step value prefix: Optional prefix for metric names + step_metric: Optional name of a field in metrics to use as step instead + of the provided step value (currently only needed for wandb) """ for logger in self.loggers: - logger.log_metrics(metrics, step, prefix) + logger.log_metrics(metrics, step, prefix, step_metric) def log_hyperparams(self, params: Dict[str, Any]) -> None: """Log hyperparameters to all enabled backends. diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 1dbbd6a169..6056effa57 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -113,6 +113,8 @@ def session_data(request, init_ray_cluster): logger = RayGpuMonitorLogger( collection_interval=float("inf"), flush_interval=float("inf"), + metric_prefix="test", + step_metric="test/step", parent_logger=None, ) unit_test_data["gpu_types"] = list(set(logger._collect_gpu_sku().values())) @@ -209,6 +211,8 @@ def ray_gpu_monitor(init_ray_cluster): gpu_monitor = RayGpuMonitorLogger( collection_interval=1, flush_interval=float("inf"), # Disabling flushing since we will do it manually + metric_prefix="test", + step_metric="test/step", parent_logger=None, ) gpu_monitor.start() diff --git a/tests/unit/utils/test_logger.py b/tests/unit/utils/test_logger.py index dd1c24fb27..ac2ce9cc43 100644 --- a/tests/unit/utils/test_logger.py +++ b/tests/unit/utils/test_logger.py @@ -186,6 +186,67 @@ def test_log_metrics_with_prefix(self, mock_wandb): expected_metrics = {"train/loss": 0.5, "train/accuracy": 0.8} mock_run.log.assert_called_once_with(expected_metrics, step=step) + @patch("nemo_reinforcer.utils.logger.wandb") + def test_log_metrics_with_step_metric(self, mock_wandb): + """Test logging metrics with a step metric to WandbLogger.""" + cfg = {} + logger = WandbLogger(cfg) + + # Define step metric + step_metric = "iteration" + + # Include the step metric in the metrics + metrics = {"loss": 0.5, "accuracy": 0.8, "iteration": 15} + step = 10 # This should be ignored when step_metric is provided + + logger.log_metrics(metrics, step, step_metric=step_metric) + + # Check that log was called with metrics and commit=False + # When using step_metric, step should be ignored and commit=False should be used + mock_run = mock_wandb.init.return_value + mock_run.log.assert_called_once_with(metrics, commit=False) + + @patch("nemo_reinforcer.utils.logger.wandb") + def test_log_metrics_with_prefix_and_step_metric(self, mock_wandb): + """Test logging metrics with both prefix and step metric.""" + cfg = {} + logger = WandbLogger(cfg) + + # Define prefix and step metric + prefix = "train" + step_metric = "train/iteration" + + # Include the step metric in the metrics + metrics = {"loss": 0.5, "accuracy": 0.8, "iteration": 15} + step = 10 # This should be ignored when step_metric is provided + + logger.log_metrics(metrics, step, prefix=prefix, step_metric=step_metric) + + # Check that log was called with prefixed metrics and commit=False + # The step_metric key gets prefixed based on the current implementation + mock_run = mock_wandb.init.return_value + expected_metrics = { + "train/loss": 0.5, + "train/accuracy": 0.8, + "train/iteration": 15, + } + mock_run.log.assert_called_once_with(expected_metrics, commit=False) + + @patch("nemo_reinforcer.utils.logger.wandb") + def test_define_metric(self, mock_wandb): + """Test defining a metric with a custom step metric.""" + cfg = {} + logger = WandbLogger(cfg) + + # Define metric pattern and step metric + logger.define_metric("ray/*", step_metric="ray/ray_step") + + # Check that define_metric was called + mock_run = mock_wandb.init.return_value + mock_run.define_metric.assert_called_once_with( + "ray/*", step_metric="ray/ray_step" + ) + @patch("nemo_reinforcer.utils.logger.wandb") def test_log_hyperparams(self, mock_wandb): """Test logging hyperparameters to WandbLogger.""" @@ -219,11 +280,13 @@ def __init__(self): self.logged_metrics = [] self.logged_steps = [] self.logged_prefixes = [] + self.logged_step_metrics = [] - def log_metrics(self, metrics, step, prefix=""): + def log_metrics(self, metrics, step, prefix="", step_metric=None): self.logged_metrics.append(metrics) self.logged_steps.append(step) self.logged_prefixes.append(prefix) + self.logged_step_metrics.append(step_metric) return MockLogger() @@ -235,12 +298,18 @@ def test_init(self, mock_ray): # Initialize the monitor with standard settings monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Verify initialization parameters assert monitor.collection_interval == 10.0 assert monitor.flush_interval == 60.0 + assert monitor.metric_prefix == "test" + assert monitor.step_metric == "test/step" assert monitor.parent_logger is None assert monitor.metrics_buffer == [] assert monitor.is_running is False @@ -255,7 +324,11 @@ def test_start(self, mock_thread, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Start the monitor @@ -277,7 +350,11 @@ def test_start_ray_not_initialized(self, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Starting should raise a ValueError @@ -293,7 +370,11 @@ def test_stop(self, mock_thread, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Start the monitor @@ -318,7 +399,11 @@ def test_parse_gpu_metric(self, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Create a sample with GPU utilization metric @@ -405,7 +490,11 @@ def test_fetch_and_parse_metrics(self, mock_get, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Mock the _parse_gpu_metric method to return expected values @@ -447,7 +536,11 @@ def test_collect_metrics(self, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Mock the _fetch_and_parse_metrics method @@ -483,6 +576,8 @@ def test_flush_empty_buffer(self, mock_ray, mock_parent_logger): monitor = RayGpuMonitorLogger( collection_interval=10.0, flush_interval=60.0, + metric_prefix="ray", + step_metric="ray/ray_step", parent_logger=mock_parent_logger, ) @@ -502,6 +597,8 @@ def test_flush(self, mock_ray, mock_parent_logger): monitor = RayGpuMonitorLogger( collection_interval=10.0, flush_interval=60.0, + metric_prefix="ray", + step_metric="ray/ray_step", parent_logger=mock_parent_logger, ) @@ -522,23 +619,68 @@ def test_flush(self, mock_ray, mock_parent_logger): # Verify parent logger's log_metrics was called for each entry assert len(mock_parent_logger.logged_metrics) == 2 - assert mock_parent_logger.logged_metrics[0] == { + + # First metrics entry should include the step metric + expected_first_metrics = { "node.0.gpu.0.gpu": 75.5, "node.0.gpu.0.memory": 4096.0, + "ray/ray_step": 10, # Step metric added } + assert mock_parent_logger.logged_metrics[0] == expected_first_metrics assert mock_parent_logger.logged_steps[0] == 10 assert mock_parent_logger.logged_prefixes[0] == "ray" + assert mock_parent_logger.logged_step_metrics[0] == "ray/ray_step" - assert mock_parent_logger.logged_metrics[1] == { + # Second metrics entry should include the step metric + expected_second_metrics = { "node.0.gpu.0.gpu": 80.0, "node.0.gpu.0.memory": 5120.0, + "ray/ray_step": 20, # Step metric added } + assert mock_parent_logger.logged_metrics[1] == expected_second_metrics assert mock_parent_logger.logged_steps[1] == 20 assert mock_parent_logger.logged_prefixes[1] == "ray" + assert mock_parent_logger.logged_step_metrics[1] == "ray/ray_step" # Verify buffer was cleared assert monitor.metrics_buffer == [] + @patch("nemo_reinforcer.utils.logger.ray") + def test_flush_with_custom_prefix(self, mock_ray, mock_parent_logger): + """Test flush method with custom metric prefix.""" + # Mock ray.is_initialized to return True + mock_ray.is_initialized.return_value = True + + # Initialize the monitor with parent logger and custom prefix + custom_prefix = "custom_metrics" + custom_step_metric = "custom_metrics/step" + monitor = RayGpuMonitorLogger( + collection_interval=10.0, + flush_interval=60.0, + metric_prefix=custom_prefix, + step_metric=custom_step_metric, + parent_logger=mock_parent_logger, + ) + + # Add test metrics to buffer + monitor.metrics_buffer = [ + { + "step": 15, + "metrics": {"node.0.gpu.0.gpu": 60.0}, + } + ] + + # Call flush + monitor.flush() + + # Verify parent logger's log_metrics was called with the custom prefix + assert len(mock_parent_logger.logged_metrics) == 1 + expected_metrics = {"node.0.gpu.0.gpu": 60.0, "custom_metrics/step": 15} + assert mock_parent_logger.logged_metrics[0] == expected_metrics + assert mock_parent_logger.logged_steps[0] == 15 + assert mock_parent_logger.logged_prefixes[0] == custom_prefix + assert mock_parent_logger.logged_step_metrics[0] == custom_step_metric + @patch("nemo_reinforcer.utils.logger.ray") @patch("nemo_reinforcer.utils.logger.time") def test_collection_loop(self, mock_time, mock_ray): @@ -556,7 +698,11 @@ def test_collection_loop(self, mock_time, mock_ray): # Initialize the monitor monitor = RayGpuMonitorLogger( - collection_interval=10.0, flush_interval=60.0, parent_logger=None + collection_interval=10.0, + flush_interval=60.0, + metric_prefix="test", + step_metric="test/step", + parent_logger=None, ) # Set start time and running flag @@ -593,6 +739,89 @@ def side_effect(): # Verify flush was called (flush_interval elapsed) mock_flush.assert_called_once() + @patch("nemo_reinforcer.utils.logger.WandbLogger") + @patch("nemo_reinforcer.utils.logger.TensorboardLogger") + @patch("nemo_reinforcer.utils.logger.RayGpuMonitorLogger") + def test_init_with_gpu_monitoring( + self, mock_gpu_monitor, mock_tb_logger, mock_wandb_logger, temp_dir + ): + """Test initialization with GPU monitoring enabled.""" + cfg = { + "wandb_enabled": True, + "tensorboard_enabled": True, + "monitor_gpus": True, + "gpu_monitoring": { + "collection_interval": 15.0, + "flush_interval": 45.0, + }, + "wandb": {"project": "test-project"}, + "tensorboard": {"log_dir": "test_logs"}, + "log_dir": temp_dir, + } + logger = Logger(cfg) + + # Check that regular loggers were initialized + assert len(logger.loggers) == 2 + mock_wandb_logger.assert_called_once() + mock_tb_logger.assert_called_once() + + # Check that GPU monitor was initialized with correct parameters + mock_gpu_monitor.assert_called_once_with( + collection_interval=15.0, + flush_interval=45.0, + metric_prefix="ray", + step_metric="ray/ray_step", + parent_logger=logger, + ) + + # Check that GPU monitor was started + mock_gpu_instance = mock_gpu_monitor.return_value + mock_gpu_instance.start.assert_called_once() + + # Check that wandb metrics are defined with the step metric + mock_wandb_instance = mock_wandb_logger.return_value + mock_wandb_instance.define_metric.assert_called_once_with( + "ray/*", step_metric="ray/ray_step" + ) + + @patch("nemo_reinforcer.utils.logger.WandbLogger") + @patch("nemo_reinforcer.utils.logger.TensorboardLogger") + @patch("nemo_reinforcer.utils.logger.RayGpuMonitorLogger") + def test_gpu_monitoring_without_wandb( + self, mock_gpu_monitor, mock_tb_logger, mock_wandb_logger, temp_dir + ): + """Test GPU monitoring initialization when wandb is disabled.""" + cfg = { + "wandb_enabled": False, + "tensorboard_enabled": True, + "monitor_gpus": True, + "gpu_monitoring": { + "collection_interval": 15.0, + "flush_interval": 45.0, + }, + "tensorboard": {"log_dir": "test_logs"}, + "log_dir": temp_dir, + } + logger = Logger(cfg) + + # Check that only tensorboard logger was initialized + assert len(logger.loggers) == 1 + mock_wandb_logger.assert_not_called() + mock_tb_logger.assert_called_once() + + # Check that GPU monitor was initialized with correct parameters + mock_gpu_monitor.assert_called_once_with( + collection_interval=15.0, + flush_interval=45.0, + metric_prefix="ray", + step_metric="ray/ray_step", + parent_logger=logger, + ) + + # Since wandb is disabled, define_metric should not be called + mock_wandb_instance = mock_wandb_logger.return_value + assert not mock_wandb_instance.define_metric.called + class TestLogger: """Test the main Logger class.""" @@ -704,8 +933,8 @@ def test_log_metrics(self, mock_tb_logger, mock_wandb_logger, temp_dir): logger.log_metrics(metrics, step) # Check that log_metrics was called on both loggers - mock_wandb_instance.log_metrics.assert_called_once_with(metrics, step, "") - mock_tb_instance.log_metrics.assert_called_once_with(metrics, step, "") + mock_wandb_instance.log_metrics.assert_called_once_with(metrics, step, "", None) + mock_tb_instance.log_metrics.assert_called_once_with(metrics, step, "", None) @patch("nemo_reinforcer.utils.logger.WandbLogger") @patch("nemo_reinforcer.utils.logger.TensorboardLogger") @@ -760,9 +989,56 @@ def test_init_with_gpu_monitoring( # Check that GPU monitor was initialized with correct parameters mock_gpu_monitor.assert_called_once_with( - collection_interval=15.0, flush_interval=45.0, parent_logger=logger + collection_interval=15.0, + flush_interval=45.0, + metric_prefix="ray", + step_metric="ray/ray_step", + parent_logger=logger, ) # Check that GPU monitor was started mock_gpu_instance = mock_gpu_monitor.return_value mock_gpu_instance.start.assert_called_once() + + # Check that wandb metrics are defined with the step metric + mock_wandb_instance = mock_wandb_logger.return_value + mock_wandb_instance.define_metric.assert_called_once_with( + "ray/*", step_metric="ray/ray_step" + ) + + @patch("nemo_reinforcer.utils.logger.WandbLogger") + @patch("nemo_reinforcer.utils.logger.TensorboardLogger") + def test_log_metrics_with_prefix_and_step_metric( + self, mock_tb_logger, mock_wandb_logger, temp_dir + ): + """Test logging metrics with prefix and step_metric.""" + cfg = { + "wandb_enabled": True, + "tensorboard_enabled": True, + "monitor_gpus": False, + "wandb": {"project": "test-project"}, + "tensorboard": {"log_dir": "test_logs"}, + "log_dir": temp_dir, + } + logger = Logger(cfg) + + # Create mock logger instances + mock_wandb_instance = mock_wandb_logger.return_value + mock_tb_instance = mock_tb_logger.return_value + + # Create metrics with a step metric field + metrics = {"loss": 0.5, "accuracy": 0.8, "iteration": 15} + step = 10 + prefix = "train" + step_metric = "iteration" + + # Log metrics with prefix and step_metric + logger.log_metrics(metrics, step, prefix=prefix, step_metric=step_metric) + + # Check that log_metrics was called on both loggers with correct parameters + mock_wandb_instance.log_metrics.assert_called_once_with( + metrics, step, prefix, step_metric + ) + mock_tb_instance.log_metrics.assert_called_once_with( + metrics, step, prefix, step_metric + )