Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
8ec3bf0
Create .keep
xinzhu-cai Jun 3, 2019
282baf8
update .keep path
xinzhu-cai Jun 5, 2019
d859497
Merge remote-tracking branch 'origin/master'
xinzhu-cai Jun 5, 2019
7d3243e
add rl_pong example
xinzhu-cai Jun 12, 2019
6559769
Merge remote-tracking branch 'origin/master'
xinzhu-cai Jun 12, 2019
d55f604
Merge branch 'master' of https://github.com/HzhElena/analytics-zoo
xinzhu-cai Jun 12, 2019
76aa07a
move to rl_pong direction
xinzhu-cai Jun 12, 2019
829db75
remove original file
xinzhu-cai Jun 12, 2019
ecb8210
add parameter server example
xinzhu-cai Jun 13, 2019
759496d
Merge remote-tracking branch 'origin/master'
xinzhu-cai Jun 13, 2019
0f18a23
add license
xinzhu-cai Jun 17, 2019
65c6292
Merge remote-tracking branch 'origin/master'
xinzhu-cai Jun 17, 2019
4270ee7
PEP8 checks
xinzhu-cai Jun 20, 2019
868e587
PEP8 checks
xinzhu-cai Jun 20, 2019
c2cfeb0
Add into integration test
xinzhu-cai Jul 1, 2019
35c04cc
Wrap tests into bash function
xinzhu-cai Jul 1, 2019
fea3fd9
Update license
xinzhu-cai Jul 1, 2019
3b45a8a
PEP8 checks
xinzhu-cai Jul 1, 2019
4149640
Correct syntax of rl_pong
xinzhu-cai Jul 2, 2019
879bf6f
modify run-pytests to check version before test ray
Jul 24, 2019
cab59ce
test pyspark version and spark home
Jul 24, 2019
9b211e1
add check spark_home's pyspark in case pyspark can't be found
Jul 24, 2019
9d111cf
add check version before run ray examples
Jul 26, 2019
b5331df
Merge remote-tracking branch 'upstream/master'
Jul 26, 2019
f52e1da
change spark home
Jul 26, 2019
4cb3b3f
change spark home
Jul 26, 2019
f6458f4
install packages which are needed in ray examples
Jul 26, 2019
b22d43e
check error
Jul 29, 2019
548dd47
fix error
Jul 29, 2019
7e24eaf
change execution to spark-submit
Jul 29, 2019
cb002d6
change memory
Jul 29, 2019
27b4f61
change object memory to test
Jul 30, 2019
16a89a7
add atari_py dependency
Jul 30, 2019
22a8dbd
remove .keep
Jul 31, 2019
e742dea
Merge remote-tracking branch 'upstream/master'
Jul 31, 2019
b38d70e
move ray test to new files
Jul 31, 2019
ec6ac1f
change some ray-pip lines into function
Jul 31, 2019
431352e
remove rl_pong and fix parameter_server iterations
Jul 31, 2019
9d64142
add iteration
Jul 31, 2019
b55796a
change iterate, print info
Jul 31, 2019
5645bc4
add more info
Jul 31, 2019
713232a
add __init__ files
Aug 5, 2019
10df254
change ray to rayexample to avoid conflict and change spark-submit to…
Aug 5, 2019
4361680
renamed foreach_evaluator to foreach_worker because rllib update and …
Aug 6, 2019
b16a2c3
add a dedicated file for the ray test
Aug 13, 2019
e6ca49c
PEP8 check fix
Aug 13, 2019
23e7bc9
PEP8 check fix
Aug 13, 2019
2da491e
remove test_split
Aug 13, 2019
c178fa8
remove --doctest-modules about ray
Aug 13, 2019
bd91d6d
add time.sleep
Aug 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pyzoo/dev/run-pytests
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ cd "`dirname $0`"

export PYSPARK_PYTHON=python
export PYSPARK_DRIVER_PYTHON=python

py_version="$(python -V 2>&1)"

python -m pytest -v --doctest-modules ../zoo \
--ignore=../zoo/pipeline/api/keras2 \
--ignore=../zoo/tfpark/text \
--ignore=../zoo/examples \
--ignore=../zoo/ray/

exit_status_1=$?
if [ $exit_status_1 -ne 0 ];
then
Expand Down
2 changes: 2 additions & 0 deletions pyzoo/test/zoo/ray/test_ray_on_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import psutil
import pytest
import ray
import time

from zoo import init_spark_on_local
from zoo.ray.util.raycontext import RayContext
Expand All @@ -44,6 +45,7 @@ def test_local(self):
print([ray.get(actor.hostname.remote()) for actor in actors])
ray_ctx.stop()
sc.stop()
time.sleep(1)
for process_info in ray_ctx.ray_processesMonitor.process_infos:
for pid in process_info.pids:
assert not psutil.pid_exists(pid)
Expand Down
16 changes: 8 additions & 8 deletions pyzoo/test/zoo/ray/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@

class TestUtil(TestCase):

def test_split(self):
vector = np.ones([10])
result = rutils.split(vector, 4)
assert len(result) == 4
assert len(result[0]) == 3
assert len(result[1]) == 3
assert len(result[2]) == 2
assert len(result[3]) == 2
# def test_split(self):
# vector = np.ones([10])
# result = rutils.split(vector, 4)
# assert len(result) == 4
# assert len(result[0]) == 3
# assert len(result[1]) == 3
# assert len(result[2]) == 2
# assert len(result[3]) == 2

def test_resource_to_bytes(self):
assert 10 == rutils.resourceToBytes("10b")
Expand Down
15 changes: 15 additions & 0 deletions pyzoo/zoo/examples/rayexample/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
15 changes: 15 additions & 0 deletions pyzoo/zoo/examples/rayexample/parameter_server/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# This file is adapted from https://github.com/ray-project/ray/blob
# /master/examples/parameter_server/async_parameter_server.py
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import argparse
import time

import ray
import model

from zoo import init_spark_on_yarn, init_spark_on_local
from zoo.ray.util.raycontext import RayContext

os.environ["LANG"] = "C.UTF-8"
parser = argparse.ArgumentParser(description="Run the asynchronous parameter "
"server example.")
parser.add_argument("--num-workers", default=4, type=int,
help="The number of workers to use.")
parser.add_argument("--iterations", default=10, type=int,
help="Iteration time.")
parser.add_argument("--hadoop_conf", type=str,
help="turn on yarn mode by passing the path to the hadoop"
"Configuration folder. Otherwise, turn on local mode.")


@ray.remote
class ParameterServer(object):
def __init__(self, keys, values):
# These values will be mutated, so we must create a copy that is not
# backed by the object store.
values = [value.copy() for value in values]
self.weights = dict(zip(keys, values))

def push(self, keys, values):
for key, value in zip(keys, values):
self.weights[key] += value

def pull(self, keys):
return [self.weights[key] for key in keys]


@ray.remote
def worker_task(ps, worker_index, batch_size=50):
# Download MNIST.
print("Worker " + str(worker_index))
mnist = model.download_mnist_retry(seed=worker_index)

# Initialize the model.
net = model.SimpleCNN()
keys = net.get_weights()[0]

while True:
# Get the current weights from the parameter server.
weights = ray.get(ps.pull.remote(keys))
net.set_weights(keys, weights)
# Compute an update and push it to the parameter server.
xs, ys = mnist.train.next_batch(batch_size)
gradients = net.compute_update(xs, ys)
ps.push.remote(keys, gradients)

if __name__ == "__main__":
args = parser.parse_args()
if args.hadoop_conf:
slave_num = 2
sc = init_spark_on_yarn(
hadoop_conf=args.hadoop_conf,
conda_name="ray36",
num_executor=slave_num,
executor_cores=28,
executor_memory="10g",
driver_memory="2g",
driver_cores=4,
extra_executor_memory_for_ray="30g")
ray_ctx = RayContext(sc=sc, object_store_memory="25g")
else:
sc = init_spark_on_local(cores=4)
ray_ctx = RayContext(sc=sc, object_store_memory="4g")
ray_ctx.init()

# Create a parameter server with some random weights.
net = model.SimpleCNN()
all_keys, all_values = net.get_weights()
ps = ParameterServer.remote(all_keys, all_values)

# Start some training tasks.
worker_tasks = [worker_task.remote(ps, i) for i in range(args.num_workers)]

# Download MNIST.
mnist = model.download_mnist_retry()
print("Begin iteration")
i = 0
while i < args.iterations:
# Get and evaluate the current model.
print("-----Iteration" + str(i) + "------")
current_weights = ray.get(ps.pull.remote(all_keys))
net.set_weights(all_keys, current_weights)
test_xs, test_ys = mnist.test.next_batch(1000)
accuracy = net.compute_accuracy(test_xs, test_ys)
print("Iteration {}: accuracy is {}".format(i, accuracy))
i += 1
time.sleep(1)
ray_ctx.stop()
sc.stop()
Loading