From 598ef3354d5bd3aae4ff826958df446258b8a053 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 23 Nov 2017 10:24:27 -0500 Subject: [PATCH 1/9] Tweak Change-Id: I383cb91f1819c6d51d0320cfb7fdfbb0a29f0ff5 --- cpp/src/arrow/python/python_to_arrow.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index 72cc5b6e1db..731096101f5 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -713,8 +713,10 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject Status WriteSerializedObject(const SerializedPyObject& obj, io::OutputStream* dst) { int32_t num_tensors = static_cast(obj.tensors.size()); int32_t num_buffers = static_cast(obj.buffers.size()); - RETURN_NOT_OK(dst->Write(reinterpret_cast(&num_tensors), sizeof(int32_t))); - RETURN_NOT_OK(dst->Write(reinterpret_cast(&num_buffers), sizeof(int32_t))); + RETURN_NOT_OK(dst->Write(reinterpret_cast(&num_tensors), + sizeof(int32_t))); + RETURN_NOT_OK(dst->Write(reinterpret_cast(&num_buffers), + sizeof(int32_t))); RETURN_NOT_OK(ipc::WriteRecordBatchStream({obj.batch}, dst)); int32_t metadata_length; @@ -725,7 +727,8 @@ Status WriteSerializedObject(const SerializedPyObject& obj, io::OutputStream* ds for (const auto& buffer : obj.buffers) { int64_t size = buffer->size(); - RETURN_NOT_OK(dst->Write(reinterpret_cast(&size), sizeof(int64_t))); + RETURN_NOT_OK(dst->Write(reinterpret_cast(&size), + sizeof(int64_t))); RETURN_NOT_OK(dst->Write(buffer->data(), size)); } From 337e1d2951533620dd715fae14ca291e883d0c7f Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 23 Nov 2017 11:54:49 -0500 Subject: [PATCH 2/9] Draft SerializedPyObject::GetComponents Change-Id: I78923fa5adf24bf3ba92fffb90c4a9d4fdb3da6b --- cpp/src/arrow/ipc/message.cc | 30 ++++++++- cpp/src/arrow/ipc/message.h | 27 +++----- cpp/src/arrow/ipc/reader.cc | 6 +- cpp/src/arrow/ipc/writer.h | 11 ++++ cpp/src/arrow/python/arrow_to_pandas.cc | 2 +- cpp/src/arrow/python/arrow_to_python.h | 5 ++ cpp/src/arrow/python/python_to_arrow.cc | 84 +++++++++++++++++++++---- cpp/src/arrow/python/python_to_arrow.h | 28 ++++++--- python/pyarrow/includes/libarrow.pxd | 12 ++-- python/pyarrow/ipc.pxi | 4 +- python/pyarrow/serialization.pxi | 4 +- 11 files changed, 157 insertions(+), 56 deletions(-) diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 21d6a69a286..1835cefde09 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -236,11 +236,35 @@ Status ReadMessage(io::InputStream* file, std::unique_ptr* message) { // ---------------------------------------------------------------------- // Implement InputStream message reader -Status InputStreamMessageReader::ReadNextMessage(std::unique_ptr* message) { - return ReadMessage(stream_, message); +/// \brief Implementation of MessageReader that reads from InputStream +class InputStreamMessageReader : public MessageReader { + public: + explicit InputStreamMessageReader(io::InputStream* stream) : stream_(stream) {} + + explicit InputStreamMessageReader(const std::shared_ptr& owned_stream) + : InputStreamMessageReader(owned_stream.get()) { + owned_stream_ = owned_stream; + } + + ~InputStreamMessageReader() {} + + Status ReadNextMessage(std::unique_ptr* message) { + return ReadMessage(stream_, message); + } + + private: + io::InputStream* stream_; + std::shared_ptr owned_stream_; +}; + +std::unique_ptr MessageReader::Open(io::InputStream* stream) { + return std::unique_ptr(new InputStreamMessageReader(stream)); } -InputStreamMessageReader::~InputStreamMessageReader() {} +std::unique_ptr MessageReader::Open( + const std::shared_ptr& owned_stream) { + return std::unique_ptr(new InputStreamMessageReader(owned_stream)); +} } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 495474e5051..159b39a81f9 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -144,6 +144,13 @@ class ARROW_EXPORT MessageReader { public: virtual ~MessageReader() = default; + /// \brief Create MessageReader that reads from InputStream + static std::unique_ptr Open(io::InputStream* stream); + + /// \brief Create MessageReader that reads from owned InputStream + static std::unique_ptr Open( + const std::shared_ptr& owned_stream); + /// \brief Read next Message from the interface /// /// \param[out] message an arrow::ipc::Message instance @@ -151,26 +158,6 @@ class ARROW_EXPORT MessageReader { virtual Status ReadNextMessage(std::unique_ptr* message) = 0; }; -/// \brief Implementation of MessageReader that reads from InputStream -/// \since 0.5.0 -class ARROW_EXPORT InputStreamMessageReader : public MessageReader { - public: - explicit InputStreamMessageReader(io::InputStream* stream) : stream_(stream) {} - - explicit InputStreamMessageReader(const std::shared_ptr& owned_stream) - : InputStreamMessageReader(owned_stream.get()) { - owned_stream_ = owned_stream; - } - - ~InputStreamMessageReader(); - - Status ReadNextMessage(std::unique_ptr* message) override; - - private: - io::InputStream* stream_; - std::shared_ptr owned_stream_; -}; - /// \brief Read encapulated RPC message from position in file /// /// Read a length-prefixed message flatbuffer starting at the indicated file diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 5960e81883d..4f86b2fed75 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -480,14 +480,12 @@ Status RecordBatchStreamReader::Open(std::unique_ptr message_read Status RecordBatchStreamReader::Open(io::InputStream* stream, std::shared_ptr* out) { - std::unique_ptr message_reader(new InputStreamMessageReader(stream)); - return Open(std::move(message_reader), out); + return Open(MessageReader::Open(stream), out); } Status RecordBatchStreamReader::Open(const std::shared_ptr& stream, std::shared_ptr* out) { - std::unique_ptr message_reader(new InputStreamMessageReader(stream)); - return Open(std::move(message_reader), out); + return Open(MessageReader::Open(stream), out); } std::shared_ptr RecordBatchStreamReader::schema() const { diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index cedac45e712..11c01db8b29 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -239,6 +239,17 @@ Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size); ARROW_EXPORT Status GetTensorSize(const Tensor& tensor, int64_t* size); +/// \brief EXPERIMENTAL: Convert arrow::Tensor to a Message with minimal memory +/// allocation +/// +/// \param[in] tensor the Tensor to write +/// \param[in] pool MemoryPool to allocate space for metadata +/// \param[out] out the resulting Message +/// \return Status +ARROW_EXPORT +Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, + std::unique_ptr* out); + /// \brief EXPERIMENTAL: Write arrow::Tensor as a contiguous message /// /// \param[in] tensor the Tensor to write diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc b/cpp/src/arrow/python/arrow_to_pandas.cc index 8814fc190ab..14c6a162d88 100644 --- a/cpp/src/arrow/python/arrow_to_pandas.cc +++ b/cpp/src/arrow/python/arrow_to_pandas.cc @@ -480,7 +480,7 @@ inline Status ConvertStruct(PandasOptions options, const ChunkedArray& data, Py_INCREF(Py_None); field_value.reset(Py_None); } - // PyDict_SetItemString does not steal the value reference + // PyDict_SetItemString increments refernece count auto setitem_result = PyDict_SetItemString(dict_item.obj(), name.c_str(), field_value.obj()); RETURN_IF_PYERROR(); diff --git a/cpp/src/arrow/python/arrow_to_python.h b/cpp/src/arrow/python/arrow_to_python.h index 7509f30eb4e..b68d1f90e0e 100644 --- a/cpp/src/arrow/python/arrow_to_python.h +++ b/cpp/src/arrow/python/arrow_to_python.h @@ -48,6 +48,11 @@ namespace py { ARROW_EXPORT Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out); +/// \brief Reconstruct SerializedPyObject from representation produced by +/// GetSerializedObjectComponents +ARROW_EXPORT +Status GetSerializedObjectFromComponents(PyObject* payload, SerializedPyObject* out); + /// \brief Reconstruct Python object from Arrow-serialized representation /// \param[in] context Serialization context which contains custom serialization /// and deserialization callbacks. Can be any Python object with a diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index 731096101f5..a3d46bc625b 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -31,7 +31,9 @@ #include "arrow/array.h" #include "arrow/builder.h" #include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" #include "arrow/ipc/writer.h" +#include "arrow/memory_pool.h" #include "arrow/record_batch.h" #include "arrow/tensor.h" #include "arrow/util/logging.h" @@ -710,30 +712,88 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject return Status::OK(); } -Status WriteSerializedObject(const SerializedPyObject& obj, io::OutputStream* dst) { - int32_t num_tensors = static_cast(obj.tensors.size()); - int32_t num_buffers = static_cast(obj.buffers.size()); - RETURN_NOT_OK(dst->Write(reinterpret_cast(&num_tensors), - sizeof(int32_t))); - RETURN_NOT_OK(dst->Write(reinterpret_cast(&num_buffers), - sizeof(int32_t))); - RETURN_NOT_OK(ipc::WriteRecordBatchStream({obj.batch}, dst)); +Status SerializedPyObject::WriteTo(io::OutputStream* dst) { + int32_t num_tensors = static_cast(this->tensors.size()); + int32_t num_buffers = static_cast(this->buffers.size()); + RETURN_NOT_OK( + dst->Write(reinterpret_cast(&num_tensors), sizeof(int32_t))); + RETURN_NOT_OK( + dst->Write(reinterpret_cast(&num_buffers), sizeof(int32_t))); + RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, dst)); int32_t metadata_length; int64_t body_length; - for (const auto& tensor : obj.tensors) { + for (const auto& tensor : this->tensors) { RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length)); } - for (const auto& buffer : obj.buffers) { + for (const auto& buffer : this->buffers) { int64_t size = buffer->size(); - RETURN_NOT_OK(dst->Write(reinterpret_cast(&size), - sizeof(int64_t))); + RETURN_NOT_OK(dst->Write(reinterpret_cast(&size), sizeof(int64_t))); RETURN_NOT_OK(dst->Write(buffer->data(), size)); } return Status::OK(); } +Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out) { + PyAcquireGIL py_gil; + + ScopedRef result(PyDict_New()); + ScopedRef buffers(PyDict_New()); + RETURN_IF_PYERROR(); + + auto PushBuffer = [&buffers](const std::shared_ptr& buffer) { + PyObject* wrapped_buffer = wrap_buffer(buffer); + RETURN_IF_PYERROR(); + if (PyList_Append(buffers.get(), wrapped_buffer) < 0) { + RETURN_IF_PYERROR(); + } + Py_DECREF(wrapped_buffer); + return Status::OK(); + }; + + constexpr int64_t kInitialCapacity = 1024; + + // Write the record batch describing the object structure + std::shared_ptr stream; + std::shared_ptr buffer; + + py_gil.release(); + RETURN_NOT_OK(io::BufferOutputStream::Create(kInitialCapacity, memory_pool, &stream)); + RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, stream.get())); + RETURN_NOT_OK(stream->Finish(&buffer)); + py_gil.acquire(); + + RETURN_NOT_OK(PushBuffer(buffer)); + + // For each tensor, get a metadata buffer and a buffer for the body + for (const auto& tensor : this->tensors) { + std::unique_ptr message; + RETURN_NOT_OK(ipc::GetTensorMessage(*tensor, memory_pool, &message)); + RETURN_NOT_OK(PushBuffer(message->metadata())); + RETURN_NOT_OK(PushBuffer(message->body())); + } + + for (const auto& buf : this->buffers) { + RETURN_NOT_OK(PushBuffer(buf)); + } + + // TODO(wesm): Not sure how pedantic we need to be about checking the return + // values of these functions. There are other places where we do not check + // PyDict_SetItem/SetItemString return value, but these failures would be + // quite esoteric + PyDict_SetItemString(result.get(), "num_tensors", + PyLong_FromSize_t(this->tensors.size())); + PyDict_SetItemString(result.get(), "num_buffers", + PyLong_FromSize_t(this->buffers.size())); + PyDict_SetItemString(result.get(), "data", buffers.release()); + + RETURN_IF_PYERROR(); + + *out = result.release(); + return Status::OK(); +} + } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/python_to_arrow.h b/cpp/src/arrow/python/python_to_arrow.h index c5b6396145b..ce7aefa0e24 100644 --- a/cpp/src/arrow/python/python_to_arrow.h +++ b/cpp/src/arrow/python/python_to_arrow.h @@ -30,6 +30,7 @@ namespace arrow { +class MemoryPool; class RecordBatch; class Tensor; @@ -45,6 +46,26 @@ struct ARROW_EXPORT SerializedPyObject { std::shared_ptr batch; std::vector> tensors; std::vector> buffers; + + /// \brief Write serialized Python object to OutputStream + /// \param[in,out] dst an OutputStream + /// \return Status + Status WriteTo(io::OutputStream* dst); + + /// \brief Convert SerializedPyObject to a dict containing the message + /// components as Buffer instances with minimal memory allocation + /// + /// { + /// 'num_tensors': N, + /// 'num_buffers': K, + /// 'data': [Buffer] + /// } + /// + /// Each tensor is written as two buffers, one for the metadata and one for + /// the body. Therefore, the number of buffers in 'data' is 2 * N + K + 1, + /// with the first buffer containing the serialized record batch containing + /// the UnionArray that describes the whole object + Status GetComponents(MemoryPool* pool, PyObject** out); }; /// \brief Serialize Python sequence as a RecordBatch plus @@ -62,13 +83,6 @@ struct ARROW_EXPORT SerializedPyObject { ARROW_EXPORT Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject* out); -/// \brief Write serialized Python object to OutputStream -/// \param[in] object a serialized Python object to write out -/// \param[out] dst an OutputStream -/// \return Status -ARROW_EXPORT -Status WriteSerializedObject(const SerializedPyObject& object, io::OutputStream* dst); - } // namespace py } // namespace arrow diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 5d68607efa3..272a8451770 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -682,11 +682,10 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: c_string FormatMessageType(MessageType type) cdef cppclass CMessageReader" arrow::ipc::MessageReader": - CStatus ReadNextMessage(unique_ptr[CMessage]* out) + @staticmethod + unique_ptr[CMessageReader] Open(const shared_ptr[InputStream]& stream) - cdef cppclass CInputStreamMessageReader \ - " arrow::ipc::InputStreamMessageReader": - CInputStreamMessageReader(const shared_ptr[InputStream]& stream) + CStatus ReadNextMessage(unique_ptr[CMessage]* out) cdef cppclass CRecordBatchWriter" arrow::ipc::RecordBatchWriter": CStatus Close() @@ -908,11 +907,12 @@ cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil: shared_ptr[CRecordBatch] batch vector[shared_ptr[CTensor]] tensors + CStatus WriteTo(OutputStream* dst) + CStatus GetComponents(CMemoryPool* pool, OutputStream* dst) + CStatus SerializeObject(object context, object sequence, CSerializedPyObject* out) - CStatus WriteSerializedObject(const CSerializedPyObject& obj, - OutputStream* dst) CStatus DeserializeObject(object context, const CSerializedPyObject& obj, diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 27e91677509..b568cd46d7a 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -125,9 +125,11 @@ cdef class MessageReader: def open_stream(source): cdef MessageReader result = MessageReader() cdef shared_ptr[InputStream] in_stream + cdef unique_ptr[CMessageReader] reader get_input_stream(source, &in_stream) with nogil: - result.reader.reset(new CInputStreamMessageReader(in_stream)) + reader = CMessageReader.Open(in_stream) + result.reader.reset(reader.release()) return result diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index 6b7227797a8..7bb8f1b33c8 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -165,7 +165,7 @@ cdef class SerializedPyObject: def __get__(self): cdef CMockOutputStream mock_stream with nogil: - check_status(WriteSerializedObject(self.data, &mock_stream)) + check_status(self.data.WriteTo(&mock_stream)) return mock_stream.GetExtentBytesWritten() @@ -179,7 +179,7 @@ cdef class SerializedPyObject: cdef _write_to(self, OutputStream* stream): with nogil: - check_status(WriteSerializedObject(self.data, stream)) + check_status(self.data.WriteTo(stream)) def deserialize(self, SerializationContext context=None): """ From b1e31a349145fcedddfcb40df7cd90d62a8c668d Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 23 Nov 2017 15:58:21 -0500 Subject: [PATCH 3/9] Draft GetTensorMessage Change-Id: I0a7346c65a39894cb14f7130cbe9ab125845cb84 --- cpp/src/arrow/ipc/writer.cc | 64 +++++++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 3c1db06159e..3aacabaefc4 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -560,9 +560,18 @@ Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offs pool, kMaxNestingDepth, true); } -static Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size, - const Tensor& tensor, uint8_t* scratch_space, - io::OutputStream* dst) { +namespace { + +Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length) { + std::shared_ptr metadata; + RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata)); + return internal::WriteMessage(*metadata, dst, metadata_length); +} + +Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size, + const Tensor& tensor, uint8_t* scratch_space, + io::OutputStream* dst) { if (dim_index == tensor.ndim() - 1) { const uint8_t* data_ptr = tensor.raw_data() + offset; const int64_t stride = tensor.strides()[dim_index]; @@ -580,16 +589,37 @@ static Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_siz return Status::OK(); } -Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length) { - RETURN_NOT_OK(AlignStreamPosition(dst)); - std::shared_ptr metadata; - RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata)); - return internal::WriteMessage(*metadata, dst, metadata_length); +Status GetContiguousTensor(const Tensor& tensor, MemoryPool* pool, + std::unique_ptr* out) { + const auto& type = static_cast(*tensor.type()); + const int elem_size = type.bit_width() / 8; + + // TODO(wesm): Do we care enough about this temporary allocation to pass in + // a MemoryPool to this function? + std::shared_ptr scratch_space; + RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), + tensor.shape()[tensor.ndim() - 1] * elem_size, + &scratch_space)); + + std::shared_ptr contiguous_data; + RETURN_NOT_OK( + AllocateResizableBuffer(pool, tensor.size() * elem_size, &contiguous_data)); + + io::BufferOutputStream stream(contiguous_data); + RETURN_NOT_OK(WriteStridedTensorData(0, 0, elem_size, tensor, + scratch_space->mutable_data(), &stream)); + + out->reset(new Tensor(tensor.type(), contiguous_data, tensor.shape())); + + return Status::OK(); } +} // namespace + Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) { + RETURN_NOT_OK(AlignStreamPosition(dst)); + if (tensor.is_contiguous()) { RETURN_NOT_OK(WriteTensorHeader(tensor, dst, metadata_length, body_length)); auto data = tensor.data(); @@ -619,6 +649,22 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadat } } +Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, + std::unique_ptr* out) { + const Tensor* tensor_to_write = &tensor; + std::unique_ptr temp_tensor; + + if (!tensor.is_contiguous()) { + RETURN_NOT_OK(GetContiguousTensor(tensor, pool, &temp_tensor)); + tensor_to_write = temp_tensor.get(); + } + + std::shared_ptr metadata; + RETURN_NOT_OK(internal::WriteTensorMessage(*tensor_to_write, 0, &metadata)); + out->reset(new Message(metadata, tensor_to_write->data())); + return Status::OK(); +} + Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr& dictionary, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) { From 58174dde469a6d3076038b3c2135798fc30a5f4b Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 25 Nov 2017 12:09:39 -0500 Subject: [PATCH 4/9] More progress, stubs for reconstruction Change-Id: I2cf430456b12a5c9830c9caa7824fcbcc6e167ef --- cpp/src/arrow/python/arrow_to_python.cc | 4 +++ cpp/src/arrow/python/arrow_to_python.h | 3 +- cpp/src/arrow/python/python_to_arrow.cc | 28 ++++++++--------- python/pyarrow/includes/libarrow.pxd | 6 +++- python/pyarrow/serialization.pxi | 40 +++++++++++++++++++++++++ 5 files changed, 65 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc index 9686050b967..8ad05a4e688 100644 --- a/cpp/src/arrow/python/arrow_to_python.cc +++ b/cpp/src/arrow/python/arrow_to_python.cc @@ -286,5 +286,9 @@ Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObj obj, out); } +Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* buffers, + SerializedPyObject* out) { +} + } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/arrow_to_python.h b/cpp/src/arrow/python/arrow_to_python.h index b68d1f90e0e..ec4da637b70 100644 --- a/cpp/src/arrow/python/arrow_to_python.h +++ b/cpp/src/arrow/python/arrow_to_python.h @@ -51,7 +51,8 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out); /// \brief Reconstruct SerializedPyObject from representation produced by /// GetSerializedObjectComponents ARROW_EXPORT -Status GetSerializedObjectFromComponents(PyObject* payload, SerializedPyObject* out); +Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* buffers, + SerializedPyObject* out); /// \brief Reconstruct Python object from Arrow-serialized representation /// \param[in] context Serialization context which contains custom serialization diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index a3d46bc625b..5a948eefd65 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -740,13 +740,25 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out PyAcquireGIL py_gil; ScopedRef result(PyDict_New()); - ScopedRef buffers(PyDict_New()); + PyObject* buffers = PyList_New(0); + + // TODO(wesm): Not sure how pedantic we need to be about checking the return + // values of these functions. There are other places where we do not check + // PyDict_SetItem/SetItemString return value, but these failures would be + // quite esoteric + PyDict_SetItemString(result.get(), "num_tensors", + PyLong_FromSize_t(this->tensors.size())); + PyDict_SetItemString(result.get(), "num_buffers", + PyLong_FromSize_t(this->buffers.size())); + PyDict_SetItemString(result.get(), "data", buffers); RETURN_IF_PYERROR(); + Py_DECREF(buffers); + auto PushBuffer = [&buffers](const std::shared_ptr& buffer) { PyObject* wrapped_buffer = wrap_buffer(buffer); RETURN_IF_PYERROR(); - if (PyList_Append(buffers.get(), wrapped_buffer) < 0) { + if (PyList_Append(buffers, wrapped_buffer) < 0) { RETURN_IF_PYERROR(); } Py_DECREF(wrapped_buffer); @@ -779,18 +791,6 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out RETURN_NOT_OK(PushBuffer(buf)); } - // TODO(wesm): Not sure how pedantic we need to be about checking the return - // values of these functions. There are other places where we do not check - // PyDict_SetItem/SetItemString return value, but these failures would be - // quite esoteric - PyDict_SetItemString(result.get(), "num_tensors", - PyLong_FromSize_t(this->tensors.size())); - PyDict_SetItemString(result.get(), "num_buffers", - PyLong_FromSize_t(this->buffers.size())); - PyDict_SetItemString(result.get(), "data", buffers.release()); - - RETURN_IF_PYERROR(); - *out = result.release(); return Status::OK(); } diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 272a8451770..3a9dd1d4a84 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -908,7 +908,7 @@ cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil: vector[shared_ptr[CTensor]] tensors CStatus WriteTo(OutputStream* dst) - CStatus GetComponents(CMemoryPool* pool, OutputStream* dst) + CStatus GetComponents(CMemoryPool* pool, PyObject** dst) CStatus SerializeObject(object context, object sequence, CSerializedPyObject* out) @@ -921,6 +921,10 @@ cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil: CStatus ReadSerializedObject(RandomAccessFile* src, CSerializedPyObject* out) + CStatus GetSerializedFromComponents(int num_tensors, int num_buffers, + object buffers, + CSerializedPyObject* out) + cdef extern from 'arrow/python/init.h': int arrow_init_numpy() except -1 diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index 7bb8f1b33c8..9afb2a83ddb 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -209,6 +209,46 @@ cdef class SerializedPyObject: self.write_to(sink) return output + @staticmethod + def from_components(components): + """ + Reconstruct SerializedPyObject from output of + SerializedPyObject.to_components + """ + cdef: + int num_tensors = components['num_tensors'] + int num_buffers = components['num_buffers'] + list buffers = components['data'] + SerializedPyObject result = SerializedPyObject() + + with nogil: + check_status(GetSerializedFromComponents(num_tensors, num_buffers, + buffers, &result.data) + + return result + + def to_components(self, memory_pool=None): + """ + Return the decomposed dict representation of the serialized object + containing a collection of Buffer objects which maximize opportunities + for zero-copy + + Parameters + ---------- + memory_pool : MemoryPool default None + Pool to use for necessary allocations + + Returns + + """ + cdef PyObject* result + cdef CMemoryPool* c_pool = maybe_unbox_memory_pool(memory_pool) + + with nogil: + check_status(self.data.GetComponents(c_pool, &result)) + + return PyObject_to_object(result) + def serialize(object value, SerializationContext context=None): """EXPERIMENTAL: Serialize a Python sequence From 50d2fee59a01ae7bf17855f8559c21d632d575d4 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 26 Nov 2017 13:06:42 -0500 Subject: [PATCH 5/9] Finish componentwise serialization roundtrip Change-Id: I9811e853311cc42c0f9899e8c6601e0058e3c623 --- cpp/src/arrow/io/memory.cc | 3 ++ cpp/src/arrow/io/memory.h | 1 + cpp/src/arrow/ipc/reader.cc | 7 ++- cpp/src/arrow/ipc/reader.h | 10 +++- cpp/src/arrow/python/arrow_to_python.cc | 55 ++++++++++++++++++++-- cpp/src/arrow/python/arrow_to_python.h | 11 ++++- python/pyarrow/__init__.py | 1 + python/pyarrow/includes/libarrow.pxd | 1 - python/pyarrow/public-api.pxi | 2 +- python/pyarrow/serialization.pxi | 20 +++++++- python/pyarrow/tests/test_serialization.py | 33 +++++++++++++ 11 files changed, 133 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index d9c84b495d2..d52b6dc5d68 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -240,6 +240,9 @@ BufferReader::BufferReader(const std::shared_ptr& buffer) BufferReader::BufferReader(const uint8_t* data, int64_t size) : buffer_(nullptr), data_(data), size_(size), position_(0) {} +BufferReader::BufferReader(const Buffer& buffer) + : BufferReader(buffer.data(), buffer.size()) {} + Status BufferReader::Close() { // no-op return Status::OK(); diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 3aec91f7237..51471a25ae6 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -107,6 +107,7 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile { class ARROW_EXPORT BufferReader : public RandomAccessFile { public: explicit BufferReader(const std::shared_ptr& buffer); + explicit BufferReader(const Buffer& buffer); BufferReader(const uint8_t* data, int64_t size); Status Close() override; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 4f86b2fed75..ae0f8f39806 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -715,14 +715,17 @@ Status ReadTensor(int64_t offset, io::RandomAccessFile* file, std::unique_ptr message; RETURN_NOT_OK(ReadContiguousPayload(file, &message)); + return ReadTensor(*message, out); +} +Status ReadTensor(const Message& message, std::shared_ptr* out) { std::shared_ptr type; std::vector shape; std::vector strides; std::vector dim_names; - RETURN_NOT_OK(internal::GetTensorMetadata(*message->metadata(), &type, &shape, &strides, + RETURN_NOT_OK(internal::GetTensorMetadata(*message.metadata(), &type, &shape, &strides, &dim_names)); - *out = std::make_shared(type, message->body(), shape, strides, dim_names); + *out = std::make_shared(type, message.body(), shape, strides, dim_names); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 627f67e2517..019c9bc1f32 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -219,7 +219,7 @@ Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr& sc int max_recursion_depth, io::RandomAccessFile* file, std::shared_ptr* out); -/// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file +/// \brief EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file /// /// \param[in] offset the file location of the start of the message /// \param[in] file the file where the batch is located @@ -229,6 +229,14 @@ ARROW_EXPORT Status ReadTensor(int64_t offset, io::RandomAccessFile* file, std::shared_ptr* out); +/// \brief EXPERIMENTAL: Read arrow::Tensor from IPC message +/// +/// \param[in] message a Message containing the tensor metadata and body +/// \param[out] out the read tensor +/// \return Status +ARROW_EXPORT +Status ReadTensor(const Message& message, std::shared_ptr* out); + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc index 8ad05a4e688..5a64e038379 100644 --- a/cpp/src/arrow/python/arrow_to_python.cc +++ b/cpp/src/arrow/python/arrow_to_python.cc @@ -29,15 +29,17 @@ #include "arrow/array.h" #include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" #include "arrow/ipc/reader.h" +#include "arrow/table.h" +#include "arrow/util/logging.h" + #include "arrow/python/common.h" #include "arrow/python/helpers.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/pyarrow.h" #include "arrow/python/python_to_arrow.h" #include "arrow/python/util/datetime.h" -#include "arrow/table.h" -#include "arrow/util/logging.h" namespace arrow { namespace py { @@ -286,8 +288,55 @@ Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObj obj, out); } -Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* buffers, +Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* data, SerializedPyObject* out) { + const Py_ssize_t data_length = PyList_Size(data); + RETURN_IF_PYERROR(); + + const Py_ssize_t expected_data_length = 1 + num_tensors * 2 + num_buffers; + if (data_length != expected_data_length) { + return Status::Invalid("Invalid number of buffers in data"); + } + + auto GetBuffer = [&data](Py_ssize_t index, std::shared_ptr* out) { + PyObject* py_buf = PyList_GET_ITEM(data, index); + return unwrap_buffer(py_buf, out); + }; + + Py_ssize_t buffer_index = 0; + + // Read the union batch describing object structure + { + std::shared_ptr data_buffer; + RETURN_NOT_OK(GetBuffer(buffer_index++, &data_buffer)); + io::BufferReader buf_reader(data_buffer); + std::shared_ptr reader; + RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(&buf_reader, &reader)); + RETURN_NOT_OK(reader->ReadNext(&out->batch)); + } + + // Zero-copy reconstruct tensors + for (int i = 0; i < num_tensors; ++i) { + std::shared_ptr metadata; + std::shared_ptr body; + std::shared_ptr tensor; + RETURN_NOT_OK(GetBuffer(buffer_index++, &metadata)); + RETURN_NOT_OK(GetBuffer(buffer_index++, &body)); + + ipc::Message message(metadata, body); + + RETURN_NOT_OK(ReadTensor(message, &tensor)); + out->tensors.emplace_back(std::move(tensor)); + } + + // Unwrap and append buffers + for (int i = 0; i < num_buffers; ++i) { + std::shared_ptr buffer; + RETURN_NOT_OK(GetBuffer(buffer_index++, &buffer)); + out->buffers.emplace_back(std::move(buffer)); + } + + return Status::OK(); } } // namespace py diff --git a/cpp/src/arrow/python/arrow_to_python.h b/cpp/src/arrow/python/arrow_to_python.h index ec4da637b70..e63b3ee96b6 100644 --- a/cpp/src/arrow/python/arrow_to_python.h +++ b/cpp/src/arrow/python/arrow_to_python.h @@ -49,9 +49,16 @@ ARROW_EXPORT Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out); /// \brief Reconstruct SerializedPyObject from representation produced by -/// GetSerializedObjectComponents +/// GetSerializedObjectComponents. +/// +/// \param[in] num_tensors +/// \param[in] num_buffers +/// \param[in] data a list containing pyarrow.Buffer instances. Must be 1 + +/// num_tensors * 2 + num_buffers in length +/// \param[out] out the reconstructed object +/// \return Status ARROW_EXPORT -Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* buffers, +Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* data, SerializedPyObject* out); /// \brief Reconstruct Python object from Arrow-serialized representation diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 0456a658f14..bd31b21c196 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -101,6 +101,7 @@ # Serialization from pyarrow.lib import (deserialize_from, deserialize, + deserialize_components, serialize, serialize_to, read_serialized, SerializedPyObject, SerializationContext, SerializationCallbackError, diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 3a9dd1d4a84..024d1475de5 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -913,7 +913,6 @@ cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil: CStatus SerializeObject(object context, object sequence, CSerializedPyObject* out) - CStatus DeserializeObject(object context, const CSerializedPyObject& obj, PyObject* base, PyObject** out) diff --git a/python/pyarrow/public-api.pxi b/python/pyarrow/public-api.pxi index 9776f2ad76e..2fdb606a7d1 100644 --- a/python/pyarrow/public-api.pxi +++ b/python/pyarrow/public-api.pxi @@ -44,7 +44,7 @@ cdef public api object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf): cdef public api object pyarrow_wrap_resizable_buffer( - const shared_ptr[CResizableBuffer]& buf): + const shared_ptr[CResizableBuffer]& buf): cdef ResizableBuffer result = ResizableBuffer() result.init_rz(buf) return result diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index 9afb2a83ddb..c8bd6daec41 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -223,7 +223,7 @@ cdef class SerializedPyObject: with nogil: check_status(GetSerializedFromComponents(num_tensors, num_buffers, - buffers, &result.data) + buffers, &result.data)) return result @@ -341,6 +341,24 @@ def deserialize_from(source, object base, SerializationContext context=None): return serialized.deserialize(context) +def deserialize_components(components, SerializationContext context=None): + """ + Reconstruct Python object from output of SerializedPyObject.to_components + + Parameters + ---------- + components : dict + Output of SerializedPyObject.to_components + context : SerializationContext, default None + + Returns + ------- + object : the Python object that was originally serialized + """ + serialized = SerializedPyObject.from_components(components) + return serialized.deserialize(context) + + def deserialize(obj, SerializationContext context=None): """ EXPERIMENTAL: Deserialize Python object from Buffer or other Python object diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py index b0c5bc49e6a..cda7a556597 100644 --- a/python/pyarrow/tests/test_serialization.py +++ b/python/pyarrow/tests/test_serialization.py @@ -216,6 +216,17 @@ def serialization_roundtrip(value, f): result = pa.deserialize_from(f, None, serialization_context) assert_equal(value, result) + _check_component_roundtrip(value) + + +def _check_component_roundtrip(value): + # Test to/from components + serialized = pa.serialize(value) + components = serialized.to_components() + from_comp = pa.SerializedPyObject.from_components(components) + recons = from_comp.deserialize() + assert_equal(value, recons) + @pytest.yield_fixture(scope='session') def large_memory_map(tmpdir_factory, size=100*1024*1024): @@ -482,3 +493,25 @@ def test_serialize_subclasses(): deserialized = serialized.deserialize() assert type(deserialized).__name__ == SerializableClass.__name__ assert deserialized.value == 3 + + +def test_serialize_to_components_invalid_cases(): + buf = pa.frombuffer(b'hello') + + components = { + 'num_tensors': 0, + 'num_buffers': 1, + 'data': [buf] + } + + with pytest.raises(pa.ArrowException): + pa.deserialize_components(components) + + components = { + 'num_tensors': 1, + 'num_buffers': 0, + 'data': [buf, buf] + } + + with pytest.raises(pa.ArrowException): + pa.deserialize_components(components) From fffc7bb6f3aae21ff320fb6f5c96fbbc2468983f Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 26 Nov 2017 13:24:28 -0500 Subject: [PATCH 6/9] Typos, add deserialize_components to API Change-Id: Id3167b483bbd19f1899d146c5f926d62690d3402 --- cpp/src/arrow/python/arrow_to_pandas.cc | 2 +- python/doc/source/api.rst | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc b/cpp/src/arrow/python/arrow_to_pandas.cc index 14c6a162d88..096bbd55c6d 100644 --- a/cpp/src/arrow/python/arrow_to_pandas.cc +++ b/cpp/src/arrow/python/arrow_to_pandas.cc @@ -480,7 +480,7 @@ inline Status ConvertStruct(PandasOptions options, const ChunkedArray& data, Py_INCREF(Py_None); field_value.reset(Py_None); } - // PyDict_SetItemString increments refernece count + // PyDict_SetItemString increments reference count auto setitem_result = PyDict_SetItemString(dict_item.obj(), name.c_str(), field_value.obj()); RETURN_IF_PYERROR(); diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst index bb2a0420b2a..636f41d67bc 100644 --- a/python/doc/source/api.rst +++ b/python/doc/source/api.rst @@ -245,6 +245,7 @@ Serialization and IPC serialize serialize_to deserialize + deserialize_components deserialize_from read_serialized SerializedPyObject From 1d2e0e277314d55b290682b502d5999ed5589b41 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 26 Nov 2017 13:31:48 -0500 Subject: [PATCH 7/9] Fix function documentation Change-Id: Ie2c9cce3b53fddc17bc4a42130efe95c6867617c --- cpp/src/arrow/python/arrow_to_python.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/python/arrow_to_python.h b/cpp/src/arrow/python/arrow_to_python.h index e63b3ee96b6..9440ffb32ab 100644 --- a/cpp/src/arrow/python/arrow_to_python.h +++ b/cpp/src/arrow/python/arrow_to_python.h @@ -49,7 +49,7 @@ ARROW_EXPORT Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out); /// \brief Reconstruct SerializedPyObject from representation produced by -/// GetSerializedObjectComponents. +/// SerializedPyObject::GetComponents. /// /// \param[in] num_tensors /// \param[in] num_buffers From e8c76d42e16cba8f028900407aa629e3627f97d3 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 26 Nov 2017 14:14:32 -0500 Subject: [PATCH 8/9] Acquire GIL in GetSerializedFromComponents Change-Id: I87081833d8beac518ad7cb832df624bc39e33185 --- cpp/src/arrow/python/arrow_to_python.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc index 5a64e038379..ce539a597be 100644 --- a/cpp/src/arrow/python/arrow_to_python.cc +++ b/cpp/src/arrow/python/arrow_to_python.cc @@ -290,6 +290,7 @@ Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObj Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* data, SerializedPyObject* out) { + PyAcquireGIL gil; const Py_ssize_t data_length = PyList_Size(data); RETURN_IF_PYERROR(); @@ -309,10 +310,12 @@ Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* d { std::shared_ptr data_buffer; RETURN_NOT_OK(GetBuffer(buffer_index++, &data_buffer)); + gil.release(); io::BufferReader buf_reader(data_buffer); std::shared_ptr reader; RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(&buf_reader, &reader)); RETURN_NOT_OK(reader->ReadNext(&out->batch)); + gil.acquire(); } // Zero-copy reconstruct tensors From 4ec5a89fdcf0eca3767275fa43fd892a90795e8a Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 27 Nov 2017 11:56:29 -0500 Subject: [PATCH 9/9] Add missing decref on error Change-Id: Id21d531666d810c2c7a68d74ff37e85e8ac0a8e2 --- cpp/src/arrow/python/python_to_arrow.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index 5a948eefd65..253e9d9a7da 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -759,6 +759,7 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out PyObject* wrapped_buffer = wrap_buffer(buffer); RETURN_IF_PYERROR(); if (PyList_Append(buffers, wrapped_buffer) < 0) { + Py_DECREF(wrapped_buffer); RETURN_IF_PYERROR(); } Py_DECREF(wrapped_buffer);