From 44ada470e934424bf582e91f94b80774e5180db4 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 25 Aug 2017 16:12:08 -0700 Subject: [PATCH 01/14] export symbols --- python/pyarrow/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 68ae017ce71..abbc125c5ad 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -90,7 +90,10 @@ # Serialization from pyarrow.lib import (deserialize_from, deserialize, serialize, serialize_to, read_serialized, - SerializedPyObject) + SerializedPyObject, + # This is temporary + register_type, type_to_type_id, whitelisted_types, + types_to_pickle, custom_serializers, custom_deserializers) from pyarrow.filesystem import FileSystem, LocalFileSystem From e1924a45f5024de589d6ec71436d57acf42f64db Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 25 Aug 2017 22:51:49 -0700 Subject: [PATCH 02/14] add put and get --- python/pyarrow/lib.pxd | 9 +++++++++ python/pyarrow/plasma.pyx | 20 ++++++++++++++++++-- python/pyarrow/serialization.pxi | 5 ----- python/pyarrow/tests/test_plasma.py | 8 +++++++- 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 48a58f7b826..c377be37e20 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -330,6 +330,15 @@ cdef class NativeFile: cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader) cdef get_writer(object source, shared_ptr[OutputStream]* writer) +cdef class SerializedPyObject: + cdef: + CSerializedPyObject data + + cdef readonly: + object base + + cdef _write_to(self, OutputStream* stream) + cdef public object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf) cdef public object pyarrow_wrap_data_type(const shared_ptr[CDataType]& type) cdef public object pyarrow_wrap_field(const shared_ptr[CField]& field) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index befa283d85b..9e094ac1bfa 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -26,7 +26,9 @@ from libcpp.vector cimport vector as c_vector from libc.stdint cimport int64_t, uint8_t, uintptr_t from cpython.pycapsule cimport * -from pyarrow.lib cimport Buffer, NativeFile, check_status +import pyarrow + +from pyarrow.lib cimport Buffer, NativeFile, check_status, SerializedPyObject from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer, CFixedSizeBufferWriter, CStatus) @@ -296,7 +298,7 @@ cdef class PlasmaClient: ---------- object_ids : list A list of ObjectIDs used to identify some objects. - timeout_ms :int + timeout_ms : int The number of milliseconds that the get call should block before timing out and returning. Pass -1 if the call should block and 0 if the call should return immediately. @@ -588,3 +590,17 @@ def connect(store_socket_name, manager_socket_name, int release_delay, result.manager_socket_name, release_delay, num_retries)) return result + +def put(PlasmaClient client, value): + cdef ObjectID object_id = ObjectID.from_random() + cdef SerializedPyObject serialized = pyarrow.serialize(value) + buffer = client.create(object_id, serialized.total_bytes) + serialized.write_to(buffer) + return object_id + +def get(PlasmaClient client, object_ids, timeout_ms=-1): + results = [] + buffers = client.get(object_ids, timeout_ms) + for buffer in buffers: + results.append(pyarrow.deserialize(buffer)) + return results diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index 3ee34eed5ed..a38a527d3e5 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -135,11 +135,6 @@ cdef class SerializedPyObject: """ Arrow-serialized representation of Python object """ - cdef: - CSerializedPyObject data - - cdef readonly: - object base property total_bytes: diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 04162bbbbad..2b4e063b236 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -119,7 +119,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, """ if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") - plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store") + plasma_store_executable = "plasma_store" plasma_store_name = "/tmp/plasma_store{}".format(random_name()) command = [plasma_store_executable, "-s", plasma_store_name, @@ -273,6 +273,12 @@ def test_get(self): else: assert results[i] is None + def test_put_and_get(self): + value = ["hello", "world", 3, 1.0] + object_id = pa.plasma.put(self.plasma_client, value) + [result] = pa.plasma.get(self.plasma_client, [object_id]) + assert result == value + def test_store_arrow_objects(self): data = np.random.randn(10, 4) # Write an arrow object. From 3518c71a1c7c9fa3d3ee20264bdc09c906405ec4 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 26 Aug 2017 16:09:21 -0700 Subject: [PATCH 03/14] fix --- python/pyarrow/__init__.py | 4 +++- python/pyarrow/plasma.pyx | 12 +++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index abbc125c5ad..3e269d13e52 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -85,12 +85,14 @@ ArrowIOError, ArrowMemoryError, ArrowNotImplementedError, - ArrowTypeError) + ArrowTypeError, + PlasmaObjectExists) # Serialization from pyarrow.lib import (deserialize_from, deserialize, serialize, serialize_to, read_serialized, SerializedPyObject, + SerializationException, DeserializationException, # This is temporary register_type, type_to_type_id, whitelisted_types, types_to_pickle, custom_serializers, custom_deserializers) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 9e094ac1bfa..51300039a44 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -591,12 +591,14 @@ def connect(store_socket_name, manager_socket_name, int release_delay, release_delay, num_retries)) return result -def put(PlasmaClient client, value): - cdef ObjectID object_id = ObjectID.from_random() +def put(PlasmaClient client, value, object_id=None): + cdef ObjectID id = object_id if object_id else ObjectID.from_random() cdef SerializedPyObject serialized = pyarrow.serialize(value) - buffer = client.create(object_id, serialized.total_bytes) - serialized.write_to(buffer) - return object_id + buffer = client.create(id, serialized.total_bytes) + stream = pyarrow.FixedSizeBufferOutputStream(buffer) + stream.set_memcopy_threads(4) + serialized.write_to(stream) + return id def get(PlasmaClient client, object_ids, timeout_ms=-1): results = [] From eb9694a0163b0da832e4be6f485219db79d8f0e8 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 26 Aug 2017 17:08:50 -0700 Subject: [PATCH 04/14] implement timeouts --- python/pyarrow/plasma.pyx | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 51300039a44..47dd790d80e 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -160,6 +160,13 @@ cdef class ObjectID: return self.data.binary() +cdef class ObjectNotAvailable: + """ + Placeholder for an object that was not available within the given timeout. + """ + pass + + cdef class PlasmaBuffer(Buffer): """ This is the type returned by calls to get with a PlasmaClient. @@ -598,11 +605,17 @@ def put(PlasmaClient client, value, object_id=None): stream = pyarrow.FixedSizeBufferOutputStream(buffer) stream.set_memcopy_threads(4) serialized.write_to(stream) + client.seal(id) return id def get(PlasmaClient client, object_ids, timeout_ms=-1): results = [] buffers = client.get(object_ids, timeout_ms) - for buffer in buffers: - results.append(pyarrow.deserialize(buffer)) + for i in range(len(object_ids)): + # buffers[i] is None if this object was not available within the + # timeout + if buffers[i]: + results.append(pyarrow.deserialize(buffers[i])) + else: + results.append(ObjectNotAvailable) return results From c0449543dc3723312e61f5513cf32fe34d7e5d5b Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 26 Aug 2017 17:28:40 -0700 Subject: [PATCH 05/14] add documentation --- python/pyarrow/__init__.py | 5 +--- python/pyarrow/plasma.pyx | 39 ++++++++++++++++++++++++++++- python/pyarrow/tests/test_plasma.py | 4 +++ 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 3e269d13e52..985e262bced 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -92,10 +92,7 @@ from pyarrow.lib import (deserialize_from, deserialize, serialize, serialize_to, read_serialized, SerializedPyObject, - SerializationException, DeserializationException, - # This is temporary - register_type, type_to_type_id, whitelisted_types, - types_to_pickle, custom_serializers, custom_deserializers) + SerializationException, DeserializationException) from pyarrow.filesystem import FileSystem, LocalFileSystem diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 47dd790d80e..981cc933c32 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -585,7 +585,7 @@ def connect(store_socket_name, manager_socket_name, int release_delay, The maximum number of objects that the client will keep and delay releasing (for caching reasons). num_retries : int, default -1 - Number of times tor ty to connect to plasma store. Default value of -1 + Number of times to try to connect to plasma store. Default value of -1 uses the default (50) """ cdef PlasmaClient result = PlasmaClient() @@ -599,6 +599,23 @@ def connect(store_socket_name, manager_socket_name, int release_delay, return result def put(PlasmaClient client, value, object_id=None): + """ + Store a Python value into the object store. + + Parameters + ---------- + client : PlasmaClient + The client connected to the object store we put the value in. + value : object + A Python object to store. + object_id : ObjectID, default None + If this is provided, the specified object ID will be used to refer + to the object. + + Returns + ------- + The object ID associated to the Python object. + """ cdef ObjectID id = object_id if object_id else ObjectID.from_random() cdef SerializedPyObject serialized = pyarrow.serialize(value) buffer = client.create(id, serialized.total_bytes) @@ -609,6 +626,26 @@ def put(PlasmaClient client, value, object_id=None): return id def get(PlasmaClient client, object_ids, timeout_ms=-1): + """ + Get one or more Python values from the object store. + + Parameters + ---------- + client : PlasmaClient + The client connected to the object store we get objects from. + object_ids : list + The list of object IDs associated to the values we get from the store. + timeout_ms : int, default -1 + The number of milliseconds that the get call should block before + timing out and returning. Pass -1 if the call should block and 0 + if the call should return immediately. + + Returns + ------- + list + List of Python values for the data associated with the object_ids + and ObjectNotAvailable if the object was not available. + """ results = [] buffers = client.get(object_ids, timeout_ms) for i in range(len(object_ids)): diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 2b4e063b236..df278025743 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -279,6 +279,10 @@ def test_put_and_get(self): [result] = pa.plasma.get(self.plasma_client, [object_id]) assert result == value + object_id = pa.plasma.ObjectID(np.bytes.random(20)) + [result] = pa.plasma.get(self.plasma_client, [object_id], timeout_ms=0) + assert result == pa.plasma.ObjectNotAvailable + def test_store_arrow_objects(self): data = np.random.randn(10, 4) # Write an arrow object. From 36f67d6d9d45cdb016e8c143bdd053b9cd56e942 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 26 Aug 2017 18:26:01 -0700 Subject: [PATCH 06/14] implement ObjectID.from_random --- python/pyarrow/plasma.pyx | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 981cc933c32..714f4789ff1 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -159,6 +159,12 @@ cdef class ObjectID: """ return self.data.binary() + @staticmethod + def from_random(): + cdef ObjectID result + result.data = CUniqueID.from_random() + return result + cdef class ObjectNotAvailable: """ From 20b119e3ce99c2897f34271ed967505030d6f7c2 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 26 Aug 2017 19:37:30 -0700 Subject: [PATCH 07/14] make it possible to get single objects --- python/pyarrow/plasma.pyx | 32 +++++++++++++++++------------ python/pyarrow/tests/test_plasma.py | 3 +++ 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 714f4789ff1..60bd799417f 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -26,6 +26,7 @@ from libcpp.vector cimport vector as c_vector from libc.stdint cimport int64_t, uint8_t, uintptr_t from cpython.pycapsule cimport * +import collections import pyarrow from pyarrow.lib cimport Buffer, NativeFile, check_status, SerializedPyObject @@ -43,6 +44,9 @@ cdef extern from "plasma/common.h" nogil: @staticmethod CUniqueID from_binary(const c_string& binary) + @staticmethod + CUniqueID from_random() + c_bool operator==(const CUniqueID& rhs) const c_string hex() const @@ -161,9 +165,8 @@ cdef class ObjectID: @staticmethod def from_random(): - cdef ObjectID result - result.data = CUniqueID.from_random() - return result + cdef CUniqueID data = CUniqueID.from_random() + return ObjectID(data.binary()) cdef class ObjectNotAvailable: @@ -652,13 +655,16 @@ def get(PlasmaClient client, object_ids, timeout_ms=-1): List of Python values for the data associated with the object_ids and ObjectNotAvailable if the object was not available. """ - results = [] - buffers = client.get(object_ids, timeout_ms) - for i in range(len(object_ids)): - # buffers[i] is None if this object was not available within the - # timeout - if buffers[i]: - results.append(pyarrow.deserialize(buffers[i])) - else: - results.append(ObjectNotAvailable) - return results + if isinstance(object_ids, collections.Sequence): + results = [] + buffers = client.get(object_ids, timeout_ms) + for i in range(len(object_ids)): + # buffers[i] is None if this object was not available within the + # timeout + if buffers[i]: + results.append(pyarrow.deserialize(buffers[i])) + else: + results.append(ObjectNotAvailable) + return results + else: + return get(client, [object_ids], timeout_ms)[0] diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index df278025743..1502bb36ddc 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -279,6 +279,9 @@ def test_put_and_get(self): [result] = pa.plasma.get(self.plasma_client, [object_id]) assert result == value + result = pa.plasma.get(self.plasma_client, object_id) + assert result == value + object_id = pa.plasma.ObjectID(np.bytes.random(20)) [result] = pa.plasma.get(self.plasma_client, [object_id], timeout_ms=0) assert result == pa.plasma.ObjectNotAvailable From 44c3b3dc53681c5620480d53fefa7ede173c0b31 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 26 Aug 2017 19:53:02 -0700 Subject: [PATCH 08/14] fixes --- python/pyarrow/lib.pxd | 9 --------- python/pyarrow/plasma.pyx | 17 +++++++++-------- python/pyarrow/serialization.pxi | 5 +++++ python/pyarrow/tests/test_plasma.py | 4 ++-- 4 files changed, 16 insertions(+), 19 deletions(-) diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index c377be37e20..48a58f7b826 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -330,15 +330,6 @@ cdef class NativeFile: cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader) cdef get_writer(object source, shared_ptr[OutputStream]* writer) -cdef class SerializedPyObject: - cdef: - CSerializedPyObject data - - cdef readonly: - object base - - cdef _write_to(self, OutputStream* stream) - cdef public object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf) cdef public object pyarrow_wrap_data_type(const shared_ptr[CDataType]& type) cdef public object pyarrow_wrap_field(const shared_ptr[CField]& field) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 60bd799417f..cc69df23fd4 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -29,7 +29,7 @@ from cpython.pycapsule cimport * import collections import pyarrow -from pyarrow.lib cimport Buffer, NativeFile, check_status, SerializedPyObject +from pyarrow.lib cimport Buffer, NativeFile, check_status from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer, CFixedSizeBufferWriter, CStatus) @@ -616,7 +616,7 @@ def put(PlasmaClient client, value, object_id=None): client : PlasmaClient The client connected to the object store we put the value in. value : object - A Python object to store. + A Python object to store. Currently only lists are supported. object_id : ObjectID, default None If this is provided, the specified object ID will be used to refer to the object. @@ -626,7 +626,7 @@ def put(PlasmaClient client, value, object_id=None): The object ID associated to the Python object. """ cdef ObjectID id = object_id if object_id else ObjectID.from_random() - cdef SerializedPyObject serialized = pyarrow.serialize(value) + serialized = pyarrow.serialize(value) buffer = client.create(id, serialized.total_bytes) stream = pyarrow.FixedSizeBufferOutputStream(buffer) stream.set_memcopy_threads(4) @@ -642,8 +642,9 @@ def get(PlasmaClient client, object_ids, timeout_ms=-1): ---------- client : PlasmaClient The client connected to the object store we get objects from. - object_ids : list - The list of object IDs associated to the values we get from the store. + object_ids : list or ObjectID + Object ID or list of object IDs associated to the values we get from + the store. timeout_ms : int, default -1 The number of milliseconds that the get call should block before timing out and returning. Pass -1 if the call should block and 0 @@ -651,9 +652,9 @@ def get(PlasmaClient client, object_ids, timeout_ms=-1): Returns ------- - list - List of Python values for the data associated with the object_ids - and ObjectNotAvailable if the object was not available. + list or object + Python value or list of Python values for the data associated with + the object_ids and ObjectNotAvailable if the object was not available. """ if isinstance(object_ids, collections.Sequence): results = [] diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index a38a527d3e5..3ee34eed5ed 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -135,6 +135,11 @@ cdef class SerializedPyObject: """ Arrow-serialized representation of Python object """ + cdef: + CSerializedPyObject data + + cdef readonly: + object base property total_bytes: diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 1502bb36ddc..4a82da3ded7 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -119,7 +119,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, """ if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") - plasma_store_executable = "plasma_store" + plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store") plasma_store_name = "/tmp/plasma_store{}".format(random_name()) command = [plasma_store_executable, "-s", plasma_store_name, @@ -282,7 +282,7 @@ def test_put_and_get(self): result = pa.plasma.get(self.plasma_client, object_id) assert result == value - object_id = pa.plasma.ObjectID(np.bytes.random(20)) + object_id = pa.plasma.ObjectID.from_random() [result] = pa.plasma.get(self.plasma_client, [object_id], timeout_ms=0) assert result == pa.plasma.ObjectNotAvailable From 0049c67437b4596508633dec1b55f91d35691ba9 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 26 Aug 2017 20:47:31 -0700 Subject: [PATCH 09/14] fix flake8 linting --- python/pyarrow/tests/test_serialization.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py index 013d86ebf9c..12bf65be69c 100644 --- a/python/pyarrow/tests/test_serialization.py +++ b/python/pyarrow/tests/test_serialization.py @@ -88,6 +88,7 @@ def array_custom_serializer(obj): def array_custom_deserializer(serialized_obj): return np.array(serialized_obj[0], dtype=np.dtype(serialized_obj[1])) + pa.lib.register_type(np.ndarray, 20 * b"\x00", pickle=False, custom_serializer=array_custom_serializer, custom_deserializer=array_custom_deserializer) From cf4bf24fc02a34cef10c7f60ffac699ed44a35bc Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 27 Aug 2017 11:15:17 -0700 Subject: [PATCH 10/14] add type information --- python/pyarrow/plasma.pyx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index cc69df23fd4..8887b72b449 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -607,7 +607,7 @@ def connect(store_socket_name, manager_socket_name, int release_delay, release_delay, num_retries)) return result -def put(PlasmaClient client, value, object_id=None): +def put(PlasmaClient client, list value, ObjectID object_id=None): """ Store a Python value into the object store. @@ -615,7 +615,7 @@ def put(PlasmaClient client, value, object_id=None): ---------- client : PlasmaClient The client connected to the object store we put the value in. - value : object + value : list A Python object to store. Currently only lists are supported. object_id : ObjectID, default None If this is provided, the specified object ID will be used to refer @@ -634,7 +634,7 @@ def put(PlasmaClient client, value, object_id=None): client.seal(id) return id -def get(PlasmaClient client, object_ids, timeout_ms=-1): +def get(PlasmaClient client, object_ids, int timeout_ms=-1): """ Get one or more Python values from the object store. From 5921148e63e89c087a1a5ac71d87b1a7499ed9db Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 28 Aug 2017 00:03:18 -0700 Subject: [PATCH 11/14] move put and get into PlasmaClient --- python/pyarrow/plasma.pyx | 124 ++++++++++++++-------------- python/pyarrow/tests/test_plasma.py | 28 +++---- 2 files changed, 74 insertions(+), 78 deletions(-) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 8887b72b449..586781ba377 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -303,7 +303,7 @@ cdef class PlasmaClient: metadata.size(), &data)) return self._make_mutable_plasma_buffer(object_id, data, data_size) - def get(self, object_ids, timeout_ms=-1): + def get_buffer(self, object_ids, timeout_ms=-1): """ Returns data buffer from the PlasmaStore based on object ID. @@ -370,6 +370,65 @@ cdef class PlasmaClient: object_buffers[i].metadata_size)) return result + def put(self, list value, ObjectID object_id=None): + """ + Store a Python value into the object store. + + Parameters + ---------- + value : list + A Python object to store. Currently only lists are supported. + object_id : ObjectID, default None + If this is provided, the specified object ID will be used to refer + to the object. + + Returns + ------- + The object ID associated to the Python object. + """ + cdef ObjectID target_id = object_id if object_id else ObjectID.from_random() + serialized = pyarrow.serialize(value) + buffer = self.create(target_id, serialized.total_bytes) + stream = pyarrow.FixedSizeBufferOutputStream(buffer) + stream.set_memcopy_threads(4) + serialized.write_to(stream) + self.seal(target_id) + return target_id + + def get(self, object_ids, int timeout_ms=-1): + """ + Get one or more Python values from the object store. + + Parameters + ---------- + object_ids : list or ObjectID + Object ID or list of object IDs associated to the values we get from + the store. + timeout_ms : int, default -1 + The number of milliseconds that the get call should block before + timing out and returning. Pass -1 if the call should block and 0 + if the call should return immediately. + + Returns + ------- + list or object + Python value or list of Python values for the data associated with + the object_ids and ObjectNotAvailable if the object was not available. + """ + if isinstance(object_ids, collections.Sequence): + results = [] + buffers = self.get_buffer(object_ids, timeout_ms) + for i in range(len(object_ids)): + # buffers[i] is None if this object was not available within the + # timeout + if buffers[i]: + results.append(pyarrow.deserialize(buffers[i])) + else: + results.append(ObjectNotAvailable) + return results + else: + return self.get([object_ids], timeout_ms)[0] + def seal(self, ObjectID object_id): """ Seal the buffer in the PlasmaStore for a particular object ID. @@ -606,66 +665,3 @@ def connect(store_socket_name, manager_socket_name, int release_delay, result.manager_socket_name, release_delay, num_retries)) return result - -def put(PlasmaClient client, list value, ObjectID object_id=None): - """ - Store a Python value into the object store. - - Parameters - ---------- - client : PlasmaClient - The client connected to the object store we put the value in. - value : list - A Python object to store. Currently only lists are supported. - object_id : ObjectID, default None - If this is provided, the specified object ID will be used to refer - to the object. - - Returns - ------- - The object ID associated to the Python object. - """ - cdef ObjectID id = object_id if object_id else ObjectID.from_random() - serialized = pyarrow.serialize(value) - buffer = client.create(id, serialized.total_bytes) - stream = pyarrow.FixedSizeBufferOutputStream(buffer) - stream.set_memcopy_threads(4) - serialized.write_to(stream) - client.seal(id) - return id - -def get(PlasmaClient client, object_ids, int timeout_ms=-1): - """ - Get one or more Python values from the object store. - - Parameters - ---------- - client : PlasmaClient - The client connected to the object store we get objects from. - object_ids : list or ObjectID - Object ID or list of object IDs associated to the values we get from - the store. - timeout_ms : int, default -1 - The number of milliseconds that the get call should block before - timing out and returning. Pass -1 if the call should block and 0 - if the call should return immediately. - - Returns - ------- - list or object - Python value or list of Python values for the data associated with - the object_ids and ObjectNotAvailable if the object was not available. - """ - if isinstance(object_ids, collections.Sequence): - results = [] - buffers = client.get(object_ids, timeout_ms) - for i in range(len(object_ids)): - # buffers[i] is None if this object was not available within the - # timeout - if buffers[i]: - results.append(pyarrow.deserialize(buffers[i])) - else: - results.append(ObjectNotAvailable) - return results - else: - return get(client, [object_ids], timeout_ms)[0] diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 4a82da3ded7..d62508e7ae2 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -82,8 +82,8 @@ def create_object(client, data_size, metadata_size, seal=True): def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffer=None, metadata=None): import pyarrow.plasma as plasma - client1_buff = client1.get([object_id])[0] - client2_buff = client2.get([object_id])[0] + client1_buff = client1.get_buffer([object_id])[0] + client2_buff = client2.get_buffer([object_id])[0] client1_metadata = client1.get_metadata([object_id])[0] client2_metadata = client2.get_metadata([object_id])[0] assert len(client1_buff) == len(client2_buff) @@ -187,7 +187,7 @@ def test_create(self): # Seal the object. self.plasma_client.seal(object_id) # Get the object. - memory_buffer = np.frombuffer(self.plasma_client.get([object_id])[0], + memory_buffer = np.frombuffer(self.plasma_client.get_buffer([object_id])[0], dtype="uint8") for i in range(length): assert memory_buffer[i] == i % 256 @@ -209,7 +209,7 @@ def test_create_with_metadata(self): self.plasma_client.seal(object_id) # Get the object. memory_buffer = np.frombuffer( - self.plasma_client.get([object_id])[0], dtype="uint8") + self.plasma_client.get_buffer([object_id])[0], dtype="uint8") for i in range(length): assert memory_buffer[i] == i % 256 # Get the metadata. @@ -241,7 +241,7 @@ def test_get(self): # Test timing out of get with various timeouts. for timeout in [0, 10, 100, 1000]: object_ids = [random_object_id() for _ in range(num_object_ids)] - results = self.plasma_client.get(object_ids, timeout_ms=timeout) + results = self.plasma_client.get_buffer(object_ids, timeout_ms=timeout) assert results == num_object_ids * [None] data_buffers = [] @@ -256,8 +256,8 @@ def test_get(self): # Test timing out from some but not all get calls with various # timeouts. for timeout in [0, 10, 100, 1000]: - data_results = self.plasma_client.get(object_ids, - timeout_ms=timeout) + data_results = self.plasma_client.get_buffer(object_ids, + timeout_ms=timeout) # metadata_results = self.plasma_client.get_metadata( # object_ids, timeout_ms=timeout) for i in range(num_object_ids): @@ -275,15 +275,15 @@ def test_get(self): def test_put_and_get(self): value = ["hello", "world", 3, 1.0] - object_id = pa.plasma.put(self.plasma_client, value) - [result] = pa.plasma.get(self.plasma_client, [object_id]) + object_id = self.plasma_client.put(value) + [result] = self.plasma_client.get([object_id]) assert result == value - result = pa.plasma.get(self.plasma_client, object_id) + result = self.plasma_client.get(object_id) assert result == value object_id = pa.plasma.ObjectID.from_random() - [result] = pa.plasma.get(self.plasma_client, [object_id], timeout_ms=0) + [result] = self.plasma_client.get([object_id], timeout_ms=0) assert result == pa.plasma.ObjectNotAvailable def test_store_arrow_objects(self): @@ -297,7 +297,7 @@ def test_store_arrow_objects(self): pa.write_tensor(tensor, stream) self.plasma_client.seal(object_id) # Read the arrow object. - [tensor] = self.plasma_client.get([object_id]) + [tensor] = self.plasma_client.get_buffer([object_id]) reader = pa.BufferReader(tensor) array = pa.read_tensor(reader).to_numpy() # Assert that they are equal. @@ -326,7 +326,7 @@ def test_store_pandas_dataframe(self): self.plasma_client.seal(object_id) # Read the DataFrame. - [data] = self.plasma_client.get([object_id]) + [data] = self.plasma_client.get_buffer([object_id]) reader = pa.RecordBatchStreamReader(pa.BufferReader(data)) result = reader.get_next_batch().to_pandas() @@ -564,7 +564,7 @@ def test_illegal_functionality(self): # with pytest.raises(Exception): # illegal_assignment() # Get the object. - memory_buffer = self.plasma_client.get([object_id])[0] + memory_buffer = self.plasma_client.get_buffer([object_id])[0] # Make sure the object is read only. def illegal_assignment(): From 8c36903755de117dabbeafebfd59fe62c57937e4 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 28 Aug 2017 00:12:53 -0700 Subject: [PATCH 12/14] support full API --- python/pyarrow/plasma.pyx | 13 ++++++++----- python/pyarrow/tests/test_plasma.py | 18 +++++++++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 586781ba377..d940db0735d 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -370,14 +370,14 @@ cdef class PlasmaClient: object_buffers[i].metadata_size)) return result - def put(self, list value, ObjectID object_id=None): + def put(self, object value, ObjectID object_id=None): """ Store a Python value into the object store. Parameters ---------- - value : list - A Python object to store. Currently only lists are supported. + value : object + A Python object to store. object_id : ObjectID, default None If this is provided, the specified object ID will be used to refer to the object. @@ -387,7 +387,9 @@ cdef class PlasmaClient: The object ID associated to the Python object. """ cdef ObjectID target_id = object_id if object_id else ObjectID.from_random() - serialized = pyarrow.serialize(value) + # TODO(pcm): Make serialization code support non-sequences and + # get rid of packing the value into a list here (and unpacking in get) + serialized = pyarrow.serialize([value]) buffer = self.create(target_id, serialized.total_bytes) stream = pyarrow.FixedSizeBufferOutputStream(buffer) stream.set_memcopy_threads(4) @@ -422,7 +424,8 @@ cdef class PlasmaClient: # buffers[i] is None if this object was not available within the # timeout if buffers[i]: - results.append(pyarrow.deserialize(buffers[i])) + value, = pyarrow.deserialize(buffers[i]) + results.append(value) else: results.append(ObjectNotAvailable) return results diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index d62508e7ae2..847a45fd8a0 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -274,17 +274,17 @@ def test_get(self): assert results[i] is None def test_put_and_get(self): - value = ["hello", "world", 3, 1.0] - object_id = self.plasma_client.put(value) - [result] = self.plasma_client.get([object_id]) - assert result == value + for value in [["hello", "world", 3, 1.0], None, "hello"]: + object_id = self.plasma_client.put(value) + [result] = self.plasma_client.get([object_id]) + assert result == value - result = self.plasma_client.get(object_id) - assert result == value + result = self.plasma_client.get(object_id) + assert result == value - object_id = pa.plasma.ObjectID.from_random() - [result] = self.plasma_client.get([object_id], timeout_ms=0) - assert result == pa.plasma.ObjectNotAvailable + object_id = pa.plasma.ObjectID.from_random() + [result] = self.plasma_client.get([object_id], timeout_ms=0) + assert result == pa.plasma.ObjectNotAvailable def test_store_arrow_objects(self): data = np.random.randn(10, 4) From e60ea73e09c03b0b3d74b98da0bef5c24dfa19d1 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 28 Aug 2017 00:21:34 -0700 Subject: [PATCH 13/14] get_buffer -> get_buffers and update example --- python/examples/plasma/sorting/sort_df.py | 2 +- python/pyarrow/plasma.pyx | 4 ++-- python/pyarrow/tests/test_plasma.py | 18 +++++++++--------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/python/examples/plasma/sorting/sort_df.py b/python/examples/plasma/sorting/sort_df.py index 03cfd13c6d7..0181ed729be 100644 --- a/python/examples/plasma/sorting/sort_df.py +++ b/python/examples/plasma/sorting/sort_df.py @@ -81,7 +81,7 @@ def put_df(df): def get_dfs(object_ids): """Retrieve dataframes from the object store given their object IDs.""" - buffers = client.get(object_ids) + buffers = client.get_buffers(object_ids) return [pa.RecordBatchStreamReader(buf).read_next_batch().to_pandas() for buf in buffers] diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index d940db0735d..aebef1b8812 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -303,7 +303,7 @@ cdef class PlasmaClient: metadata.size(), &data)) return self._make_mutable_plasma_buffer(object_id, data, data_size) - def get_buffer(self, object_ids, timeout_ms=-1): + def get_buffers(self, object_ids, timeout_ms=-1): """ Returns data buffer from the PlasmaStore based on object ID. @@ -419,7 +419,7 @@ cdef class PlasmaClient: """ if isinstance(object_ids, collections.Sequence): results = [] - buffers = self.get_buffer(object_ids, timeout_ms) + buffers = self.get_buffers(object_ids, timeout_ms) for i in range(len(object_ids)): # buffers[i] is None if this object was not available within the # timeout diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 847a45fd8a0..d729c1ef2d2 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -82,8 +82,8 @@ def create_object(client, data_size, metadata_size, seal=True): def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffer=None, metadata=None): import pyarrow.plasma as plasma - client1_buff = client1.get_buffer([object_id])[0] - client2_buff = client2.get_buffer([object_id])[0] + client1_buff = client1.get_buffers([object_id])[0] + client2_buff = client2.get_buffers([object_id])[0] client1_metadata = client1.get_metadata([object_id])[0] client2_metadata = client2.get_metadata([object_id])[0] assert len(client1_buff) == len(client2_buff) @@ -187,7 +187,7 @@ def test_create(self): # Seal the object. self.plasma_client.seal(object_id) # Get the object. - memory_buffer = np.frombuffer(self.plasma_client.get_buffer([object_id])[0], + memory_buffer = np.frombuffer(self.plasma_client.get_buffers([object_id])[0], dtype="uint8") for i in range(length): assert memory_buffer[i] == i % 256 @@ -209,7 +209,7 @@ def test_create_with_metadata(self): self.plasma_client.seal(object_id) # Get the object. memory_buffer = np.frombuffer( - self.plasma_client.get_buffer([object_id])[0], dtype="uint8") + self.plasma_client.get_buffers([object_id])[0], dtype="uint8") for i in range(length): assert memory_buffer[i] == i % 256 # Get the metadata. @@ -241,7 +241,7 @@ def test_get(self): # Test timing out of get with various timeouts. for timeout in [0, 10, 100, 1000]: object_ids = [random_object_id() for _ in range(num_object_ids)] - results = self.plasma_client.get_buffer(object_ids, timeout_ms=timeout) + results = self.plasma_client.get_buffers(object_ids, timeout_ms=timeout) assert results == num_object_ids * [None] data_buffers = [] @@ -256,7 +256,7 @@ def test_get(self): # Test timing out from some but not all get calls with various # timeouts. for timeout in [0, 10, 100, 1000]: - data_results = self.plasma_client.get_buffer(object_ids, + data_results = self.plasma_client.get_buffers(object_ids, timeout_ms=timeout) # metadata_results = self.plasma_client.get_metadata( # object_ids, timeout_ms=timeout) @@ -297,7 +297,7 @@ def test_store_arrow_objects(self): pa.write_tensor(tensor, stream) self.plasma_client.seal(object_id) # Read the arrow object. - [tensor] = self.plasma_client.get_buffer([object_id]) + [tensor] = self.plasma_client.get_buffers([object_id]) reader = pa.BufferReader(tensor) array = pa.read_tensor(reader).to_numpy() # Assert that they are equal. @@ -326,7 +326,7 @@ def test_store_pandas_dataframe(self): self.plasma_client.seal(object_id) # Read the DataFrame. - [data] = self.plasma_client.get_buffer([object_id]) + [data] = self.plasma_client.get_buffers([object_id]) reader = pa.RecordBatchStreamReader(pa.BufferReader(data)) result = reader.get_next_batch().to_pandas() @@ -564,7 +564,7 @@ def test_illegal_functionality(self): # with pytest.raises(Exception): # illegal_assignment() # Get the object. - memory_buffer = self.plasma_client.get_buffer([object_id])[0] + memory_buffer = self.plasma_client.get_buffers([object_id])[0] # Make sure the object is read only. def illegal_assignment(): From bd24e014869ff4e7955ae51a15fa4343a5180bda Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 28 Aug 2017 00:47:31 -0700 Subject: [PATCH 14/14] add documentation --- python/doc/source/plasma.rst | 65 ++++++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/python/doc/source/plasma.rst b/python/doc/source/plasma.rst index e4665d187e1..9a5a74bfaf1 100644 --- a/python/doc/source/plasma.rst +++ b/python/doc/source/plasma.rst @@ -98,9 +98,46 @@ follows: def random_object_id(): return plasma.ObjectID(np.random.bytes(20)) +Putting and Getting Python Objects +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Creating an Object -^^^^^^^^^^^^^^^^^^ +Plasma supports two APIs for creating and accessing objects: A high level +API that allows storing and retrieving Python objects and a low level +API that allows creating, writing and sealing buffers and operating on +the binary data directly. In this section we describe the high level API. + +This is how you can put and get a Python object: + +.. code-block:: python + + # Create a python object. + object_id = client.put("hello, world") + + # Get the object. + client.get(object_id) + +This works with all Python objects supported by the Arrow Python object +serialization. + +You can also get multiple objects at the same time (which can be more +efficient since it avoids IPC round trips): + +.. code-block:: python + + # Create multiple python objects. + object_id1 = client.put(1) + object_id2 = client.put(2) + object_id3 = client.put(3) + + # Get the objects. + client.get([object_id1, object_id2, object_id3]) + +Furthermore, it is possible to provide a timeout for the get call. If the +object is not available within the timeout, the special object +`pyarrow.ObjectNotAvailable` will be returned. + +Creating an Object Buffer +^^^^^^^^^^^^^^^^^^^^^^^^^ Objects are created in Plasma in two stages. First, they are **created**, which allocates a buffer for the object. At this point, the client can write to the @@ -111,7 +148,7 @@ give the object's maximum size in bytes. .. code-block:: python - # Create an object. + # Create an object buffer. object_id = plasma.ObjectID(20 * b"a") object_size = 1000 buffer = memoryview(client.create(object_id, object_size)) @@ -129,11 +166,11 @@ immutable, and making it available to other Plasma clients. client.seal(object_id) -Getting an Object -^^^^^^^^^^^^^^^^^ +Getting an Object Buffer +^^^^^^^^^^^^^^^^^^^^^^^^ After an object has been sealed, any client who knows the object ID can get -the object. +the object buffer. .. code-block:: python @@ -143,11 +180,11 @@ the object. # Get the object in the second client. This blocks until the object has been sealed. object_id2 = plasma.ObjectID(20 * b"a") - [buffer2] = client2.get([object_id]) + [buffer2] = client2.get_buffers([object_id]) -If the object has not been sealed yet, then the call to client.get will block -until the object has been sealed by the client constructing the object. Using -the ``timeout_ms`` argument to get, you can specify a timeout for this (in +If the object has not been sealed yet, then the call to client.get_buffers will +block until the object has been sealed by the client constructing the object. +Using the ``timeout_ms`` argument to get, you can specify a timeout for this (in milliseconds). After the timeout, the interpreter will yield control back. .. code-block:: shell @@ -223,7 +260,7 @@ To read the object, first retrieve it as a ``PlasmaBuffer`` using its object ID. .. code-block:: python # Get the arrow object by ObjectID. - [buf2] = client.get([object_id]) + [buf2] = client.get_buffers([object_id]) To convert the ``PlasmaBuffer`` back into an Arrow ``Tensor``, first create a pyarrow ``BufferReader`` object from it. You can then pass the ``BufferReader`` @@ -310,13 +347,13 @@ Since we store the Pandas DataFrame as a PyArrow ``RecordBatch`` object, to get the object back from the Plasma store, we follow similar steps to those specified in `Getting Arrow Objects from Plasma`_. -We first have to convert the ``PlasmaBuffer`` returned from ``client.get`` -into an Arrow ``BufferReader`` object. +We first have to convert the ``PlasmaBuffer`` returned from +``client.get_buffers`` into an Arrow ``BufferReader`` object. .. code-block:: python # Fetch the Plasma object - [data] = client.get([object_id]) # Get PlasmaBuffer from ObjectID + [data] = client.get_buffers([object_id]) # Get PlasmaBuffer from ObjectID buffer = pa.BufferReader(data) From the ``BufferReader``, we can create a specific ``RecordBatchStreamReader``