diff --git a/doc/conf.py b/doc/conf.py index 3a8c5ac5155f..d6c124f39153 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -18,7 +18,7 @@ # These 4 lines added to enable ReadTheDocs to work. import mock -MOCK_MODULES = ["libraylib", "IPython", "numpy", "funcsigs", "subprocess32", "protobuf", "colorama", "graphviz", "cloudpickle", "ray.internal.graph_pb2"] +MOCK_MODULES = ["numpy", "funcsigs", "colorama", "cloudpickle"] for mod_name in MOCK_MODULES: sys.modules[mod_name] = mock.Mock() diff --git a/doc/install-on-macosx.md b/doc/install-on-macosx.md index a45784e66699..f21cb4f3daa9 100644 --- a/doc/install-on-macosx.md +++ b/doc/install-on-macosx.md @@ -16,10 +16,9 @@ install [Anaconda](https://www.continuum.io/downloads). ``` brew update -brew install git cmake automake autoconf libtool boost graphviz +brew install git cmake automake autoconf libtool boost sudo easy_install pip -sudo pip install ipython --user -sudo pip install numpy funcsigs subprocess32 protobuf colorama graphviz --ignore-installed six +sudo pip install numpy funcsigs colorama --ignore-installed six sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. ``` diff --git a/doc/install-on-ubuntu.md b/doc/install-on-ubuntu.md index 8d832fb736f7..b9cfc3c2d29a 100644 --- a/doc/install-on-ubuntu.md +++ b/doc/install-on-ubuntu.md @@ -14,8 +14,8 @@ First install the dependencies. We currently do not support Python 3. ``` sudo apt-get update -sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip graphviz -sudo pip install ipython funcsigs subprocess32 protobuf colorama graphviz +sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip +sudo pip install funcsigs colorama sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. ``` diff --git a/install-dependencies.sh b/install-dependencies.sh index 8c283e3efc41..0fcf5f28c5a7 100755 --- a/install-dependencies.sh +++ b/install-dependencies.sh @@ -30,15 +30,14 @@ fi if [[ $platform == "linux" ]]; then # These commands must be kept in sync with the installation instructions. sudo apt-get update - sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip graphviz - sudo pip install ipython funcsigs subprocess32 protobuf colorama graphviz redis + sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip + sudo pip install funcsigs colorama redis sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. elif [[ $platform == "macosx" ]]; then # These commands must be kept in sync with the installation instructions. - brew install git cmake automake autoconf libtool boost graphviz + brew install git cmake automake autoconf libtool boost sudo easy_install pip - sudo pip install ipython --user - sudo pip install numpy funcsigs subprocess32 protobuf colorama graphviz redis --ignore-installed six + sudo pip install numpy funcsigs colorama redis --ignore-installed six sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. fi diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 75a9b7a6dd54..b6c08da2977a 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -9,6 +9,7 @@ # Ray modules import config +import plasma # all_processes is a list of the scheduler, object store, and worker processes # that have been started by this services module if Ray is being used in local @@ -80,15 +81,8 @@ def start_objstore(node_ip_address, redis_address, cleanup): store_name = "/tmp/ray_plasma_store{}".format(random_name()) p1 = subprocess.Popen([plasma_store_executable, "-s", store_name]) - plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_manager") manager_name = "/tmp/ray_plasma_manager{}".format(random_name()) - manager_port = new_port() - p2 = subprocess.Popen([plasma_manager_executable, - "-s", store_name, - "-m", manager_name, - "-h", node_ip_address, - "-p", str(manager_port), - "-r", redis_address]) + p2, manager_port = plasma.start_plasma_manager(store_name, manager_name, redis_address) if cleanup: all_processes.append(p1) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index a3370cf9e801..5a0b33d32cca 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -1086,8 +1086,7 @@ def process_task(task): # wrapping these lines in a function should cause the lo # We record the traceback and notify the scheduler. traceback_str = format_error_message(traceback.format_exc()) error_key = "ReusableVariableReinitializeError:{}".format(random_string()) - worker.redis_client.hmset(error_key, {"task_instance_id": "NOTIMPLEMENTED", - "task_id": "NOTIMPLEMENTED", + worker.redis_client.hmset(error_key, {"task_id": "NOTIMPLEMENTED", "function_id": function_id.id(), "function_name": function_name, "message": traceback_str}) @@ -1151,7 +1150,6 @@ def _export_reusable_variable(name, reusable, worker=global_worker): """ if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]: raise Exception("_export_reusable_variable can only be called on a driver.") - reusable_variable_id = name key = "ReusableVariables:{}".format(reusable_variable_id) worker.redis_client.hmset(key, {"name": name, @@ -1161,11 +1159,10 @@ def _export_reusable_variable(name, reusable, worker=global_worker): worker.driver_export_counter += 1 def export_remote_function(function_id, func_name, func, num_return_vals, worker=global_worker): + if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]: + raise Exception("export_remote_function can only be called on a driver.") key = "RemoteFunction:{}".format(function_id.id()) - worker.num_return_vals[function_id.id()] = num_return_vals - - pickled_func = pickling.dumps(func) worker.redis_client.hmset(key, {"function_id": function_id.id(), "name": func_name, diff --git a/src/plasma/lib/python/plasma.py b/src/plasma/lib/python/plasma.py index a869d136a5a7..c838cb247a63 100644 --- a/src/plasma/lib/python/plasma.py +++ b/src/plasma/lib/python/plasma.py @@ -1,6 +1,8 @@ +import ctypes import os +import random import socket -import ctypes +import subprocess import time Addr = ctypes.c_ubyte * 4 @@ -287,3 +289,48 @@ def get_next_notification(self): assert len(message_data) == PLASMA_ID_SIZE break return message_data + +def start_plasma_manager(store_name, manager_name, redis_address, num_retries=5, use_valgrind=False): + """Start a plasma manager and return the ports it listens on. + + Args: + store_name (str): The name of the plasma store socket. + manager_name (str): The name of the plasma manager socket. + redis_address (str): The address of the Redis server. + use_valgrind (bool): True if the Plasma manager should be started inside of + valgrind and False otherwise. + + Returns: + The process ID of the Plasma manager and the port that the manager is + listening on. + + Raises: + Exception: An exception is raised if the manager could not be properly + started. + """ + plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_manager") + port = None + process = None + counter = 0 + while counter < num_retries: + if counter > 0: + print("Plasma manager failed to start, retrying now.") + port = random.randint(10000, 65535) + command = [plasma_manager_executable, + "-s", store_name, + "-m", manager_name, + "-h", "127.0.0.1", + "-p", str(port), + "-r", redis_address] + if use_valgrind: + process = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) + else: + process = subprocess.Popen(command) + # This sleep is critical. If the plasma_manager fails to start because the + # port is already in use, then we need it to fail within 0.1 seconds. + time.sleep(0.1) + # See if the process has terminated + if process.poll() == None: + return process, port + counter += 1 + raise Exception("Couldn't start plasma manager.") diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index 9613126edf3c..caeaa79fe911 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -53,32 +53,6 @@ def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffe unit_test.assertEqual(client1.get_metadata(object_id)[:], client2.get_metadata(object_id)[:]) -def start_plasma_manager(store_name, host_name, redis_port, use_valgrind=False): - """Start a plasma manager and return the ports it listens on.""" - plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_manager") - num_retries = 5 - port = None - process = None - while num_retries >= 0: - port = random.randint(10000, 50000) - command = [plasma_manager_executable, - "-s", store_name, - "-m", host_name, - "-h", "127.0.0.1", - "-p", str(port), - "-r", "{addr}:{port}".format(addr="127.0.0.1", port=redis_port)] - print("Try to start plasma manager on port " + str(port)) - if use_valgrind: - process = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) - else: - process = subprocess.Popen(command) - time.sleep(0.1) - # See if the process has terminated - if process.poll() == None: - return process, port - num_retries = num_retries - 1 - raise Exception("Couldn't start plasma manager") - # Check if the redis-server binary is present. redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis-3.2.3/src/redis-server") if not os.path.exists(redis_path): @@ -266,8 +240,9 @@ def setUp(self): time.sleep(0.1) # Start two PlasmaManagers. - self.p4, self.port1 = start_plasma_manager(store_name1, manager_name1, redis_port, USE_VALGRIND) - self.p5, self.port2 = start_plasma_manager(store_name2, manager_name2, redis_port, USE_VALGRIND) + redis_address = "{}:{}".format("127.0.0.1", redis_port) + self.p4, self.port1 = plasma.start_plasma_manager(store_name1, manager_name1, redis_address, use_valgrind=USE_VALGRIND) + self.p5, self.port2 = plasma.start_plasma_manager(store_name2, manager_name2, redis_address, use_valgrind=USE_VALGRIND) # Connect two PlasmaClients. self.client1 = plasma.PlasmaClient(store_name1, manager_name1)