Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

# These 4 lines added to enable ReadTheDocs to work.
import mock
MOCK_MODULES = ["libraylib", "IPython", "numpy", "funcsigs", "subprocess32", "protobuf", "colorama", "graphviz", "cloudpickle", "ray.internal.graph_pb2"]
MOCK_MODULES = ["numpy", "funcsigs", "colorama", "cloudpickle"]
for mod_name in MOCK_MODULES:
sys.modules[mod_name] = mock.Mock()

Expand Down
5 changes: 2 additions & 3 deletions doc/install-on-macosx.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ install [Anaconda](https://www.continuum.io/downloads).

```
brew update
brew install git cmake automake autoconf libtool boost graphviz
brew install git cmake automake autoconf libtool boost
sudo easy_install pip
sudo pip install ipython --user
sudo pip install numpy funcsigs subprocess32 protobuf colorama graphviz --ignore-installed six
sudo pip install numpy funcsigs colorama --ignore-installed six
sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples.
```

Expand Down
4 changes: 2 additions & 2 deletions doc/install-on-ubuntu.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ First install the dependencies. We currently do not support Python 3.

```
sudo apt-get update
sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip graphviz
sudo pip install ipython funcsigs subprocess32 protobuf colorama graphviz
sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip
sudo pip install funcsigs colorama
sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples.
```

Expand Down
9 changes: 4 additions & 5 deletions install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ fi
if [[ $platform == "linux" ]]; then
# These commands must be kept in sync with the installation instructions.
sudo apt-get update
sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip graphviz
sudo pip install ipython funcsigs subprocess32 protobuf colorama graphviz redis
sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip
sudo pip install funcsigs colorama redis
sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples.
elif [[ $platform == "macosx" ]]; then
# These commands must be kept in sync with the installation instructions.
brew install git cmake automake autoconf libtool boost graphviz
brew install git cmake automake autoconf libtool boost
sudo easy_install pip
sudo pip install ipython --user
sudo pip install numpy funcsigs subprocess32 protobuf colorama graphviz redis --ignore-installed six
sudo pip install numpy funcsigs colorama redis --ignore-installed six
sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples.
fi

Expand Down
10 changes: 2 additions & 8 deletions lib/python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

# Ray modules
import config
import plasma

# all_processes is a list of the scheduler, object store, and worker processes
# that have been started by this services module if Ray is being used in local
Expand Down Expand Up @@ -80,15 +81,8 @@ def start_objstore(node_ip_address, redis_address, cleanup):
store_name = "/tmp/ray_plasma_store{}".format(random_name())
p1 = subprocess.Popen([plasma_store_executable, "-s", store_name])

plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_manager")
manager_name = "/tmp/ray_plasma_manager{}".format(random_name())
manager_port = new_port()
p2 = subprocess.Popen([plasma_manager_executable,
"-s", store_name,
"-m", manager_name,
"-h", node_ip_address,
"-p", str(manager_port),
"-r", redis_address])
p2, manager_port = plasma.start_plasma_manager(store_name, manager_name, redis_address)

if cleanup:
all_processes.append(p1)
Expand Down
9 changes: 3 additions & 6 deletions lib/python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,8 +1086,7 @@ def process_task(task): # wrapping these lines in a function should cause the lo
# We record the traceback and notify the scheduler.
traceback_str = format_error_message(traceback.format_exc())
error_key = "ReusableVariableReinitializeError:{}".format(random_string())
worker.redis_client.hmset(error_key, {"task_instance_id": "NOTIMPLEMENTED",
"task_id": "NOTIMPLEMENTED",
worker.redis_client.hmset(error_key, {"task_id": "NOTIMPLEMENTED",
"function_id": function_id.id(),
"function_name": function_name,
"message": traceback_str})
Expand Down Expand Up @@ -1151,7 +1150,6 @@ def _export_reusable_variable(name, reusable, worker=global_worker):
"""
if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]:
raise Exception("_export_reusable_variable can only be called on a driver.")

reusable_variable_id = name
key = "ReusableVariables:{}".format(reusable_variable_id)
worker.redis_client.hmset(key, {"name": name,
Expand All @@ -1161,11 +1159,10 @@ def _export_reusable_variable(name, reusable, worker=global_worker):
worker.driver_export_counter += 1

def export_remote_function(function_id, func_name, func, num_return_vals, worker=global_worker):
if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]:
raise Exception("export_remote_function can only be called on a driver.")
key = "RemoteFunction:{}".format(function_id.id())

worker.num_return_vals[function_id.id()] = num_return_vals


pickled_func = pickling.dumps(func)
worker.redis_client.hmset(key, {"function_id": function_id.id(),
"name": func_name,
Expand Down
49 changes: 48 additions & 1 deletion src/plasma/lib/python/plasma.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import ctypes
import os
import random
import socket
import ctypes
import subprocess
import time

Addr = ctypes.c_ubyte * 4
Expand Down Expand Up @@ -287,3 +289,48 @@ def get_next_notification(self):
assert len(message_data) == PLASMA_ID_SIZE
break
return message_data

def start_plasma_manager(store_name, manager_name, redis_address, num_retries=5, use_valgrind=False):
"""Start a plasma manager and return the ports it listens on.

Args:
store_name (str): The name of the plasma store socket.
manager_name (str): The name of the plasma manager socket.
redis_address (str): The address of the Redis server.
use_valgrind (bool): True if the Plasma manager should be started inside of
valgrind and False otherwise.

Returns:
The process ID of the Plasma manager and the port that the manager is
listening on.

Raises:
Exception: An exception is raised if the manager could not be properly
started.
"""
plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_manager")
port = None
process = None
counter = 0
while counter < num_retries:
if counter > 0:
print("Plasma manager failed to start, retrying now.")
port = random.randint(10000, 65535)
command = [plasma_manager_executable,
"-s", store_name,
"-m", manager_name,
"-h", "127.0.0.1",
"-p", str(port),
"-r", redis_address]
if use_valgrind:
process = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command)
else:
process = subprocess.Popen(command)
# This sleep is critical. If the plasma_manager fails to start because the
# port is already in use, then we need it to fail within 0.1 seconds.
time.sleep(0.1)
# See if the process has terminated
if process.poll() == None:
return process, port
counter += 1
raise Exception("Couldn't start plasma manager.")
31 changes: 3 additions & 28 deletions src/plasma/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,6 @@ def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffe
unit_test.assertEqual(client1.get_metadata(object_id)[:],
client2.get_metadata(object_id)[:])

def start_plasma_manager(store_name, host_name, redis_port, use_valgrind=False):
"""Start a plasma manager and return the ports it listens on."""
plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_manager")
num_retries = 5
port = None
process = None
while num_retries >= 0:
port = random.randint(10000, 50000)
command = [plasma_manager_executable,
"-s", store_name,
"-m", host_name,
"-h", "127.0.0.1",
"-p", str(port),
"-r", "{addr}:{port}".format(addr="127.0.0.1", port=redis_port)]
print("Try to start plasma manager on port " + str(port))
if use_valgrind:
process = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command)
else:
process = subprocess.Popen(command)
time.sleep(0.1)
# See if the process has terminated
if process.poll() == None:
return process, port
num_retries = num_retries - 1
raise Exception("Couldn't start plasma manager")

# Check if the redis-server binary is present.
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis-3.2.3/src/redis-server")
if not os.path.exists(redis_path):
Expand Down Expand Up @@ -266,8 +240,9 @@ def setUp(self):
time.sleep(0.1)

# Start two PlasmaManagers.
self.p4, self.port1 = start_plasma_manager(store_name1, manager_name1, redis_port, USE_VALGRIND)
self.p5, self.port2 = start_plasma_manager(store_name2, manager_name2, redis_port, USE_VALGRIND)
redis_address = "{}:{}".format("127.0.0.1", redis_port)
self.p4, self.port1 = plasma.start_plasma_manager(store_name1, manager_name1, redis_address, use_valgrind=USE_VALGRIND)
self.p5, self.port2 = plasma.start_plasma_manager(store_name2, manager_name2, redis_address, use_valgrind=USE_VALGRIND)

# Connect two PlasmaClients.
self.client1 = plasma.PlasmaClient(store_name1, manager_name1)
Expand Down