From 98ddc2deca26a3fa1b434aead7325e351e7f95d0 Mon Sep 17 00:00:00 2001 From: Aditya Gudimella Date: Sun, 20 Sep 2020 16:14:41 -0700 Subject: [PATCH 1/5] Replaced Prioritized Experience Replay with normal Experience replay to create AsyncSAC. --- rllib/agents/sac/apex.py | 267 +++++++++++++++++++++++++++- rllib/tests/agents/parameters.py | 205 ++++++++++----------- rllib/tests/agents/test_learning.py | 1 + 3 files changed, 369 insertions(+), 104 deletions(-) diff --git a/rllib/agents/sac/apex.py b/rllib/agents/sac/apex.py index 6465f8de7b39..c2ac8a80cd8d 100644 --- a/rllib/agents/sac/apex.py +++ b/rllib/agents/sac/apex.py @@ -1,5 +1,33 @@ -from ray.rllib.agents.dqn.apex import apex_execution_plan +import collections +import copy + +import ray +from ray.rllib.agents.dqn.apex import UpdateWorkerWeights +from ray.rllib.agents.dqn.dqn import calculate_rr_weights +from ray.rllib.agents.dqn.learner_thread import LearnerThread from ray.rllib.agents.sac.sac import DEFAULT_CONFIG as SAC_CONFIG, SACTrainer +from ray.rllib.evaluation.worker_set import WorkerSet +from ray.rllib.execution.common import ( + STEPS_TRAINED_COUNTER, + _get_shared_metrics, +) +from ray.rllib.execution.concurrency_ops import Concurrently, Dequeue, Enqueue +from ray.rllib.execution.metric_ops import StandardMetricsReporting +from ray.rllib.execution.replay_buffer import ( + LocalReplayBuffer, + ReplayBuffer, +) +from ray.rllib.execution.replay_ops import Replay, StoreToReplayBuffer +from ray.rllib.execution.rollout_ops import ParallelRollouts +from ray.rllib.execution.train_ops import UpdateTargetNetwork +from ray.rllib.utils.timer import TimerStat +from ray.rllib.utils.actors import create_colocated +from ray.util.iter import ParallelIteratorWorker +from ray.rllib.policy.sample_batch import ( + SampleBatch, + DEFAULT_POLICY_ID, + MultiAgentBatch, +) # yapf: disable # __sphinx_doc_begin__ @@ -41,6 +69,241 @@ # __sphinx_doc_end__ # yapf: enable + +class LocalAsyncReplayBuffer(LocalReplayBuffer): + """A replay buffer shard. + + Ray actors are single-threaded, so for scalability multiple replay actors + may be created to increase parallelism.""" + + def __init__( + self, + num_shards, + learning_starts, + buffer_size, + replay_batch_size, + prioritized_replay_alpha=0.6, + prioritized_replay_beta=0.4, + prioritized_replay_eps=1e-6, + multiagent_sync_replay=False, + ): + self.replay_starts = learning_starts // num_shards + self.buffer_size = buffer_size // num_shards + self.replay_batch_size = replay_batch_size + self.prioritized_replay_beta = prioritized_replay_beta + self.prioritized_replay_eps = prioritized_replay_eps + self.multiagent_sync_replay = multiagent_sync_replay + + def gen_replay(): + while True: + yield self.replay() + + ParallelIteratorWorker.__init__(self, gen_replay, False) + + def new_buffer(): + return ReplayBuffer(self.buffer_size) + + self.replay_buffers = collections.defaultdict(new_buffer) + + # Metrics + self.add_batch_timer = TimerStat() + self.replay_timer = TimerStat() + self.update_priorities_timer = TimerStat() + self.num_added = 0 + + # Make externally accessible for testing. + global _local_replay_buffer + _local_replay_buffer = self + # If set, return this instead of the usual data for testing. + self._fake_batch = None + + def replay(self): + if self._fake_batch: + fake_batch = SampleBatch(self._fake_batch) + return MultiAgentBatch({DEFAULT_POLICY_ID: fake_batch}, fake_batch.count) + + if self.num_added < self.replay_starts: + return None + + with self.replay_timer: + samples = {} + idxes = None + for policy_id, replay_buffer in self.replay_buffers.items(): + if self.multiagent_sync_replay: + if idxes is None: + idxes = replay_buffer.sample_idxes(self.replay_batch_size) + else: + idxes = replay_buffer.sample_idxes(self.replay_batch_size) + ( + obses_t, + actions, + rewards, + obses_tp1, + dones, + ) = replay_buffer.sample_with_idxes(idxes) + samples[policy_id] = SampleBatch( + { + "obs": obses_t, + "actions": actions, + "rewards": rewards, + "new_obs": obses_tp1, + "dones": dones, + # "weights": weights, + # "batch_indexes": batch_indexes, + } + ) + return MultiAgentBatch(samples, self.replay_batch_size) + + def stats(self, debug=False): + stat = { + "add_batch_time_ms": round(1000 * self.add_batch_timer.mean, 3), + "replay_time_ms": round(1000 * self.replay_timer.mean, 3), + } + for policy_id, replay_buffer in self.replay_buffers.items(): + stat.update( + {"policy_{}".format(policy_id): replay_buffer.stats(debug=debug)} + ) + return stat + + +ReplayActor = ray.remote(num_cpus=0)(LocalAsyncReplayBuffer) + + +def async_execution_plan(workers: WorkerSet, config: dict): + # Create a number of replay buffer actors. + num_replay_buffer_shards = config["optimizer"]["num_replay_buffer_shards"] + replay_actors = create_colocated( + ReplayActor, + [ + num_replay_buffer_shards, + config["learning_starts"], + config["buffer_size"], + config["train_batch_size"], + config["prioritized_replay_alpha"], + config["prioritized_replay_beta"], + config["prioritized_replay_eps"], + ], + num_replay_buffer_shards, + ) + + # Start the learner thread. + learner_thread = LearnerThread(workers.local_worker()) + learner_thread.start() + + # Update experience priorities post learning. + def update_prio_and_stats(item: ("ActorHandle", dict, int)): + actor, prio_dict, count = item + # actor.update_priorities.remote(prio_dict) + metrics = _get_shared_metrics() + # Manually update the steps trained counter since the learner thread + # is executing outside the pipeline. + metrics.counters[STEPS_TRAINED_COUNTER] += count + metrics.timers["learner_dequeue"] = learner_thread.queue_timer + metrics.timers["learner_grad"] = learner_thread.grad_timer + metrics.timers["learner_overall"] = learner_thread.overall_timer + + # We execute the following steps concurrently: + # (1) Generate rollouts and store them in our replay buffer actors. Update + # the weights of the worker that generated the batch. + parallel_rollouts_mode = config.get("parallel_rollouts_mode", "async") + num_async = config.get("parallel_rollouts_num_async") + # This could be set to None explicitly + if not num_async: + num_async = 2 + rollouts = ParallelRollouts( + workers, mode=parallel_rollouts_mode, num_async=num_async + ) + store_op = rollouts.for_each(StoreToReplayBuffer(actors=replay_actors)) + if config.get("execution_plan_custom_store_ops"): + custom_store_ops = config["execution_plan_custom_store_ops"] + store_op = store_op.for_each(custom_store_ops(workers, config)) + # Only need to update workers if there are remote workers. + if workers.remote_workers(): + store_op = store_op.zip_with_source_actor().for_each( + UpdateWorkerWeights( + learner_thread, + workers, + max_weight_sync_delay=(config["optimizer"]["max_weight_sync_delay"]), + ) + ) + + # (2) Read experiences from the replay buffer actors and send to the + # learner thread via its in-queue. + if config.get("before_learn_on_batch"): + before_learn_on_batch = config["before_learn_on_batch"] + before_learn_on_batch = before_learn_on_batch(workers, config) + else: + before_learn_on_batch = lambda b: b + replay_op = ( + Replay(actors=replay_actors, num_async=4) + .for_each(before_learn_on_batch) + .zip_with_source_actor() + .for_each(Enqueue(learner_thread.inqueue)) + ) + + # (3) Get priorities back from learner thread and apply them to the + # replay buffer actors. + update_op = ( + Dequeue(learner_thread.outqueue, check=learner_thread.is_alive) + .for_each(update_prio_and_stats) + .for_each( + UpdateTargetNetwork( + workers, config["target_network_update_freq"], by_steps_trained=True + ) + ) + ) + + if config["training_intensity"]: + # Execute (1), (2) with a fixed intensity ratio. + rr_weights = calculate_rr_weights(config) + ["*"] + merged_op = Concurrently( + [store_op, replay_op, update_op], + mode="round_robin", + output_indexes=[2], + round_robin_weights=rr_weights, + ) + else: + # Execute (1), (2), (3) asynchronously as fast as possible. Only output + # items from (3) since metrics aren't available before then. + merged_op = Concurrently( + [store_op, replay_op, update_op], mode="async", output_indexes=[2] + ) + + # Add in extra replay and learner metrics to the training result. + def add_apex_metrics(result): + replay_stats = ray.get( + replay_actors[0].stats.remote(config["optimizer"].get("debug")) + ) + exploration_infos = workers.foreach_trainable_policy( + lambda p, _: p.get_exploration_info() + ) + result["info"].update( + { + "exploration_infos": exploration_infos, + "learner_queue": learner_thread.learner_queue_size.stats(), + "learner": copy.deepcopy(learner_thread.stats), + "replay_shard_0": replay_stats, + } + ) + return result + + # Only report metrics from the workers with the lowest 1/3 of epsilons. + selected_workers = None + worker_amount_to_collect_metrics_from = config.get( + "worker_amount_to_collect_metrics_from" + ) + if worker_amount_to_collect_metrics_from: + selected_workers = workers.remote_workers()[ + -len(workers.remote_workers()) // worker_amount_to_collect_metrics_from : + ] + + return StandardMetricsReporting( + merged_op, workers, config, selected_workers=selected_workers + ).for_each(add_apex_metrics) + + ApexSACTrainer = SACTrainer.with_updates( - name="APEX_SAC", default_config=APEX_SAC_DEFAULT_CONFIG, execution_plan=apex_execution_plan + name="APEX_SAC", + default_config=APEX_SAC_DEFAULT_CONFIG, + execution_plan=async_execution_plan, ) diff --git a/rllib/tests/agents/parameters.py b/rllib/tests/agents/parameters.py index 4a24dc14724a..3e27a8d735a8 100644 --- a/rllib/tests/agents/parameters.py +++ b/rllib/tests/agents/parameters.py @@ -212,117 +212,118 @@ def astuple(self): ] = [ x.astuple() for x in chain( - TestAgentParams.for_cart_pole( - algorithm=DiscreteActionSpaceAlgorithm.PPO, - config_updates={ - "num_gpus": 2, - "_fake_gpus": True, - "num_workers": 1, - "lr": 0.0003, - "observation_filter": "MeanStdFilter", - "num_sgd_iter": 6, - "vf_share_layers": True, - "vf_loss_coeff": 0.01, - "model": {"fcnet_hiddens": [32], "fcnet_activation": "linear"}, - }, - n_iter=200, - threshold=150.0, - ), - TestAgentParams.for_pendulum( - algorithm=ContinuousActionSpaceAlgorithm.APEX_DDPG, - config_updates={ - "use_huber": True, - "clip_rewards": False, - "num_workers": 4, - "n_step": 1, - "target_network_update_freq": 50000, - "tau": 1.0, - }, - n_iter=200, - threshold=-750.0, - ), - TestAgentParams.for_cart_pole( - algorithm=DiscreteActionSpaceAlgorithm.APEX_DQN, - config_updates={ - "target_network_update_freq": 20000, - "num_workers": 4, - "num_envs_per_worker": 8, - "train_batch_size": 64, - "gamma": 0.95, - }, - n_iter=200, - threshold=150.0, - ), - TestAgentParams.for_cart_pole( - algorithm=DiscreteActionSpaceAlgorithm.SAC, - config_updates={ - "num_workers": 4, - "twin_q": True, - "soft_horizon": True, - "clip_actions": False, - "normalize_actions": True, - "learning_starts": 0, - "prioritized_replay": True, - "Q_model": {"fcnet_hiddens": [64, 64]}, - "policy_model": {"fcnet_hiddens": [64, 64],}, - }, - n_iter=200, - threshold=100.0, - ), - TestAgentParams.for_cart_pole( - algorithm=ContinuousActionSpaceAlgorithm.APEX_SAC, - config_updates={ - "seed": 42, - "num_workers": 8, - }, - n_iter=100, - threshold=175.0, - ), + # TestAgentParams.for_cart_pole( + # algorithm=DiscreteActionSpaceAlgorithm.PPO, + # config_updates={ + # "num_gpus": 2, + # "_fake_gpus": True, + # "num_workers": 1, + # "lr": 0.0003, + # "observation_filter": "MeanStdFilter", + # "num_sgd_iter": 6, + # "vf_share_layers": True, + # "vf_loss_coeff": 0.01, + # "model": {"fcnet_hiddens": [32], "fcnet_activation": "linear"}, + # }, + # n_iter=200, + # threshold=150.0, + # ), + # TestAgentParams.for_pendulum( + # algorithm=ContinuousActionSpaceAlgorithm.APEX_DDPG, + # config_updates={ + # "use_huber": True, + # "clip_rewards": False, + # "num_workers": 4, + # "n_step": 1, + # "target_network_update_freq": 50000, + # "tau": 1.0, + # }, + # n_iter=200, + # threshold=-750.0, + # ), + # TestAgentParams.for_cart_pole( + # algorithm=DiscreteActionSpaceAlgorithm.APEX_DQN, + # config_updates={ + # "target_network_update_freq": 20000, + # "num_workers": 4, + # "num_envs_per_worker": 8, + # "train_batch_size": 64, + # "gamma": 0.95, + # }, + # n_iter=200, + # threshold=150.0, + # ), + # TestAgentParams.for_cart_pole( + # algorithm=DiscreteActionSpaceAlgorithm.SAC, + # config_updates={ + # "num_workers": 4, + # "twin_q": True, + # "soft_horizon": True, + # "clip_actions": False, + # "normalize_actions": True, + # "learning_starts": 0, + # "prioritized_replay": True, + # "Q_model": {"fcnet_hiddens": [64, 64]}, + # "policy_model": {"fcnet_hiddens": [64, 64],}, + # }, + # n_iter=200, + # threshold=100.0, + # ), + # TestAgentParams.for_cart_pole( + # algorithm=ContinuousActionSpaceAlgorithm.APEX_SAC, + # config_updates={ + # "seed": 42, + # "num_workers": 8, + # }, + # n_iter=100, + # threshold=175.0, + # ), TestAgentParams.for_pendulum( algorithm=ContinuousActionSpaceAlgorithm.APEX_SAC, config_updates={ - "num_workers": 8, + "num_workers": 3, + "num_envs_per_worker": 2, "exploration_config": {"type": "StochasticSampling"}, - "no_done_at_end": True, }, # TODO: Delete next line before landing PR + frameworks=[Framework.TensorFlow], n_iter=200, threshold=-350., ), - TestAgentParams.for_pendulum( - algorithm=DiscreteActionSpaceAlgorithm.SAC, - config_updates={ - "horizon": 200, - "soft_horizon": True, - "Q_model": {"fcnet_activation": "relu", "fcnet_hiddens": [256, 256]}, - "policy_model": { - "fcnet_activation": "relu", - "fcnet_hiddens": [256, 256], - }, - "tau": 0.005, - "target_entropy": "auto", - "no_done_at_end": True, - "n_step": 1, - "rollout_fragment_length": 1, - "prioritized_replay": True, - "train_batch_size": 256, - "target_network_update_freq": 1, - "timesteps_per_iteration": 1000, - "learning_starts": 256, - "optimization": { - "actor_learning_rate": 0.0003, - "critic_learning_rate": 0.0003, - "entropy_learning_rate": 0.0003, - }, - "num_workers": 4, - "num_gpus": 0, - "clip_actions": False, - "normalize_actions": True, - "metrics_smoothing_episodes": 5, - }, - n_iter=200, - threshold=-750.0, - ), + # TestAgentParams.for_pendulum( + # algorithm=DiscreteActionSpaceAlgorithm.SAC, + # config_updates={ + # "horizon": 200, + # "soft_horizon": True, + # "Q_model": {"fcnet_activation": "relu", "fcnet_hiddens": [256, 256]}, + # "policy_model": { + # "fcnet_activation": "relu", + # "fcnet_hiddens": [256, 256], + # }, + # "tau": 0.005, + # "target_entropy": "auto", + # "no_done_at_end": True, + # "n_step": 1, + # "rollout_fragment_length": 1, + # "prioritized_replay": True, + # "train_batch_size": 256, + # "target_network_update_freq": 1, + # "timesteps_per_iteration": 1000, + # "learning_starts": 256, + # "optimization": { + # "actor_learning_rate": 0.0003, + # "critic_learning_rate": 0.0003, + # "entropy_learning_rate": 0.0003, + # }, + # "num_workers": 4, + # "num_gpus": 0, + # "clip_actions": False, + # "normalize_actions": True, + # "metrics_smoothing_episodes": 5, + # }, + # n_iter=200, + # threshold=-750.0, + # ), ) ] diff --git a/rllib/tests/agents/test_learning.py b/rllib/tests/agents/test_learning.py index 3e55f79bf583..07d4cdc29bed 100644 --- a/rllib/tests/agents/test_learning.py +++ b/rllib/tests/agents/test_learning.py @@ -80,6 +80,7 @@ def test_monotonically_improving_algorithms_can_converge_with_different_framewor for i in range(n_iter): results = trainer.train() episode_reward_mean = results["episode_reward_mean"] + print(f"{i}: {episode_reward_mean}") if episode_reward_mean >= threshold: learnt = True break From bd81b686bac485cba25570e6df1103fea303a97f Mon Sep 17 00:00:00 2001 From: Aditya Gudimella Date: Sun, 20 Sep 2020 16:39:08 -0700 Subject: [PATCH 2/5] Setting prioritized_replay in config now uses PrioritizedReplay correctly. --- rllib/agents/sac/apex.py | 10 ++++++---- rllib/tests/agents/parameters.py | 5 +++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/rllib/agents/sac/apex.py b/rllib/agents/sac/apex.py index c2ac8a80cd8d..24f5524622b3 100644 --- a/rllib/agents/sac/apex.py +++ b/rllib/agents/sac/apex.py @@ -15,7 +15,7 @@ from ray.rllib.execution.metric_ops import StandardMetricsReporting from ray.rllib.execution.replay_buffer import ( LocalReplayBuffer, - ReplayBuffer, + ReplayActor, ReplayBuffer, ) from ray.rllib.execution.replay_ops import Replay, StoreToReplayBuffer from ray.rllib.execution.rollout_ops import ParallelRollouts @@ -166,14 +166,15 @@ def stats(self, debug=False): return stat -ReplayActor = ray.remote(num_cpus=0)(LocalAsyncReplayBuffer) +AsyncReplayActor = ray.remote(num_cpus=0)(LocalAsyncReplayBuffer) def async_execution_plan(workers: WorkerSet, config: dict): # Create a number of replay buffer actors. num_replay_buffer_shards = config["optimizer"]["num_replay_buffer_shards"] + replay_actor_cls = ReplayActor if config["prioritized_replay"] else AsyncReplayActor replay_actors = create_colocated( - ReplayActor, + replay_actor_cls, [ num_replay_buffer_shards, config["learning_starts"], @@ -193,7 +194,8 @@ def async_execution_plan(workers: WorkerSet, config: dict): # Update experience priorities post learning. def update_prio_and_stats(item: ("ActorHandle", dict, int)): actor, prio_dict, count = item - # actor.update_priorities.remote(prio_dict) + if config["prioritized_replay"]: + actor.update_priorities.remote(prio_dict) metrics = _get_shared_metrics() # Manually update the steps trained counter since the learner thread # is executing outside the pipeline. diff --git a/rllib/tests/agents/parameters.py b/rllib/tests/agents/parameters.py index 3e27a8d735a8..ce3e8fc54c34 100644 --- a/rllib/tests/agents/parameters.py +++ b/rllib/tests/agents/parameters.py @@ -281,9 +281,10 @@ def astuple(self): TestAgentParams.for_pendulum( algorithm=ContinuousActionSpaceAlgorithm.APEX_SAC, config_updates={ - "num_workers": 3, - "num_envs_per_worker": 2, + "num_workers": 8, + # "num_envs_per_worker": 2, "exploration_config": {"type": "StochasticSampling"}, + "prioritized_replay": False, }, # TODO: Delete next line before landing PR frameworks=[Framework.TensorFlow], From fde03281b976c4f9eed219f3153a00b52069a808 Mon Sep 17 00:00:00 2001 From: Aditya Gudimella Date: Sun, 20 Sep 2020 17:04:13 -0700 Subject: [PATCH 3/5] Renamed LocalAsyncReplayBuffer and AsyncReplayActor to better reflect usage --- rllib/agents/sac/apex.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rllib/agents/sac/apex.py b/rllib/agents/sac/apex.py index 24f5524622b3..b779502bf061 100644 --- a/rllib/agents/sac/apex.py +++ b/rllib/agents/sac/apex.py @@ -70,7 +70,7 @@ # yapf: enable -class LocalAsyncReplayBuffer(LocalReplayBuffer): +class LocalVanillaReplayBuffer(LocalReplayBuffer): """A replay buffer shard. Ray actors are single-threaded, so for scalability multiple replay actors @@ -166,13 +166,13 @@ def stats(self, debug=False): return stat -AsyncReplayActor = ray.remote(num_cpus=0)(LocalAsyncReplayBuffer) +VanillaReplayActor = ray.remote(num_cpus=0)(LocalVanillaReplayBuffer) def async_execution_plan(workers: WorkerSet, config: dict): # Create a number of replay buffer actors. num_replay_buffer_shards = config["optimizer"]["num_replay_buffer_shards"] - replay_actor_cls = ReplayActor if config["prioritized_replay"] else AsyncReplayActor + replay_actor_cls = ReplayActor if config["prioritized_replay"] else VanillaReplayActor replay_actors = create_colocated( replay_actor_cls, [ From 46e3cf53a6f1d92e0339e45c7738422747a3ad5d Mon Sep 17 00:00:00 2001 From: Aditya Gudimella Date: Sun, 20 Sep 2020 18:13:21 -0700 Subject: [PATCH 4/5] Added test with prioritized_replay set to True --- rllib/tests/agents/parameters.py | 215 +++++++++++++++------------- rllib/tests/agents/test_learning.py | 3 +- 2 files changed, 113 insertions(+), 105 deletions(-) diff --git a/rllib/tests/agents/parameters.py b/rllib/tests/agents/parameters.py index ce3e8fc54c34..6eb5d9fb236d 100644 --- a/rllib/tests/agents/parameters.py +++ b/rllib/tests/agents/parameters.py @@ -212,119 +212,128 @@ def astuple(self): ] = [ x.astuple() for x in chain( - # TestAgentParams.for_cart_pole( - # algorithm=DiscreteActionSpaceAlgorithm.PPO, - # config_updates={ - # "num_gpus": 2, - # "_fake_gpus": True, - # "num_workers": 1, - # "lr": 0.0003, - # "observation_filter": "MeanStdFilter", - # "num_sgd_iter": 6, - # "vf_share_layers": True, - # "vf_loss_coeff": 0.01, - # "model": {"fcnet_hiddens": [32], "fcnet_activation": "linear"}, - # }, - # n_iter=200, - # threshold=150.0, - # ), - # TestAgentParams.for_pendulum( - # algorithm=ContinuousActionSpaceAlgorithm.APEX_DDPG, - # config_updates={ - # "use_huber": True, - # "clip_rewards": False, - # "num_workers": 4, - # "n_step": 1, - # "target_network_update_freq": 50000, - # "tau": 1.0, - # }, - # n_iter=200, - # threshold=-750.0, - # ), - # TestAgentParams.for_cart_pole( - # algorithm=DiscreteActionSpaceAlgorithm.APEX_DQN, - # config_updates={ - # "target_network_update_freq": 20000, - # "num_workers": 4, - # "num_envs_per_worker": 8, - # "train_batch_size": 64, - # "gamma": 0.95, - # }, - # n_iter=200, - # threshold=150.0, - # ), - # TestAgentParams.for_cart_pole( - # algorithm=DiscreteActionSpaceAlgorithm.SAC, - # config_updates={ - # "num_workers": 4, - # "twin_q": True, - # "soft_horizon": True, - # "clip_actions": False, - # "normalize_actions": True, - # "learning_starts": 0, - # "prioritized_replay": True, - # "Q_model": {"fcnet_hiddens": [64, 64]}, - # "policy_model": {"fcnet_hiddens": [64, 64],}, - # }, - # n_iter=200, - # threshold=100.0, - # ), - # TestAgentParams.for_cart_pole( - # algorithm=ContinuousActionSpaceAlgorithm.APEX_SAC, - # config_updates={ - # "seed": 42, - # "num_workers": 8, - # }, - # n_iter=100, - # threshold=175.0, - # ), + TestAgentParams.for_cart_pole( + algorithm=DiscreteActionSpaceAlgorithm.PPO, + config_updates={ + "num_gpus": 2, + "_fake_gpus": True, + "num_workers": 1, + "lr": 0.0003, + "observation_filter": "MeanStdFilter", + "num_sgd_iter": 6, + "vf_share_layers": True, + "vf_loss_coeff": 0.01, + "model": {"fcnet_hiddens": [32], "fcnet_activation": "linear"}, + }, + n_iter=200, + threshold=150.0, + ), + TestAgentParams.for_pendulum( + algorithm=ContinuousActionSpaceAlgorithm.APEX_DDPG, + config_updates={ + "use_huber": True, + "clip_rewards": False, + "num_workers": 4, + "n_step": 1, + "target_network_update_freq": 50000, + "tau": 1.0, + }, + n_iter=200, + threshold=-750.0, + ), + TestAgentParams.for_cart_pole( + algorithm=DiscreteActionSpaceAlgorithm.APEX_DQN, + config_updates={ + "target_network_update_freq": 20000, + "num_workers": 4, + "num_envs_per_worker": 8, + "train_batch_size": 64, + "gamma": 0.95, + }, + n_iter=200, + threshold=150.0, + ), + TestAgentParams.for_cart_pole( + algorithm=DiscreteActionSpaceAlgorithm.SAC, + config_updates={ + "num_workers": 4, + "twin_q": True, + "soft_horizon": True, + "clip_actions": False, + "normalize_actions": True, + "learning_starts": 0, + "prioritized_replay": True, + "Q_model": {"fcnet_hiddens": [64, 64]}, + "policy_model": {"fcnet_hiddens": [64, 64],}, + }, + n_iter=200, + threshold=100.0, + ), + TestAgentParams.for_cart_pole( + algorithm=ContinuousActionSpaceAlgorithm.APEX_SAC, + config_updates={ + "seed": 42, + "num_workers": 8, + }, + n_iter=100, + threshold=175.0, + ), TestAgentParams.for_pendulum( algorithm=ContinuousActionSpaceAlgorithm.APEX_SAC, config_updates={ "num_workers": 8, - # "num_envs_per_worker": 2, "exploration_config": {"type": "StochasticSampling"}, "prioritized_replay": False, + "no_done_at_end": False, }, - # TODO: Delete next line before landing PR - frameworks=[Framework.TensorFlow], n_iter=200, threshold=-350., ), - # TestAgentParams.for_pendulum( - # algorithm=DiscreteActionSpaceAlgorithm.SAC, - # config_updates={ - # "horizon": 200, - # "soft_horizon": True, - # "Q_model": {"fcnet_activation": "relu", "fcnet_hiddens": [256, 256]}, - # "policy_model": { - # "fcnet_activation": "relu", - # "fcnet_hiddens": [256, 256], - # }, - # "tau": 0.005, - # "target_entropy": "auto", - # "no_done_at_end": True, - # "n_step": 1, - # "rollout_fragment_length": 1, - # "prioritized_replay": True, - # "train_batch_size": 256, - # "target_network_update_freq": 1, - # "timesteps_per_iteration": 1000, - # "learning_starts": 256, - # "optimization": { - # "actor_learning_rate": 0.0003, - # "critic_learning_rate": 0.0003, - # "entropy_learning_rate": 0.0003, - # }, - # "num_workers": 4, - # "num_gpus": 0, - # "clip_actions": False, - # "normalize_actions": True, - # "metrics_smoothing_episodes": 5, - # }, - # n_iter=200, - # threshold=-750.0, - # ), + TestAgentParams.for_pendulum( + algorithm=ContinuousActionSpaceAlgorithm.APEX_SAC, + config_updates={ + "num_workers": 8, + "exploration_config": {"type": "StochasticSampling"}, + "prioritized_replay": True, + "no_done_at_end": True + }, + n_iter=200, + threshold=-350., + ), + TestAgentParams.for_pendulum( + algorithm=DiscreteActionSpaceAlgorithm.SAC, + config_updates={ + "horizon": 200, + "soft_horizon": True, + "Q_model": {"fcnet_activation": "relu", "fcnet_hiddens": [256, 256]}, + "policy_model": { + "fcnet_activation": "relu", + "fcnet_hiddens": [256, 256], + }, + "tau": 0.005, + "target_entropy": "auto", + "no_done_at_end": True, + "n_step": 1, + "rollout_fragment_length": 1, + "prioritized_replay": True, + "train_batch_size": 256, + "target_network_update_freq": 1, + "timesteps_per_iteration": 1000, + "learning_starts": 256, + "optimization": { + "actor_learning_rate": 0.0003, + "critic_learning_rate": 0.0003, + "entropy_learning_rate": 0.0003, + }, + "num_workers": 4, + "num_gpus": 0, + "clip_actions": False, + "normalize_actions": True, + "metrics_smoothing_episodes": 5, + }, + n_iter=200, + threshold=-750.0, + ), ) ] diff --git a/rllib/tests/agents/test_learning.py b/rllib/tests/agents/test_learning.py index 07d4cdc29bed..beeb1e9ff881 100644 --- a/rllib/tests/agents/test_learning.py +++ b/rllib/tests/agents/test_learning.py @@ -77,10 +77,9 @@ def test_monotonically_improving_algorithms_can_converge_with_different_framewor """ learnt = False episode_reward_mean = -float("inf") - for i in range(n_iter): + for _ in range(n_iter): results = trainer.train() episode_reward_mean = results["episode_reward_mean"] - print(f"{i}: {episode_reward_mean}") if episode_reward_mean >= threshold: learnt = True break From 2d260d63bc38c50d0aa810d145988b91518d3462 Mon Sep 17 00:00:00 2001 From: Aditya Gudimella Date: Mon, 21 Sep 2020 12:51:25 -0700 Subject: [PATCH 5/5] Cleaned up code. --- rllib/agents/dqn/apex.py | 27 ++-- rllib/agents/sac/apex.py | 266 +------------------------------ rllib/execution/replay_buffer.py | 105 ++++++++++++ 3 files changed, 124 insertions(+), 274 deletions(-) diff --git a/rllib/agents/dqn/apex.py b/rllib/agents/dqn/apex.py index c3fd13008e47..bdc1ba512160 100644 --- a/rllib/agents/dqn/apex.py +++ b/rllib/agents/dqn/apex.py @@ -13,7 +13,7 @@ from ray.rllib.execution.replay_ops import StoreToReplayBuffer, Replay from ray.rllib.execution.train_ops import UpdateTargetNetwork from ray.rllib.execution.metric_ops import StandardMetricsReporting -from ray.rllib.execution.replay_buffer import ReplayActor +from ray.rllib.execution.replay_buffer import ReplayActor, VanillaReplayActor from ray.rllib.utils import merge_dicts from ray.rllib.utils.actors import create_colocated @@ -88,15 +88,21 @@ def __call__(self, item: ("ActorHandle", SampleBatchType)): def apex_execution_plan(workers: WorkerSet, config: dict): # Create a number of replay buffer actors. num_replay_buffer_shards = config["optimizer"]["num_replay_buffer_shards"] - replay_actors = create_colocated(ReplayActor, [ + replay_actor_cls = ReplayActor if config[ + "prioritized_replay"] else VanillaReplayActor + replay_actors = create_colocated( + replay_actor_cls, + [ + num_replay_buffer_shards, + config["learning_starts"], + config["buffer_size"], + config["train_batch_size"], + config["prioritized_replay_alpha"], + config["prioritized_replay_beta"], + config["prioritized_replay_eps"], + ], num_replay_buffer_shards, - config["learning_starts"], - config["buffer_size"], - config["train_batch_size"], - config["prioritized_replay_alpha"], - config["prioritized_replay_beta"], - config["prioritized_replay_eps"], - ], num_replay_buffer_shards) + ) # Start the learner thread. learner_thread = LearnerThread(workers.local_worker()) @@ -105,7 +111,8 @@ def apex_execution_plan(workers: WorkerSet, config: dict): # Update experience priorities post learning. def update_prio_and_stats(item: ("ActorHandle", dict, int)): actor, prio_dict, count = item - actor.update_priorities.remote(prio_dict) + if config["prioritized_replay"]: + actor.update_priorities.remote(prio_dict) metrics = _get_shared_metrics() # Manually update the steps trained counter since the learner thread # is executing outside the pipeline. diff --git a/rllib/agents/sac/apex.py b/rllib/agents/sac/apex.py index b779502bf061..f8ea337da690 100644 --- a/rllib/agents/sac/apex.py +++ b/rllib/agents/sac/apex.py @@ -1,33 +1,5 @@ -import collections -import copy - -import ray -from ray.rllib.agents.dqn.apex import UpdateWorkerWeights -from ray.rllib.agents.dqn.dqn import calculate_rr_weights -from ray.rllib.agents.dqn.learner_thread import LearnerThread +from ray.rllib.agents.dqn.apex import apex_execution_plan from ray.rllib.agents.sac.sac import DEFAULT_CONFIG as SAC_CONFIG, SACTrainer -from ray.rllib.evaluation.worker_set import WorkerSet -from ray.rllib.execution.common import ( - STEPS_TRAINED_COUNTER, - _get_shared_metrics, -) -from ray.rllib.execution.concurrency_ops import Concurrently, Dequeue, Enqueue -from ray.rllib.execution.metric_ops import StandardMetricsReporting -from ray.rllib.execution.replay_buffer import ( - LocalReplayBuffer, - ReplayActor, ReplayBuffer, -) -from ray.rllib.execution.replay_ops import Replay, StoreToReplayBuffer -from ray.rllib.execution.rollout_ops import ParallelRollouts -from ray.rllib.execution.train_ops import UpdateTargetNetwork -from ray.rllib.utils.timer import TimerStat -from ray.rllib.utils.actors import create_colocated -from ray.util.iter import ParallelIteratorWorker -from ray.rllib.policy.sample_batch import ( - SampleBatch, - DEFAULT_POLICY_ID, - MultiAgentBatch, -) # yapf: disable # __sphinx_doc_begin__ @@ -70,242 +42,8 @@ # yapf: enable -class LocalVanillaReplayBuffer(LocalReplayBuffer): - """A replay buffer shard. - - Ray actors are single-threaded, so for scalability multiple replay actors - may be created to increase parallelism.""" - - def __init__( - self, - num_shards, - learning_starts, - buffer_size, - replay_batch_size, - prioritized_replay_alpha=0.6, - prioritized_replay_beta=0.4, - prioritized_replay_eps=1e-6, - multiagent_sync_replay=False, - ): - self.replay_starts = learning_starts // num_shards - self.buffer_size = buffer_size // num_shards - self.replay_batch_size = replay_batch_size - self.prioritized_replay_beta = prioritized_replay_beta - self.prioritized_replay_eps = prioritized_replay_eps - self.multiagent_sync_replay = multiagent_sync_replay - - def gen_replay(): - while True: - yield self.replay() - - ParallelIteratorWorker.__init__(self, gen_replay, False) - - def new_buffer(): - return ReplayBuffer(self.buffer_size) - - self.replay_buffers = collections.defaultdict(new_buffer) - - # Metrics - self.add_batch_timer = TimerStat() - self.replay_timer = TimerStat() - self.update_priorities_timer = TimerStat() - self.num_added = 0 - - # Make externally accessible for testing. - global _local_replay_buffer - _local_replay_buffer = self - # If set, return this instead of the usual data for testing. - self._fake_batch = None - - def replay(self): - if self._fake_batch: - fake_batch = SampleBatch(self._fake_batch) - return MultiAgentBatch({DEFAULT_POLICY_ID: fake_batch}, fake_batch.count) - - if self.num_added < self.replay_starts: - return None - - with self.replay_timer: - samples = {} - idxes = None - for policy_id, replay_buffer in self.replay_buffers.items(): - if self.multiagent_sync_replay: - if idxes is None: - idxes = replay_buffer.sample_idxes(self.replay_batch_size) - else: - idxes = replay_buffer.sample_idxes(self.replay_batch_size) - ( - obses_t, - actions, - rewards, - obses_tp1, - dones, - ) = replay_buffer.sample_with_idxes(idxes) - samples[policy_id] = SampleBatch( - { - "obs": obses_t, - "actions": actions, - "rewards": rewards, - "new_obs": obses_tp1, - "dones": dones, - # "weights": weights, - # "batch_indexes": batch_indexes, - } - ) - return MultiAgentBatch(samples, self.replay_batch_size) - - def stats(self, debug=False): - stat = { - "add_batch_time_ms": round(1000 * self.add_batch_timer.mean, 3), - "replay_time_ms": round(1000 * self.replay_timer.mean, 3), - } - for policy_id, replay_buffer in self.replay_buffers.items(): - stat.update( - {"policy_{}".format(policy_id): replay_buffer.stats(debug=debug)} - ) - return stat - - -VanillaReplayActor = ray.remote(num_cpus=0)(LocalVanillaReplayBuffer) - - -def async_execution_plan(workers: WorkerSet, config: dict): - # Create a number of replay buffer actors. - num_replay_buffer_shards = config["optimizer"]["num_replay_buffer_shards"] - replay_actor_cls = ReplayActor if config["prioritized_replay"] else VanillaReplayActor - replay_actors = create_colocated( - replay_actor_cls, - [ - num_replay_buffer_shards, - config["learning_starts"], - config["buffer_size"], - config["train_batch_size"], - config["prioritized_replay_alpha"], - config["prioritized_replay_beta"], - config["prioritized_replay_eps"], - ], - num_replay_buffer_shards, - ) - - # Start the learner thread. - learner_thread = LearnerThread(workers.local_worker()) - learner_thread.start() - - # Update experience priorities post learning. - def update_prio_and_stats(item: ("ActorHandle", dict, int)): - actor, prio_dict, count = item - if config["prioritized_replay"]: - actor.update_priorities.remote(prio_dict) - metrics = _get_shared_metrics() - # Manually update the steps trained counter since the learner thread - # is executing outside the pipeline. - metrics.counters[STEPS_TRAINED_COUNTER] += count - metrics.timers["learner_dequeue"] = learner_thread.queue_timer - metrics.timers["learner_grad"] = learner_thread.grad_timer - metrics.timers["learner_overall"] = learner_thread.overall_timer - - # We execute the following steps concurrently: - # (1) Generate rollouts and store them in our replay buffer actors. Update - # the weights of the worker that generated the batch. - parallel_rollouts_mode = config.get("parallel_rollouts_mode", "async") - num_async = config.get("parallel_rollouts_num_async") - # This could be set to None explicitly - if not num_async: - num_async = 2 - rollouts = ParallelRollouts( - workers, mode=parallel_rollouts_mode, num_async=num_async - ) - store_op = rollouts.for_each(StoreToReplayBuffer(actors=replay_actors)) - if config.get("execution_plan_custom_store_ops"): - custom_store_ops = config["execution_plan_custom_store_ops"] - store_op = store_op.for_each(custom_store_ops(workers, config)) - # Only need to update workers if there are remote workers. - if workers.remote_workers(): - store_op = store_op.zip_with_source_actor().for_each( - UpdateWorkerWeights( - learner_thread, - workers, - max_weight_sync_delay=(config["optimizer"]["max_weight_sync_delay"]), - ) - ) - - # (2) Read experiences from the replay buffer actors and send to the - # learner thread via its in-queue. - if config.get("before_learn_on_batch"): - before_learn_on_batch = config["before_learn_on_batch"] - before_learn_on_batch = before_learn_on_batch(workers, config) - else: - before_learn_on_batch = lambda b: b - replay_op = ( - Replay(actors=replay_actors, num_async=4) - .for_each(before_learn_on_batch) - .zip_with_source_actor() - .for_each(Enqueue(learner_thread.inqueue)) - ) - - # (3) Get priorities back from learner thread and apply them to the - # replay buffer actors. - update_op = ( - Dequeue(learner_thread.outqueue, check=learner_thread.is_alive) - .for_each(update_prio_and_stats) - .for_each( - UpdateTargetNetwork( - workers, config["target_network_update_freq"], by_steps_trained=True - ) - ) - ) - - if config["training_intensity"]: - # Execute (1), (2) with a fixed intensity ratio. - rr_weights = calculate_rr_weights(config) + ["*"] - merged_op = Concurrently( - [store_op, replay_op, update_op], - mode="round_robin", - output_indexes=[2], - round_robin_weights=rr_weights, - ) - else: - # Execute (1), (2), (3) asynchronously as fast as possible. Only output - # items from (3) since metrics aren't available before then. - merged_op = Concurrently( - [store_op, replay_op, update_op], mode="async", output_indexes=[2] - ) - - # Add in extra replay and learner metrics to the training result. - def add_apex_metrics(result): - replay_stats = ray.get( - replay_actors[0].stats.remote(config["optimizer"].get("debug")) - ) - exploration_infos = workers.foreach_trainable_policy( - lambda p, _: p.get_exploration_info() - ) - result["info"].update( - { - "exploration_infos": exploration_infos, - "learner_queue": learner_thread.learner_queue_size.stats(), - "learner": copy.deepcopy(learner_thread.stats), - "replay_shard_0": replay_stats, - } - ) - return result - - # Only report metrics from the workers with the lowest 1/3 of epsilons. - selected_workers = None - worker_amount_to_collect_metrics_from = config.get( - "worker_amount_to_collect_metrics_from" - ) - if worker_amount_to_collect_metrics_from: - selected_workers = workers.remote_workers()[ - -len(workers.remote_workers()) // worker_amount_to_collect_metrics_from : - ] - - return StandardMetricsReporting( - merged_op, workers, config, selected_workers=selected_workers - ).for_each(add_apex_metrics) - - ApexSACTrainer = SACTrainer.with_updates( name="APEX_SAC", default_config=APEX_SAC_DEFAULT_CONFIG, - execution_plan=async_execution_plan, + execution_plan=apex_execution_plan, ) diff --git a/rllib/execution/replay_buffer.py b/rllib/execution/replay_buffer.py index a1ecab230fa3..25d6e57d6b44 100644 --- a/rllib/execution/replay_buffer.py +++ b/rllib/execution/replay_buffer.py @@ -416,4 +416,109 @@ def stats(self, debug=False): return stat +# Visible for testing. +_local_vanilla_replay_buffer = None + + +class LocalVanillaReplayBuffer(LocalReplayBuffer): + """A replay buffer shard. + + Ray actors are single-threaded, so for scalability multiple replay actors + may be created to increase parallelism.""" + + def __init__( + self, + num_shards, + learning_starts, + buffer_size, + replay_batch_size, + prioritized_replay_alpha=0.6, + prioritized_replay_beta=0.4, + prioritized_replay_eps=1e-6, + multiagent_sync_replay=False, + ): + self.replay_starts = learning_starts // num_shards + self.buffer_size = buffer_size // num_shards + self.replay_batch_size = replay_batch_size + self.prioritized_replay_beta = prioritized_replay_beta + self.prioritized_replay_eps = prioritized_replay_eps + self.multiagent_sync_replay = multiagent_sync_replay + + def gen_replay(): + while True: + yield self.replay() + + ParallelIteratorWorker.__init__(self, gen_replay, False) + + def new_buffer(): + return ReplayBuffer(self.buffer_size) + + self.replay_buffers = collections.defaultdict(new_buffer) + + # Metrics + self.add_batch_timer = TimerStat() + self.replay_timer = TimerStat() + self.update_priorities_timer = TimerStat() + self.num_added = 0 + + # Make externally accessible for testing. + global _local_vanilla_replay_buffer + _local_vanilla_replay_buffer = self + # If set, return this instead of the usual data for testing. + self._fake_batch = None + + @staticmethod + def get_instance_for_testing(): + global _local_vanilla_replay_buffer + return _local_vanilla_replay_buffer + + def replay(self): + if self._fake_batch: + fake_batch = SampleBatch(self._fake_batch) + return MultiAgentBatch({DEFAULT_POLICY_ID: fake_batch}, fake_batch.count) + + if self.num_added < self.replay_starts: + return None + + with self.replay_timer: + samples = {} + idxes = None + for policy_id, replay_buffer in self.replay_buffers.items(): + if self.multiagent_sync_replay: + if idxes is None: + idxes = replay_buffer.sample_idxes(self.replay_batch_size) + else: + idxes = replay_buffer.sample_idxes(self.replay_batch_size) + ( + obses_t, + actions, + rewards, + obses_tp1, + dones, + ) = replay_buffer.sample_with_idxes(idxes) + samples[policy_id] = SampleBatch( + { + "obs": obses_t, + "actions": actions, + "rewards": rewards, + "new_obs": obses_tp1, + "dones": dones, + } + ) + return MultiAgentBatch(samples, self.replay_batch_size) + + def stats(self, debug=False): + stat = { + "add_batch_time_ms": round(1000 * self.add_batch_timer.mean, 3), + "replay_time_ms": round(1000 * self.replay_timer.mean, 3), + } + for policy_id, replay_buffer in self.replay_buffers.items(): + stat.update( + {"policy_{}".format(policy_id): replay_buffer.stats(debug=debug)} + ) + return stat + + +VanillaReplayActor = ray.remote(num_cpus=0)(LocalVanillaReplayBuffer) + ReplayActor = ray.remote(num_cpus=0)(LocalReplayBuffer)