diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..54c85285 Binary files /dev/null and b/.DS_Store differ diff --git a/profiling_results.prof b/profiling_results.prof new file mode 100644 index 00000000..bd0df6c8 Binary files /dev/null and b/profiling_results.prof differ diff --git a/syllabus/.DS_Store b/syllabus/.DS_Store new file mode 100644 index 00000000..6bbb9e98 Binary files /dev/null and b/syllabus/.DS_Store differ diff --git a/syllabus/core/.DS_Store b/syllabus/core/.DS_Store new file mode 100644 index 00000000..14b5a4e2 Binary files /dev/null and b/syllabus/core/.DS_Store differ diff --git a/syllabus/core/curriculum_base.py b/syllabus/core/curriculum_base.py index 4b027268..95bb3b86 100644 --- a/syllabus/core/curriculum_base.py +++ b/syllabus/core/curriculum_base.py @@ -3,9 +3,10 @@ from typing import Any, Callable, List, Tuple, Union import numpy as np -from gymnasium.spaces import Dict - +from gymnasium.spaces import Dict, Box +import random from syllabus.task_space import TaskSpace +from itertools import product from .stat_recorder import StatRecorder @@ -14,25 +15,36 @@ class Curriculum: """Base class and API for defining curricula to interface with Gym environments. """ - def __init__(self, task_space: TaskSpace, random_start_tasks: int = 0, task_names: Callable = None, record_stats: bool = False) -> None: + def __init__(self, task_space: TaskSpace, random_start_tasks: int = 0, task_names: Callable = None, record_stats: bool = False, warmup_strategy: str = None, warmup_samples: int = 0) -> None: """Initialize the base Curriculum :param task_space: the environment's task space from which new tasks are sampled - TODO: Implement this in a way that works with any curriculum, maybe as a wrapper - :param random_start_tasks: Number of uniform random tasks to sample before using the algorithm's sample method, defaults to 0 - TODO: Use task space for this + :param random_start_tasks: Number of tasks to sample randomly at the start, defaults to 0 :param task_names: Names of the tasks in the task space, defaults to None + :param record_stats: Boolean to indicate if statistics should be recorded, defaults to False + :param warmup_strategy: Strategy for warmup, defaults to None + :param warmup_samples: Number of warmup samples, defaults to 0 """ assert isinstance(task_space, TaskSpace), f"task_space must be a TaskSpace object. Got {type(task_space)} instead." self.task_space = task_space - self.random_start_tasks = random_start_tasks self.completed_tasks = 0 self.task_names = task_names if task_names is not None else lambda task, idx: idx self.n_updates = 0 + self.startup_sampled_tasks = 0 + self.warmup_strategy = warmup_strategy + self.warmup_tasks = warmup_samples + self.fix_curr_index = 0 self.stat_recorder = StatRecorder(self.task_space, task_names=task_names) if record_stats else None - if self.num_tasks == 0: + if warmup_strategy == "fix" and isinstance(self.task_space.gym_space, Box): + self.fix_box_space = self._initialize_fixed_grid() + + if self.num_tasks is None: + warnings.warn("Task space is continuous. Number of warmup tasks can't be compared to the task space size.") + elif self.num_tasks == 0: warnings.warn("Task space is empty. This will cause errors during sampling if no tasks are added.") + elif warmup_samples > self.num_tasks: + warnings.warn("Number of warmup tasks is larger than task space, some tasks will be replayed during warmup.") @property def requires_step_updates(self) -> bool: @@ -182,14 +194,47 @@ def _sample_distribution(self) -> List[float]: Any curriculum that maintains a true probability distribution should implement this method to retrieve it. """ raise NotImplementedError + + def _initialize_fixed_grid(self): + dims = self.task_space.gym_space.shape[0] + samples_per_dim = int(round(pow(self.warmup_tasks,(1 / dims)))) + ranges = [np.linspace(self.task_space.gym_space.low[i], self.task_space.gym_space.high[i], samples_per_dim) + for i in range(dims)] + all_points = list(product(*ranges)) + sampled_tasks = [tuple(point) for point in all_points] + + return sampled_tasks + + def _should_use_startup_sampling(self) -> bool: + return self.warmup_strategy != "none" and self.startup_sampled_tasks < self.warmup_tasks + + def _startup_sample(self, k: int) -> List: + sampled_tasks = [] + + if isinstance(self.task_space.gym_space, Box): + if self.warmup_strategy == "fix": + sampled_tasks = self.fix_box_space + self.fix_curr_index = (self.fix_curr_index + self.warmup_tasks) % len(sampled_tasks) + elif self.warmup_strategy == "random": + sampled_tasks = [self.task_space.gym_space.sample() for _ in range(k)] + + else: + if self.warmup_strategy == "fix": + if self.fix_curr_index + k > self.num_tasks: + sampled_tasks = self.tasks[self.fix_curr_index:self.num_tasks] + self.fix_curr_index = self.fix_curr_index + k - self.num_tasks + sampled_tasks.extend(self.tasks[0:(self.fix_curr_index)]) + else: + sampled_tasks = self.tasks[self.fix_curr_index:self.fix_curr_index + k] + self.fix_curr_index += k - def _should_use_startup_sampling(self) -> bool: - return self.random_start_tasks > 0 and self.completed_tasks < self.random_start_tasks - - def _startup_sample(self) -> List: - task_dist = [0.0 / self.num_tasks for _ in range(self.num_tasks)] - task_dist[0] = 1.0 - return task_dist + elif self.warmup_strategy == "random": + # Allows sampling with replacement, making duplicates possible if k > num_tasks. + indices = random.choices(range(self.num_tasks), k=k) + sampled_tasks = [self.tasks[idx] for idx in indices] + + self.startup_sampled_tasks += k + return sampled_tasks def sample(self, k: int = 1) -> Union[List, Any]: """Sample k tasks from the curriculum. @@ -200,14 +245,20 @@ def sample(self, k: int = 1) -> Union[List, Any]: # assert self.num_tasks > 0, "Task space is empty. Please add tasks to the curriculum before sampling." if self._should_use_startup_sampling(): - return self._startup_sample() - - # Use list of indices because np.choice does not play nice with tuple tasks - # tasks = self.tasks - n_tasks = self.num_tasks + tasks = self._startup_sample(k) + # Check if the startup sampling has satisfied the request or if there's no progress (no tasks returned) + if len(tasks) > 0 and len(tasks) < k: # Check if we need to add more tasks + additional_tasks = self.sample(k=k-len(tasks)) + tasks.extend(additional_tasks) + return tasks + task_dist = self._sample_distribution() - task_idx = np.random.choice(list(range(n_tasks)), size=k, p=task_dist) - return task_idx + + # Normal sampling process + tasks = self.tasks + n_tasks = len(tasks) + task_idx = np.random.choice(range(n_tasks), size=k, p=task_dist) + return [tasks[i] for i in task_idx] def log_metrics(self, writer, step=None, log_full_dist=False): """Log the task distribution to the provided tensorboard writer. diff --git a/syllabus/core/environment_sync_wrapper.py b/syllabus/core/environment_sync_wrapper.py index 4d968d6b..8faa3b40 100644 --- a/syllabus/core/environment_sync_wrapper.py +++ b/syllabus/core/environment_sync_wrapper.py @@ -87,6 +87,7 @@ def reset(self, *args, **kwargs): def step(self, action): obs, rew, term, trunc, info = step_api_compatibility(self.env.step(action), output_truncation_bool=True) + info["task"] = self.task_space.encode(self.get_task()) self.episode_length += 1 self.episode_return += rew self.task_progress = info.get("task_completion", 0.0) @@ -491,4 +492,4 @@ def add_task(self, task): def __getattr__(self, attr): env_attr = getattr(self.env, attr, None) if env_attr: - return env_attr + return env_attr \ No newline at end of file diff --git a/syllabus/curricula/.DS_Store b/syllabus/curricula/.DS_Store new file mode 100644 index 00000000..227266ec Binary files /dev/null and b/syllabus/curricula/.DS_Store differ diff --git a/syllabus/curricula/annealing_box.py b/syllabus/curricula/annealing_box.py index 101981c7..8dc8dd84 100644 --- a/syllabus/curricula/annealing_box.py +++ b/syllabus/curricula/annealing_box.py @@ -48,6 +48,9 @@ def sample(self, k: int = 1) -> Union[List, Any]: Sample k tasks from the curriculum. """ # Linear annealing from start_values to end_values + if self._should_use_startup_sampling(): + return self._startup_sample(k) + annealed_values = ( self.start_values + (self.end_values - self.start_values) * np.minimum(self.current_step, self.total_steps) / self.total_steps diff --git a/syllabus/curricula/plr/central_plr_wrapper.py b/syllabus/curricula/plr/central_plr_wrapper.py index 7f69ea85..5092ce7b 100644 --- a/syllabus/curricula/plr/central_plr_wrapper.py +++ b/syllabus/curricula/plr/central_plr_wrapper.py @@ -215,9 +215,9 @@ def _sample_distribution(self) -> List[float]: def sample(self, k: int = 1) -> Union[List, Any]: self.num_samples += 1 if self._should_use_startup_sampling(): - return self._startup_sample() - else: - return [self._task_sampler.sample() for _ in range(k)] + return self._startup_sample(k) + + return [self._task_sampler.sample() for _ in range(k)] def _enumerate_tasks(self, space): assert isinstance(space, Discrete) or isinstance(space, MultiDiscrete), f"Unsupported task space {space}: Expected Discrete or MultiDiscrete" diff --git a/syllabus/curricula/plr/plr_wrapper.py b/syllabus/curricula/plr/plr_wrapper.py index fd7027f6..bc07cc86 100644 --- a/syllabus/curricula/plr/plr_wrapper.py +++ b/syllabus/curricula/plr/plr_wrapper.py @@ -216,9 +216,9 @@ def _sample_distribution(self) -> List[float]: def sample(self, k: int = 1) -> Union[List, Any]: if self._should_use_startup_sampling(): - return self._startup_sample() - else: - return [self._task_sampler.sample() for _ in range(k)] + return self._startup_sample(k) + + return [self._task_sampler.sample() for _ in range(k)] def update_on_step(self, task, obs, rew, term, trunc, info, env_id: int = None) -> None: """ diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index c1e97a18..6a6dc219 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -311,6 +311,8 @@ def sample_weights(self): self.staleness_temperature, self.task_staleness, ) + if np.isclose(np.sum(staleness_weights), 0): + staleness_weights = np.ones_like(staleness_weights, dtype=float) / len(staleness_weights) staleness_weights = staleness_weights * (1 - self.unseen_task_weights) z = np.sum(staleness_weights) if z > 0: diff --git a/syllabus/curricula/sequential.py b/syllabus/curricula/sequential.py index 0dea9b92..8d9e4cf2 100644 --- a/syllabus/curricula/sequential.py +++ b/syllabus/curricula/sequential.py @@ -15,13 +15,15 @@ class SequentialCurriculum(Curriculum): REQUIRES_EPISODE_UPDATES = True REQUIRES_CENTRAL_UPDATES = False - def __init__(self, curriculum_list: List[Curriculum], stopping_conditions: List[Any], *curriculum_args, return_buffer_size: int = 1000, **curriculum_kwargs): + def __init__(self, curriculum_list: List[Curriculum], stopping_conditions: List[Any], *curriculum_args, custom_metrics: dict = None, return_buffer_size: int = 1000, **curriculum_kwargs): super().__init__(*curriculum_args, **curriculum_kwargs) assert len(curriculum_list) > 0, "Must provide at least one curriculum" assert len(stopping_conditions) == len(curriculum_list) - 1, f"Stopping conditions must be one less than the number of curricula. Final curriculum is used for the remainder of training. Expected {len(curriculum_list) - 1}, got {len(stopping_conditions)}." if len(curriculum_list) == 1: - warnings.warn("Your sequential curriculum only containes one element. Consider using that element directly instead.") + warnings.warn("Your sequential curriculum only contains one element. Consider using that element directly instead.") + self.custom_metrics = custom_metrics if custom_metrics is not None else {} # Stores the functions that can calculate the data during training + self.metric_values = {metric: None for metric in self.custom_metrics.keys()} # Initialize the metric values dictionary with None self.curriculum_list = self._parse_curriculum_list(curriculum_list) self.stopping_conditions = self._parse_stopping_conditions(stopping_conditions) self._curriculum_index = 0 @@ -83,23 +85,25 @@ def _parse_condition_string(self, condition: str) -> Callable: try: metric, comparator, value = clauses - - if metric == "steps": - metric_fn = self._get_steps - elif metric == "total_steps": - metric_fn = self._get_total_steps - elif metric == "episodes": - metric_fn = self._get_episodes - elif metric == "total_episodes": - metric_fn = self._get_total_episodes - elif metric == "tasks": - metric_fn = self._get_tasks - elif metric == "total_tasks": - metric_fn = self._get_total_tasks - elif metric == "episode_return": - metric_fn = self._get_episode_return + if metric in self.metric_values: + metric_fn = lambda: self.metric_values[metric] else: - raise ValueError(f"Invalid metric name: {metric}") + if metric == "steps": + metric_fn = self._get_steps + elif metric == "total_steps": + metric_fn = self._get_total_steps + elif metric == "episodes": + metric_fn = self._get_episodes + elif metric == "total_episodes": + metric_fn = self._get_total_episodes + elif metric == "tasks": + metric_fn = self._get_tasks + elif metric == "total_tasks": + metric_fn = self._get_total_tasks + elif metric == "episode_return": + metric_fn = self._get_episode_return + else: + raise ValueError(f"Invalid metric name: {metric}") if comparator == '<': return lambda: metric_fn() < float(value) @@ -164,6 +168,7 @@ def sample(self, k: int = 1) -> Union[List, Any]: return recoded_tasks def update_on_episode(self, episode_return, episode_len, episode_task, env_id=None): + print(self.metric_values) self.n_episodes += 1 self.total_episodes += 1 self.n_steps += episode_len diff --git a/syllabus/curricula/simple_box.py b/syllabus/curricula/simple_box.py index 7f14572c..d3388a81 100644 --- a/syllabus/curricula/simple_box.py +++ b/syllabus/curricula/simple_box.py @@ -60,14 +60,7 @@ def sample(self, k: int = 1) -> Union[List, Any]: """ Sample k tasks from the curriculum. """ - return [self.max_range for _ in range(k)] - - def log_metrics(self, writer, step=None): - print("Logging", flush=True) - try: - import wandb - writer.log({"range_min": self.max_range[0]}, step=step) - writer.log({"range_max": self.max_range[1]}, step=step) - - except ImportError: - pass + if self._should_use_startup_sampling(): + return self._startup_sample(k) + + return [self.max_range for _ in range(k)] \ No newline at end of file diff --git a/syllabus/examples/experimental/sb3_procgen_plr.py b/syllabus/examples/experimental/sb3_procgen_plr.py index f9cde9f9..a5285994 100644 --- a/syllabus/examples/experimental/sb3_procgen_plr.py +++ b/syllabus/examples/experimental/sb3_procgen_plr.py @@ -1,38 +1,181 @@ +import argparse +import os +import random +import time +from distutils.util import strtobool from typing import Callable -import gym +import gym as openai_gym +import gymnasium as gym +import numpy as np import procgen # noqa: F401 +import torch import wandb +from procgen import ProcgenEnv +from shimmy.openai_gym_compatibility import GymV21CompatibilityV0 from stable_baselines3 import PPO from stable_baselines3.common.callbacks import BaseCallback, CallbackList -from stable_baselines3.common.vec_env import (DummyVecEnv, VecMonitor, - VecNormalize) -from syllabus.core import (MultiProcessingSyncWrapper, - make_multiprocessing_curriculum) -from syllabus.curricula import CentralizedPrioritizedLevelReplay +from stable_baselines3.common.vec_env import DummyVecEnv, VecMonitor, VecNormalize +from syllabus.core import MultiProcessingSyncWrapper, make_multiprocessing_curriculum +from syllabus.curricula import CentralizedPrioritizedLevelReplay, DomainRandomization +from syllabus.examples.models import SB3ResNetBase, Sb3ProcgenAgent from syllabus.examples.task_wrappers import ProcgenTaskWrapper +from syllabus.examples.utils.vecenv import VecMonitor as SyllabusVecMonitor, VecNormalize as SyllabusVecNormalize, VecExtractDictObs as SyllabusVecExtractDictObs + +from torch.utils.tensorboard import SummaryWriter from wandb.integration.sb3 import WandbCallback -def make_env(task_queue, update_queue, start_level=0, num_levels=1): +def parse_args(): + # fmt: off + parser = argparse.ArgumentParser() + parser.add_argument("--exp-name", type=str, default=os.path.basename(__file__).rstrip(".py"), + help="the name of this experiment") + parser.add_argument("--seed", type=int, default=1, + help="seed of the experiment") + parser.add_argument("--torch-deterministic", type=lambda x: bool(strtobool(x)), default=True, nargs="?", const=True, + help="if toggled, `torch.backends.cudnn.deterministic=False`") + parser.add_argument("--cuda", type=lambda x: bool(strtobool(x)), default=True, nargs="?", const=True, + help="if toggled, cuda will be enabled by default") + parser.add_argument("--track", type=lambda x: bool(strtobool(x)), default=False, nargs="?", const=True, + help="if toggled, this experiment will be tracked with Weights and Biases") + parser.add_argument("--wandb-project-name", type=str, default="syllabus", + help="the wandb's project name") + parser.add_argument("--wandb-entity", type=str, default=None, + help="the entity (team) of wandb's project") + parser.add_argument("--capture-video", type=lambda x: bool(strtobool(x)), default=False, nargs="?", const=True, + help="weather to capture videos of the agent performances (check out `videos` folder)") + parser.add_argument("--logging-dir", type=str, default=".", + help="the base directory for logging and wandb storage.") + + # Algorithm specific arguments + parser.add_argument("--env-id", type=str, default="starpilot", + help="the id of the environment") + parser.add_argument("--total-timesteps", type=int, default=int(25e6), + help="total timesteps of the experiments") + parser.add_argument("--learning-rate", type=float, default=5e-4, + help="the learning rate of the optimizer") + parser.add_argument("--num-envs", type=int, default=64, + help="the number of parallel game environments") + parser.add_argument("--num-steps", type=int, default=256, + help="the number of steps to run in each environment per policy rollout") + parser.add_argument("--anneal-lr", type=lambda x: bool(strtobool(x)), default=False, nargs="?", const=True, + help="Toggle learning rate annealing for policy and value networks") + parser.add_argument("--gae", type=lambda x: bool(strtobool(x)), default=True, nargs="?", const=True, + help="Use GAE for advantage computation") + parser.add_argument("--gamma", type=float, default=0.999, + help="the discount factor gamma") + parser.add_argument("--gae-lambda", type=float, default=0.95, + help="the lambda for the general advantage estimation") + parser.add_argument("--num-minibatches", type=int, default=8, + help="the number of mini-batches") + parser.add_argument("--update-epochs", type=int, default=3, + help="the K epochs to update the policy") + parser.add_argument("--norm-adv", type=lambda x: bool(strtobool(x)), default=True, nargs="?", const=True, + help="Toggles advantages normalization") + parser.add_argument("--clip-coef", type=float, default=0.2, + help="the surrogate clipping coefficient") + parser.add_argument("--clip-vloss", type=lambda x: bool(strtobool(x)), default=True, nargs="?", const=True, + help="Toggles whether or not to use a clipped loss for the value function, as per the paper.") + parser.add_argument("--ent-coef", type=float, default=0.01, + help="coefficient of the entropy") + parser.add_argument("--vf-coef", type=float, default=0.5, + help="coefficient of the value function") + parser.add_argument("--max-grad-norm", type=float, default=0.5, + help="the maximum norm for the gradient clipping") + parser.add_argument("--target-kl", type=float, default=None, + help="the target KL divergence threshold") + + # Procgen arguments + parser.add_argument("--full-dist", type=lambda x: bool(strtobool(x)), default=True, nargs="?", const=True, + help="Train on full distribution of levels.") + + # Curriculum arguments + parser.add_argument("--curriculum", type=lambda x: bool(strtobool(x)), default=False, nargs="?", const=True, + help="if toggled, this experiment will use curriculum learning") + parser.add_argument("--curriculum-method", type=str, default="plr", + help="curriculum method to use") + parser.add_argument("--num-eval-episodes", type=int, default=10, + help="the number of episodes to evaluate the agent on after each policy update.") + + args = parser.parse_args() + args.batch_size = int(args.num_envs * args.num_steps) + args.minibatch_size = int(args.batch_size // args.num_minibatches) + # fmt: on + return args + + +PROCGEN_RETURN_BOUNDS = { + "coinrun": (5, 10), + "starpilot": (2.5, 64), + "caveflyer": (3.5, 12), + "dodgeball": (1.5, 19), + "fruitbot": (-1.5, 32.4), + "chaser": (0.5, 13), + "miner": (1.5, 13), + "jumper": (3, 10), + "leaper": (3, 10), + "maze": (5, 10), + "bigfish": (1, 40), + "heist": (3.5, 10), + "climber": (2, 12.6), + "plunder": (4.5, 30), + "ninja": (3.5, 10), + "bossfight": (0.5, 13), +} + +def make_env(env_id, seed, curriculum=None, start_level=0, num_levels=1): def thunk(): - env = gym.make("procgen-bigfish-v0", distribution_mode="easy", start_level=start_level, num_levels=num_levels) - env = ProcgenTaskWrapper(env) - env = MultiProcessingSyncWrapper( - env, - task_queue, - update_queue, - update_on_step=False, - task_space=env.task_space, - ) + env = openai_gym.make(f"procgen-{env_id}-v0", distribution_mode="easy", start_level=start_level, num_levels=num_levels) + env = GymV21CompatibilityV0(env=env) + if curriculum is not None: + components = curriculum.get_components() # This must be safe to call here + env = ProcgenTaskWrapper(env, env_id, seed=seed) + env = MultiProcessingSyncWrapper( + env=env, + components=components, + update_on_step=False, + task_space=env.task_space, + ) return env return thunk +def level_replay_evaluate_sb3(env_name, model, num_episodes, num_levels=0): + eval_envs = ProcgenEnv( + num_envs=args.num_eval_episodes, + env_name=env_name, + num_levels=num_levels, + start_level=0, + distribution_mode="easy", + paint_vel_info=False + ) + model.policy.set_training_mode(False) + eval_envs = SyllabusVecExtractDictObs(eval_envs, "rgb") + eval_envs = SyllabusVecMonitor(venv=eval_envs, filename=None, keep_buf=100) + eval_envs = SyllabusVecNormalize(venv=eval_envs, ob=False, ret=True) + eval_obs, _ = eval_envs.reset() + eval_episode_rewards = [-1] * num_episodes + + while -1 in eval_episode_rewards: + with torch.no_grad(): + eval_action, _states = model.predict(eval_obs, deterministic=False) + eval_obs, _, _, _, infos = eval_envs.step(eval_action) + for i, info in enumerate(infos): + if 'episode' in info.keys() and eval_episode_rewards[i] == -1: + eval_episode_rewards[i] = info['episode']['r'] + + model.policy.set_training_mode(True) + mean_returns = np.mean(eval_episode_rewards) + stddev_returns = np.std(eval_episode_rewards) + env_min, env_max = PROCGEN_RETURN_BOUNDS[args.env_id] + normalized_mean_returns = (mean_returns - env_min) / (env_max - env_min) + return mean_returns, stddev_returns, normalized_mean_returns + + def wrap_vecenv(vecenv): - vecenv.is_vector_env = True vecenv = VecMonitor(venv=vecenv, filename=None) - vecenv = VecNormalize(venv=vecenv, norm_obs=False, norm_reward=True) + vecenv = VecNormalize(venv=vecenv, norm_obs=False, norm_reward=True, training=True) return vecenv @@ -42,75 +185,154 @@ class CustomCallback(BaseCallback): :param verbose: Verbosity level: 0 for no output, 1 for info messages, 2 for debug messages """ - def __init__(self, curriculum, verbose=0): + def __init__(self, curriculum, model, verbose=0): super().__init__(verbose) self.curriculum = curriculum + self.model = model def _on_step(self) -> bool: - tasks = self.training_env.venv.venv.venv.get_attr("task") - - update = { - "update_type": "on_demand", - "metrics": { - "value": self.locals["values"], - "next_value": self.locals["values"], - "rew": self.locals["rewards"], - "dones": self.locals["dones"], - "tasks": tasks, - }, - } - self.curriculum.update_curriculum(update) + if self.curriculum is not None and type(self.curriculum.curriculum) is CentralizedPrioritizedLevelReplay: + self.update_locals(self.locals) + tasks = [i["task"] for i in self.locals["infos"]] + obs = self.locals['new_obs'] + obs_tensor = torch.tensor(obs, dtype=torch.float32).to(self.model.device) + with torch.no_grad(): + new_value = self.model.policy.predict_values(obs_tensor) + update = { + "update_type": "on_demand", + "metrics": { + "value": self.locals["values"], + "next_value": new_value, + "rew": self.locals["rewards"], + "dones": self.locals["dones"], + "tasks": tasks, + }, + } + self.curriculum.update(update) + del obs_tensor + del obs return True + def _on_rollout_end(self) -> None: + mean_eval_returns, stddev_eval_returns, normalized_mean_eval_returns = level_replay_evaluate_sb3(args.env_id, self.model, args.num_eval_episodes, num_levels=0) + mean_train_returns, stddev_train_returns, normalized_mean_train_returns = level_replay_evaluate_sb3(args.env_id, self.model, args.num_eval_episodes, num_levels=200) + writer.add_scalar("test_eval/mean_episode_return", mean_eval_returns, self.num_timesteps) + writer.add_scalar("test_eval/normalized_mean_eval_return", normalized_mean_eval_returns, self.num_timesteps) + writer.add_scalar("test_eval/stddev_eval_return", stddev_eval_returns, self.num_timesteps) + writer.add_scalar("train_eval/mean_episode_return", mean_train_returns, self.num_timesteps) + writer.add_scalar("train_eval/normalized_mean_train_return", normalized_mean_train_returns, self.num_timesteps) + writer.add_scalar("train_eval/stddev_train_return", stddev_train_returns, self.num_timesteps) + if self.curriculum is not None: + self.curriculum.log_metrics(writer, step=self.num_timesteps) -def linear_schedule(initial_value: float) -> Callable[[float], float]: - def func(progress_remaining: float) -> float: - return progress_remaining * initial_value - return func +if __name__ == "__main__": + args = parse_args() + run_name = f"{args.env_id}__{args.exp_name}__{args.seed}__{int(time.time())}" + if args.track: + import wandb -run = wandb.init( - project="sb3", - entity="ryansullivan", - sync_tensorboard=True, # auto-upload sb3's tensorboard metrics - save_code=True, # optional -) + run = wandb.init( + project=args.wandb_project_name, + entity=args.wandb_entity, + sync_tensorboard=True, + config=vars(args), + name=run_name, + monitor_gym=True, + save_code=True, + dir=args.logging_dir, + ) + + writer = SummaryWriter(os.path.join(args.logging_dir, "./runs/{run_name}")) + writer.add_text( + "hyperparameters", + "|param|value|\n|-|-|\n%s" % ("\n".join([f"|{key}|{value}|" for key, value in vars(args).items()])), + ) + + # TRY NOT TO MODIFY: seeding + random.seed(args.seed) + np.random.seed(args.seed) + torch.manual_seed(args.seed) + torch.backends.cudnn.deterministic = args.torch_deterministic + + device = torch.device("cuda" if torch.cuda.is_available() and args.cuda else "cpu") + print("Device:", device) + # Curriculum setup + curriculum = None + if args.curriculum: + sample_env = openai_gym.make(f"procgen-{args.env_id}-v0") + sample_env = GymV21CompatibilityV0(env=sample_env) + sample_env = ProcgenTaskWrapper(sample_env, args.env_id, seed=args.seed) -sample_env = gym.make("procgen-bigfish-v0") -sample_env = ProcgenTaskWrapper(sample_env) -curriculum = CentralizedPrioritizedLevelReplay(sample_env.task_space, num_processes=64, num_steps=256) -curriculum, task_queue, update_queue = make_multiprocessing_curriculum(curriculum) -venv = DummyVecEnv( - [ - make_env(task_queue, update_queue, num_levels=0) - for i in range(64) + # Intialize Curriculum Method + if args.curriculum_method == "plr": + print("Using prioritized level replay.") + curriculum = CentralizedPrioritizedLevelReplay( + sample_env.task_space, + num_steps=args.num_steps, + num_processes=args.num_envs, + gamma=args.gamma, + gae_lambda=args.gae_lambda, + task_sampler_kwargs_dict={"strategy": "value_l1"} + ) + elif args.curriculum_method == "dr": + print("Using domain randomization.") + curriculum = DomainRandomization(sample_env.task_space) + else: + raise ValueError(f"Unknown curriculum method {args.curriculum_method}") + curriculum = make_multiprocessing_curriculum(curriculum) + del sample_env + + + # env setup + print("Creating env") + venv_fn = [ + make_env( + args.env_id, + args.seed + i, + curriculum=curriculum if args.curriculum else None, + num_levels=1 if args.curriculum else 0 + ) + for i in range(args.num_envs) ] -) -venv = wrap_vecenv(venv) - -model = PPO( - "CnnPolicy", - venv, - verbose=1, - n_steps=256, - learning_rate=linear_schedule(0.0005), - gamma=0.999, - gae_lambda=0.95, - n_epochs=3, - clip_range_vf=0.2, - ent_coef=0.01, - batch_size=256 * 64, - tensorboard_log="runs/testing" -) - -wandb_callback = WandbCallback( - model_save_path=f"models/{run.id}", - verbose=2, -) -plr_callback = CustomCallback(curriculum) -callback = CallbackList([wandb_callback, plr_callback]) -model.learn( - 25000000, - callback=callback, -) + venv = DummyVecEnv(venv_fn) + venv = wrap_vecenv(venv) + assert isinstance(venv.action_space, gym.spaces.discrete.Discrete), "only discrete action space is supported" + + print("Creating model") + model = PPO( + Sb3ProcgenAgent, + venv, + verbose=1, + n_steps=256, + learning_rate=0.0005, + gamma=0.999, + gae_lambda=0.95, + n_epochs=3, + clip_range_vf=0.2, + ent_coef=0.01, + batch_size=2048, + policy_kwargs={ + "hidden_size": 256, + "features_extractor_class": SB3ResNetBase, + "features_extractor_kwargs": {'num_inputs': 3, "features_dim": 256}, + "normalize_images": False + } + ) + + plr_callback = CustomCallback(curriculum, model, venv) + + if args.track: + wandb_callback = WandbCallback( + model_save_path=f"models/{run.id}", + verbose=2, + ) + callback = CallbackList([wandb_callback, plr_callback]) + else: + callback = plr_callback + + model.learn( + 25000000, + callback=callback, + ) diff --git a/syllabus/examples/models/__init__.py b/syllabus/examples/models/__init__.py index 120782f4..92dbdf99 100644 --- a/syllabus/examples/models/__init__.py +++ b/syllabus/examples/models/__init__.py @@ -1,3 +1,3 @@ from .minigrid_model import MinigridAgent -from .procgen_model import ProcgenAgent +from .procgen_model import ProcgenAgent, SB3ResNetBase, Sb3ProcgenAgent # from .nethack_model import ChaoticDwarf diff --git a/syllabus/examples/models/procgen_model.py b/syllabus/examples/models/procgen_model.py index 6e6a2db9..c44377e7 100644 --- a/syllabus/examples/models/procgen_model.py +++ b/syllabus/examples/models/procgen_model.py @@ -1,8 +1,13 @@ +from typing import Callable + import numpy as np import torch import torch.nn as nn import torch.nn.functional as F +import gymnasium as gym +from stable_baselines3.common.torch_layers import MlpExtractor +from stable_baselines3.common.policies import ActorCriticPolicy def init(module, weight_init, bias_init, gain=1): weight_init(module.weight.data, gain=gain) @@ -153,8 +158,10 @@ def __init__(self, obs_shape, num_actions, arch='small', base_kwargs=None): self.base = base(obs_shape[0], **base_kwargs) self.dist = Categorical(self.base.output_size, num_actions) - self.latent_dim_pi = 256 + self.critic_linear = init_(nn.Linear(base_kwargs.get("hidden_size"), 1)) + self.latent_dim_vf = 256 + self.latent_dim_pi = 256 @property def is_recurrent(self): @@ -165,6 +172,16 @@ def recurrent_hidden_state_size(self): """Size of rnn_hx.""" return self.base.recurrent_hidden_state_size + def forward_critic(self, features): + values, _ = self.base(features) + return values + + def forward_actor(self, features): + value, actor_features = self.base(features) + dist = self.dist(actor_features) + dist = dist.sample().float() + return dist + def forward(self, inputs): value, actor_features, rnn_hxs = self.base(inputs, None, None) dist = self.dist(actor_features) @@ -238,8 +255,6 @@ def __init__(self, num_inputs, recurrent=False, hidden_size=64): init_tanh_(nn.Linear(num_inputs, hidden_size)), nn.Tanh(), init_tanh_(nn.Linear(hidden_size, hidden_size)), nn.Tanh()) - self.critic_linear = init_(nn.Linear(hidden_size, 1)) - self.train() def forward(self, inputs): @@ -248,7 +263,7 @@ def forward(self, inputs): hidden_critic = self.critic(x) hidden_actor = self.actor(x) - return self.critic_linear(hidden_critic), hidden_actor + return hidden_critic, hidden_actor class BasicBlock(nn.Module): @@ -281,9 +296,9 @@ def forward(self, x): class ResNetBase(NNBase): """ - Residual Network + Residual Network """ - def __init__(self, num_inputs, recurrent=False, hidden_size=256, channels=[16, 32, 32]): + def __init__(self, num_inputs=16, recurrent=False, hidden_size=256, channels=[16, 32, 32]): super(ResNetBase, self).__init__(recurrent, num_inputs, hidden_size) self.layer1 = self._make_layer(num_inputs, channels[0]) @@ -294,7 +309,6 @@ def __init__(self, num_inputs, recurrent=False, hidden_size=256, channels=[16, 3 self.relu = nn.ReLU() self.fc = init_relu_(nn.Linear(2048, hidden_size)) - self.critic_linear = init_(nn.Linear(hidden_size, 1)) apply_init_(self.modules()) @@ -317,11 +331,9 @@ def forward(self, inputs): x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) - x = self.relu(self.flatten(x)) x = self.relu(self.fc(x)) - - return self.critic_linear(x), x + return x class SmallNetBase(NNBase): @@ -338,7 +350,6 @@ def __init__(self, num_inputs, recurrent=False, hidden_size=256): self.relu = nn.ReLU() self.fc = init_relu_(nn.Linear(2048, hidden_size)) - self.critic_linear = init_(nn.Linear(hidden_size, 1)) apply_init_(self.modules()) @@ -346,30 +357,31 @@ def __init__(self, num_inputs, recurrent=False, hidden_size=256): def forward(self, inputs): x = inputs - x = self.relu(self.conv1(x)) x = self.relu(self.conv2(x)) x = self.flatten(x) x = self.relu(self.fc(x)) - - return self.critic_linear(x), x + return x class ProcgenAgent(Policy): def __init__(self, obs_shape, num_actions, arch='small', base_kwargs=None): + h, w, c = obs_shape shape = (c, h, w) super().__init__(shape, num_actions, arch=arch, base_kwargs=base_kwargs) def get_value(self, x): new_x = x.permute((0, 3, 1, 2)) / 255.0 - value, _ = self.base(new_x) + features = self.base(new_x) + value = self.critic_linear(features) return value def get_action_and_value(self, x, action=None, full_log_probs=False, deterministic=False): new_x = x.permute((0, 3, 1, 2)) / 255.0 - value, actor_features = self.base(new_x) - dist = self.dist(actor_features) + features = self.base(new_x) + value = self.critic_linear(features) + dist = self.dist(features) if action is None: action = dist.mode() if deterministic else dist.sample() @@ -381,3 +393,58 @@ def get_action_and_value(self, x, action=None, full_log_probs=False, determinist return torch.squeeze(action), action_log_probs, dist_entropy, value, log_probs return torch.squeeze(action), action_log_probs, dist_entropy, value + + +class SB3ResNetBase(ResNetBase): + def __init__(self, observation_space, features_dim: int = 0, **kwargs) -> None: + super().__init__(**kwargs) + assert features_dim > 0 + self._observation_space = observation_space + self._features_dim = features_dim + + @property + def features_dim(self) -> int: + return self._features_dim + + def forward(self, inputs): + return super().forward(inputs / 255.0) + + +class Sb3ProcgenAgent(ActorCriticPolicy): + def __init__( + self, + observation_space: gym.spaces.Space, + action_space: gym.spaces.Space, + lr_schedule: Callable[[float], float], + *args, + hidden_size=256, + **kwargs + ): + + self.shape = observation_space.shape + self.num_actions = action_space.n + self.hidden_size = hidden_size + + super(Sb3ProcgenAgent, self).__init__( + observation_space, + action_space, + lr_schedule, + *args, + **kwargs + ) + self.ortho_init = False + self.action_net.is_target_net = True + self.value_net.is_target_net = True + self.apply(self.init_weights) + + def _build_mlp_extractor(self) -> None: + self.mlp_extractor = MlpExtractor(self.hidden_size, [], None) + + def init_weights(self, m, **kwargs): + if hasattr(m, 'is_target_net') and m.is_target_net: + if m is self.action_net: + nn.init.orthogonal_(m.weight, gain=0.01) + nn.init.constant_(m.bias, 0) + if m is self.value_net: + nn.init.orthogonal_(m.weight, gain=1.0) + nn.init.constant_(m.bias, 0) \ No newline at end of file diff --git a/syllabus/examples/task_wrappers/cartpole_task_wrapper.py b/syllabus/examples/task_wrappers/cartpole_task_wrapper.py index af319e8a..f07cb6d0 100644 --- a/syllabus/examples/task_wrappers/cartpole_task_wrapper.py +++ b/syllabus/examples/task_wrappers/cartpole_task_wrapper.py @@ -1,3 +1,5 @@ +import warnings + from gymnasium.spaces import Box from syllabus.core import TaskWrapper @@ -15,6 +17,9 @@ def reset(self, *args, **kwargs): self.total_reward = 0 if "new_task" in kwargs: new_task = kwargs.pop("new_task") + if new_task[0] > new_task[1]: + warnings.warn("Provided lower bound was higher than upper bound. Swapping the bounds.") + new_task = sorted(new_task) self.task = new_task return self.env.reset(options={"low": self.task[0], "high": self.task[1]}) diff --git a/syllabus/examples/training_scripts/cleanrl_procgen_centralplr.py b/syllabus/examples/training_scripts/cleanrl_procgen_centralplr.py index c002772f..0c66e954 100644 --- a/syllabus/examples/training_scripts/cleanrl_procgen_centralplr.py +++ b/syllabus/examples/training_scripts/cleanrl_procgen_centralplr.py @@ -22,7 +22,7 @@ from torch.utils.tensorboard import SummaryWriter from syllabus.core import MultiProcessingSyncWrapper, make_multiprocessing_curriculum -from syllabus.curricula import CentralizedPrioritizedLevelReplay, DomainRandomization, SyncedBatchedDomainRandomization, LearningProgressCurriculum, SequentialCurriculum +from syllabus.curricula import CentralizedPrioritizedLevelReplay, DomainRandomization, LearningProgressCurriculum, SequentialCurriculum from syllabus.examples.models import ProcgenAgent from syllabus.examples.task_wrappers import ProcgenTaskWrapper from syllabus.examples.utils.vecenv import VecMonitor, VecNormalize, VecExtractDictObs @@ -150,6 +150,46 @@ def wrap_vecenv(vecenv): return vecenv +def slow_level_replay_evaluate( + env_name, + policy, + num_episodes, + device, + num_levels=0 +): + policy.eval() + + eval_envs = ProcgenEnv( + num_envs=1, env_name=env_name, num_levels=num_levels, start_level=0, distribution_mode="easy", paint_vel_info=False + ) + eval_envs = VecExtractDictObs(eval_envs, "rgb") + eval_envs = wrap_vecenv(eval_envs) + + # Seed environments + seeds = [int.from_bytes(os.urandom(3), byteorder="little") for _ in range(num_episodes)] + for i, seed in enumerate(seeds): + eval_envs.seed(seed, i) + + eval_obs, _ = eval_envs.reset() + eval_episode_rewards = [] + + while len(eval_episode_rewards) < num_episodes: + with torch.no_grad(): + eval_action, _, _, _ = policy.get_action_and_value(torch.Tensor(eval_obs).to(device), deterministic=False) + + eval_obs, _, truncs, terms, infos = eval_envs.step(eval_action.cpu().numpy()) + for i, info in enumerate(infos): + if 'episode' in info.keys(): + eval_episode_rewards.append(info['episode']['r']) + + mean_returns = np.mean(eval_episode_rewards) + stddev_returns = np.std(eval_episode_rewards) + env_min, env_max = PROCGEN_RETURN_BOUNDS[args.env_id] + normalized_mean_returns = (mean_returns - env_min) / (env_max - env_min) + policy.train() + return mean_returns, stddev_returns, normalized_mean_returns + + def level_replay_evaluate( env_name, policy, @@ -176,7 +216,6 @@ def level_replay_evaluate( if 'episode' in info.keys() and eval_episode_rewards[i] == -1: eval_episode_rewards[i] = info['episode']['r'] - # print(eval_episode_rewards) mean_returns = np.mean(eval_episode_rewards) stddev_returns = np.std(eval_episode_rewards) env_min, env_max = PROCGEN_RETURN_BOUNDS[args.env_id] @@ -344,7 +383,6 @@ def level_replay_evaluate( with torch.no_grad(): next_value = agent.get_value(next_obs) tasks = envs.get_attr("task") - update = { "update_type": "on_demand", "metrics": { @@ -459,12 +497,12 @@ def level_replay_evaluate( explained_var = np.nan if var_y == 0 else 1 - np.var(y_true - y_pred) / var_y # Evaluate agent - mean_eval_returns, stddev_eval_returns, normalized_mean_eval_returns = level_replay_evaluate( - args.env_id, agent, args.num_eval_episodes, device, num_levels=0 - ) - mean_train_returns, stddev_train_returns, normalized_mean_train_returns = level_replay_evaluate( - args.env_id, agent, args.num_eval_episodes, device, num_levels=200 - ) + # mean_eval_returns, stddev_eval_returns, normalized_mean_eval_returns = level_replay_evaluate( + # args.env_id, agent, args.num_eval_episodes, device, num_levels=0 + # ) + # mean_train_returns, stddev_train_returns, normalized_mean_train_returns = level_replay_evaluate( + # args.env_id, agent, args.num_eval_episodes, device, num_levels=200 + # ) # TRY NOT TO MODIFY: record rewards for plotting purposes writer.add_scalar("charts/learning_rate", optimizer.param_groups[0]["lr"], global_step) @@ -479,13 +517,13 @@ def level_replay_evaluate( print("SPS:", int(global_step / (time.time() - start_time))) writer.add_scalar("charts/SPS", int(global_step / (time.time() - start_time)), global_step) - writer.add_scalar("test_eval/mean_episode_return", mean_eval_returns, global_step) - writer.add_scalar("test_eval/normalized_mean_eval_return", normalized_mean_eval_returns, global_step) - writer.add_scalar("test_eval/stddev_eval_return", stddev_eval_returns, global_step) + # writer.add_scalar("test_eval/mean_episode_return", mean_eval_returns, global_step) + # writer.add_scalar("test_eval/normalized_mean_eval_return", normalized_mean_eval_returns, global_step) + # writer.add_scalar("test_eval/stddev_eval_return", mean_eval_returns, global_step) - writer.add_scalar("train_eval/mean_episode_return", mean_train_returns, global_step) - writer.add_scalar("train_eval/normalized_mean_train_return", normalized_mean_train_returns, global_step) - writer.add_scalar("train_eval/stddev_train_return", stddev_train_returns, global_step) + # writer.add_scalar("train_eval/mean_episode_return", mean_train_returns, global_step) + # writer.add_scalar("train_eval/normalized_mean_train_return", normalized_mean_train_returns, global_step) + # writer.add_scalar("train_eval/stddev_train_return", mean_train_returns, global_step) writer.add_scalar("curriculum/completed_episodes", completed_episodes, step) diff --git a/syllabus/examples/training_scripts/cleanrl_procgen_plr.py b/syllabus/examples/training_scripts/cleanrl_procgen_plr.py index c895564a..e372d281 100644 --- a/syllabus/examples/training_scripts/cleanrl_procgen_plr.py +++ b/syllabus/examples/training_scripts/cleanrl_procgen_plr.py @@ -154,6 +154,41 @@ def wrap_vecenv(vecenv): return vecenv +def full_level_replay_evaluate( + env_name, + policy, + num_episodes, + device, + num_levels=1 # Not used +): + policy.eval() + + eval_envs = ProcgenEnv( + num_envs=args.num_eval_episodes, env_name=env_name, num_levels=1, start_level=0, distribution_mode="easy", paint_vel_info=False + ) + eval_envs = VecExtractDictObs(eval_envs, "rgb") + eval_envs = wrap_vecenv(eval_envs) + + eval_obs, _ = eval_envs.reset() + eval_episode_rewards = [-1] * num_episodes + + while -1 in eval_episode_rewards: + with torch.no_grad(): + eval_action, _, _, _ = policy.get_action_and_value(torch.Tensor(eval_obs).to(device), deterministic=False) + + eval_obs, _, truncs, terms, infos = eval_envs.step(eval_action.cpu().numpy()) + for i, info in enumerate(infos): + if 'episode' in info.keys() and eval_episode_rewards[i] == -1: + eval_episode_rewards[i] = info['episode']['r'] + + mean_returns = np.mean(eval_episode_rewards) + stddev_returns = np.std(eval_episode_rewards) + env_min, env_max = PROCGEN_RETURN_BOUNDS[args.env_id] + normalized_mean_returns = (mean_returns - env_min) / (env_max - env_min) + policy.train() + return mean_returns, stddev_returns, normalized_mean_returns + + def level_replay_evaluate( env_name, policy, @@ -319,6 +354,17 @@ def get_action(obs): ) envs = wrap_vecenv(envs) + assert isinstance(envs.single_action_space, gym.spaces.Discrete), "only discrete action space is supported" + print("Creating agent") + + agent = ProcgenAgent( + envs.single_observation_space.shape, + envs.single_action_space.n, + arch="large", + base_kwargs={'recurrent': False, 'hidden_size': 256} + ).to(device) + optimizer = optim.Adam(agent.parameters(), lr=args.learning_rate, eps=1e-5) + # ALGO Logic: Storage setup obs = torch.zeros((args.num_steps, args.num_envs) + envs.single_observation_space.shape).to(device) actions = torch.zeros((args.num_steps, args.num_envs) + envs.single_action_space.shape).to(device) @@ -500,4 +546,4 @@ def get_action(obs): writer.add_scalar("curriculum/completed_episodes", completed_episodes, step) envs.close() - writer.close() + writer.close() \ No newline at end of file diff --git a/syllabus/tests/utils.py b/syllabus/tests/utils.py index 20cbb20e..33d73e37 100644 --- a/syllabus/tests/utils.py +++ b/syllabus/tests/utils.py @@ -49,7 +49,6 @@ def evaluate_random_policy_gymnasium(make_env, num_episodes=100, seeds=None): episode_returns.append(episode_return) avg_return = sum(episode_returns) / len(episode_returns) - # print(f"Average Episodic Return: {avg_return}") return avg_return, episode_returns diff --git a/tests/.DS_Store b/tests/.DS_Store new file mode 100644 index 00000000..ab561e57 Binary files /dev/null and b/tests/.DS_Store differ diff --git a/tests/multiprocessing_smoke_tests.py b/tests/multiprocessing_smoke_tests.py index eec37360..aadda9e6 100644 --- a/tests/multiprocessing_smoke_tests.py +++ b/tests/multiprocessing_smoke_tests.py @@ -15,7 +15,7 @@ run_ray_multiprocess, run_single_process) N_ENVS = 2 -N_EPISODES = 2 +N_EPISODES = 20 nethack_env = create_nethack_env() @@ -45,7 +45,6 @@ test_names = [curriculum_args[0].__name__ for curriculum_args in curricula] - @pytest.mark.parametrize("curriculum, env_fn, args, kwargs", curricula, ids=test_names) def test_multiprocessing_sync_single_process(curriculum, env_fn, args, kwargs): # Test single process speed @@ -53,31 +52,24 @@ def test_multiprocessing_sync_single_process(curriculum, env_fn, args, kwargs): single_kwargs = kwargs.copy() if "num_processes" in single_kwargs: single_kwargs["num_processes"] = 1 - test_curriculum = curriculum(*args, **single_kwargs) + test_curriculum = curriculum(*args, warmup_strategy="random", warmup_samples=10, **kwargs) native_speed = run_single_process(env_fn, curriculum=test_curriculum, num_envs=1, num_episodes=N_EPISODES) print(f"PASSED: single process test (1 env) passed: {native_speed:.2f}s") - @pytest.mark.parametrize("curriculum, env_fn, args, kwargs", curricula, ids=test_names) def test_multiprocessing_sync_queue_multi_process(curriculum, env_fn, args, kwargs): # Test Queue multiprocess speed with Syllabus - test_curriculum = curriculum(*args, **kwargs) + test_curriculum = curriculum(*args, warmup_strategy="fix", warmup_samples=10, **kwargs) test_curriculum = make_multiprocessing_curriculum(test_curriculum) print("\nRUNNING: Python multiprocess test with Syllabus...") native_syllabus_speed = run_native_multiprocess(env_fn, curriculum=test_curriculum, num_envs=N_ENVS, num_episodes=N_EPISODES) print(f"PASSED: Python multiprocess test with Syllabus: {native_syllabus_speed:.2f}s") - -# @pytest.mark.parametrize("curriculum, env_fn, args, kwargs", curricula, ids=test_names) -# def test_multiprocessing_sync_ray_multi_process(curriculum, env_fn, args, kwargs, ray_session): -# # Test Ray multiprocess speed with Syllabus -# test_curriculum = curriculum(*args, **kwargs) -# test_curriculum = make_ray_curriculum(test_curriculum) -# print("\nRUNNING: Ray multiprocess test with Syllabus...") -# ray_syllabus_speed = run_ray_multiprocess(env_fn, num_envs=N_ENVS, num_episodes=N_EPISODES) -# print(f"PASSED: Ray multiprocess test with Syllabus: {ray_syllabus_speed:.2f}s") - - -if __name__ == "__main__": - test_multiprocessing_sync_single_process(*curricula[2]) - test_multiprocessing_sync_queue_multi_process(*curricula[2]) +@pytest.mark.parametrize("curriculum, env_fn, args, kwargs", curricula, ids=test_names) +def test_multiprocessing_sync_ray_multi_process(curriculum, env_fn, args, kwargs, ray_session): + # Test Ray multiprocess speed with Syllabus + test_curriculum = curriculum(*args, warmup_strategy="random", warmup_samples=10, **kwargs) + test_curriculum = make_ray_curriculum(test_curriculum) + print("\nRUNNING: Ray multiprocess test with Syllabus...") + ray_syllabus_speed = run_ray_multiprocess(env_fn, num_envs=N_ENVS, num_episodes=N_EPISODES) + print(f"PASSED: Ray multiprocess test with Syllabus: {ray_syllabus_speed:.2f}s") diff --git a/tests/multiprocessing_sync_tests.py b/tests/multiprocessing_sync_tests.py index 2c5666df..13701034 100644 --- a/tests/multiprocessing_sync_tests.py +++ b/tests/multiprocessing_sync_tests.py @@ -29,8 +29,8 @@ def generate_environment(num_episodes=N_EPISODES): def test_single_process(): sample_env = generate_environment() test_curriculum = SyncTestCurriculum(N_ENVS, N_EPISODES, sample_env.task_space) - native_speed = run_single_process( - create_gymnasium_synctest_env, env_args=(N_EPISODES,), curriculum=test_curriculum, num_envs=N_ENVS, num_episodes=N_EPISODES + run_single_process( + create_synctest_env, env_args=(N_EPISODES,), curriculum=test_curriculum, num_envs=N_ENVS, num_episodes=N_EPISODES ) evaluate_curriculum(test_curriculum, num_envs=N_ENVS) @@ -39,8 +39,8 @@ def test_queue_multiprocess(): sample_env = generate_environment() test_curriculum = SyncTestCurriculum(N_ENVS, N_EPISODES, sample_env.task_space) test_curriculum = make_multiprocessing_curriculum(test_curriculum, sequential_start=False) - native_syllabus_speed = run_native_multiprocess( - create_gymnasium_synctest_env, env_args=(N_EPISODES,), curriculum=test_curriculum, num_envs=N_ENVS, num_episodes=N_EPISODES + run_native_multiprocess( + create_synctest_env, env_args=(N_EPISODES,), curriculum=test_curriculum, num_envs=N_ENVS, num_episodes=N_EPISODES ) evaluate_curriculum(test_curriculum.curriculum) @@ -49,7 +49,7 @@ def test_ray_multiprocess(ray_session): sample_env = generate_environment() test_curriculum = SyncTestCurriculum(N_ENVS, N_EPISODES, sample_env.task_space) test_curriculum = make_ray_curriculum(test_curriculum) - ray_syllabus_speed = run_ray_multiprocess(create_gymnasium_synctest_env, env_args=(N_EPISODES,), num_envs=N_ENVS, num_episodes=N_EPISODES) + run_ray_multiprocess(create_synctest_env, env_args=(N_EPISODES,), num_envs=N_ENVS, num_episodes=N_EPISODES) # TODO: Implement Ray checks # evaluate_curriculum(test_curriculum) diff --git a/tests/sequential_curriculum_test.py b/tests/sequential_curriculum_test.py new file mode 100644 index 00000000..add1a37d --- /dev/null +++ b/tests/sequential_curriculum_test.py @@ -0,0 +1,86 @@ +from copy import deepcopy +import pytest +from nle.env.tasks import NetHackEat, NetHackScore +from syllabus.core import make_multiprocessing_curriculum +from syllabus.curricula import SequentialCurriculum, NoopCurriculum, DomainRandomization +from syllabus.task_space import TaskSpace +from syllabus.tests.utils import create_nethack_env, run_native_multiprocess, run_single_process, run_set_length +from unittest.mock import patch +import logging + +# Set up logging +logging.basicConfig(filename='test_output.log', level=logging.INFO, format='%(asctime)s - %(message)s') +logger = logging.getLogger() + +def create_env(): + return create_nethack_env + +def run_curriculum(curriculum, env_fn): + # Test single process speed + native_speed = run_single_process(env_fn, curriculum=curriculum, num_envs=1, num_episodes=4) + +def run_gymnasium_episode(env, new_task=None, curriculum=None, env_id=0): + """Run a single episode of the environment.""" + if new_task is not None: + obs = env.reset(new_task=new_task) + else: + obs = env.reset() + + term = trunc = False + ep_rew = 0 + ep_len = 0 + + task_completion = 0 + while not (term or trunc): + action = env.action_space.sample() + obs, rew, term, trunc, info = env.step(action) + + task_completion += 1 + info['task_completion'] += task_completion + if curriculum.custom_metrics: + for name, func in curriculum.custom_metrics.items(): + curriculum.metric_values[name] = func(obs, info) + + logger.info(info) + + if curriculum and curriculum.requires_step_updates: + curriculum.update_on_step(env.task_space.encode(env.task), obs, rew, term, trunc, info, env_id=env_id) + curriculum.update_task_progress(env.task_space.encode(env.task), info["task_completion"], env_id=env_id) + ep_rew += rew + ep_len += 1 + + if curriculum and curriculum.requires_episode_updates: + curriculum.update_on_episode(ep_rew, ep_len, env.task_space.encode(env.task), env_id=env_id) + return ep_rew + +def test_custom_sequential_curriculum(create_env): + env = create_env() + curricula = [] + stopping = [] + + # Custom metrics definition + custom_metrics = { + "my_custom_function_1": lambda obs, info: sum(sum(obs['glyphs'])) + info['task_completion'], + "my_custom_function_2": lambda obs, info: info['task_completion'], + } + + # Stage 1 - Survival + stage1 = [0, 1, 2, 3] + stopping.append("steps<100") + + # Stage 2 - Harvest Equipment + stage2 = [4, 5] + stopping.append("my_custom_function_1<0") + + # Stage 3 - Equip Weapons + stage3 = [6, 7] + + curricula = [stage1, stage2, stage3] + curriculum = SequentialCurriculum(curricula, stopping, env.task_space, custom_metrics=custom_metrics) + + with patch('syllabus.tests.utils.run_gymnasium_episode', side_effect=lambda env, new_task, curriculum, env_id: run_gymnasium_episode(env, new_task, curriculum, env_id)): + run_curriculum(curriculum, create_env) + +if __name__ == "__main__": + test_custom_sequential_curriculum(create_nethack_env) + logger.info("All tests passed!") diff --git a/tests/single_process_test.py b/tests/single_process_test.py new file mode 100644 index 00000000..445692a3 --- /dev/null +++ b/tests/single_process_test.py @@ -0,0 +1,96 @@ +""" Test curriculum synchronization across multiple processes. """ +import pytest +from nle.env.tasks import NetHackScore, NetHackScout, NetHackStaircase + +from syllabus.core import make_multiprocessing_curriculum, make_ray_curriculum +from syllabus.curricula import (AnnealingBoxCurriculum, + CentralizedPrioritizedLevelReplay, + DomainRandomization, + LearningProgressCurriculum, NoopCurriculum, + PrioritizedLevelReplay, SequentialCurriculum, + SimpleBoxCurriculum) +from syllabus.tests import (create_cartpole_env, create_nethack_env, + get_test_values, run_native_multiprocess, + run_ray_multiprocess, run_single_process, run_episode) + +N_ENVS = 1 +N_EPISODES = 34 + + +nethack_env = create_nethack_env() +cartpole_env = create_cartpole_env() + +curricula = [ + (NoopCurriculum, create_nethack_env, (NetHackScore, nethack_env.task_space), {}), + (DomainRandomization, create_nethack_env, (nethack_env.task_space,), {}), + # (LearningProgressCurriculum, create_nethack_env, (nethack_env.task_space,), {}), + (CentralizedPrioritizedLevelReplay, create_nethack_env, (nethack_env.task_space,), { + "device": "cpu", "suppress_usage_warnings": True, "num_processes": N_ENVS + }), + (PrioritizedLevelReplay, create_nethack_env, (nethack_env.task_space, nethack_env.observation_space), { + "get_value": get_test_values, + "device": "cpu", + "num_processes": N_ENVS, + "num_steps": 2048 + }), + (SimpleBoxCurriculum, create_cartpole_env, (cartpole_env.task_space,), {}), + (AnnealingBoxCurriculum, create_cartpole_env, (cartpole_env.task_space,), { + 'start_values': [-0.02, 0.02], + 'end_values': [-0.3, 0.3], + 'total_steps': [10] + }), + (SequentialCurriculum, create_nethack_env, ([ + CentralizedPrioritizedLevelReplay(nethack_env.task_space, device="cpu", suppress_usage_warnings=True, num_processes=N_ENVS, warmup_strategy = 'random', warmup_samples = 1), + PrioritizedLevelReplay(nethack_env.task_space, nethack_env.observation_space, get_value=get_test_values, device="cpu", num_processes=N_ENVS, num_steps=2048, warmup_strategy = 'fix', warmup_samples = 1), + NetHackScore, + [NetHackScout, NetHackStaircase] + ], ["steps>1000", "episodes>=50", "tasks>20"], nethack_env.task_space), {}), +] + +test_names = [curriculum_args[0].__name__ for curriculum_args in curricula] +@pytest.mark.parametrize("curriculum, env_fn, args, kwargs", curricula, ids=test_names) +def test_multiprocessing_sync_single_process_fix(curriculum, env_fn, args, kwargs): + # Test single process speed + print("RUNNING: Python single process test (1 env)...") + single_kwargs = kwargs.copy() + if "num_processes" in single_kwargs: + single_kwargs["num_processes"] = 1 + test_curriculum = curriculum(*args, warmup_strategy="fix", warmup_samples=26, **kwargs) + env = env_fn(env_args=(), env_kwargs={}) + ep_rews = [] + for i in range(N_EPISODES): + if test_curriculum: + if i == N_EPISODES - 9 and (not isinstance(test_curriculum, NoopCurriculum)): + assert test_curriculum._should_use_startup_sampling() + if i == N_EPISODES - 8 and (not isinstance(test_curriculum, NoopCurriculum)): + assert not test_curriculum._should_use_startup_sampling() + task = env.task_space.decode(test_curriculum.sample()[0]) + ep_rews.append(run_episode(env, new_task=task, curriculum=test_curriculum, env_id=0)) + else: + ep_rews.append(run_episode(env)) + env.close() + print("PASSED: single process test on fix sampling (1 env) passed") + +test_names = [curriculum_args[0].__name__ for curriculum_args in curricula] +@pytest.mark.parametrize("curriculum, env_fn, args, kwargs", curricula, ids=test_names) +def test_multiprocessing_sync_single_process_random(curriculum, env_fn, args, kwargs): + # Test single process speed + print("RUNNING: Python single process test (1 env)...") + single_kwargs = kwargs.copy() + if "num_processes" in single_kwargs: + single_kwargs["num_processes"] = 1 + test_curriculum = curriculum(*args, warmup_strategy="random", warmup_samples=26, **kwargs) + env = env_fn(env_args=(), env_kwargs={}) + ep_rews = [] + for i in range(N_EPISODES): + if test_curriculum: + if i == N_EPISODES - 9 and (not isinstance(test_curriculum, NoopCurriculum)): + assert test_curriculum._should_use_startup_sampling() + if i == N_EPISODES - 8 and (not isinstance(test_curriculum, NoopCurriculum)): + assert not test_curriculum._should_use_startup_sampling() + task = env.task_space.decode(test_curriculum.sample()[0]) + ep_rews.append(run_episode(env, new_task=task, curriculum=test_curriculum, env_id=0)) + else: + ep_rews.append(run_episode(env)) + env.close() + print("PASSED: single process test on random sampling (1 env) passed") diff --git a/tests/unit_tests/sequential.py b/tests/unit_tests/sequential.py index e8531424..ed7092da 100644 --- a/tests/unit_tests/sequential.py +++ b/tests/unit_tests/sequential.py @@ -73,8 +73,8 @@ def test_curriculum_sequence_3step(create_env): if __name__ == "__main__": - # test_parsing_condition_operators(create_nethack_env) - # test_parsing_compount_conditions(create_nethack_env) + test_parsing_condition_operators(create_nethack_env) + test_parsing_compount_conditions(create_nethack_env) # test_curriculum_sequence_2step(create_nethack_env) - test_curriculum_sequence_3step(create_nethack_env) + # test_curriculum_sequence_3step(create_nethack_env) print("All tests passed!")