Trieye is a Python library designed to provide asynchronous statistics collection, processing, logging, and data persistence for machine learning experiments, particularly those using Ray for distributed computation. It aims to decouple these common operational concerns from the main application logic, offering a configurable and reusable solution via a dedicated Ray actor.
Key Features:
- Ray Actor Integration: Core functionality is exposed via a
TrieyeActorRay actor for asynchronous operation, minimizing impact on the main training loop. Configurable fault tolerance (max_restarts). - Configurable Statistics: Define metrics, aggregation methods (mean, sum, rate, latest, etc.), logging frequency (by step or time), and logging targets (MLflow, TensorBoard, console) via Pydantic models (
TrieyeConfig,StatsConfig,MetricConfig). - Flexible Event System: Applications send raw metric data points (
RawMetricEvent) to the actor, tagged with a global step and optional context dictionary. Events with non-finite values are automatically skipped. - Asynchronous Processing: The actor buffers raw events (
ActorState) and processes/aggregates them periodically (ActorLogic,StatsProcessor) based on configuration, preventing blocking in the main application. - MLflow & TensorBoard Integration: Automatically initializes and manages MLflow runs and TensorBoard writers within the actor, logging processed metrics and specified artifacts (checkpoints, buffers, configs).
- Centralized Data Persistence: Handles saving and loading of application state (e.g., model checkpoints, replay buffers) using
cloudpickleand Pydantic schemas (ActorLogic,Serializer). Manages all file paths internally within a structured directory (<ROOT_DATA_DIR>/<app_name>/runs/<run_name>/) based on the providedTrieyeConfig. Logs artifacts to MLflow. Includes logic for auto-resuming from the latest checkpoint of the most recent previous run. - Structured Configuration: Uses Pydantic for clear, validated configuration (
TrieyeConfig,PersistenceConfig,StatsConfig). - Testability: Designed with dependency injection in mind, allowing components like MLflow/TensorBoard clients to be mocked during testing.
pip install trieyeOr, for development:
git clone https://github.com/lguibr/trieye.git
cd trieye
pip install -e .[dev]Dependencies: Requires Python 3.10+ and relies on ray[default], mlflow, tensorboard, pydantic, cloudpickle, numpy, and torch.
The library separates concerns into distinct components, orchestrated by the TrieyeActor:
TrieyeActor(trieye/actor.py): The central Ray actor (@ray.remote). Acts as a façade, receiving external calls (log_event,save_training_state,load_initial_state,shutdown). Manages thread safety (threading.Lock) and delegates core tasks toActorLogic. Initializes and owns tracking clients (MLflow, TensorBoard) and thePathManagerunless injected for testing. Handles actor lifecycle and shutdown. Provides methods likeget_actor_name()andget_run_base_dir_str()for external components to query its identity and paths.ActorState(trieye/actor_state.py): Manages the internal, mutable state within the actor: raw event buffers, latest metric values, event timestamps for rate calculation, and tracking of the last processed step/time. Provides methods for adding events, retrieving data for processing, clearing processed data, and getting/setting persistable state (e.g.,last_processed_step).ActorLogic(trieye/actor_logic.py): Encapsulates the core business logic, independent of Ray actor specifics. Orchestrates interactions betweenActorState,PathManager,Serializer, andStatsProcessor. Contains the logic for processing stats, saving/loading checkpoints and buffers, handling auto-resume, and saving configuration files.TrieyeConfig(trieye/config.py): Top-level Pydantic configuration model, containingPersistenceConfigandStatsConfig. Defines application/run names. Passed to theTrieyeActoron initialization.PersistenceConfig(trieye/config.py): Defines the root data directory (ROOT_DATA_DIR), application name (APP_NAME), run name (RUN_NAME), buffer save frequency (BUFFER_SAVE_FREQ_STEPS), checkpoint frequency (CHECKPOINT_SAVE_FREQ_STEPS), auto-resume behavior (AUTO_RESUME_LATEST), and explicit load paths (LOAD_CHECKPOINT_PATH,LOAD_BUFFER_PATH). Does NOT define subdirectory names or specific filenames; these are internal details managed byPathManager.StatsConfig(trieye/config.py): Defines metric processing rules (aggregation, frequency, targets) via a list ofMetricConfig.MetricConfig(trieye/config.py): Defines a single metric to be tracked (name, source event, aggregation, logging frequency/targets, etc.). Includes validation for rate metrics.RawMetricEvent,CheckpointData,BufferData,LoadedTrainingState,LogContext(trieye/schemas.py): Pydantic models for structuring data (events, saved state, logging context).BufferDatastoreslist[Any], allowing applications to store arbitrary buffer content.CheckpointDataincludes actor state for seamless resumption.PathManager(trieye/path_manager.py): Internal component responsible for deriving all necessary absolute paths (checkpoints, buffers, logs, TensorBoard, MLflow URI) based only on thePersistenceConfigprovided (ROOT_DATA_DIR,APP_NAME,RUN_NAME). It encapsulates the directory structure logic (e.g.,<ROOT>/<APP>/runs/<RUN>/checkpoints/).Serializer(trieye/serializer.py): Handles serialization/deserialization usingcloudpickle(for checkpoints/buffers) and JSON (for configs). Includes logic to prepare optimizer state (move to CPU) before saving.StatsProcessor(trieye/stats_processor.py): Performs aggregation (mean, sum, rate, etc.) and logging logic based onStatsConfig. Interacts with MLflow Client and TensorBoard Writer. Used byActorLogic.exceptions.py(trieye/exceptions.py): Custom exception classes (ConfigurationError,SerializationError,ProcessingError).
import logging
import random
import time
import ray
# Import necessary components from trieye
from trieye import (
DEFAULT_METRICS, # Example using default metrics
BufferData,
CheckpointData,
MetricConfig,
PersistenceConfig, # Import PersistenceConfig if customizing
RawMetricEvent,
StatsConfig,
TrieyeActor,
TrieyeConfig,
)
# Configure logging for the application
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 1. Configure Trieye
# Example: Customize persistence and add one custom metric to defaults
my_metrics = DEFAULT_METRICS + [
MetricConfig(name="Custom/MyValue", source="custom", aggregation="mean", log_frequency_steps=20)
]
# --- Create TrieyeConfig ---
# Provide app_name and optionally run_name.
# Customize persistence behavior (frequencies, auto-resume) if needed.
# Trieye will handle path generation internally.
trieye_config = TrieyeConfig(
app_name="my_rl_app",
run_name=f"experiment_{time.strftime('%Y%m%d_%H%M%S')}",
persistence=PersistenceConfig( # Optional: Customize persistence behavior
BUFFER_SAVE_FREQ_STEPS=50, # Save buffer more frequently
CHECKPOINT_SAVE_FREQ_STEPS=100, # Save checkpoint every 100 steps
AUTO_RESUME_LATEST=True, # Enable auto-resume
# ROOT_DATA_DIR=".my_app_data" # Example: change root dir if needed
),
stats=StatsConfig( # Optional: Customize stats
processing_interval_seconds=2.0, # Process stats every 2 seconds
metrics=my_metrics # Use the combined list
)
)
# 2. Initialize Ray (if not already done)
if not ray.is_initialized():
# Set lower Ray logging level to reduce noise if desired
ray.init(logging_level=logging.WARNING)
# 3. Start the TrieyeActor
# Give the actor a unique name based on the run_name for potential reconnection
actor_name = f"trieye_actor_{trieye_config.run_name}"
try:
# Use get_actor for resilience if script restarts and actor still exists
trieye_actor = ray.get_actor(actor_name)
logger.info(f"Reconnected to existing TrieyeActor '{actor_name}'.")
except ValueError:
logger.info(f"Creating new TrieyeActor '{actor_name}'.")
# Pass the configured TrieyeConfig object
trieye_actor = TrieyeActor.options(name=actor_name, lifetime="detached").remote(config=trieye_config)
# Optional: Wait for actor to be ready by calling a method
try:
run_id = ray.get(trieye_actor.get_mlflow_run_id.remote(), timeout=10)
logger.info(f"TrieyeActor ready. MLflow Run ID: {run_id}")
# Get the run directory managed by the actor (optional)
run_dir = ray.get(trieye_actor.get_run_base_dir_str.remote(), timeout=5)
logger.info(f"TrieyeActor managing run directory: {run_dir}")
except Exception as e:
logger.error(f"Error waiting for TrieyeActor: {e}. Exiting.")
ray.shutdown()
exit(1)
# --- In your training loop ---
global_step = 0
buffer_size = 0
episodes_played = 0
my_buffer: list = []
my_model_state: dict = {}
my_optimizer_state: dict = {}
# Load initial state (example)
# Use the logic handler within the actor for loading
logger.info("Attempting to load initial state...")
# Actor handles finding the correct checkpoint/buffer based on its config
initial_state = ray.get(trieye_actor.load_initial_state.remote())
if initial_state.checkpoint_data:
cp_data = initial_state.checkpoint_data
global_step = cp_data.global_step
episodes_played = cp_data.episodes_played
my_model_state = cp_data.model_state_dict
my_optimizer_state = cp_data.optimizer_state_dict
# Actor state is restored internally by load_initial_state
logger.info(f"Resuming from Checkpoint: Step={global_step}, Episodes={episodes_played}")
# ... apply model/optimizer state to your actual model/optimizer ...
if initial_state.buffer_data:
my_buffer = initial_state.buffer_data.buffer_list
buffer_size = len(my_buffer)
logger.info(f"Loaded Buffer: {buffer_size} items")
# ... potentially validate/process loaded buffer content ...
logger.info("Starting training loop...")
try:
for i in range(global_step, global_step + 100): # Example loop continuing from loaded step
time.sleep(0.1)
current_step = i + 1 # Typically step increments after processing
# Simulate adding to buffer
my_buffer.append(f"experience_{current_step}")
buffer_size = len(my_buffer)
# Simulate training step
loss = random.random() * 0.1
my_model_state = {"param": current_step} # Update mock state
my_optimizer_state = {"opt_param": current_step}
# --- Log Metrics ---
# Use default metric names if defined in config
trieye_actor.log_event.remote(
RawMetricEvent(name="Loss/Total", value=loss, global_step=current_step)
)
# Log event for rate calculation (must match rate_numerator_event in config)
trieye_actor.log_event.remote(
RawMetricEvent(name="step_completed", value=1, global_step=current_step)
)
# Log custom metric
trieye_actor.log_event.remote(
RawMetricEvent(name="Custom/MyValue", value=current_step * 2, global_step=current_step)
)
# Log buffer size
trieye_actor.log_event.remote(
RawMetricEvent(name="Buffer/Size", value=buffer_size, global_step=current_step)
)
# Simulate episode end occasionally
if current_step % 10 == 0:
episodes_played += 1
score = random.random() * 100
length = random.randint(50, 150)
# Log event with context (must match raw_event_name and context_key in config)
trieye_actor.log_event.remote(
RawMetricEvent(
name="episode_end",
value=1, # Value often unused when context is primary data
global_step=current_step,
context={"score": score, "length": length}
)
)
# Log progress
trieye_actor.log_event.remote(
RawMetricEvent(name="Progress/Episodes_Played", value=episodes_played, global_step=current_step)
)
# --- Trigger Stats Processing ---
# Actor handles interval check internally. Fire-and-forget.
trieye_actor.process_and_log.remote(current_step)
# --- Save State Periodically ---
# Check frequencies based on the config passed to the actor
should_save_buffer = (current_step % trieye_config.persistence.BUFFER_SAVE_FREQ_STEPS == 0)
should_save_checkpoint = (current_step % trieye_config.persistence.CHECKPOINT_SAVE_FREQ_STEPS == 0)
if should_save_checkpoint:
logger.info(f"Requesting save at step {current_step}, save buffer: {should_save_buffer}")
# Pass actual state dicts and buffer content
mock_model_config = {"layers": 2} # Example config
mock_env_config = {"id": "env-v1"} # Example config
trieye_actor.save_training_state.remote(
nn_state_dict=my_model_state,
optimizer_state_dict=my_optimizer_state,
buffer_content=my_buffer, # Pass the actual buffer list
global_step=current_step,
episodes_played=episodes_played,
total_simulations_run=current_step * 100, # Example simulation count
save_buffer=should_save_buffer, # Pass flag based on config check
model_config_dict=mock_model_config,
env_config_dict=mock_env_config,
user_data={"custom_info": "value"}, # Optional extra data
is_best=False # Example: only save 'best' based on some evaluation condition
)
# Update global step for next iteration
global_step = current_step
except KeyboardInterrupt:
logger.info("Training interrupted by user.")
finally:
# --- Cleanup ---
logger.info("Training finished or interrupted. Shutting down TrieyeActor...")
if trieye_actor:
try:
# Force final stats processing with the last known step
logger.info(f"Forcing final log processing at step {global_step}...")
ray.get(trieye_actor.force_process_and_log.remote(global_step), timeout=15)
# Explicitly call shutdown to close files, TB writer, and potentially MLflow run
logger.info("Calling actor shutdown...")
ray.get(trieye_actor.shutdown.remote(), timeout=15)
logger.info("TrieyeActor shutdown complete.")
except Exception as e:
logger.error(f"Error during TrieyeActor final processing or shutdown: {e}")
# Optional: Force kill the actor if shutdown hangs or fails
logger.warning("Attempting to kill actor after shutdown error.")
try:
ray.kill(trieye_actor)
except Exception as kill_e:
logger.error(f"Error killing TrieyeActor: {kill_e}")
if ray.is_initialized():
ray.shutdown()
logger.info("Ray shut down. Done.")TrieyeConfig: Top-level config passed to theTrieyeActor.app_name: Namespace for data storage (<ROOT_DATA_DIR>/<app_name>).run_name: Specific identifier for the current run (defaults to timestamp).persistence:PersistenceConfiginstance.stats:StatsConfiginstance.
PersistenceConfig: Defines persistence behavior.ROOT_DATA_DIR: Root directory for all Trieye data (default:.trieye_data).SAVE_BUFFER: Whether to save the buffer (default:True).BUFFER_SAVE_FREQ_STEPS: Save buffer every N steps (default: 1000).CHECKPOINT_SAVE_FREQ_STEPS: Save checkpoint every N steps (default: 1000).AUTO_RESUME_LATEST: Enable auto-resume from previous run (default:True).LOAD_CHECKPOINT_PATH/LOAD_BUFFER_PATH: Explicit paths to load, overriding auto-resume (default:None).- Note: Specific paths like MLflow URI, TensorBoard dir, checkpoint dir are derived internally by
PathManagerbased onROOT_DATA_DIR,APP_NAME, andRUN_NAME.
StatsConfig:processing_interval_seconds: How often the actor processes buffered stats (e.g.,1.0for every second).metrics: A list ofMetricConfigobjects defining each metric to track.
MetricConfig:name: Final name used for logging (e.g., "Episode/Score"). Must be unique withinStatsConfig.source: Origin identifier (e.g., "worker", "trainer", "custom"). Used for organization/filtering (currently informational).raw_event_name: Thenamefield used inRawMetricEventif different from the finalname. IfNone, usesname. Important: This is the key the actor looks for in incoming events.aggregation: How to combine multiple raw values within a processing interval (mean,sum,latest,rate,min,max,std,count).log_frequency_steps/log_frequency_seconds: Control how often the aggregated metric is logged. Set to 0 to disable that frequency type. If both are > 0, logging occurs if either condition is met. If both are 0, the metric is aggregated but never logged automatically (useful for internal state).log_to: List of targets (mlflow,tensorboard,console).x_axis: Primary x-axis for logging (global_step,wall_time,episode). Note:wall_timeandepisodesupport might be limited depending on logging backend capabilities and how steps are provided.global_stepis the most common and reliable.context_key: If the value comes from thecontextdictionary withinRawMetricEvent, specify the key here (e.g., forepisode_endevents withcontext={"score": 123}). Thevaluefield of theRawMetricEventis ignored in this case.rate_numerator_event: Required ifaggregation="rate". Specifies theraw_event_namewhose summed values act as the numerator for the rate calculation (events per second). For example, ifrate_numerator_event="step_completed"and the event value is always 1, this calculates steps/sec. If the event value represents simulations run in that step, it calculates simulations/sec.
Testing Trieye involves several layers, following best practices for Ray applications:
- Unit Tests (Components): Individual components like
PathManager,Serializer,ActorState, andStatsProcessorare tested in isolation using standardpytestfixtures and mocks (unittest.mock). These tests verify the core logic without involving Ray. Seetests/test_path_manager.py,tests/test_serializer.py,tests/test_actor_state.py,tests/test_stats_processor.py. - Unit Tests (Actor Logic - Local Mode): The
TrieyeActor's methods are tested using Ray's local mode (ray.init(local_mode=True)). This executes the actor's code synchronously in the main process, allowing direct interaction and state inspection, bypassing Ray's pickling and scheduling. Dependencies like MLflow/TensorBoard clients are injected as mocks. This approach provides high code coverage for the actor's internal logic. Seetests/test_actor.py(tests usingtrieye_actor_localfixture). - Integration Tests (Remote Actor): A few tests run the
TrieyeActoras a remote actor (@ray.remote) on a minimal, multi-process Ray cluster initialized bypytest. These tests verify the actor's behavior in a distributed setting, including method calls viaactor.method.remote(), state persistence across calls, and interaction with injected mocks for external services. These tests ensure the actor functions correctly within the Ray ecosystem. Seetests/test_actor.py(tests marked with@pytest.mark.integrationusingtrieye_actor_integrationfixture). - Mocking: External dependencies (MLflow, TensorBoard, file I/O via
PathManager) are mocked usingunittest.mock.MagicMockand injected into the relevant components (TrieyeActor,ActorLogic,StatsProcessor) during testing. This isolates tests from external services and filesystem side effects. Module-level functions (likemlflow.start_run) are patched usingunittest.mock.patchwhere necessary. - Fixtures:
pytestfixtures (tests/conftest.py) provide reusable setup for configurations (TrieyeConfig), temporary directories (tmp_path), mock objects (mock_mlflow_client,mock_tb_writer), Ray initialization (session-scoped cluster, function-scoped local mode), and test data (RawMetricEvent,CheckpointData). - Coverage: Code coverage is measured using
pytest-cov, aiming for high coverage (>80%) of the core logic. Ray local mode helps include actor code in coverage reports.
This multi-layered approach ensures both the internal logic and the distributed behavior of the TrieyeActor are thoroughly validated.
Contributions are welcome! Please open an issue to discuss changes or submit a pull request. Ensure tests pass (pytest tests) and code meets formatting/linting standards (ruff check . --fix && ruff format .). Update type hints (mypy .) as needed.
This project is licensed under the MIT License - see the LICENSE file for details.
