diff --git a/src/art/backend.py b/src/art/backend.py index 6997cfa02..b1b6f78ff 100644 --- a/src/art/backend.py +++ b/src/art/backend.py @@ -55,16 +55,14 @@ async def _get_step(self, model: "TrainableModel") -> int: response.raise_for_status() return response.json() - async def _delete_checkpoints( + async def _delete_checkpoint_files( self, model: "TrainableModel", - benchmark: str, - benchmark_smoothing: float, + steps_to_keep: list[int], ) -> None: response = await self._client.post( - "/_delete_checkpoints", - json=model.safe_model_dump(), - params={"benchmark": benchmark, "benchmark_smoothing": benchmark_smoothing}, + "/_delete_checkpoint_files", + json={"model": model.safe_model_dump(), "steps_to_keep": steps_to_keep}, ) response.raise_for_status() @@ -82,23 +80,6 @@ async def _prepare_backend_for_training( base_url, api_key = tuple(response.json()) return base_url, api_key - async def _log( - self, - model: "Model", - trajectory_groups: list[TrajectoryGroup], - split: str = "val", - ) -> None: - response = await self._client.post( - "/_log", - json={ - "model": model.safe_model_dump(), - "trajectory_groups": [tg.model_dump() for tg in trajectory_groups], - "split": split, - }, - timeout=None, - ) - response.raise_for_status() - async def _train_model( self, model: "TrainableModel", diff --git a/src/art/cli.py b/src/art/cli.py index d6f17cf8c..dd64fc634 100644 --- a/src/art/cli.py +++ b/src/art/cli.py @@ -173,7 +173,13 @@ async def art_error_handler(request: Request, exc: ARTError): app.post("/close")(backend.close) app.post("/register")(backend.register) app.post("/_get_step")(backend._get_step) - app.post("/_delete_checkpoints")(backend._delete_checkpoints) + + @app.post("/_delete_checkpoint_files") + async def _delete_checkpoint_files( + model: TrainableModel = Body(...), + steps_to_keep: list[int] = Body(...), + ): + await backend._delete_checkpoint_files(model, steps_to_keep) @app.post("/_prepare_backend_for_training") async def _prepare_backend_for_training( @@ -182,13 +188,7 @@ async def _prepare_backend_for_training( ): return await backend._prepare_backend_for_training(model, config) - @app.post("/_log") - async def _log( - model: Model, - trajectory_groups: list[TrajectoryGroup], - split: str = Body("val"), - ): - await backend._log(model, trajectory_groups, split) + # Note: /_log endpoint removed - logging now handled by frontend (Model.log()) @app.post("/_train_model") async def _train_model( diff --git a/src/art/local/backend.py b/src/art/local/backend.py index 2251929c8..22b3f4f09 100644 --- a/src/art/local/backend.py +++ b/src/art/local/backend.py @@ -1,5 +1,4 @@ import asyncio -from datetime import datetime import json import math import os @@ -11,32 +10,24 @@ import aiohttp import numpy as np from openai import AsyncOpenAI -import polars as pl import torch from tqdm import auto as tqdm from transformers import AutoImageProcessor, AutoTokenizer from transformers.image_processing_utils import BaseImageProcessor from transformers.tokenization_utils_base import PreTrainedTokenizerBase from typing_extensions import Self -import wandb -from wandb.sdk.wandb_run import Run -import weave -from weave.trace.weave_client import WeaveClient -from art.utils.old_benchmarking.calculate_step_metrics import calculate_step_std_dev from art.utils.output_dirs import ( get_default_art_path, get_model_dir, get_output_dir_from_model_properties, get_step_checkpoint_dir, - get_trajectories_split_dir, ) from art.utils.s3 import ( ExcludableOption, pull_model_from_s3, push_model_to_s3, ) -from art.utils.trajectory_logging import write_trajectory_groups_parquet from mp_actors import close_proxy, move_to_child_process from .. import dev @@ -79,8 +70,6 @@ def __init__(self, *, in_process: bool = False, path: str | None = None) -> None self._services: dict[str, ModelService] = {} self._tokenizers: dict[str, PreTrainedTokenizerBase] = {} self._image_processors: dict[str, BaseImageProcessor | None] = {} - self._wandb_runs: dict[str, Run] = {} - self._weave_clients: dict[str, WeaveClient] = {} def __enter__(self) -> Self: return self @@ -123,9 +112,10 @@ async def register( auto_migrate_on_register(output_dir) - # Initialize wandb and weave early if this is a trainable model + # Initialize wandb early if this is a trainable model + # (wandb initialization is now handled by the model's _get_wandb_run method) if model.trainable and "WANDB_API_KEY" in os.environ: - _ = self._get_wandb_run(model) + _ = model._get_wandb_run() async def _get_service(self, model: TrainableModel) -> ModelService: from ..dev.get_model_config import get_model_config @@ -243,33 +233,15 @@ def __get_step(self, model: Model) -> int: # Non-trainable models do not have checkpoints/steps; default to 0 return 0 - async def _delete_checkpoints( + async def _delete_checkpoint_files( self, model: TrainableModel, - benchmark: str, - benchmark_smoothing: float, + steps_to_keep: list[int], ) -> None: + """Delete checkpoint files, keeping only the specified steps.""" from ..tinker.service import TinkerService output_dir = get_model_dir(model=model, art_path=self._path) - # Keep the latest step - steps_to_keep = [get_model_step(model, self._path)] - try: - best_step = ( - pl.read_ndjson(f"{output_dir}/history.jsonl") - .drop_nulls(subset=[benchmark]) - .group_by("step") - .mean() - .with_columns(pl.col(benchmark).ewm_mean(alpha=benchmark_smoothing)) - .sort(benchmark) - .select(pl.col("step").last()) - .item() - ) - steps_to_keep.append(best_step) - except FileNotFoundError: - print(f'"{output_dir}/history.jsonl" not found') - except pl.exceptions.ColumnNotFoundError: - print(f'No "{benchmark}" metric found in history') service = await self._get_service(model) if isinstance(service, TinkerService): await service.delete_checkpoints(steps_to_keep) @@ -364,54 +336,7 @@ async def _monitor_openai_server( raise # Otherwise, continue and try again - async def _log( - self, - model: Model, - trajectory_groups: list[TrajectoryGroup], - split: str = "val", - ) -> None: - # Save logs for trajectory groups - parent_dir = get_trajectories_split_dir( - get_model_dir(model=model, art_path=self._path), split - ) - os.makedirs(parent_dir, exist_ok=True) - - # Get the file name for the current iteration, or default to 0 for non-trainable models - iteration = self.__get_step(model) - file_name = f"{iteration:04d}.parquet" - - # Write the logs to Parquet file (with ZSTD compression) - write_trajectory_groups_parquet(trajectory_groups, f"{parent_dir}/{file_name}") - - # Collect all metrics (including reward) across all trajectories - all_metrics: dict[str, list[float]] = {"reward": [], "exception_rate": []} - - for group in trajectory_groups: - for trajectory in group: - if isinstance(trajectory, BaseException): - all_metrics["exception_rate"].append(1) - continue - else: - all_metrics["exception_rate"].append(0) - # Add reward metric - all_metrics["reward"].append(trajectory.reward) - - # Collect other custom metrics - for metric, value in trajectory.metrics.items(): - if metric not in all_metrics: - all_metrics[metric] = [] - all_metrics[metric].append(float(value)) - - # Calculate averages for all metrics - averages = {} - for metric, values in all_metrics.items(): - if len(values) > 0: - averages[metric] = sum(values) / len(values) - - # Calculate average standard deviation of rewards within groups - averages["reward_std_dev"] = calculate_step_std_dev(trajectory_groups) - - self._log_metrics(model, averages, split) + # Note: _log() method has been moved to the Model class (frontend) def _trajectory_log(self, trajectory: Trajectory) -> str: """Format a trajectory into a readable log string.""" @@ -436,9 +361,7 @@ async def _train_model( if verbose: print("Starting _train_model") service = await self._get_service(model) - if verbose: - print("Logging training data to disk...") - await self._log(model, trajectory_groups, "train") + # Note: Logging is now handled by the frontend (Model.train() calls Model.log()) if verbose: print("Packing tensors...") @@ -491,27 +414,18 @@ async def _train_model( if isinstance(service, UnslothService): await service.register_lora_for_step(next_step, next_checkpoint_dir) - # Log metrics showing no groups were trainable - self._log_metrics( - model, - { - "num_groups_submitted": num_groups_submitted, - "num_groups_trainable": 0, - }, - "train", - step=next_step, - ) + # Yield metrics showing no groups were trainable + # (the frontend will handle logging) + yield { + "num_groups_submitted": num_groups_submitted, + "num_groups_trainable": 0, + "num_gradient_steps": 0, + } return disk_packed_tensors = packed_tensors_to_dir( packed_tensors, f"{get_model_dir(model=model, art_path=self._path)}/tensors" ) - if dev_config.get("scale_learning_rate_by_reward_std_dev", False): - config = config.model_copy( - update={ - "learning_rate": config.learning_rate - * self._get_reward_std_dev_learning_rate_multiplier(model) - } - ) + # Note: scale_learning_rate_by_reward_std_dev is now handled by the frontend (Model.train()) results: list[dict[str, float]] = [] estimated_gradient_steps = disk_packed_tensors["num_sequences"] if torchtune_args := (model._internal_config or dev.InternalModelConfig()).get( @@ -537,171 +451,12 @@ async def _train_model( pbar.update(1) pbar.set_postfix(result) pbar.close() - if verbose: - print("Logging metrics...") - data = { - k: sum(d.get(k, 0) for d in results) / sum(1 for d in results if k in d) - for k in {k for d in results for k in d} - } - # Add group counting metrics - data["num_groups_submitted"] = num_groups_submitted - data["num_groups_trainable"] = num_groups_trainable - # Get the current step after training - current_step = self.__get_step(model) - self._log_metrics(model, data, "train", step=current_step) + # Note: Metrics logging is now handled by the frontend (Model.train()) if verbose: print("_train_model complete") - def _get_reward_std_dev_learning_rate_multiplier( - self, model: TrainableModel - ) -> float: - output_dir = get_model_dir(model=model, art_path=self._path) - learning_rate_multiplier = 1.0 # Default prior - try: - std_dev_history = ( - pl.read_ndjson(f"{output_dir}/history.jsonl") - .drop_nulls(subset=["train/reward_std_dev"]) - .group_by("step") - .mean() - .sort("step") - ) - - # Fit linear regression to std_dev_history - if len(std_dev_history) > 1: - steps = std_dev_history["step"].to_numpy() - std_devs = std_dev_history["train/reward_std_dev"].to_numpy() - - # Fit linear regression: y = mx + b - # polyfit returns [coefficient, intercept] for degree 1 - coefficient, intercept = np.polyfit(steps, std_devs, deg=1) - - # Get prediction for the last step - last_step = steps[-1] - last_step_prediction = coefficient * last_step + intercept - last_step_actual = std_devs[-1] - - # Calculate R-squared and adjusted R-squared - predictions = coefficient * steps + intercept - ss_residual = np.sum((std_devs - predictions) ** 2) - ss_total = np.sum((std_devs - np.mean(std_devs)) ** 2) - r_squared = 1 - (ss_residual / ss_total) if ss_total > 0 else 0 - - # Adjusted R-squared accounts for sample size - # For simple linear regression: adj_R² = 1 - (1 - R²) * (n - 1) / (n - 2) - n_samples = len(steps) - if n_samples > 2: - adjusted_r_squared = 1 - (1 - r_squared) * (n_samples - 1) / ( - n_samples - 2 - ) - else: - adjusted_r_squared = ( - 0 # Not enough samples for meaningful adjustment - ) - - # Calculate learning rate multiplier - # raw_multiplier = last_step_prediction / intercept (if intercept > 0) - # adjusted by goodness of fit: multiplier = 1 + adj_R² * (raw_multiplier - 1) - if intercept > 0: - raw_multiplier = last_step_prediction / intercept - # learning_rate_multiplier = 1 + adjusted_r_squared * ( - # raw_multiplier - 1 - # ) - learning_rate_multiplier = raw_multiplier - else: - # If intercept <= 0, can't calculate meaningful ratio, stick with prior - raw_multiplier = 1.0 - learning_rate_multiplier = 1.0 - - print(f"Regression fitted: y = {coefficient:.6f}x + {intercept:.6f}") - print(f" Coefficient (slope): {coefficient:.6f}") - print(f" Intercept: {intercept:.6f}") - print(f" R-squared: {r_squared:.4f}") - print( - f" Adjusted R-squared: {adjusted_r_squared:.4f} (n={n_samples} samples)" - ) - print( - f" Last step ({last_step}) prediction: {last_step_prediction:.6f}" - ) - print(f" Last step actual value: {last_step_actual:.6f}") - print( - f" Prediction error: {abs(last_step_actual - last_step_prediction):.6f}" - ) - print(f" Raw LR multiplier (pred/intercept): {raw_multiplier:.4f}") - print(f" Adjusted LR multiplier: {learning_rate_multiplier:.4f}") - else: - print( - f"Not enough data points to fit regression (need at least 2, got {len(std_dev_history)})" - ) - - except FileNotFoundError: - print(f'"{output_dir}/history.jsonl" not found') - except pl.exceptions.ColumnNotFoundError: - print(f'No "train/reward_std_dev" metric found in history') - - return learning_rate_multiplier - - def _log_metrics( - self, - model: Model, - metrics: dict[str, float], - split: str, - step: int | None = None, - ) -> None: - metrics = {f"{split}/{metric}": value for metric, value in metrics.items()} - step = step if step is not None else self.__get_step(model) - - with open( - f"{get_model_dir(model=model, art_path=self._path)}/history.jsonl", "a" - ) as f: - f.write( - json.dumps( - { - k: v for k, v in metrics.items() if v == v - } # Filter out NaN values - | {"step": step, "recorded_at": datetime.now().isoformat()} - ) - + "\n" - ) - - # If we have a W&B run, log the data there - if run := self._get_wandb_run(model): - run.log({"training_step": step, **metrics}) - - def _get_wandb_run(self, model: Model) -> Run | None: - if "WANDB_API_KEY" not in os.environ: - return None - if ( - model.name not in self._wandb_runs - or self._wandb_runs[model.name]._is_finished - ): - run = wandb.init( - project=model.project, - name=model.name, - id=model.name, - resume="allow", - settings=wandb.Settings( - x_stats_open_metrics_endpoints={ - "vllm": "http://localhost:8000/metrics", - }, - x_stats_open_metrics_filters=( - "vllm.vllm:num_requests_waiting", - "vllm.vllm:num_requests_running", - ), - ), - ) - self._wandb_runs[model.name] = run - - # Define training_step as the x-axis for all metrics. - # This allows out-of-order logging (e.g., async validation for previous steps). - wandb.define_metric("training_step") - wandb.define_metric("train/*", step_metric="training_step") - wandb.define_metric("val/*", step_metric="training_step") - os.environ["WEAVE_PRINT_CALL_LINK"] = os.getenv( - "WEAVE_PRINT_CALL_LINK", "False" - ) - os.environ["WEAVE_LOG_LEVEL"] = os.getenv("WEAVE_LOG_LEVEL", "CRITICAL") - self._weave_clients[model.name] = weave.init(model.project) - return self._wandb_runs[model.name] + # Note: _get_reward_std_dev_learning_rate_multiplier and _log_metrics + # have been moved to the Model class (frontend) # ------------------------------------------------------------------ # Experimental support for S3 diff --git a/src/art/model.py b/src/art/model.py index b652a2813..b9d47a233 100644 --- a/src/art/model.py +++ b/src/art/model.py @@ -1,15 +1,23 @@ +from datetime import datetime +import json +import os from typing import TYPE_CHECKING, Generic, Iterable, Optional, TypeVar, cast, overload import httpx from openai import AsyncOpenAI, DefaultAsyncHttpxClient +import polars as pl from pydantic import BaseModel from typing_extensions import Never from . import dev from .trajectories import Trajectory, TrajectoryGroup from .types import TrainConfig +from .utils.old_benchmarking.calculate_step_metrics import calculate_step_std_dev +from .utils.trajectory_logging import write_trajectory_groups_parquet if TYPE_CHECKING: + from wandb.sdk.wandb_run import Run + from art.backend import Backend @@ -68,10 +76,15 @@ class Model( # inference endpoint. inference_model_name: str | None = None + # --- Frontend logging configuration --- + base_path: str = ".art" # Same default as LocalBackend for backward compat + report_metrics: list[str] | None = None # None = default (wandb if key present) + _backend: Optional["Backend"] = None _s3_bucket: str | None = None _s3_prefix: str | None = None _openai_client: AsyncOpenAI | None = None + _wandb_run: Optional["Run"] = None # Private, for lazy wandb initialization def __init__( self, @@ -84,6 +97,8 @@ def __init__( inference_api_key: str | None = None, inference_base_url: str | None = None, inference_model_name: str | None = None, + base_path: str = ".art", + report_metrics: list[str] | None = None, **kwargs: Never, ) -> None: super().__init__( @@ -94,6 +109,8 @@ def __init__( inference_api_key=inference_api_key, inference_base_url=inference_base_url, inference_model_name=inference_model_name, + base_path=base_path, + report_metrics=report_metrics, **kwargs, ) @@ -109,6 +126,8 @@ def __new__( inference_api_key: str | None = None, inference_base_url: str | None = None, inference_model_name: str | None = None, + base_path: str = ".art", + report_metrics: list[str] | None = None, ) -> "Model[None]": ... @overload @@ -123,6 +142,8 @@ def __new__( inference_api_key: str | None = None, inference_base_url: str | None = None, inference_model_name: str | None = None, + base_path: str = ".art", + report_metrics: list[str] | None = None, ) -> "Model[ModelConfig]": ... def __new__( @@ -225,6 +246,71 @@ def get_inference_name(self, step: int | None = None) -> str: return f"{base_name}@{step}" return base_name + def _get_output_dir(self) -> str: + """Get the output directory for this model.""" + return f"{self.base_path}/{self.project}/models/{self.name}" + + def _get_wandb_run(self) -> Optional["Run"]: + """Get or create the wandb run for this model.""" + import wandb + + if "WANDB_API_KEY" not in os.environ: + return None + if self._wandb_run is None or self._wandb_run._is_finished: + run = wandb.init( + project=self.project, + name=self.name, + id=self.name, + resume="allow", + settings=wandb.Settings( + x_stats_open_metrics_endpoints={ + "vllm": "http://localhost:8000/metrics", + }, + x_stats_open_metrics_filters=( + "vllm.vllm:num_requests_waiting", + "vllm.vllm:num_requests_running", + ), + ), + ) + self._wandb_run = run + + # Define training_step as the x-axis for all metrics. + # This allows out-of-order logging (e.g., async validation for previous steps). + wandb.define_metric("training_step") + wandb.define_metric("train/*", step_metric="training_step") + wandb.define_metric("val/*", step_metric="training_step") + return self._wandb_run + + def _log_metrics( + self, + metrics: dict[str, float], + split: str, + step: int, + ) -> None: + """Log metrics to history.jsonl and optionally wandb.""" + prefixed = {f"{split}/{k}": v for k, v in metrics.items()} + output_dir = self._get_output_dir() + + # Write to history.jsonl + with open(f"{output_dir}/history.jsonl", "a") as f: + f.write( + json.dumps( + { + k: v for k, v in prefixed.items() if v == v + } # Filter out NaN values + | {"step": step, "recorded_at": datetime.now().isoformat()} + ) + + "\n" + ) + + # Log to wandb if enabled + should_log_wandb = ( + self.report_metrics is None and "WANDB_API_KEY" in os.environ + ) or (self.report_metrics is not None and "wandb" in self.report_metrics) + if should_log_wandb: + if run := self._get_wandb_run(): + run.log({"training_step": step, **prefixed}) + async def log( self, trajectories: Iterable[Trajectory | BaseException] | Iterable[TrajectoryGroup], @@ -237,6 +323,7 @@ async def log( trajectories: A batch of trajectories or trajectory groups. split: The evaluation's split. Defaults to "val". """ + # Convert to list[TrajectoryGroup] if any(isinstance(t, Trajectory) for t in trajectories) or any( isinstance(t, BaseException) for t in trajectories ): @@ -247,12 +334,60 @@ async def log( ] else: trajectory_groups = cast(list[TrajectoryGroup], list(trajectories)) - await self.backend()._log( - self, - trajectory_groups, - split, + + # Get the current step + step = await self.get_step() if self.trainable else 0 + + # Ensure output directories exist + output_dir = self._get_output_dir() + trajectories_dir = f"{output_dir}/trajectories/{split}" + os.makedirs(trajectories_dir, exist_ok=True) + + # 1. Write parquet + file_name = f"{step:04d}.parquet" + write_trajectory_groups_parquet( + trajectory_groups, f"{trajectories_dir}/{file_name}" ) + # 2. Calculate aggregate metrics + all_metrics: dict[str, list[float]] = {"reward": [], "exception_rate": []} + + for group in trajectory_groups: + for trajectory in group: + if isinstance(trajectory, BaseException): + all_metrics["exception_rate"].append(1) + continue + else: + all_metrics["exception_rate"].append(0) + # Add reward metric + all_metrics["reward"].append(trajectory.reward) + + # Collect other custom metrics + for metric, value in trajectory.metrics.items(): + if metric not in all_metrics: + all_metrics[metric] = [] + all_metrics[metric].append(float(value)) + + # Calculate averages for all metrics + averages: dict[str, float] = {} + for metric, values in all_metrics.items(): + if len(values) > 0: + averages[metric] = sum(values) / len(values) + + # Calculate average standard deviation of rewards within groups + averages["reward_std_dev"] = calculate_step_std_dev(trajectory_groups) + + # 3. Log metrics (writes to history.jsonl and wandb) + self._log_metrics(averages, split, step) + + async def get_step(self) -> int: + """ + Get the model's current training step. For non-trainable models, returns 0. + """ + if self.trainable: + return await self.backend()._get_step(self) # type: ignore + return 0 + # --------------------------------------------------------------------------- # Trainable models @@ -277,6 +412,8 @@ def __init__( id: str | None = None, config: ModelConfig | None = None, base_model: str, + base_path: str = ".art", + report_metrics: list[str] | None = None, _internal_config: dev.InternalModelConfig | None = None, **kwargs: Never, ) -> None: @@ -287,6 +424,8 @@ def __init__( id=id, config=config, base_model=base_model, # type: ignore + base_path=base_path, + report_metrics=report_metrics, **kwargs, ) if _internal_config is not None: @@ -303,6 +442,8 @@ def __new__( id: str | None = None, config: None = None, base_model: str, + base_path: str = ".art", + report_metrics: list[str] | None = None, _internal_config: dev.InternalModelConfig | None = None, ) -> "TrainableModel[None]": ... @@ -316,6 +457,8 @@ def __new__( id: str | None = None, config: ModelConfig, base_model: str, + base_path: str = ".art", + report_metrics: list[str] | None = None, _internal_config: dev.InternalModelConfig | None = None, ) -> "TrainableModel[ModelConfig]": ... @@ -360,12 +503,6 @@ async def register( or self.name ) - async def get_step(self) -> int: - """ - Get the model's current training step. - """ - return await self.backend()._get_step(self) - async def delete_checkpoints( self, best_checkpoint_metric: str = "val/reward" ) -> None: @@ -376,9 +513,28 @@ async def delete_checkpoints( best_checkpoint_metric: The metric to use to determine the best checkpoint. Defaults to "val/reward". """ - await self.backend()._delete_checkpoints( - self, best_checkpoint_metric, benchmark_smoothing=1.0 - ) + output_dir = self._get_output_dir() + steps_to_keep = [await self.get_step()] # Keep latest + + # Read history.jsonl to find best step + try: + best_step = ( + pl.read_ndjson(f"{output_dir}/history.jsonl") + .drop_nulls(subset=[best_checkpoint_metric]) + .group_by("step") + .mean() + .sort(best_checkpoint_metric) + .select(pl.col("step").last()) + .item() + ) + steps_to_keep.append(best_step) + except FileNotFoundError: + print(f'"{output_dir}/history.jsonl" not found') + except pl.exceptions.ColumnNotFoundError: + print(f'No "{best_checkpoint_metric}" metric found in history') + + # Backend only does file deletion + await self.backend()._delete_checkpoint_files(self, steps_to_keep) async def train( self, @@ -396,7 +552,27 @@ async def train( _config: Additional configuration that is subject to change and not yet part of the public API. Use at your own risk. """ - async for _ in self.backend()._train_model( - self, list(trajectory_groups), config, _config or {}, verbose + groups_list = list(trajectory_groups) + _config = _config or {} + + # 1. Log trajectories first (frontend handles this now) + await self.log(groups_list, split="train") + + # 2. Train (backend no longer logs internally) + training_metrics: list[dict[str, float]] = [] + async for metrics in self.backend()._train_model( + self, groups_list, config, _config, verbose ): - pass + training_metrics.append(metrics) + + # 3. Log training metrics (loss, gradient norms, etc.) + if training_metrics: + avg_metrics = { + k: sum(d.get(k, 0) for d in training_metrics) + / sum(1 for d in training_metrics if k in d) + for k in {k for d in training_metrics for k in d} + if k != "num_gradient_steps" + } + # Get the current step after training + step = await self.get_step() + self._log_metrics(avg_metrics, "train", step) diff --git a/src/art/serverless/backend.py b/src/art/serverless/backend.py index b33dda1e6..181f8d881 100644 --- a/src/art/serverless/backend.py +++ b/src/art/serverless/backend.py @@ -101,26 +101,19 @@ async def _get_step(self, model: "Model") -> int: # Non-trainable models do not have checkpoints/steps; default to 0 return 0 - async def _delete_checkpoints( + async def _delete_checkpoint_files( self, model: "TrainableModel", - benchmark: str, - benchmark_smoothing: float, + steps_to_keep: list[int], ) -> None: - # TODO: potentially implement benchmark smoothing + """Delete checkpoint files, keeping only the specified steps.""" assert model.id is not None, "Model ID is required" - benchmark_values: dict[int, float] = {} + # Get all checkpoint steps + all_steps: list[int] = [] async for checkpoint in self._client.models.checkpoints.list(model_id=model.id): - benchmark_values[checkpoint.step] = checkpoint.metrics.get( - benchmark, -float("inf") - ) - max_step = max(benchmark_values.keys()) - max_benchmark_value = max(benchmark_values.values()) - if steps_to_delete := [ - step - for step, benchmark_value in benchmark_values.items() - if step != max_step and benchmark_value != max_benchmark_value - ]: + all_steps.append(checkpoint.step) + # Delete all steps not in steps_to_keep + if steps_to_delete := [step for step in all_steps if step not in steps_to_keep]: await self._client.models.checkpoints.delete( model_id=model.id, steps=steps_to_delete, @@ -133,20 +126,8 @@ async def _prepare_backend_for_training( ) -> tuple[str, str]: return str(self._base_url), self._client.api_key - async def _log( - self, - model: "Model", - trajectory_groups: list[TrajectoryGroup], - split: str = "val", - ) -> None: - # TODO: log trajectories to local file system? - if not model.trainable: - print(f"Model {model.name} is not trainable; skipping logging.") - return - assert model.id is not None, "Model ID is required" - await self._client.models.log( - model_id=model.id, trajectory_groups=trajectory_groups, split=split - ) + # Note: _log() method has been moved to the Model class (frontend) + # Trajectories are now saved locally by the Model.log() method async def _train_model( self, diff --git a/src/art/vllm/server.py b/src/art/vllm/server.py index 988f637fa..20f62db4d 100644 --- a/src/art/vllm/server.py +++ b/src/art/vllm/server.py @@ -44,11 +44,9 @@ async def openai_server_task( # We must subclass ChatCompletionRequest before importing api_server # or logprobs will not always be returned subclass_chat_completion_request() - from vllm.entrypoints.openai import api_server - # Capture the OpenAIServingModels instance so dynamically added LoRAs # are reflected in the model list. - from vllm.entrypoints.openai import serving_models + from vllm.entrypoints.openai import api_server, serving_models serving_models_any = cast(Any, serving_models) if not getattr(serving_models_any, "_art_openai_serving_models_patched", False): diff --git a/tests/unit/test_frontend_logging.py b/tests/unit/test_frontend_logging.py new file mode 100644 index 000000000..8ae0f9453 --- /dev/null +++ b/tests/unit/test_frontend_logging.py @@ -0,0 +1,521 @@ +""" +Tests for frontend trajectory logging (Model.log() implementation). + +Tests verify: +1. Parquet files written by Model.log() are readable by existing infrastructure +2. history.jsonl format is compatible with existing readers +3. File paths match LocalBackend locations exactly +4. Metrics are calculated and prefixed correctly +""" + +import json +import os +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import polars as pl +import pytest + +from art import Model, TrainableModel, Trajectory, TrajectoryGroup +from art.utils.trajectory_logging import read_trajectory_groups_parquet + + +class TestFrontendLoggingCompatibility: + """Test that trajectories logged via frontend are readable by existing infra.""" + + @pytest.fixture + def sample_trajectories(self) -> list[Trajectory]: + """Create sample trajectories for testing.""" + return [ + Trajectory( + reward=0.8, + metrics={"duration": 5.2, "tokens": 100}, + metadata={"trace_id": "abc-123"}, + messages_and_choices=[ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ], + logs=["log1", "log2"], + ), + Trajectory( + reward=0.9, + metrics={"duration": 3.1, "tokens": 50}, + metadata={"trace_id": "def-456"}, + messages_and_choices=[ + {"role": "user", "content": "What's 2+2?"}, + {"role": "assistant", "content": "4"}, + ], + logs=[], + ), + ] + + @pytest.fixture + def sample_trajectory_groups( + self, sample_trajectories: list[Trajectory] + ) -> list[TrajectoryGroup]: + """Create sample trajectory groups for testing.""" + return [ + TrajectoryGroup( + trajectories=[sample_trajectories[0]], + exceptions=[], + ), + TrajectoryGroup( + trajectories=[sample_trajectories[1]], + exceptions=[], + ), + ] + + @pytest.mark.asyncio + async def test_parquet_readable_by_read_trajectory_groups_parquet( + self, tmp_path: Path, sample_trajectory_groups: list[TrajectoryGroup] + ): + """Direct parquet reader compatibility.""" + model = Model( + name="test-model", + project="test-project", + base_path=str(tmp_path), + ) + + # Mock get_step to return 0 for non-trainable model + await model.log(sample_trajectory_groups, split="val") + + # Verify readable by existing utility + parquet_path = ( + tmp_path / "test-project/models/test-model/trajectories/val/0000.parquet" + ) + assert parquet_path.exists(), f"Parquet file not found at {parquet_path}" + + loaded = read_trajectory_groups_parquet(parquet_path) + assert len(loaded) == 2 + assert loaded[0].trajectories[0].reward == 0.8 + assert loaded[1].trajectories[0].reward == 0.9 + + @pytest.mark.asyncio + async def test_parquet_schema_preserved( + self, tmp_path: Path, sample_trajectory_groups: list[TrajectoryGroup] + ): + """Verify parquet schema contains expected fields.""" + import pyarrow.parquet as pq + + model = Model( + name="test-model", + project="test-project", + base_path=str(tmp_path), + ) + + await model.log(sample_trajectory_groups, split="val") + + parquet_path = ( + tmp_path / "test-project/models/test-model/trajectories/val/0000.parquet" + ) + table = pq.read_table(parquet_path) + + # Check expected columns exist + expected_columns = [ + "group_index", + "reward", + "metrics", + "metadata", + "tools", + "logs", + "messages", + ] + for col in expected_columns: + assert col in table.column_names, f"Missing column: {col}" + + +class TestHistoryJsonlCompatibility: + """Test history.jsonl format compatibility.""" + + @pytest.fixture + def sample_trajectory_groups(self) -> list[TrajectoryGroup]: + """Create sample trajectory groups for testing.""" + return [ + TrajectoryGroup( + trajectories=[ + Trajectory( + reward=0.8, + metrics={"custom_metric": 42.0}, + messages_and_choices=[ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi!"}, + ], + ), + Trajectory( + reward=0.6, + metrics={"custom_metric": 38.0}, + messages_and_choices=[ + {"role": "user", "content": "Bye"}, + {"role": "assistant", "content": "Goodbye!"}, + ], + ), + ], + exceptions=[], + ) + ] + + @pytest.mark.asyncio + async def test_history_jsonl_format( + self, tmp_path: Path, sample_trajectory_groups: list[TrajectoryGroup] + ): + """Verify history.jsonl has correct format for downstream readers.""" + model = Model( + name="test-model", + project="test-project", + base_path=str(tmp_path), + ) + + await model.log(sample_trajectory_groups, split="val") + + history_path = tmp_path / "test-project/models/test-model/history.jsonl" + assert history_path.exists() + + with open(history_path) as f: + entry = json.loads(f.readline()) + + # Verify required fields + assert "step" in entry + assert "recorded_at" in entry + assert "val/reward" in entry # Prefixed metric + + @pytest.mark.asyncio + async def test_history_readable_by_polars( + self, tmp_path: Path, sample_trajectory_groups: list[TrajectoryGroup] + ): + """Verify history.jsonl is readable by pl.read_ndjson (used by delete_checkpoints).""" + model = Model( + name="test-model", + project="test-project", + base_path=str(tmp_path), + ) + + await model.log(sample_trajectory_groups, split="val") + + history_path = tmp_path / "test-project/models/test-model/history.jsonl" + df = pl.read_ndjson(str(history_path)) + + assert "step" in df.columns + assert "val/reward" in df.columns + assert "val/reward_std_dev" in df.columns + + @pytest.mark.asyncio + async def test_history_appends_entries( + self, tmp_path: Path, sample_trajectory_groups: list[TrajectoryGroup] + ): + """Verify multiple log calls append to history.jsonl.""" + model = Model( + name="test-model", + project="test-project", + base_path=str(tmp_path), + ) + + # Log twice + await model.log(sample_trajectory_groups, split="val") + await model.log(sample_trajectory_groups, split="train") + + history_path = tmp_path / "test-project/models/test-model/history.jsonl" + df = pl.read_ndjson(str(history_path)) + + # Should have 2 entries + assert len(df) == 2 + + # Check both splits are present + columns = df.columns + assert any("val/" in col for col in columns) + assert any("train/" in col for col in columns) + + +class TestPathStructure: + """Test that file paths match LocalBackend locations exactly.""" + + @pytest.mark.asyncio + async def test_file_locations_match_localbackend(self, tmp_path: Path): + """Verify files are written to expected paths.""" + model = Model( + name="mymodel", + project="myproj", + base_path=str(tmp_path), + ) + + trajectories = [ + TrajectoryGroup( + trajectories=[ + Trajectory( + reward=0.5, + messages_and_choices=[{"role": "user", "content": "test"}], + ) + ], + exceptions=[], + ) + ] + + await model.log(trajectories, split="val") + + # Verify exact paths + assert ( + tmp_path / "myproj/models/mymodel/trajectories/val/0000.parquet" + ).exists() + assert (tmp_path / "myproj/models/mymodel/history.jsonl").exists() + + @pytest.mark.asyncio + async def test_step_numbering_format(self, tmp_path: Path): + """Verify step numbers are zero-padded to 4 digits.""" + # Create a mock trainable model with step > 0 + model = TrainableModel( + name="mymodel", + project="myproj", + base_model="gpt-4", + base_path=str(tmp_path), + ) + + # Mock the backend and get_step + mock_backend = MagicMock() + mock_backend._get_step = AsyncMock(return_value=42) + model._backend = mock_backend + + trajectories = [ + TrajectoryGroup( + trajectories=[ + Trajectory( + reward=0.5, + messages_and_choices=[{"role": "user", "content": "test"}], + ) + ], + exceptions=[], + ) + ] + + await model.log(trajectories, split="train") + + # Verify zero-padded step in filename + assert ( + tmp_path / "myproj/models/mymodel/trajectories/train/0042.parquet" + ).exists() + + +class TestMetricCalculation: + """Test metric calculation and formatting.""" + + @pytest.mark.asyncio + async def test_metric_prefixes(self, tmp_path: Path): + """Verify metrics are prefixed with split name.""" + model = Model( + name="test", + project="test", + base_path=str(tmp_path), + ) + + trajectories = [ + TrajectoryGroup( + trajectories=[ + Trajectory( + reward=0.7, + metrics={"custom": 1.0}, + messages_and_choices=[{"role": "user", "content": "test"}], + ) + ], + exceptions=[], + ) + ] + + await model.log(trajectories, split="val") + + history_path = tmp_path / "test/models/test/history.jsonl" + with open(history_path) as f: + entry = json.loads(f.readline()) + + # All metrics should be prefixed (except step and recorded_at) + metric_keys = [k for k in entry.keys() if k not in ["step", "recorded_at"]] + assert all(k.startswith("val/") for k in metric_keys), ( + f"Not all metrics prefixed: {metric_keys}" + ) + + @pytest.mark.asyncio + async def test_standard_metrics_present(self, tmp_path: Path): + """Verify standard metrics (reward, exception_rate, reward_std_dev) are computed.""" + model = Model( + name="test", + project="test", + base_path=str(tmp_path), + ) + + trajectory_groups = [ + TrajectoryGroup( + trajectories=[ + Trajectory( + reward=0.8, + messages_and_choices=[{"role": "user", "content": "test1"}], + ), + Trajectory( + reward=0.6, + messages_and_choices=[{"role": "user", "content": "test2"}], + ), + ], + exceptions=[], + ) + ] + + await model.log(trajectory_groups, split="val") + + history_path = tmp_path / "test/models/test/history.jsonl" + with open(history_path) as f: + entry = json.loads(f.readline()) + + assert "val/reward" in entry + assert "val/exception_rate" in entry + assert "val/reward_std_dev" in entry + + # Check reward average is correct + assert entry["val/reward"] == 0.7 # (0.8 + 0.6) / 2 + + @pytest.mark.asyncio + async def test_exception_rate_calculation(self, tmp_path: Path): + """Verify exception_rate is calculated correctly for successful trajectories.""" + model = Model( + name="test", + project="test", + base_path=str(tmp_path), + ) + + # TrajectoryGroup stores trajectories and exceptions separately + # The Model.log() iterates over the group which yields trajectories and exceptions + trajectory_groups = [ + TrajectoryGroup( + trajectories=[ + Trajectory( + reward=0.5, + messages_and_choices=[{"role": "user", "content": "test"}], + ) + ], + exceptions=[], + ) + ] + + await model.log(trajectory_groups, split="val") + + history_path = tmp_path / "test/models/test/history.jsonl" + with open(history_path) as f: + entry = json.loads(f.readline()) + + # All successful trajectories = 0% exception rate + assert entry["val/exception_rate"] == 0.0 + + +class TestWandbIntegration: + """Test wandb integration logic (without mocking wandb itself).""" + + @pytest.mark.asyncio + async def test_wandb_not_called_without_api_key(self, tmp_path: Path): + """Verify _get_wandb_run returns None without WANDB_API_KEY.""" + # Ensure WANDB_API_KEY is not set + env_backup = os.environ.get("WANDB_API_KEY") + if "WANDB_API_KEY" in os.environ: + del os.environ["WANDB_API_KEY"] + + try: + model = Model( + name="test", + project="test", + base_path=str(tmp_path), + ) + + # Verify _get_wandb_run returns None when no API key + result = model._get_wandb_run() + assert result is None + finally: + if env_backup is not None: + os.environ["WANDB_API_KEY"] = env_backup + + def test_should_log_wandb_logic_default(self, tmp_path: Path): + """Test the should_log_wandb logic with default report_metrics.""" + model = Model( + name="test", + project="test", + base_path=str(tmp_path), + report_metrics=None, # Default + ) + + # With no API key and default report_metrics, should not log + env_backup = os.environ.get("WANDB_API_KEY") + if "WANDB_API_KEY" in os.environ: + del os.environ["WANDB_API_KEY"] + try: + should_log = ( + model.report_metrics is None and "WANDB_API_KEY" in os.environ + ) or (model.report_metrics is not None and "wandb" in model.report_metrics) + assert should_log is False + finally: + if env_backup is not None: + os.environ["WANDB_API_KEY"] = env_backup + + def test_should_log_wandb_logic_with_key(self, tmp_path: Path): + """Test the should_log_wandb logic with WANDB_API_KEY present.""" + model = Model( + name="test", + project="test", + base_path=str(tmp_path), + report_metrics=None, # Default + ) + + # With API key and default report_metrics, should log + with patch.dict(os.environ, {"WANDB_API_KEY": "test-key"}): + should_log = ( + model.report_metrics is None and "WANDB_API_KEY" in os.environ + ) or (model.report_metrics is not None and "wandb" in model.report_metrics) + assert should_log is True + + def test_should_log_wandb_logic_explicit_wandb(self, tmp_path: Path): + """Test the should_log_wandb logic with explicit wandb in report_metrics.""" + model = Model( + name="test", + project="test", + base_path=str(tmp_path), + report_metrics=["wandb"], + ) + + # With explicit wandb in report_metrics, should log regardless of env var + should_log = ( + model.report_metrics is None and "WANDB_API_KEY" in os.environ + ) or (model.report_metrics is not None and "wandb" in model.report_metrics) + assert should_log is True + + def test_should_log_wandb_logic_empty_list(self, tmp_path: Path): + """Test the should_log_wandb logic with empty report_metrics list.""" + model = Model( + name="test", + project="test", + base_path=str(tmp_path), + report_metrics=[], # Explicit empty list + ) + + # With empty report_metrics, should not log even with API key + with patch.dict(os.environ, {"WANDB_API_KEY": "test-key"}): + should_log = ( + model.report_metrics is None and "WANDB_API_KEY" in os.environ + ) or (model.report_metrics is not None and "wandb" in model.report_metrics) + assert should_log is False + + +class TestModelAttributes: + """Test new Model attributes.""" + + def test_base_path_default(self): + """Verify base_path defaults to '.art'.""" + model = Model(name="test", project="test") + assert model.base_path == ".art" + + def test_base_path_custom(self): + """Verify base_path can be customized.""" + model = Model(name="test", project="test", base_path="/custom/path") + assert model.base_path == "/custom/path" + + def test_report_metrics_default(self): + """Verify report_metrics defaults to None.""" + model = Model(name="test", project="test") + assert model.report_metrics is None + + def test_report_metrics_custom(self): + """Verify report_metrics can be customized.""" + model = Model(name="test", project="test", report_metrics=["wandb", "custom"]) + assert model.report_metrics == ["wandb", "custom"]