diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 46900fc7129..3b851f92c17 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -244,7 +244,7 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder { using PrimitiveBuilder::Reserve; /// Append a single scalar and increase the size if necessary. - Status Append(value_type val) { + Status Append(const value_type val) { RETURN_NOT_OK(ArrayBuilder::Reserve(1)); UnsafeAppend(val); return Status::OK(); @@ -255,7 +255,7 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder { /// /// This method does not capacity-check; make sure to call Reserve /// beforehand. - void UnsafeAppend(value_type val) { + void UnsafeAppend(const value_type val) { BitUtil::SetBit(null_bitmap_data_, length_); raw_data_[length_++] = val; } @@ -371,7 +371,7 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase using ArrayBuilder::Advance; /// Scalar append - Status Append(uint64_t val) { + Status Append(const uint64_t val) { RETURN_NOT_OK(Reserve(1)); BitUtil::SetBit(null_bitmap_data_, length_); @@ -430,7 +430,7 @@ class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase using ArrayBuilder::Advance; /// Scalar append - Status Append(int64_t val) { + Status Append(const int64_t val) { RETURN_NOT_OK(Reserve(1)); BitUtil::SetBit(null_bitmap_data_, length_); @@ -511,7 +511,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { std::shared_ptr data() const { return data_; } /// Scalar append - Status Append(bool val) { + Status Append(const bool val) { RETURN_NOT_OK(Reserve(1)); BitUtil::SetBit(null_bitmap_data_, length_); if (val) { @@ -523,7 +523,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { return Status::OK(); } - Status Append(uint8_t val) { return Append(val != 0); } + Status Append(const uint8_t val) { return Append(val != 0); } /// Vector append /// diff --git a/cpp/src/arrow/python/CMakeLists.txt b/cpp/src/arrow/python/CMakeLists.txt index 0fdf81e7aa9..f2807b930a3 100644 --- a/cpp/src/arrow/python/CMakeLists.txt +++ b/cpp/src/arrow/python/CMakeLists.txt @@ -43,6 +43,7 @@ set(ARROW_PYTHON_TEST_LINK_LIBS ${ARROW_PYTHON_MIN_TEST_LIBS}) set(ARROW_PYTHON_SRCS arrow_to_pandas.cc + arrow_to_python.cc builtin_convert.cc common.cc config.cc @@ -51,6 +52,7 @@ set(ARROW_PYTHON_SRCS io.cc numpy_convert.cc pandas_to_arrow.cc + python_to_arrow.cc pyarrow.cc ) @@ -83,6 +85,7 @@ endif() install(FILES api.h arrow_to_pandas.h + arrow_to_python.h builtin_convert.h common.h config.h @@ -92,6 +95,7 @@ install(FILES numpy_convert.h numpy_interop.h pandas_to_arrow.h + python_to_arrow.h platform.h pyarrow.h type_traits.h diff --git a/cpp/src/arrow/python/api.h b/cpp/src/arrow/python/api.h index 7cb36ad636f..4ceb3f1a45d 100644 --- a/cpp/src/arrow/python/api.h +++ b/cpp/src/arrow/python/api.h @@ -19,11 +19,13 @@ #define ARROW_PYTHON_API_H #include "arrow/python/arrow_to_pandas.h" +#include "arrow/python/arrow_to_python.h" #include "arrow/python/builtin_convert.h" #include "arrow/python/common.h" #include "arrow/python/helpers.h" #include "arrow/python/io.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/pandas_to_arrow.h" +#include "arrow/python/python_to_arrow.h" #endif // ARROW_PYTHON_API_H diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc new file mode 100644 index 00000000000..622ef829937 --- /dev/null +++ b/cpp/src/arrow/python/arrow_to_python.cc @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/python/arrow_to_python.h" + +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/io/interfaces.h" +#include "arrow/ipc/reader.h" +#include "arrow/python/common.h" +#include "arrow/python/helpers.h" +#include "arrow/python/numpy_convert.h" +#include "arrow/table.h" +#include "arrow/util/logging.h" + +extern "C" { +extern PyObject* pyarrow_serialize_callback; +extern PyObject* pyarrow_deserialize_callback; +} + +namespace arrow { +namespace py { + +Status CallCustomCallback(PyObject* callback, PyObject* elem, PyObject** result); + +Status DeserializeTuple(std::shared_ptr array, int64_t start_idx, int64_t stop_idx, + PyObject* base, + const std::vector>& tensors, + PyObject** out); + +Status DeserializeList(std::shared_ptr array, int64_t start_idx, int64_t stop_idx, + PyObject* base, + const std::vector>& tensors, + PyObject** out); + +Status DeserializeDict(std::shared_ptr array, int64_t start_idx, int64_t stop_idx, + PyObject* base, + const std::vector>& tensors, + PyObject** out) { + auto data = std::dynamic_pointer_cast(array); + ScopedRef keys, vals; + ScopedRef result(PyDict_New()); + RETURN_NOT_OK( + DeserializeList(data->field(0), start_idx, stop_idx, base, tensors, keys.ref())); + RETURN_NOT_OK( + DeserializeList(data->field(1), start_idx, stop_idx, base, tensors, vals.ref())); + for (int64_t i = start_idx; i < stop_idx; ++i) { + // PyDict_SetItem behaves differently from PyList_SetItem and PyTuple_SetItem. + // The latter two steal references whereas PyDict_SetItem does not. So we need + // to make sure the reference count is decremented by letting the ScopedRef + // go out of scope at the end. + PyDict_SetItem(result.get(), PyList_GET_ITEM(keys.get(), i - start_idx), + PyList_GET_ITEM(vals.get(), i - start_idx)); + } + static PyObject* py_type = PyUnicode_FromString("_pytype_"); + if (PyDict_Contains(result.get(), py_type)) { + RETURN_NOT_OK(CallCustomCallback(pyarrow_deserialize_callback, result.get(), out)); + } else { + *out = result.release(); + } + return Status::OK(); +} + +Status DeserializeArray(std::shared_ptr array, int64_t offset, PyObject* base, + const std::vector>& tensors, + PyObject** out) { + DCHECK(array); + int32_t index = std::static_pointer_cast(array)->Value(offset); + RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out)); + // Mark the array as immutable + ScopedRef flags(PyObject_GetAttrString(*out, "flags")); + DCHECK(flags.get() != NULL) << "Could not mark Numpy array immutable"; + Py_INCREF(Py_False); + int flag_set = PyObject_SetAttrString(flags.get(), "writeable", Py_False); + DCHECK(flag_set == 0) << "Could not mark Numpy array immutable"; + return Status::OK(); +} + +Status GetValue(std::shared_ptr arr, int64_t index, int32_t type, PyObject* base, + const std::vector>& tensors, PyObject** result) { + switch (arr->type()->id()) { + case Type::BOOL: + *result = + PyBool_FromLong(std::static_pointer_cast(arr)->Value(index)); + return Status::OK(); + case Type::INT64: + *result = + PyLong_FromSsize_t(std::static_pointer_cast(arr)->Value(index)); + return Status::OK(); + case Type::BINARY: { + int32_t nchars; + const uint8_t* str = + std::static_pointer_cast(arr)->GetValue(index, &nchars); + *result = PyBytes_FromStringAndSize(reinterpret_cast(str), nchars); + return CheckPyError(); + } + case Type::STRING: { + int32_t nchars; + const uint8_t* str = + std::static_pointer_cast(arr)->GetValue(index, &nchars); + *result = PyUnicode_FromStringAndSize(reinterpret_cast(str), nchars); + return CheckPyError(); + } + case Type::FLOAT: + *result = + PyFloat_FromDouble(std::static_pointer_cast(arr)->Value(index)); + return Status::OK(); + case Type::DOUBLE: + *result = + PyFloat_FromDouble(std::static_pointer_cast(arr)->Value(index)); + return Status::OK(); + case Type::STRUCT: { + auto s = std::static_pointer_cast(arr); + auto l = std::static_pointer_cast(s->field(0)); + if (s->type()->child(0)->name() == "list") { + return DeserializeList(l->values(), l->value_offset(index), + l->value_offset(index + 1), base, tensors, result); + } else if (s->type()->child(0)->name() == "tuple") { + return DeserializeTuple(l->values(), l->value_offset(index), + l->value_offset(index + 1), base, tensors, result); + } else if (s->type()->child(0)->name() == "dict") { + return DeserializeDict(l->values(), l->value_offset(index), + l->value_offset(index + 1), base, tensors, result); + } else { + DCHECK(false) << "unexpected StructArray type " << s->type()->child(0)->name(); + } + } + // We use an Int32Builder here to distinguish the tensor indices from + // the Type::INT64 above (see tensor_indices_ in SequenceBuilder). + case Type::INT32: { + return DeserializeArray(arr, index, base, tensors, result); + } + default: + DCHECK(false) << "union tag " << type << " not recognized"; + } + return Status::OK(); +} + +#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN) \ + auto data = std::dynamic_pointer_cast(array); \ + int64_t size = array->length(); \ + ScopedRef result(CREATE_FN(stop_idx - start_idx)); \ + auto types = std::make_shared(size, data->type_ids()); \ + auto offsets = std::make_shared(size, data->value_offsets()); \ + for (int64_t i = start_idx; i < stop_idx; ++i) { \ + if (data->IsNull(i)) { \ + Py_INCREF(Py_None); \ + SET_ITEM_FN(result.get(), i - start_idx, Py_None); \ + } else { \ + int64_t offset = offsets->Value(i); \ + int8_t type = types->Value(i); \ + std::shared_ptr arr = data->child(type); \ + PyObject* value; \ + RETURN_NOT_OK(GetValue(arr, offset, type, base, tensors, &value)); \ + SET_ITEM_FN(result.get(), i - start_idx, value); \ + } \ + } \ + *out = result.release(); \ + return Status::OK(); + +Status DeserializeList(std::shared_ptr array, int64_t start_idx, int64_t stop_idx, + PyObject* base, + const std::vector>& tensors, + PyObject** out) { + DESERIALIZE_SEQUENCE(PyList_New, PyList_SET_ITEM) +} + +Status DeserializeTuple(std::shared_ptr array, int64_t start_idx, int64_t stop_idx, + PyObject* base, + const std::vector>& tensors, + PyObject** out) { + DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM) +} + +Status ReadSerializedObject(std::shared_ptr src, + SerializedPyObject* out) { + std::shared_ptr reader; + int64_t offset; + int64_t bytes_read; + int32_t num_tensors; + // Read number of tensors + RETURN_NOT_OK( + src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast(&num_tensors))); + RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader)); + RETURN_NOT_OK(reader->ReadNextRecordBatch(&out->batch)); + RETURN_NOT_OK(src->Tell(&offset)); + offset += 4; // Skip the end-of-stream message + for (int i = 0; i < num_tensors; ++i) { + std::shared_ptr tensor; + RETURN_NOT_OK(ipc::ReadTensor(offset, src.get(), &tensor)); + out->tensors.push_back(tensor); + RETURN_NOT_OK(src->Tell(&offset)); + } + return Status::OK(); +} + +Status DeserializeObject(const SerializedPyObject& obj, PyObject* base, PyObject** out) { + PyAcquireGIL lock; + return DeserializeList(obj.batch->column(0), 0, obj.batch->num_rows(), base, + obj.tensors, out); +} + +} // namespace py +} // namespace arrow diff --git a/cpp/src/arrow/python/arrow_to_python.h b/cpp/src/arrow/python/arrow_to_python.h new file mode 100644 index 00000000000..559ce18c507 --- /dev/null +++ b/cpp/src/arrow/python/arrow_to_python.h @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PYTHON_ARROW_TO_PYTHON_H +#define ARROW_PYTHON_ARROW_TO_PYTHON_H + +#include "arrow/python/platform.h" + +#include +#include +#include + +#include "arrow/python/python_to_arrow.h" +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class RecordBatch; +class Tensor; + +namespace io { + +class RandomAccessFile; + +} // namespace io + +namespace py { + +/// \brief Read serialized Python sequence from file interface using Arrow IPC +/// \param[in] src a RandomAccessFile +/// \param[out] out the reconstructed data +/// \return Status +ARROW_EXPORT +Status ReadSerializedObject(std::shared_ptr src, + SerializedPyObject* out); + +/// \brief Reconstruct Python object from Arrow-serialized representation +/// \param[in] object +/// \param[in] base a Python object holding the underlying data that any NumPy +/// arrays will reference, to avoid premature deallocation +/// \param[out] out the returned object +/// \return Status +/// This acquires the GIL +ARROW_EXPORT +Status DeserializeObject(const SerializedPyObject& object, PyObject* base, + PyObject** out); + +} // namespace py +} // namespace arrow + +#endif // ARROW_PYTHON_ARROW_TO_PYTHON_H diff --git a/cpp/src/arrow/python/common.h b/cpp/src/arrow/python/common.h index ec40d0eafa3..7f94f9554f1 100644 --- a/cpp/src/arrow/python/common.h +++ b/cpp/src/arrow/python/common.h @@ -91,6 +91,36 @@ class ARROW_EXPORT OwnedRef { PyObject* obj_; }; +// This is different from OwnedRef in that it assumes that +// the GIL is held by the caller and doesn't decrement the +// reference count when release is called. +class ARROW_EXPORT ScopedRef { + public: + ScopedRef() : obj_(nullptr) {} + + explicit ScopedRef(PyObject* obj) : obj_(obj) {} + + ~ScopedRef() { Py_XDECREF(obj_); } + + void reset(PyObject* obj) { + Py_XDECREF(obj_); + obj_ = obj; + } + + PyObject* release() { + PyObject* result = obj_; + obj_ = nullptr; + return result; + } + + PyObject* get() const { return obj_; } + + PyObject** ref() { return &obj_; } + + private: + PyObject* obj_; +}; + struct ARROW_EXPORT PyObjectStringify { OwnedRef tmp_obj; const char* bytes; diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc new file mode 100644 index 00000000000..47d8ef60c4b --- /dev/null +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -0,0 +1,654 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/python/python_to_arrow.h" +#include "arrow/python/numpy_interop.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "arrow/array.h" +#include "arrow/builder.h" +#include "arrow/io/interfaces.h" +#include "arrow/ipc/writer.h" +#include "arrow/python/common.h" +#include "arrow/python/helpers.h" +#include "arrow/python/numpy_convert.h" +#include "arrow/python/platform.h" +#include "arrow/tensor.h" +#include "arrow/util/logging.h" + +constexpr int32_t kMaxRecursionDepth = 100; + +extern "C" { +PyObject* pyarrow_serialize_callback = NULL; +PyObject* pyarrow_deserialize_callback = NULL; +} + +namespace arrow { +namespace py { + +/// A Sequence is a heterogeneous collections of elements. It can contain +/// scalar Python types, lists, tuples, dictionaries and tensors. +class SequenceBuilder { + public: + explicit SequenceBuilder(MemoryPool* pool = nullptr) + : pool_(pool), + types_(pool, ::arrow::int8()), + offsets_(pool, ::arrow::int32()), + nones_(pool), + bools_(pool, ::arrow::boolean()), + ints_(pool, ::arrow::int64()), + bytes_(pool, ::arrow::binary()), + strings_(pool), + floats_(pool, ::arrow::float32()), + doubles_(pool, ::arrow::float64()), + tensor_indices_(pool, ::arrow::int32()), + list_offsets_({0}), + tuple_offsets_({0}), + dict_offsets_({0}) {} + + /// Appending a none to the sequence + Status AppendNone() { + RETURN_NOT_OK(offsets_.Append(0)); + RETURN_NOT_OK(types_.Append(0)); + return nones_.AppendToBitmap(false); + } + + Status Update(int64_t offset, int8_t* tag) { + if (*tag == -1) { + *tag = num_tags_++; + } + RETURN_NOT_OK(offsets_.Append(static_cast(offset))); + RETURN_NOT_OK(types_.Append(*tag)); + return nones_.AppendToBitmap(true); + } + + template + Status AppendPrimitive(const T val, int8_t* tag, BuilderType* out) { + RETURN_NOT_OK(Update(out->length(), tag)); + return out->Append(val); + } + + /// Appending a boolean to the sequence + Status AppendBool(const bool data) { + return AppendPrimitive(data, &bool_tag_, &bools_); + } + + /// Appending an int64_t to the sequence + Status AppendInt64(const int64_t data) { + return AppendPrimitive(data, &int_tag_, &ints_); + } + + /// Appending an uint64_t to the sequence + Status AppendUInt64(const uint64_t data) { + // TODO(wesm): Bounds check + return AppendPrimitive(static_cast(data), &int_tag_, &ints_); + } + + /// Append a list of bytes to the sequence + Status AppendBytes(const uint8_t* data, int32_t length) { + RETURN_NOT_OK(Update(bytes_.length(), &bytes_tag_)); + return bytes_.Append(data, length); + } + + /// Appending a string to the sequence + Status AppendString(const char* data, int32_t length) { + RETURN_NOT_OK(Update(strings_.length(), &string_tag_)); + return strings_.Append(data, length); + } + + /// Appending a float to the sequence + Status AppendFloat(const float data) { + return AppendPrimitive(data, &float_tag_, &floats_); + } + + /// Appending a double to the sequence + Status AppendDouble(const double data) { + return AppendPrimitive(data, &double_tag_, &doubles_); + } + + /// Appending a tensor to the sequence + /// + /// \param tensor_index Index of the tensor in the object. + Status AppendTensor(const int32_t tensor_index) { + RETURN_NOT_OK(Update(tensor_indices_.length(), &tensor_tag_)); + return tensor_indices_.Append(tensor_index); + } + + /// Add a sublist to the sequence. The data contained in the sublist will be + /// specified in the "Finish" method. + /// + /// To construct l = [[11, 22], 33, [44, 55]] you would for example run + /// list = ListBuilder(); + /// list.AppendList(2); + /// list.Append(33); + /// list.AppendList(2); + /// list.Finish([11, 22, 44, 55]); + /// list.Finish(); + + /// \param size + /// The size of the sublist + Status AppendList(Py_ssize_t size) { + RETURN_NOT_OK(Update(list_offsets_.size() - 1, &list_tag_)); + list_offsets_.push_back(list_offsets_.back() + static_cast(size)); + return Status::OK(); + } + + Status AppendTuple(Py_ssize_t size) { + RETURN_NOT_OK(Update(tuple_offsets_.size() - 1, &tuple_tag_)); + tuple_offsets_.push_back(tuple_offsets_.back() + static_cast(size)); + return Status::OK(); + } + + Status AppendDict(Py_ssize_t size) { + RETURN_NOT_OK(Update(dict_offsets_.size() - 1, &dict_tag_)); + dict_offsets_.push_back(dict_offsets_.back() + static_cast(size)); + return Status::OK(); + } + + template + Status AddElement(const int8_t tag, BuilderType* out) { + if (tag != -1) { + fields_[tag] = ::arrow::field("", out->type()); + RETURN_NOT_OK(out->Finish(&children_[tag])); + RETURN_NOT_OK(nones_.AppendToBitmap(true)); + type_ids_.push_back(tag); + } + return Status::OK(); + } + + Status AddSubsequence(int8_t tag, const Array* data, + const std::vector& offsets, const std::string& name) { + if (data != nullptr) { + DCHECK(data->length() == offsets.back()); + std::shared_ptr offset_array; + Int32Builder builder(pool_, std::make_shared()); + RETURN_NOT_OK(builder.Append(offsets.data(), offsets.size())); + RETURN_NOT_OK(builder.Finish(&offset_array)); + std::shared_ptr list_array; + RETURN_NOT_OK(ListArray::FromArrays(*offset_array, *data, pool_, &list_array)); + auto field = ::arrow::field(name, list_array->type()); + auto type = ::arrow::struct_({field}); + fields_[tag] = ::arrow::field("", type); + children_[tag] = std::shared_ptr( + new StructArray(type, list_array->length(), {list_array})); + RETURN_NOT_OK(nones_.AppendToBitmap(true)); + type_ids_.push_back(tag); + } else { + DCHECK_EQ(offsets.size(), 1); + } + return Status::OK(); + } + + /// Finish building the sequence and return the result. + /// Input arrays may be nullptr + Status Finish(const Array* list_data, const Array* tuple_data, const Array* dict_data, + std::shared_ptr* out) { + fields_.resize(num_tags_); + children_.resize(num_tags_); + + RETURN_NOT_OK(AddElement(bool_tag_, &bools_)); + RETURN_NOT_OK(AddElement(int_tag_, &ints_)); + RETURN_NOT_OK(AddElement(string_tag_, &strings_)); + RETURN_NOT_OK(AddElement(bytes_tag_, &bytes_)); + RETURN_NOT_OK(AddElement(float_tag_, &floats_)); + RETURN_NOT_OK(AddElement(double_tag_, &doubles_)); + RETURN_NOT_OK(AddElement(tensor_tag_, &tensor_indices_)); + + RETURN_NOT_OK(AddSubsequence(list_tag_, list_data, list_offsets_, "list")); + RETURN_NOT_OK(AddSubsequence(tuple_tag_, tuple_data, tuple_offsets_, "tuple")); + RETURN_NOT_OK(AddSubsequence(dict_tag_, dict_data, dict_offsets_, "dict")); + + auto type = ::arrow::union_(fields_, type_ids_, UnionMode::DENSE); + out->reset(new UnionArray(type, types_.length(), children_, types_.data(), + offsets_.data(), nones_.null_bitmap(), + nones_.null_count())); + return Status::OK(); + } + + private: + MemoryPool* pool_; + + Int8Builder types_; + Int32Builder offsets_; + + NullBuilder nones_; + BooleanBuilder bools_; + Int64Builder ints_; + BinaryBuilder bytes_; + StringBuilder strings_; + FloatBuilder floats_; + DoubleBuilder doubles_; + + // We use an Int32Builder here to distinguish the tensor indices from + // the ints_ above (see the case Type::INT32 in get_value in python.cc). + // TODO(pcm): Replace this by using the union tags to distinguish between + // these two cases. + Int32Builder tensor_indices_; + + std::vector list_offsets_; + std::vector tuple_offsets_; + std::vector dict_offsets_; + + // Tags for members of the sequence. If they are set to -1 it means + // they are not used and will not part be of the metadata when we call + // SequenceBuilder::Finish. If a member with one of the tags is added, + // the associated variable gets a unique index starting from 0. This + // happens in the UPDATE macro in sequence.cc. + int8_t bool_tag_ = -1; + int8_t int_tag_ = -1; + int8_t string_tag_ = -1; + int8_t bytes_tag_ = -1; + int8_t float_tag_ = -1; + int8_t double_tag_ = -1; + + int8_t tensor_tag_ = -1; + int8_t list_tag_ = -1; + int8_t tuple_tag_ = -1; + int8_t dict_tag_ = -1; + + int8_t num_tags_ = 0; + + // Members for the output union constructed in Finish + std::vector> fields_; + std::vector> children_; + std::vector type_ids_; +}; + +/// Constructing dictionaries of key/value pairs. Sequences of +/// keys and values are built separately using a pair of +/// SequenceBuilders. The resulting Arrow representation +/// can be obtained via the Finish method. +class DictBuilder { + public: + explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {} + + /// Builder for the keys of the dictionary + SequenceBuilder& keys() { return keys_; } + /// Builder for the values of the dictionary + SequenceBuilder& vals() { return vals_; } + + /// Construct an Arrow StructArray representing the dictionary. + /// Contains a field "keys" for the keys and "vals" for the values. + + /// \param list_data + /// List containing the data from nested lists in the value + /// list of the dictionary + /// + /// \param dict_data + /// List containing the data from nested dictionaries in the + /// value list of the dictionary + Status Finish(const Array* key_tuple_data, const Array* key_dict_data, + const Array* val_list_data, const Array* val_tuple_data, + const Array* val_dict_data, std::shared_ptr* out) { + // lists and dicts can't be keys of dicts in Python, that is why for + // the keys we do not need to collect sublists + std::shared_ptr keys, vals; + RETURN_NOT_OK(keys_.Finish(nullptr, key_tuple_data, key_dict_data, &keys)); + RETURN_NOT_OK(vals_.Finish(val_list_data, val_tuple_data, val_dict_data, &vals)); + auto keys_field = std::make_shared("keys", keys->type()); + auto vals_field = std::make_shared("vals", vals->type()); + auto type = std::make_shared( + std::vector>({keys_field, vals_field})); + std::vector> field_arrays({keys, vals}); + DCHECK(keys->length() == vals->length()); + out->reset(new StructArray(type, keys->length(), field_arrays)); + return Status::OK(); + } + + private: + SequenceBuilder keys_; + SequenceBuilder vals_; +}; + +Status CallCustomCallback(PyObject* callback, PyObject* elem, PyObject** result) { + *result = NULL; + if (!callback) { + std::stringstream ss; + ScopedRef repr(PyObject_Repr(elem)); + RETURN_IF_PYERROR(); + ScopedRef ascii(PyUnicode_AsASCIIString(repr.get())); + ss << "error while calling callback on " << PyBytes_AsString(ascii.get()) + << ": handler not registered"; + return Status::NotImplemented(ss.str()); + } else { + ScopedRef arglist(Py_BuildValue("(O)", elem)); + *result = PyObject_CallObject(callback, arglist.get()); + RETURN_IF_PYERROR(); + } + return Status::OK(); +} + +void set_serialization_callbacks(PyObject* serialize_callback, + PyObject* deserialize_callback) { + pyarrow_serialize_callback = serialize_callback; + pyarrow_deserialize_callback = deserialize_callback; +} + +Status CallCustomSerializationCallback(PyObject* elem, PyObject** serialized_object) { + RETURN_NOT_OK(CallCustomCallback(pyarrow_serialize_callback, elem, serialized_object)); + if (!PyDict_Check(*serialized_object)) { + return Status::TypeError("serialization callback must return a valid dictionary"); + } + return Status::OK(); +} + +Status SerializeDict(std::vector dicts, int32_t recursion_depth, + std::shared_ptr* out, std::vector* tensors_out); + +Status SerializeArray(PyArrayObject* array, SequenceBuilder* builder, + std::vector* subdicts, + std::vector* tensors_out); + +Status SerializeSequences(std::vector sequences, int32_t recursion_depth, + std::shared_ptr* out, + std::vector* tensors_out); + +Status AppendScalar(PyObject* obj, SequenceBuilder* builder) { + if (PyArray_IsScalar(obj, Bool)) { + return builder->AppendBool(reinterpret_cast(obj)->obval != 0); + } else if (PyArray_IsScalar(obj, Float)) { + return builder->AppendFloat(reinterpret_cast(obj)->obval); + } else if (PyArray_IsScalar(obj, Double)) { + return builder->AppendDouble(reinterpret_cast(obj)->obval); + } + int64_t value = 0; + if (PyArray_IsScalar(obj, Byte)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, UByte)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, Short)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, UShort)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, Int)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, UInt)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, Long)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, ULong)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, LongLong)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, Int64)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, ULongLong)) { + value = reinterpret_cast(obj)->obval; + } else if (PyArray_IsScalar(obj, UInt64)) { + value = reinterpret_cast(obj)->obval; + } else { + DCHECK(false) << "scalar type not recognized"; + } + return builder->AppendInt64(value); +} + +Status Append(PyObject* elem, SequenceBuilder* builder, std::vector* sublists, + std::vector* subtuples, std::vector* subdicts, + std::vector* tensors_out) { + // The bool case must precede the int case (PyInt_Check passes for bools) + if (PyBool_Check(elem)) { + RETURN_NOT_OK(builder->AppendBool(elem == Py_True)); + } else if (PyFloat_Check(elem)) { + RETURN_NOT_OK(builder->AppendDouble(PyFloat_AS_DOUBLE(elem))); + } else if (PyLong_Check(elem)) { + int overflow = 0; + int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow); + if (!overflow) { + RETURN_NOT_OK(builder->AppendInt64(data)); + } else { + // Attempt to serialize the object using the custom callback. + PyObject* serialized_object; + // The reference count of serialized_object will be decremented in SerializeDict + RETURN_NOT_OK(CallCustomSerializationCallback(elem, &serialized_object)); + RETURN_NOT_OK(builder->AppendDict(PyDict_Size(serialized_object))); + subdicts->push_back(serialized_object); + } +#if PY_MAJOR_VERSION < 3 + } else if (PyInt_Check(elem)) { + RETURN_NOT_OK(builder->AppendInt64(static_cast(PyInt_AS_LONG(elem)))); +#endif + } else if (PyBytes_Check(elem)) { + auto data = reinterpret_cast(PyBytes_AS_STRING(elem)); + const int64_t size = static_cast(PyBytes_GET_SIZE(elem)); + if (size > std::numeric_limits::max()) { + return Status::Invalid("Cannot writes bytes over 2GB"); + } + RETURN_NOT_OK(builder->AppendBytes(data, static_cast(size))); + } else if (PyUnicode_Check(elem)) { + Py_ssize_t size; +#if PY_MAJOR_VERSION >= 3 + char* data = PyUnicode_AsUTF8AndSize(elem, &size); +#else + ScopedRef str(PyUnicode_AsUTF8String(elem)); + char* data = PyString_AS_STRING(str.get()); + size = PyString_GET_SIZE(str.get()); +#endif + if (size > std::numeric_limits::max()) { + return Status::Invalid("Cannot writes bytes over 2GB"); + } + RETURN_NOT_OK(builder->AppendString(data, static_cast(size))); + } else if (PyList_Check(elem)) { + RETURN_NOT_OK(builder->AppendList(PyList_Size(elem))); + sublists->push_back(elem); + } else if (PyDict_Check(elem)) { + RETURN_NOT_OK(builder->AppendDict(PyDict_Size(elem))); + subdicts->push_back(elem); + } else if (PyTuple_CheckExact(elem)) { + RETURN_NOT_OK(builder->AppendTuple(PyTuple_Size(elem))); + subtuples->push_back(elem); + } else if (PyArray_IsScalar(elem, Generic)) { + RETURN_NOT_OK(AppendScalar(elem, builder)); + } else if (PyArray_Check(elem)) { + RETURN_NOT_OK(SerializeArray(reinterpret_cast(elem), builder, + subdicts, tensors_out)); + } else if (elem == Py_None) { + RETURN_NOT_OK(builder->AppendNone()); + } else { + // Attempt to serialize the object using the custom callback. + PyObject* serialized_object; + // The reference count of serialized_object will be decremented in SerializeDict + RETURN_NOT_OK(CallCustomSerializationCallback(elem, &serialized_object)); + RETURN_NOT_OK(builder->AppendDict(PyDict_Size(serialized_object))); + subdicts->push_back(serialized_object); + } + return Status::OK(); +} + +Status SerializeArray(PyArrayObject* array, SequenceBuilder* builder, + std::vector* subdicts, + std::vector* tensors_out) { + int dtype = PyArray_TYPE(array); + switch (dtype) { + case NPY_BOOL: + case NPY_UINT8: + case NPY_INT8: + case NPY_UINT16: + case NPY_INT16: + case NPY_UINT32: + case NPY_INT32: + case NPY_UINT64: + case NPY_INT64: + case NPY_FLOAT: + case NPY_DOUBLE: { + RETURN_NOT_OK(builder->AppendTensor(static_cast(tensors_out->size()))); + tensors_out->push_back(reinterpret_cast(array)); + } break; + default: { + PyObject* serialized_object; + // The reference count of serialized_object will be decremented in SerializeDict + RETURN_NOT_OK(CallCustomSerializationCallback(reinterpret_cast(array), + &serialized_object)); + RETURN_NOT_OK(builder->AppendDict(PyDict_Size(serialized_object))); + subdicts->push_back(serialized_object); + } + } + return Status::OK(); +} + +Status SerializeSequences(std::vector sequences, int32_t recursion_depth, + std::shared_ptr* out, + std::vector* tensors_out) { + DCHECK(out); + if (recursion_depth >= kMaxRecursionDepth) { + return Status::NotImplemented( + "This object exceeds the maximum recursion depth. It may contain itself " + "recursively."); + } + SequenceBuilder builder(nullptr); + std::vector sublists, subtuples, subdicts; + for (const auto& sequence : sequences) { + ScopedRef iterator(PyObject_GetIter(sequence)); + RETURN_IF_PYERROR(); + ScopedRef item; + while (item.reset(PyIter_Next(iterator.get())), item.get()) { + RETURN_NOT_OK( + Append(item.get(), &builder, &sublists, &subtuples, &subdicts, tensors_out)); + } + } + std::shared_ptr list; + if (sublists.size() > 0) { + RETURN_NOT_OK(SerializeSequences(sublists, recursion_depth + 1, &list, tensors_out)); + } + std::shared_ptr tuple; + if (subtuples.size() > 0) { + RETURN_NOT_OK( + SerializeSequences(subtuples, recursion_depth + 1, &tuple, tensors_out)); + } + std::shared_ptr dict; + if (subdicts.size() > 0) { + RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict, tensors_out)); + } + return builder.Finish(list.get(), tuple.get(), dict.get(), out); +} + +Status SerializeDict(std::vector dicts, int32_t recursion_depth, + std::shared_ptr* out, std::vector* tensors_out) { + DictBuilder result; + if (recursion_depth >= kMaxRecursionDepth) { + return Status::NotImplemented( + "This object exceeds the maximum recursion depth. It may contain itself " + "recursively."); + } + std::vector key_tuples, key_dicts, val_lists, val_tuples, val_dicts, dummy; + for (const auto& dict : dicts) { + PyObject *key, *value; + Py_ssize_t pos = 0; + while (PyDict_Next(dict, &pos, &key, &value)) { + RETURN_NOT_OK( + Append(key, &result.keys(), &dummy, &key_tuples, &key_dicts, tensors_out)); + DCHECK_EQ(dummy.size(), 0); + RETURN_NOT_OK(Append(value, &result.vals(), &val_lists, &val_tuples, &val_dicts, + tensors_out)); + } + } + std::shared_ptr key_tuples_arr; + if (key_tuples.size() > 0) { + RETURN_NOT_OK(SerializeSequences(key_tuples, recursion_depth + 1, &key_tuples_arr, + tensors_out)); + } + std::shared_ptr key_dicts_arr; + if (key_dicts.size() > 0) { + RETURN_NOT_OK( + SerializeDict(key_dicts, recursion_depth + 1, &key_dicts_arr, tensors_out)); + } + std::shared_ptr val_list_arr; + if (val_lists.size() > 0) { + RETURN_NOT_OK( + SerializeSequences(val_lists, recursion_depth + 1, &val_list_arr, tensors_out)); + } + std::shared_ptr val_tuples_arr; + if (val_tuples.size() > 0) { + RETURN_NOT_OK(SerializeSequences(val_tuples, recursion_depth + 1, &val_tuples_arr, + tensors_out)); + } + std::shared_ptr val_dict_arr; + if (val_dicts.size() > 0) { + RETURN_NOT_OK( + SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr, tensors_out)); + } + RETURN_NOT_OK(result.Finish(key_tuples_arr.get(), key_dicts_arr.get(), + val_list_arr.get(), val_tuples_arr.get(), + val_dict_arr.get(), out)); + + // This block is used to decrement the reference counts of the results + // returned by the serialization callback, which is called in SerializeArray, + // in DeserializeDict and in Append + static PyObject* py_type = PyUnicode_FromString("_pytype_"); + for (const auto& dict : dicts) { + if (PyDict_Contains(dict, py_type)) { + // If the dictionary contains the key "_pytype_", then the user has to + // have registered a callback. + if (pyarrow_serialize_callback == nullptr) { + return Status::Invalid("No serialization callback set"); + } + Py_XDECREF(dict); + } + } + + return Status::OK(); +} + +std::shared_ptr MakeBatch(std::shared_ptr data) { + auto field = std::make_shared("list", data->type()); + auto schema = ::arrow::schema({field}); + return std::shared_ptr(new RecordBatch(schema, data->length(), {data})); +} + +Status SerializeObject(PyObject* sequence, SerializedPyObject* out) { + PyAcquireGIL lock; + std::vector sequences = {sequence}; + std::shared_ptr array; + std::vector py_tensors; + RETURN_NOT_OK(SerializeSequences(sequences, 0, &array, &py_tensors)); + out->batch = MakeBatch(array); + for (const auto& py_tensor : py_tensors) { + std::shared_ptr arrow_tensor; + RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(), py_tensor, &arrow_tensor)); + out->tensors.push_back(arrow_tensor); + } + return Status::OK(); +} + +Status WriteSerializedObject(const SerializedPyObject& obj, io::OutputStream* dst) { + int32_t num_tensors = static_cast(obj.tensors.size()); + std::shared_ptr writer; + int32_t metadata_length; + int64_t body_length; + + RETURN_NOT_OK(dst->Write(reinterpret_cast(&num_tensors), sizeof(int32_t))); + RETURN_NOT_OK(ipc::RecordBatchStreamWriter::Open(dst, obj.batch->schema(), &writer)); + RETURN_NOT_OK(writer->WriteRecordBatch(*obj.batch)); + RETURN_NOT_OK(writer->Close()); + + for (const auto& tensor : obj.tensors) { + RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length)); + } + + return Status::OK(); +} + +} // namespace py +} // namespace arrow diff --git a/cpp/src/arrow/python/python_to_arrow.h b/cpp/src/arrow/python/python_to_arrow.h new file mode 100644 index 00000000000..8ac03965a1c --- /dev/null +++ b/cpp/src/arrow/python/python_to_arrow.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PYTHON_PYTHON_TO_ARROW_H +#define ARROW_PYTHON_PYTHON_TO_ARROW_H + +#include "arrow/python/platform.h" + +#include +#include + +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class RecordBatch; +class Tensor; + +namespace io { + +class OutputStream; + +} // namespace io + +namespace py { + +struct ARROW_EXPORT SerializedPyObject { + std::shared_ptr batch; + std::vector> tensors; +}; + +/// \brief Register callback functions to perform conversions to or from other +/// Python representations en route to/from deserialization +/// +/// \param[in] serialize_callback a Python callable +/// \param[in] deserialize_callback a Python callable +/// +/// Analogous to Python custom picklers / unpicklers +ARROW_EXPORT +void set_serialization_callbacks(PyObject* serialize_callback, + PyObject* deserialize_callback); + +/// \brief Serialize Python sequence as a RecordBatch plus +/// \param[in] sequence a Python sequence object to serialize to Arrow data +/// structures +/// \param[out] out the serialized representation +/// \return Status +/// +/// Release GIL before calling +ARROW_EXPORT +Status SerializeObject(PyObject* sequence, SerializedPyObject* out); + +/// \brief Write serialized Python object to OutputStream +/// \param[in] object a serialized Python object to write out +/// \param[out] dst an OutputStream +/// \return Status +ARROW_EXPORT +Status WriteSerializedObject(const SerializedPyObject& object, io::OutputStream* dst); + +} // namespace py +} // namespace arrow + +#endif // ARROW_PYTHON_PYTHON_TO_ARROW_H diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst index 1aaf89ce9a1..846af4c7f2e 100644 --- a/python/doc/source/api.rst +++ b/python/doc/source/api.rst @@ -181,8 +181,8 @@ File Systems .. _api.ipc: -Interprocess Communication and Messaging ----------------------------------------- +Interprocess Communication and Serialization +-------------------------------------------- .. autosummary:: :toctree: generated/ @@ -201,6 +201,12 @@ Interprocess Communication and Messaging read_tensor write_tensor get_tensor_size + serialize + serialize_to + deserialize + deserialize_from + read_serialized + SerializedPyObject .. _api.memory_pool: diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 8d4a214ba26..6e71c93d1ad 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -87,6 +87,10 @@ ArrowNotImplementedError, ArrowTypeError) +# Serialization +from pyarrow.lib import (deserialize_from, deserialize, + serialize, serialize_to, read_serialized, + SerializedPyObject) from pyarrow.filesystem import FileSystem, LocalFileSystem diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py index 2252e85e6ef..df5e4faadd4 100644 --- a/python/pyarrow/compat.py +++ b/python/pyarrow/compat.py @@ -129,6 +129,13 @@ def tobytes(o): def frombytes(o): return o.decode('utf8') +try: + import cloudpickle as pickle +except ImportError: + try: + import cPickle as pickle + except ImportError: + import pickle def encode_file_path(path): import os diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index eed9640861f..c18d4edf76b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -774,6 +774,26 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil: cdef struct PandasOptions: c_bool strings_to_categorical +cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil: + + cdef cppclass CSerializedPyObject" arrow::py::SerializedPyObject": + shared_ptr[CRecordBatch] batch + vector[shared_ptr[CTensor]] tensors + + CStatus SerializeObject(object sequence, CSerializedPyObject* out) + + CStatus WriteSerializedObject(const CSerializedPyObject& obj, + OutputStream* dst) + + CStatus DeserializeObject(const CSerializedPyObject& obj, + PyObject* base, PyObject** out) + + CStatus ReadSerializedObject(shared_ptr[RandomAccessFile] src, + CSerializedPyObject* out) + + void set_serialization_callbacks(object serialize_callback, + object deserialize_callback) + cdef extern from 'arrow/python/init.h': int arrow_init_numpy() except -1 diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index eda8de73028..7fbbe110c5c 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -734,7 +734,7 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer): if isinstance(source, NativeFile): nf = source - if nf.is_readable: + if not nf.is_writeable: raise IOError('Native file is not writeable') nf.write_handle(writer) diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index 789801b9f06..4ea327ef926 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -119,5 +119,8 @@ include "ipc.pxi" # Feather format include "feather.pxi" +# Python serialization +include "serialization.pxi" + # Public API include "public-api.pxi" diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi new file mode 100644 index 00000000000..a6c955bef99 --- /dev/null +++ b/python/pyarrow/serialization.pxi @@ -0,0 +1,279 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from cpython.ref cimport PyObject + +from pyarrow.compat import pickle + + +def is_named_tuple(cls): + """Return True if cls is a namedtuple and False otherwise.""" + b = cls.__bases__ + if len(b) != 1 or b[0] != tuple: + return False + f = getattr(cls, "_fields", None) + if not isinstance(f, tuple): + return False + return all(type(n) == str for n in f) + + +class SerializationException(Exception): + def __init__(self, message, example_object): + Exception.__init__(self, message) + self.example_object = example_object + + +class DeserializationException(Exception): + def __init__(self, message, type_id): + Exception.__init__(self, message) + self.type_id = type_id + + +# Types with special serialization handlers +type_to_type_id = dict() +whitelisted_types = dict() +types_to_pickle = set() +custom_serializers = dict() +custom_deserializers = dict() + + +def register_type(type, type_id, pickle=False, + custom_serializer=None, custom_deserializer=None): + """Add type to the list of types we can serialize. + + Parameters + ---------- + type :type + The type that we can serialize. + type_id : bytes + A string of bytes used to identify the type. + pickle : bool + True if the serialization should be done with pickle. + False if it should be done efficiently with Arrow. + custom_serializer : callable + This argument is optional, but can be provided to + serialize objects of the class in a particular way. + custom_deserializer : callable + This argument is optional, but can be provided to + deserialize objects of the class in a particular way. + """ + type_to_type_id[type] = type_id + whitelisted_types[type_id] = type + if pickle: + types_to_pickle.add(type_id) + if custom_serializer is not None: + custom_serializers[type_id] = custom_serializer + custom_deserializers[type_id] = custom_deserializer + + +def _serialization_callback(obj): + if type(obj) not in type_to_type_id: + raise SerializationException("pyarrow does not know how to " + "serialize objects of type {}." + .format(type(obj)), + obj) + type_id = type_to_type_id[type(obj)] + if type_id in types_to_pickle: + serialized_obj = {"data": pickle.dumps(obj), "pickle": True} + elif type_id in custom_serializers: + serialized_obj = {"data": custom_serializers[type_id](obj)} + else: + if is_named_tuple(type(obj)): + serialized_obj = {} + serialized_obj["_pa_getnewargs_"] = obj.__getnewargs__() + elif hasattr(obj, "__dict__"): + serialized_obj = obj.__dict__ + else: + raise SerializationException("We do not know how to serialize " + "the object '{}'".format(obj), obj) + return dict(serialized_obj, **{"_pytype_": type_id}) + + +def _deserialization_callback(serialized_obj): + type_id = serialized_obj["_pytype_"] + + if "pickle" in serialized_obj: + # The object was pickled, so unpickle it. + obj = pickle.loads(serialized_obj["data"]) + else: + assert type_id not in types_to_pickle + if type_id not in whitelisted_types: + raise "error" + type = whitelisted_types[type_id] + if type_id in custom_deserializers: + obj = custom_deserializers[type_id](serialized_obj["data"]) + else: + # In this case, serialized_obj should just be the __dict__ field. + if "_pa_getnewargs_" in serialized_obj: + obj = type.__new__(type, *serialized_obj["_pa_getnewargs_"]) + else: + obj = type.__new__(type) + serialized_obj.pop("_pytype_") + obj.__dict__.update(serialized_obj) + return obj + + +set_serialization_callbacks(_serialization_callback, + _deserialization_callback) + + +cdef class SerializedPyObject: + """ + Arrow-serialized representation of Python object + """ + cdef: + CSerializedPyObject data + + cdef readonly: + object base + + property total_bytes: + + def __get__(self): + cdef CMockOutputStream mock_stream + with nogil: + check_status(WriteSerializedObject(self.data, &mock_stream)) + + return mock_stream.GetExtentBytesWritten() + + def write_to(self, sink): + """ + Write serialized object to a sink + """ + cdef shared_ptr[OutputStream] stream + get_writer(sink, &stream) + self._write_to(stream.get()) + + cdef _write_to(self, OutputStream* stream): + with nogil: + check_status(WriteSerializedObject(self.data, stream)) + + def deserialize(self): + """ + Convert back to Python object + """ + cdef PyObject* result + + with nogil: + check_status(DeserializeObject(self.data, self.base, + &result)) + + # This is necessary to avoid a memory leak + return PyObject_to_object(result) + + def to_buffer(self): + """ + Write serialized data as Buffer + """ + sink = BufferOutputStream() + self.write_to(sink) + return sink.get_result() + + +def serialize(object value): + """EXPERIMENTAL: Serialize a Python sequence + + Parameters + ---------- + value: object + Python object for the sequence that is to be serialized. + + Returns + ------- + serialized : SerializedPyObject + """ + cdef SerializedPyObject serialized = SerializedPyObject() + with nogil: + check_status(SerializeObject(value, &serialized.data)) + return serialized + + +def serialize_to(object value, sink): + """EXPERIMENTAL: Serialize a Python sequence to a file. + + Parameters + ---------- + value: object + Python object for the sequence that is to be serialized. + sink: NativeFile or file-like + File the sequence will be written to. + """ + serialized = serialize(value) + serialized.write_to(sink) + + +def read_serialized(source, base=None): + """EXPERIMENTAL: Read serialized Python sequence from file-like object + + Parameters + ---------- + source: NativeFile + File to read the sequence from. + base: object + This object will be the base object of all the numpy arrays + contained in the sequence. + + Returns + ------- + serialized : the serialized data + """ + cdef shared_ptr[RandomAccessFile] stream + get_reader(source, &stream) + + cdef SerializedPyObject serialized = SerializedPyObject() + serialized.base = base + with nogil: + check_status(ReadSerializedObject(stream, &serialized.data)) + + return serialized + + +def deserialize_from(source, object base): + """EXPERIMENTAL: Deserialize a Python sequence from a file. + + Parameters + ---------- + source: NativeFile + File to read the sequence from. + base: object + This object will be the base object of all the numpy arrays + contained in the sequence. + + Returns + ------- + object + Python object for the deserialized sequence. + """ + serialized = read_serialized(source, base=base) + return serialized.deserialize() + + +def deserialize(obj): + """ + EXPERIMENTAL: Deserialize Python object from Buffer or other Python object + supporting the buffer protocol + + Parameters + ---------- + obj : pyarrow.Buffer or Python object supporting buffer protocol + + Returns + ------- + deserialized : object + """ + source = BufferReader(obj) + return deserialize_from(source, obj) diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py new file mode 100644 index 00000000000..f6f98402ac0 --- /dev/null +++ b/python/pyarrow/tests/test_serialization.py @@ -0,0 +1,236 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import division + +import pytest + +from collections import namedtuple +import os +import string +import sys + +import pyarrow as pa +import numpy as np + + +def assert_equal(obj1, obj2): + module_numpy = (type(obj1).__module__ == np.__name__ or + type(obj2).__module__ == np.__name__) + if module_numpy: + empty_shape = ((hasattr(obj1, "shape") and obj1.shape == ()) or + (hasattr(obj2, "shape") and obj2.shape == ())) + if empty_shape: + # This is a special case because currently np.testing.assert_equal + # fails because we do not properly handle different numerical + # types. + assert obj1 == obj2, ("Objects {} and {} are " + "different.".format(obj1, obj2)) + else: + np.testing.assert_equal(obj1, obj2) + elif hasattr(obj1, "__dict__") and hasattr(obj2, "__dict__"): + special_keys = ["_pytype_"] + assert (set(list(obj1.__dict__.keys()) + special_keys) == + set(list(obj2.__dict__.keys()) + special_keys)), ("Objects {} " + "and {} are " + "different." + .format( + obj1, + obj2)) + for key in obj1.__dict__.keys(): + if key not in special_keys: + assert_equal(obj1.__dict__[key], obj2.__dict__[key]) + elif type(obj1) is dict or type(obj2) is dict: + assert_equal(obj1.keys(), obj2.keys()) + for key in obj1.keys(): + assert_equal(obj1[key], obj2[key]) + elif type(obj1) is list or type(obj2) is list: + assert len(obj1) == len(obj2), ("Objects {} and {} are lists with " + "different lengths." + .format(obj1, obj2)) + for i in range(len(obj1)): + assert_equal(obj1[i], obj2[i]) + elif type(obj1) is tuple or type(obj2) is tuple: + assert len(obj1) == len(obj2), ("Objects {} and {} are tuples with " + "different lengths." + .format(obj1, obj2)) + for i in range(len(obj1)): + assert_equal(obj1[i], obj2[i]) + elif (pa.lib.is_named_tuple(type(obj1)) or + pa.lib.is_named_tuple(type(obj2))): + assert len(obj1) == len(obj2), ("Objects {} and {} are named tuples " + "with different lengths." + .format(obj1, obj2)) + for i in range(len(obj1)): + assert_equal(obj1[i], obj2[i]) + else: + assert obj1 == obj2, "Objects {} and {} are different.".format(obj1, + obj2) + + +def array_custom_serializer(obj): + return obj.tolist(), obj.dtype.str + + +def array_custom_deserializer(serialized_obj): + return np.array(serialized_obj[0], dtype=np.dtype(serialized_obj[1])) + +pa.lib.register_type(np.ndarray, 20 * b"\x00", pickle=False, + custom_serializer=array_custom_serializer, + custom_deserializer=array_custom_deserializer) + +if sys.version_info >= (3, 0): + long_extras = [0, np.array([["hi", u"hi"], [1.3, 1]])] +else: + _LONG_ZERO, _LONG_ONE = long(0), long(1) # noqa: E501,F821 + long_extras = [_LONG_ZERO, np.array([["hi", u"hi"], + [1.3, _LONG_ONE]])] + +PRIMITIVE_OBJECTS = [ + 0, 0.0, 0.9, 1 << 62, 1 << 100, 1 << 999, + [1 << 100, [1 << 100]], "a", string.printable, "\u262F", + u"hello world", u"\xff\xfe\x9c\x001\x000\x00", None, True, + False, [], (), {}, np.int8(3), np.int32(4), np.int64(5), + np.uint8(3), np.uint32(4), np.uint64(5), np.float32(1.9), + np.float64(1.9), np.zeros([100, 100]), + np.random.normal(size=[100, 100]), np.array(["hi", 3]), + np.array(["hi", 3], dtype=object)] + long_extras + +COMPLEX_OBJECTS = [ + [[[[[[[[[[[[]]]]]]]]]]]], + {"obj{}".format(i): np.random.normal(size=[100, 100]) for i in range(10)}, + # {(): {(): {(): {(): {(): {(): {(): {(): {(): {(): { + # (): {(): {}}}}}}}}}}}}}, + ((((((((((),),),),),),),),),), + {"a": {"b": {"c": {"d": {}}}}}] + + +class Foo(object): + def __init__(self, value=0): + self.value = value + + def __hash__(self): + return hash(self.value) + + def __eq__(self, other): + return other.value == self.value + + +class Bar(object): + def __init__(self): + for i, val in enumerate(PRIMITIVE_OBJECTS + COMPLEX_OBJECTS): + setattr(self, "field{}".format(i), val) + + +class Baz(object): + def __init__(self): + self.foo = Foo() + self.bar = Bar() + + def method(self, arg): + pass + + +class Qux(object): + def __init__(self): + self.objs = [Foo(), Bar(), Baz()] + + +class SubQux(Qux): + def __init__(self): + Qux.__init__(self) + + +class CustomError(Exception): + pass + + +Point = namedtuple("Point", ["x", "y"]) +NamedTupleExample = namedtuple("Example", + "field1, field2, field3, field4, field5") + + +CUSTOM_OBJECTS = [Exception("Test object."), CustomError(), Point(11, y=22), + Foo(), Bar(), Baz(), Qux(), SubQux(), + NamedTupleExample(1, 1.0, "hi", np.zeros([3, 5]), [1, 2, 3])] + +pa.lib.register_type(Foo, 20 * b"\x01") +pa.lib.register_type(Bar, 20 * b"\x02") +pa.lib.register_type(Baz, 20 * b"\x03") +pa.lib.register_type(Qux, 20 * b"\x04") +pa.lib.register_type(SubQux, 20 * b"\x05") +pa.lib.register_type(Exception, 20 * b"\x06") +pa.lib.register_type(CustomError, 20 * b"\x07") +pa.lib.register_type(Point, 20 * b"\x08") +pa.lib.register_type(NamedTupleExample, 20 * b"\x09") + +# TODO(pcm): This is currently a workaround until arrow supports +# arbitrary precision integers. This is only called on long integers, +# see the associated case in the append method in python_to_arrow.cc +pa.lib.register_type(int, 20 * b"\x10", pickle=False, + custom_serializer=lambda obj: str(obj), + custom_deserializer=( + lambda serialized_obj: int(serialized_obj))) + + +if (sys.version_info < (3, 0)): + deserializer = ( + lambda serialized_obj: long(serialized_obj)) # noqa: E501,F821 + pa.lib.register_type(long, 20 * b"\x11", pickle=False, # noqa: E501,F821 + custom_serializer=lambda obj: str(obj), + custom_deserializer=deserializer) + + +def serialization_roundtrip(value, f): + f.seek(0) + pa.serialize_to(value, f) + f.seek(0) + result = pa.deserialize_from(f, None) + assert_equal(value, result) + + +@pytest.yield_fixture(scope='session') +def large_memory_map(tmpdir_factory): + path = (tmpdir_factory.mktemp('data') + .join('pyarrow-serialization-tmp-file').strpath) + + # Create a large memory mapped file + SIZE = 100 * 1024 * 1024 # 100 MB + with open(path, 'wb') as f: + f.write(np.random.randint(0, 256, size=SIZE) + .astype('u1') + .tobytes() + [:SIZE]) + return path + + +def test_primitive_serialization(large_memory_map): + with pa.memory_map(large_memory_map, mode="r+") as mmap: + for obj in PRIMITIVE_OBJECTS: + serialization_roundtrip([obj], mmap) + + +def test_complex_serialization(large_memory_map): + with pa.memory_map(large_memory_map, mode="r+") as mmap: + for obj in COMPLEX_OBJECTS: + serialization_roundtrip([obj], mmap) + + +def test_custom_serialization(large_memory_map): + with pa.memory_map(large_memory_map, mode="r+") as mmap: + for obj in CUSTOM_OBJECTS: + serialization_roundtrip([obj], mmap)