diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index a37b49c7c7a..75c86f53230 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -1245,31 +1245,42 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr } int ndim = static_cast(sparse_tensor->shape()->size()); - for (int i = 0; i < ndim; ++i) { - auto dim = sparse_tensor->shape()->Get(i); + if (shape || dim_names) { + for (int i = 0; i < ndim; ++i) { + auto dim = sparse_tensor->shape()->Get(i); - shape->push_back(dim->size()); - auto fb_name = dim->name(); - if (fb_name == 0) { - dim_names->push_back(""); - } else { - dim_names->push_back(fb_name->str()); + if (shape) { + shape->push_back(dim->size()); + } + + if (dim_names) { + auto fb_name = dim->name(); + if (fb_name == 0) { + dim_names->push_back(""); + } else { + dim_names->push_back(fb_name->str()); + } + } } } - *non_zero_length = sparse_tensor->non_zero_length(); + if (non_zero_length) { + *non_zero_length = sparse_tensor->non_zero_length(); + } - switch (sparse_tensor->sparseIndex_type()) { - case flatbuf::SparseTensorIndex_SparseTensorIndexCOO: - *sparse_tensor_format_id = SparseTensorFormat::COO; - break; + if (sparse_tensor_format_id) { + switch (sparse_tensor->sparseIndex_type()) { + case flatbuf::SparseTensorIndex_SparseTensorIndexCOO: + *sparse_tensor_format_id = SparseTensorFormat::COO; + break; - case flatbuf::SparseTensorIndex_SparseMatrixIndexCSR: - *sparse_tensor_format_id = SparseTensorFormat::CSR; - break; + case flatbuf::SparseTensorIndex_SparseMatrixIndexCSR: + *sparse_tensor_format_id = SparseTensorFormat::CSR; + break; - default: - return Status::Invalid("Unrecognized sparse index type"); + default: + return Status::Invalid("Unrecognized sparse index type"); + } } auto type_data = sparse_tensor->type(); @@ -1277,7 +1288,11 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr return Status::IOError( "Type-pointer in custom metadata of flatbuffer-encoded SparseTensor is null."); } - return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {}, type); + if (type) { + return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {}, type); + } else { + return Status::OK(); + } } } // namespace internal diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 63807dcb8b1..de464c42ddf 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -1188,8 +1188,8 @@ void TestSparseTensorRoundTrip::CheckSparseTensorRoundTrip( ASSERT_OK(mmap_->Seek(0)); - ASSERT_OK(WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length, - default_memory_pool())); + ASSERT_OK( + WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length)); const auto& sparse_index = checked_cast(*sparse_tensor.sparse_index()); @@ -1224,8 +1224,8 @@ void TestSparseTensorRoundTrip::CheckSparseTensorRoundTrip( ASSERT_OK(mmap_->Seek(0)); - ASSERT_OK(WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length, - default_memory_pool())); + ASSERT_OK( + WriteSparseTensor(sparse_tensor, mmap_.get(), &metadata_length, &body_length)); const auto& sparse_index = checked_cast(*sparse_tensor.sparse_index()); @@ -1285,8 +1285,10 @@ TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCOOIndexRowMajor) { 0, 2, 0, 0, 2, 2, 1, 0, 1, 1, 0, 3, 1, 1, 0, 1, 1, 2, 1, 2, 1, 1, 2, 3}; const int sizeof_index_value = sizeof(c_index_value_type); - auto si = this->MakeSparseCOOIndex( - {12, 3}, {sizeof_index_value * 3, sizeof_index_value}, coords_values); + std::shared_ptr si; + ASSERT_OK(SparseCOOIndex::Make(TypeTraits::type_singleton(), {12, 3}, + {sizeof_index_value * 3, sizeof_index_value}, + Buffer::Wrap(coords_values), &si)); std::vector shape = {2, 3, 4}; std::vector dim_names = {"foo", "bar", "baz"}; @@ -1328,8 +1330,10 @@ TYPED_TEST_P(TestSparseTensorRoundTrip, WithSparseCOOIndexColumnMajor) { 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 2, 1, 3, 0, 2, 1, 3, 0, 2, 1, 3}; const int sizeof_index_value = sizeof(c_index_value_type); - auto si = this->MakeSparseCOOIndex( - {12, 3}, {sizeof_index_value, sizeof_index_value * 12}, coords_values); + std::shared_ptr si; + ASSERT_OK(SparseCOOIndex::Make(TypeTraits::type_singleton(), {12, 3}, + {sizeof_index_value, sizeof_index_value * 12}, + Buffer::Wrap(coords_values), &si)); std::vector shape = {2, 3, 4}; std::vector dim_names = {"foo", "bar", "baz"}; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 66ec3578397..31ba431c00e 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -933,32 +933,150 @@ Status MakeSparseTensorWithSparseCSRIndex( return Status::OK(); } -} // namespace - -Status ReadSparseTensor(const Buffer& metadata, io::RandomAccessFile* file, - std::shared_ptr* out) { - std::shared_ptr type; - std::vector shape; - std::vector dim_names; - int64_t non_zero_length; - SparseTensorFormat::type sparse_tensor_format_id; - +Status ReadSparseTensorMetadata(const Buffer& metadata, + std::shared_ptr* out_type, + std::vector* out_shape, + std::vector* out_dim_names, + int64_t* out_non_zero_length, + SparseTensorFormat::type* out_format_id, + const flatbuf::SparseTensor** out_fb_sparse_tensor, + const flatbuf::Buffer** out_buffer) { RETURN_NOT_OK(internal::GetSparseTensorMetadata( - metadata, &type, &shape, &dim_names, &non_zero_length, &sparse_tensor_format_id)); + metadata, out_type, out_shape, out_dim_names, out_non_zero_length, out_format_id)); const flatbuf::Message* message; RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &message)); + auto sparse_tensor = message->header_as_SparseTensor(); if (sparse_tensor == nullptr) { return Status::IOError( "Header-type of flatbuffer-encoded Message is not SparseTensor."); } - const flatbuf::Buffer* buffer = sparse_tensor->data(); + *out_fb_sparse_tensor = sparse_tensor; + + auto buffer = sparse_tensor->data(); if (!BitUtil::IsMultipleOf8(buffer->offset())) { return Status::Invalid( "Buffer of sparse index data did not start on 8-byte aligned offset: ", buffer->offset()); } + *out_buffer = buffer; + + return Status::OK(); +} + +} // namespace + +namespace internal { + +namespace { + +Status GetSparseTensorBodyBufferCount(SparseTensorFormat::type format_id, + size_t* buffer_count) { + switch (format_id) { + case SparseTensorFormat::COO: + *buffer_count = 2; + break; + + case SparseTensorFormat::CSR: + *buffer_count = 3; + break; + + default: + return Status::Invalid("Unrecognized sparse tensor format"); + } + + return Status::OK(); +} + +Status CheckSparseTensorBodyBufferCount( + const IpcPayload& payload, SparseTensorFormat::type sparse_tensor_format_id) { + size_t expected_body_buffer_count; + + RETURN_NOT_OK(GetSparseTensorBodyBufferCount(sparse_tensor_format_id, + &expected_body_buffer_count)); + if (payload.body_buffers.size() != expected_body_buffer_count) { + return Status::Invalid("Invalid body buffer count for a sparse tensor"); + } + + return Status::OK(); +} + +} // namespace + +Status ReadSparseTensorBodyBufferCount(const Buffer& metadata, size_t* buffer_count) { + SparseTensorFormat::type format_id; + + RETURN_NOT_OK(internal::GetSparseTensorMetadata(metadata, nullptr, nullptr, nullptr, + nullptr, &format_id)); + return GetSparseTensorBodyBufferCount(format_id, buffer_count); +} + +Status ReadSparseTensorPayload(const IpcPayload& payload, + std::shared_ptr* out) { + std::shared_ptr type; + std::vector shape; + std::vector dim_names; + int64_t non_zero_length; + SparseTensorFormat::type sparse_tensor_format_id; + const flatbuf::SparseTensor* sparse_tensor; + const flatbuf::Buffer* buffer; + + RETURN_NOT_OK(ReadSparseTensorMetadata(*payload.metadata, &type, &shape, &dim_names, + &non_zero_length, &sparse_tensor_format_id, + &sparse_tensor, &buffer)); + + RETURN_NOT_OK(CheckSparseTensorBodyBufferCount(payload, sparse_tensor_format_id)); + + switch (sparse_tensor_format_id) { + case SparseTensorFormat::COO: { + std::shared_ptr sparse_index; + std::shared_ptr indices_type; + RETURN_NOT_OK(internal::GetSparseCOOIndexMetadata( + sparse_tensor->sparseIndex_as_SparseTensorIndexCOO(), &indices_type)); + RETURN_NOT_OK(SparseCOOIndex::Make(indices_type, shape, non_zero_length, + payload.body_buffers[0], &sparse_index)); + return MakeSparseTensorWithSparseCOOIndex(type, shape, dim_names, sparse_index, + non_zero_length, payload.body_buffers[1], + out); + } + + case SparseTensorFormat::CSR: { + std::shared_ptr sparse_index; + std::shared_ptr indptr_type; + std::shared_ptr indices_type; + RETURN_NOT_OK(internal::GetSparseCSRIndexMetadata( + sparse_tensor->sparseIndex_as_SparseMatrixIndexCSR(), &indptr_type, + &indices_type)); + ARROW_CHECK_EQ(indptr_type, indices_type); + RETURN_NOT_OK(SparseCSRIndex::Make(indices_type, shape, non_zero_length, + payload.body_buffers[0], payload.body_buffers[1], + &sparse_index)); + return MakeSparseTensorWithSparseCSRIndex(type, shape, dim_names, sparse_index, + non_zero_length, payload.body_buffers[2], + out); + } + + default: + return Status::Invalid("Unsupported sparse index format"); + } +} + +} // namespace internal + +Status ReadSparseTensor(const Buffer& metadata, io::RandomAccessFile* file, + std::shared_ptr* out) { + std::shared_ptr type; + std::vector shape; + std::vector dim_names; + int64_t non_zero_length; + SparseTensorFormat::type sparse_tensor_format_id; + const flatbuf::SparseTensor* sparse_tensor; + const flatbuf::Buffer* buffer; + + RETURN_NOT_OK(ReadSparseTensorMetadata(metadata, &type, &shape, &dim_names, + &non_zero_length, &sparse_tensor_format_id, + &sparse_tensor, &buffer)); std::shared_ptr data; RETURN_NOT_OK(file->ReadAt(buffer->offset(), buffer->length(), &data)); diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 1314a3791dc..33634d96d7b 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -26,7 +26,9 @@ #include "arrow/ipc/dictionary.h" #include "arrow/ipc/message.h" #include "arrow/ipc/options.h" +#include "arrow/ipc/writer.h" #include "arrow/record_batch.h" +#include "arrow/sparse_tensor.h" #include "arrow/util/visibility.h" namespace arrow { @@ -286,6 +288,27 @@ Status ReadSparseTensor(io::InputStream* file, std::shared_ptr* ou ARROW_EXPORT Status ReadSparseTensor(const Message& message, std::shared_ptr* out); +namespace internal { + +// These internal APIs may change without warning or deprecation + +/// \brief EXPERIMENTAL: Read arrow::SparseTensorFormat::type from a metadata +/// \param[in] metadata a Buffer containing the sparse tensor metadata +/// \param[out] buffer_count the returned count of the body buffers +/// \return Status +ARROW_EXPORT +Status ReadSparseTensorBodyBufferCount(const Buffer& metadata, size_t* buffer_count); + +/// \brief EXPERIMENTAL: Read arrow::SparseTensor from an IpcPayload +/// \param[in] payload a IpcPayload contains a serialized SparseTensor +/// \param[out] out the returned SparseTensor +/// \return Status +ARROW_EXPORT +Status ReadSparseTensorPayload(const IpcPayload& payload, + std::shared_ptr* out); + +} // namespace internal + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 6cdf7841ae5..f5672deba6f 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -814,8 +814,7 @@ Status GetSparseTensorPayload(const SparseTensor& sparse_tensor, MemoryPool* poo } // namespace internal Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length, - MemoryPool* pool) { + int32_t* metadata_length, int64_t* body_length) { internal::IpcPayload payload; internal::SparseTensorSerializer writer(0, &payload); RETURN_NOT_OK(writer.Assemble(sparse_tensor)); @@ -824,6 +823,18 @@ Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* ds return internal::WriteIpcPayload(payload, IpcOptions::Defaults(), dst, metadata_length); } +Status GetSparseTensorMessage(const SparseTensor& sparse_tensor, MemoryPool* pool, + std::unique_ptr* out) { + internal::IpcPayload payload; + RETURN_NOT_OK(internal::GetSparseTensorPayload(sparse_tensor, pool, &payload)); + + const std::shared_ptr metadata = payload.metadata; + const std::shared_ptr buffer = *payload.body_buffers.data(); + + out->reset(new Message(metadata, buffer)); + return Status::OK(); +} + Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { // emulates the behavior of Write without actually writing auto options = IpcOptions::Defaults(); @@ -1029,7 +1040,7 @@ class StreamBookKeeper { int64_t position_; }; -/// A IpcPayloadWriter implementation that writes to a IPC stream +/// A IpcPayloadWriter implementation that writes to an IPC stream /// (with an end-of-stream marker) class PayloadStreamWriter : public internal::IpcPayloadWriter, protected StreamBookKeeper { diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index c673b0a1193..47153ea20ae 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -287,19 +287,35 @@ ARROW_EXPORT Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length); -// \brief EXPERIMENTAL: Write arrow::SparseTensor as a contiguous mesasge. The metadata, -// sparse index, and body are written assuming 64-byte alignment. It is the -// user's responsibility to ensure that the OutputStream has been aligned -// to a 64-byte multiple before writing the message. -// -// \param[in] tensor the SparseTensor to write -// \param[in] dst the OutputStream to write to -// \param[out] metadata_length the actual metadata length, including padding -// \param[out] body_length the actual message body length +/// \brief EXPERIMENTAL: Convert arrow::SparseTensor to a Message with minimal memory +/// allocation +/// +/// The message is written out as followed: +/// \code +/// +/// \endcode +/// +/// \param[in] sparse_tensor the SparseTensor to write +/// \param[in] pool MemoryPool to allocate space for metadata +/// \param[out] out the resulting Message +/// \return Status +ARROW_EXPORT +Status GetSparseTensorMessage(const SparseTensor& sparse_tensor, MemoryPool* pool, + std::unique_ptr* out); + +/// \brief EXPERIMENTAL: Write arrow::SparseTensor as a contiguous message. The metadata, +/// sparse index, and body are written assuming 64-byte alignment. It is the +/// user's responsibility to ensure that the OutputStream has been aligned +/// to a 64-byte multiple before writing the message. +/// +/// \param[in] sparse_tensor the SparseTensor to write +/// \param[in] dst the OutputStream to write to +/// \param[out] metadata_length the actual metadata length, including padding +/// \param[out] body_length the actual message body length +/// \return Status ARROW_EXPORT Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length, - MemoryPool* pool); + int32_t* metadata_length, int64_t* body_length); namespace internal { @@ -383,6 +399,15 @@ ARROW_EXPORT Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options, io::OutputStream* dst, int32_t* metadata_length); +/// \brief Compute IpcPayload for the given sparse tensor +/// \param[in] sparse_tensor the SparseTensor that is being serialized +/// \param[in,out] pool for any required temporary memory allocations +/// \param[out] out the returned IpcPayload +/// \return Status +ARROW_EXPORT +Status GetSparseTensorPayload(const SparseTensor& sparse_tensor, MemoryPool* pool, + IpcPayload* out); + } // namespace internal } // namespace ipc diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc index 52d82b51d65..f748d2f5806 100644 --- a/cpp/src/arrow/python/deserialize.cc +++ b/cpp/src/arrow/python/deserialize.cc @@ -184,6 +184,22 @@ Status GetValue(PyObject* context, const Array& arr, int64_t index, int8_t type, *result = wrap_tensor(blobs.tensors[ref]); return Status::OK(); } + case PythonType::SPARSECOOTENSOR: { + int32_t ref = checked_cast(arr).Value(index); + const std::shared_ptr& sparse_coo_tensor = + arrow::internal::checked_pointer_cast( + blobs.sparse_tensors[ref]); + *result = wrap_sparse_coo_tensor(sparse_coo_tensor); + return Status::OK(); + } + case PythonType::SPARSECSRMATRIX: { + int32_t ref = checked_cast(arr).Value(index); + const std::shared_ptr& sparse_csr_matrix = + arrow::internal::checked_pointer_cast( + blobs.sparse_tensors[ref]); + *result = wrap_sparse_csr_matrix(sparse_csr_matrix); + return Status::OK(); + } case PythonType::NDARRAY: { int32_t ref = checked_cast(arr).Value(index); return DeserializeArray(ref, base, blobs, result); @@ -287,12 +303,15 @@ Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx, Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) { int64_t bytes_read; int32_t num_tensors; + int32_t num_sparse_tensors; int32_t num_ndarrays; int32_t num_buffers; // Read number of tensors RETURN_NOT_OK( src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast(&num_tensors))); + RETURN_NOT_OK(src->Read(sizeof(int32_t), &bytes_read, + reinterpret_cast(&num_sparse_tensors))); RETURN_NOT_OK( src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast(&num_ndarrays))); RETURN_NOT_OK( @@ -317,6 +336,13 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) out->tensors.push_back(tensor); } + for (int i = 0; i < num_sparse_tensors; ++i) { + std::shared_ptr sparse_tensor; + RETURN_NOT_OK(ipc::ReadSparseTensor(src, &sparse_tensor)); + RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment)); + out->sparse_tensors.push_back(sparse_tensor); + } + for (int i = 0; i < num_ndarrays; ++i) { std::shared_ptr ndarray; RETURN_NOT_OK(ipc::ReadTensor(src, &ndarray)); @@ -347,19 +373,23 @@ Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObj obj, out); } -Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_buffers, - PyObject* data, SerializedPyObject* out) { +Status GetSerializedFromComponents(int num_tensors, + const SparseTensorCounts& num_sparse_tensors, + int num_ndarrays, int num_buffers, PyObject* data, + SerializedPyObject* out) { PyAcquireGIL gil; const Py_ssize_t data_length = PyList_Size(data); RETURN_IF_PYERROR(); - const Py_ssize_t expected_data_length = - 1 + num_tensors * 2 + num_ndarrays * 2 + num_buffers; + const Py_ssize_t expected_data_length = 1 + num_tensors * 2 + + num_sparse_tensors.num_total_buffers() + + num_ndarrays * 2 + num_buffers; if (data_length != expected_data_length) { return Status::Invalid("Invalid number of buffers in data"); } auto GetBuffer = [&data](Py_ssize_t index, std::shared_ptr* out) { + ARROW_CHECK_LE(index, PyList_Size(data)); PyObject* py_buf = PyList_GET_ITEM(data, index); return unwrap_buffer(py_buf, out); }; @@ -392,6 +422,27 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu out->tensors.emplace_back(std::move(tensor)); } + // Zero-copy reconstruct sparse tensors + for (int i = 0, n = num_sparse_tensors.num_total_tensors(); i < n; ++i) { + ipc::internal::IpcPayload payload; + RETURN_NOT_OK(GetBuffer(buffer_index++, &payload.metadata)); + + size_t num_bodies; + RETURN_NOT_OK( + ipc::internal::ReadSparseTensorBodyBufferCount(*payload.metadata, &num_bodies)); + + payload.body_buffers.reserve(num_bodies); + for (size_t i = 0; i < num_bodies; ++i) { + std::shared_ptr body; + RETURN_NOT_OK(GetBuffer(buffer_index++, &body)); + payload.body_buffers.emplace_back(body); + } + + std::shared_ptr sparse_tensor; + RETURN_NOT_OK(ipc::internal::ReadSparseTensorPayload(payload, &sparse_tensor)); + out->sparse_tensors.emplace_back(std::move(sparse_tensor)); + } + // Zero-copy reconstruct tensors for numpy ndarrays for (int i = 0; i < num_ndarrays; ++i) { std::shared_ptr metadata; diff --git a/cpp/src/arrow/python/deserialize.h b/cpp/src/arrow/python/deserialize.h index b9c4984a3b0..5a755130b4e 100644 --- a/cpp/src/arrow/python/deserialize.h +++ b/cpp/src/arrow/python/deserialize.h @@ -39,6 +39,14 @@ class RandomAccessFile; namespace py { +struct ARROW_PYTHON_EXPORT SparseTensorCounts { + int coo; + int csr; + + int num_total_tensors() const { return coo + csr; } + int num_total_buffers() const { return coo * 3 + csr * 4; } +}; + /// \brief Read serialized Python sequence from file interface using Arrow IPC /// \param[in] src a RandomAccessFile /// \param[out] out the reconstructed data @@ -50,15 +58,18 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out); /// SerializedPyObject::GetComponents. /// /// \param[in] num_tensors number of tensors in the object +/// \param[in] num_sparse_tensors number of sparse tensors in the object /// \param[in] num_ndarrays number of numpy Ndarrays in the object /// \param[in] num_buffers number of buffers in the object /// \param[in] data a list containing pyarrow.Buffer instances. Must be 1 + -/// num_tensors * 2 + num_buffers in length +/// num_tensors * 2 + num_coo_tensors * 3 + num_csr_tensors * 4 + num_buffers in length /// \param[out] out the reconstructed object /// \return Status ARROW_PYTHON_EXPORT -Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_buffers, - PyObject* data, SerializedPyObject* out); +Status GetSerializedFromComponents(int num_tensors, + const SparseTensorCounts& num_sparse_tensors, + int num_ndarrays, int num_buffers, PyObject* data, + SerializedPyObject* out); /// \brief Reconstruct Python object from Arrow-serialized representation /// \param[in] context Serialization context which contains custom serialization diff --git a/cpp/src/arrow/python/pyarrow.cc b/cpp/src/arrow/python/pyarrow.cc index 8a9a57d521a..c2eb85d8179 100644 --- a/cpp/src/arrow/python/pyarrow.cc +++ b/cpp/src/arrow/python/pyarrow.cc @@ -137,40 +137,40 @@ PyObject* wrap_tensor(const std::shared_ptr& tensor) { return ::pyarrow_wrap_tensor(tensor); } -bool is_sparse_csr_matrix(PyObject* sparse_tensor) { - return ::pyarrow_is_sparse_csr_matrix(sparse_tensor) != 0; +bool is_sparse_coo_tensor(PyObject* sparse_tensor) { + return ::pyarrow_is_sparse_coo_tensor(sparse_tensor) != 0; } -Status unwrap_sparse_csr_matrix(PyObject* sparse_tensor, - std::shared_ptr* out) { - *out = ::pyarrow_unwrap_sparse_csr_matrix(sparse_tensor); +Status unwrap_sparse_coo_tensor(PyObject* sparse_tensor, + std::shared_ptr* out) { + *out = ::pyarrow_unwrap_sparse_coo_tensor(sparse_tensor); if (*out) { return Status::OK(); } else { - return UnwrapError(sparse_tensor, "SparseCSRMatrix"); + return UnwrapError(sparse_tensor, "SparseCOOTensor"); } } -PyObject* wrap_sparse_csr_matrix(const std::shared_ptr& sparse_tensor) { - return ::pyarrow_wrap_sparse_csr_matrix(sparse_tensor); +PyObject* wrap_sparse_coo_tensor(const std::shared_ptr& sparse_tensor) { + return ::pyarrow_wrap_sparse_coo_tensor(sparse_tensor); } -bool is_sparse_coo_tensor(PyObject* sparse_tensor) { - return ::pyarrow_is_sparse_coo_tensor(sparse_tensor) != 0; +bool is_sparse_csr_matrix(PyObject* sparse_tensor) { + return ::pyarrow_is_sparse_csr_matrix(sparse_tensor) != 0; } -Status unwrap_sparse_coo_tensor(PyObject* sparse_tensor, - std::shared_ptr* out) { - *out = ::pyarrow_unwrap_sparse_coo_tensor(sparse_tensor); +Status unwrap_sparse_csr_matrix(PyObject* sparse_tensor, + std::shared_ptr* out) { + *out = ::pyarrow_unwrap_sparse_csr_matrix(sparse_tensor); if (*out) { return Status::OK(); } else { - return UnwrapError(sparse_tensor, "SparseCOOTensor"); + return UnwrapError(sparse_tensor, "SparseCSRMatrix"); } } -PyObject* wrap_sparse_coo_tensor(const std::shared_ptr& sparse_tensor) { - return ::pyarrow_wrap_sparse_coo_tensor(sparse_tensor); +PyObject* wrap_sparse_csr_matrix(const std::shared_ptr& sparse_tensor) { + return ::pyarrow_wrap_sparse_csr_matrix(sparse_tensor); } bool is_table(PyObject* table) { return ::pyarrow_is_table(table) != 0; } diff --git a/cpp/src/arrow/python/pyarrow.h b/cpp/src/arrow/python/pyarrow.h index 34ecbbe5651..4d9eac9d60c 100644 --- a/cpp/src/arrow/python/pyarrow.h +++ b/cpp/src/arrow/python/pyarrow.h @@ -73,13 +73,13 @@ ARROW_PYTHON_EXPORT bool is_tensor(PyObject* tensor); ARROW_PYTHON_EXPORT Status unwrap_tensor(PyObject* tensor, std::shared_ptr* out); ARROW_PYTHON_EXPORT PyObject* wrap_tensor(const std::shared_ptr& tensor); -ARROW_PYTHON_EXPORT bool is_sparse_tensor_coo(PyObject* sparse_tensor); +ARROW_PYTHON_EXPORT bool is_sparse_coo_tensor(PyObject* sparse_tensor); ARROW_PYTHON_EXPORT Status unwrap_sparse_coo_tensor(PyObject* sparse_tensor, std::shared_ptr* out); ARROW_PYTHON_EXPORT PyObject* wrap_sparse_coo_tensor( const std::shared_ptr& sparse_tensor); -ARROW_PYTHON_EXPORT bool is_sparse_tensor_csr(PyObject* sparse_tensor); +ARROW_PYTHON_EXPORT bool is_sparse_csr_matrix(PyObject* sparse_tensor); ARROW_PYTHON_EXPORT Status unwrap_sparse_csr_matrix(PyObject* sparse_tensor, std::shared_ptr* out); ARROW_PYTHON_EXPORT PyObject* wrap_sparse_csr_matrix( diff --git a/cpp/src/arrow/python/pyarrow_lib.h b/cpp/src/arrow/python/pyarrow_lib.h index a18e6a45afd..365956fcd70 100644 --- a/cpp/src/arrow/python/pyarrow_lib.h +++ b/cpp/src/arrow/python/pyarrow_lib.h @@ -47,8 +47,8 @@ __PYX_EXTERN_C PyObject *__pyx_f_7pyarrow_3lib_pyarrow_wrap_resizable_buffer(std __PYX_EXTERN_C PyObject *__pyx_f_7pyarrow_3lib_pyarrow_wrap_schema(std::shared_ptr< arrow::Schema> const &); __PYX_EXTERN_C PyObject *__pyx_f_7pyarrow_3lib_pyarrow_wrap_table(std::shared_ptr< arrow::Table> const &); __PYX_EXTERN_C PyObject *__pyx_f_7pyarrow_3lib_pyarrow_wrap_tensor(std::shared_ptr< arrow::Tensor> const &); -__PYX_EXTERN_C PyObject *__pyx_f_7pyarrow_3lib_pyarrow_wrap_sparse_tensor_coo(std::shared_ptr< arrow::SparseTensorCOO> const &); -__PYX_EXTERN_C PyObject *__pyx_f_7pyarrow_3lib_pyarrow_wrap_sparse_tensor_csr(std::shared_ptr< arrow::SparseTensorCSR> const &); +__PYX_EXTERN_C PyObject *__pyx_f_7pyarrow_3lib_pyarrow_wrap_sparse_coo_tensor(std::shared_ptr< arrow::SparseCOOTensor> const &); +__PYX_EXTERN_C PyObject *__pyx_f_7pyarrow_3lib_pyarrow_wrap_sparse_csr_matrix(std::shared_ptr< arrow::SparseCSRMatrix> const &); __PYX_EXTERN_C std::shared_ptr< arrow::Array> __pyx_f_7pyarrow_3lib_pyarrow_unwrap_array(PyObject *); __PYX_EXTERN_C std::shared_ptr< arrow::RecordBatch> __pyx_f_7pyarrow_3lib_pyarrow_unwrap_batch(PyObject *); __PYX_EXTERN_C std::shared_ptr< arrow::Buffer> __pyx_f_7pyarrow_3lib_pyarrow_unwrap_buffer(PyObject *); @@ -57,8 +57,8 @@ __PYX_EXTERN_C std::shared_ptr< arrow::Field> __pyx_f_7pyarrow_3lib_pyarrow_unw __PYX_EXTERN_C std::shared_ptr< arrow::Schema> __pyx_f_7pyarrow_3lib_pyarrow_unwrap_schema(PyObject *); __PYX_EXTERN_C std::shared_ptr< arrow::Table> __pyx_f_7pyarrow_3lib_pyarrow_unwrap_table(PyObject *); __PYX_EXTERN_C std::shared_ptr< arrow::Tensor> __pyx_f_7pyarrow_3lib_pyarrow_unwrap_tensor(PyObject *); -__PYX_EXTERN_C std::shared_ptr< arrow::SparseTensorCOO> __pyx_f_7pyarrow_3lib_pyarrow_unwrap_sparse_tensor_coo(PyObject *); -__PYX_EXTERN_C std::shared_ptr< arrow::SparseTensorCSR> __pyx_f_7pyarrow_3lib_pyarrow_unwrap_sparse_tensor_csr(PyObject *); +__PYX_EXTERN_C std::shared_ptr< arrow::SparseCOOTensor> __pyx_f_7pyarrow_3lib_pyarrow_unwrap_sparse_coo_tensor(PyObject *); +__PYX_EXTERN_C std::shared_ptr< arrow::SparseCSRMatrix> __pyx_f_7pyarrow_3lib_pyarrow_unwrap_sparse_csr_matrix(PyObject *); #endif /* !__PYX_HAVE_API__pyarrow__lib */ diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc index 571502db1fc..5aa02c3c85e 100644 --- a/cpp/src/arrow/python/serialize.cc +++ b/cpp/src/arrow/python/serialize.cc @@ -63,7 +63,7 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, int32_t recursion_depth, SerializedPyObject* blobs_out); // A Sequence is a heterogeneous collections of elements. It can contain -// scalar Python types, lists, tuples, dictionaries and tensors. +// scalar Python types, lists, tuples, dictionaries, tensors and sparse tensors. class SequenceBuilder { public: explicit SequenceBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT) @@ -156,6 +156,26 @@ class SequenceBuilder { return tensor_indices_->Append(tensor_index); } + // Appending a sparse coo tensor to the sequence + // + // \param sparse_coo_tensor_index Index of the sparse coo tensor in the object. + Status AppendSparseCOOTensor(const int32_t sparse_coo_tensor_index) { + RETURN_NOT_OK(CreateAndUpdate(&sparse_coo_tensor_indices_, + PythonType::SPARSECOOTENSOR, + [this]() { return new Int32Builder(pool_); })); + return sparse_coo_tensor_indices_->Append(sparse_coo_tensor_index); + } + + // Appending a sparse csr matrix to the sequence + // + // \param sparse_csr_matrix_index Index of the sparse csr matrix in the object. + Status AppendSparseCSRMatrix(const int32_t sparse_csr_matrix_index) { + RETURN_NOT_OK(CreateAndUpdate(&sparse_csr_matrix_indices_, + PythonType::SPARSECSRMATRIX, + [this]() { return new Int32Builder(pool_); })); + return sparse_csr_matrix_indices_->Append(sparse_csr_matrix_index); + } + // Appending a numpy ndarray to the sequence // // \param tensor_index Index of the tensor in the object. @@ -250,6 +270,8 @@ class SequenceBuilder { std::shared_ptr sets_; std::shared_ptr tensor_indices_; + std::shared_ptr sparse_coo_tensor_indices_; + std::shared_ptr sparse_csr_matrix_indices_; std::shared_ptr ndarray_indices_; std::shared_ptr buffer_indices_; @@ -480,6 +502,18 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, std::shared_ptr tensor; RETURN_NOT_OK(unwrap_tensor(elem, &tensor)); blobs_out->tensors.push_back(tensor); + } else if (is_sparse_coo_tensor(elem)) { + RETURN_NOT_OK(builder->AppendSparseCOOTensor( + static_cast(blobs_out->sparse_tensors.size()))); + std::shared_ptr sparse_coo_tensor; + RETURN_NOT_OK(unwrap_sparse_coo_tensor(elem, &sparse_coo_tensor)); + blobs_out->sparse_tensors.push_back(sparse_coo_tensor); + } else if (is_sparse_csr_matrix(elem)) { + RETURN_NOT_OK(builder->AppendSparseCSRMatrix( + static_cast(blobs_out->sparse_tensors.size()))); + std::shared_ptr sparse_csr_matrix; + RETURN_NOT_OK(unwrap_sparse_csr_matrix(elem, &sparse_csr_matrix)); + blobs_out->sparse_tensors.push_back(sparse_csr_matrix); } else { // Attempt to serialize the object using the custom callback. PyObject* serialized_object; @@ -568,10 +602,13 @@ SerializedPyObject::SerializedPyObject() : ipc_options(ipc::IpcOptions::Defaults Status SerializedPyObject::WriteTo(io::OutputStream* dst) { int32_t num_tensors = static_cast(this->tensors.size()); + int32_t num_sparse_tensors = static_cast(this->sparse_tensors.size()); int32_t num_ndarrays = static_cast(this->ndarrays.size()); int32_t num_buffers = static_cast(this->buffers.size()); RETURN_NOT_OK( dst->Write(reinterpret_cast(&num_tensors), sizeof(int32_t))); + RETURN_NOT_OK( + dst->Write(reinterpret_cast(&num_sparse_tensors), sizeof(int32_t))); RETURN_NOT_OK( dst->Write(reinterpret_cast(&num_ndarrays), sizeof(int32_t))); RETURN_NOT_OK( @@ -591,6 +628,12 @@ Status SerializedPyObject::WriteTo(io::OutputStream* dst) { RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment)); } + for (const auto& sparse_tensor : this->sparse_tensors) { + RETURN_NOT_OK( + ipc::WriteSparseTensor(*sparse_tensor, dst, &metadata_length, &body_length)); + RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment)); + } + for (const auto& tensor : this->ndarrays) { RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length)); RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment)); @@ -605,11 +648,41 @@ Status SerializedPyObject::WriteTo(io::OutputStream* dst) { return Status::OK(); } +namespace { + +Status CountSparseTensors( + const std::vector>& sparse_tensors, PyObject** out) { + OwnedRef num_sparse_tensors(PyDict_New()); + size_t num_coo = 0; + size_t num_csr = 0; + + for (const auto& sparse_tensor : sparse_tensors) { + switch (sparse_tensor->format_id()) { + case SparseTensorFormat::COO: + ++num_coo; + break; + case SparseTensorFormat::CSR: + ++num_csr; + break; + } + } + + PyDict_SetItemString(num_sparse_tensors.obj(), "coo", PyLong_FromSize_t(num_coo)); + PyDict_SetItemString(num_sparse_tensors.obj(), "csr", PyLong_FromSize_t(num_csr)); + RETURN_IF_PYERROR(); + + *out = num_sparse_tensors.detach(); + return Status::OK(); +} + +} // namespace + Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out) { PyAcquireGIL py_gil; OwnedRef result(PyDict_New()); PyObject* buffers = PyList_New(0); + PyObject* num_sparse_tensors; // TODO(wesm): Not sure how pedantic we need to be about checking the return // values of these functions. There are other places where we do not check @@ -617,6 +690,8 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out // quite esoteric PyDict_SetItemString(result.obj(), "num_tensors", PyLong_FromSize_t(this->tensors.size())); + RETURN_NOT_OK(CountSparseTensors(this->sparse_tensors, &num_sparse_tensors)); + PyDict_SetItemString(result.obj(), "num_sparse_tensors", num_sparse_tensors); PyDict_SetItemString(result.obj(), "num_ndarrays", PyLong_FromSize_t(this->ndarrays.size())); PyDict_SetItemString(result.obj(), "num_buffers", @@ -660,6 +735,17 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out RETURN_NOT_OK(PushBuffer(message->body())); } + // For each sparse tensor, get a metadata buffer and buffers containing index and data + for (const auto& sparse_tensor : this->sparse_tensors) { + ipc::internal::IpcPayload payload; + RETURN_NOT_OK( + ipc::internal::GetSparseTensorPayload(*sparse_tensor, memory_pool, &payload)); + RETURN_NOT_OK(PushBuffer(payload.metadata)); + for (const auto& body : payload.body_buffers) { + RETURN_NOT_OK(PushBuffer(body)); + } + } + // For each ndarray, get a metadata buffer and a buffer for the body for (const auto& ndarray : this->ndarrays) { std::unique_ptr message; diff --git a/cpp/src/arrow/python/serialize.h b/cpp/src/arrow/python/serialize.h index 80f641667b2..e0f4a902f97 100644 --- a/cpp/src/arrow/python/serialize.h +++ b/cpp/src/arrow/python/serialize.h @@ -23,6 +23,7 @@ #include "arrow/ipc/options.h" #include "arrow/python/visibility.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" // Forward declaring PyObject, see @@ -51,6 +52,7 @@ namespace py { struct ARROW_PYTHON_EXPORT SerializedPyObject { std::shared_ptr batch; std::vector> tensors; + std::vector> sparse_tensors; std::vector> ndarrays; std::vector> buffers; ipc::IpcOptions ipc_options; @@ -66,13 +68,14 @@ struct ARROW_PYTHON_EXPORT SerializedPyObject { /// components as Buffer instances with minimal memory allocation /// /// { - /// 'num_tensors': N, + /// 'num_tensors': M, + /// 'num_sparse_tensors': N, /// 'num_buffers': K, /// 'data': [Buffer] /// } /// /// Each tensor is written as two buffers, one for the metadata and one for - /// the body. Therefore, the number of buffers in 'data' is 2 * N + K + 1, + /// the body. Therefore, the number of buffers in 'data' is 2 * M + 2 * N + K + 1, /// with the first buffer containing the serialized record batch containing /// the UnionArray that describes the whole object Status GetComponents(MemoryPool* pool, PyObject** out); @@ -100,6 +103,14 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject ARROW_PYTHON_EXPORT Status SerializeTensor(std::shared_ptr tensor, py::SerializedPyObject* out); +/// \brief Serialize an Arrow SparseCOOTensor as a SerializedPyObject. +/// \param[in] sparse_tensor SparseCOOTensor to be serialized +/// \param[out] out The serialized representation +/// \return Status +ARROW_PYTHON_EXPORT +Status SerializeSparseTensor(std::shared_ptr sparse_tensor, + py::SerializedPyObject* out); + /// \brief Write the Tensor metadata header to an OutputStream. /// \param[in] dtype DataType of the Tensor /// \param[in] shape The shape of the tensor @@ -129,6 +140,8 @@ struct PythonType { TENSOR, NDARRAY, BUFFER, + SPARSECOOTENSOR, + SPARSECSRMATRIX, NUM_PYTHON_TYPES }; }; diff --git a/cpp/src/arrow/sparse_tensor.cc b/cpp/src/arrow/sparse_tensor.cc index b80c478b0b7..ef7a986bf05 100644 --- a/cpp/src/arrow/sparse_tensor.cc +++ b/cpp/src/arrow/sparse_tensor.cc @@ -503,6 +503,28 @@ Status MakeTensorFromSparseTensor(MemoryPool* pool, const SparseTensor* sparse_t // ---------------------------------------------------------------------- // SparseCOOIndex +Status SparseCOOIndex::Make(std::shared_ptr indices_type, + const std::vector& indices_shape, + const std::vector& indices_strides, + std::shared_ptr indices_data, + std::shared_ptr* out) { + *out = std::make_shared(std::make_shared( + indices_type, indices_data, indices_shape, indices_strides)); + return Status::OK(); +} + +Status SparseCOOIndex::Make(std::shared_ptr indices_type, + const std::vector& shape, int64_t non_zero_length, + std::shared_ptr indices_data, + std::shared_ptr* out) { + auto ndim = static_cast(shape.size()); + const int64_t elsize = sizeof(indices_type.get()); + std::vector indices_shape({non_zero_length, ndim}); + std::vector indices_strides({elsize, elsize * non_zero_length}); + return SparseCOOIndex::Make(indices_type, indices_shape, indices_strides, indices_data, + out); +} + // Constructor with a contiguous NumericTensor SparseCOOIndex::SparseCOOIndex(const std::shared_ptr& coords) : SparseIndexBase(coords->shape()[0]), coords_(coords) { @@ -516,6 +538,29 @@ std::string SparseCOOIndex::ToString() const { return std::string("SparseCOOInde // ---------------------------------------------------------------------- // SparseCSRIndex +Status SparseCSRIndex::Make(const std::shared_ptr indices_type, + const std::vector& indptr_shape, + const std::vector& indices_shape, + std::shared_ptr indptr_data, + std::shared_ptr indices_data, + std::shared_ptr* out) { + *out = std::make_shared( + std::make_shared(indices_type, indptr_data, indptr_shape), + std::make_shared(indices_type, indices_data, indices_shape)); + return Status::OK(); +} + +Status SparseCSRIndex::Make(const std::shared_ptr indices_type, + const std::vector& shape, int64_t non_zero_length, + std::shared_ptr indptr_data, + std::shared_ptr indices_data, + std::shared_ptr* out) { + std::vector indptr_shape({shape[0] + 1}); + std::vector indices_shape({non_zero_length}); + return SparseCSRIndex::Make(indices_type, indptr_shape, indices_shape, indptr_data, + indices_data, out); +} + // Constructor with two index vectors SparseCSRIndex::SparseCSRIndex(const std::shared_ptr& indptr, const std::shared_ptr& indices) diff --git a/cpp/src/arrow/sparse_tensor.h b/cpp/src/arrow/sparse_tensor.h index dc5c9bffa45..259284a5b40 100644 --- a/cpp/src/arrow/sparse_tensor.h +++ b/cpp/src/arrow/sparse_tensor.h @@ -88,6 +88,19 @@ class ARROW_EXPORT SparseCOOIndex : public internal::SparseIndexBase indices_type, + const std::vector& indices_shape, + const std::vector& indices_strides, + std::shared_ptr indices_data, + std::shared_ptr* out); + + /// \brief Make SparseCOOIndex from sparse tensor's shape properties and data + static Status Make(const std::shared_ptr indices_type, + const std::vector& shape, int64_t non_zero_length, + std::shared_ptr indices_data, + std::shared_ptr* out); + /// \brief Construct SparseCOOIndex from column-major NumericTensor explicit SparseCOOIndex(const std::shared_ptr& coords); @@ -130,6 +143,21 @@ class ARROW_EXPORT SparseCSRIndex : public internal::SparseIndexBase indices_type, + const std::vector& indptr_shape, + const std::vector& indices_shape, + std::shared_ptr indptr_data, + std::shared_ptr indices_data, + std::shared_ptr* out); + + /// \brief Make SparseCSRIndex from sparse tensor's shape properties and data + static Status Make(const std::shared_ptr indices_type, + const std::vector& shape, int64_t non_zero_length, + std::shared_ptr indptr_data, + std::shared_ptr indices_data, + std::shared_ptr* out); + /// \brief Construct SparseCSRIndex from two index vectors explicit SparseCSRIndex(const std::shared_ptr& indptr, const std::shared_ptr& indices); diff --git a/docs/source/python/extending.rst b/docs/source/python/extending.rst index a8bc3b69fc2..b724ed33956 100644 --- a/docs/source/python/extending.rst +++ b/docs/source/python/extending.rst @@ -266,11 +266,11 @@ an exception) if the input is not of the right type. Unwrap the Arrow C++ :cpp:class:`Tensor` pointer from *obj*. -.. function:: pyarrow_unwrap_sparse_tensor_coo(obj) -> shared_ptr[CSparseTensorCOO] +.. function:: pyarrow_unwrap_sparse_coo_tensor(obj) -> shared_ptr[CSparseCOOTensor] Unwrap the Arrow C++ :cpp:class:`SparseCOOTensor` pointer from *obj*. -.. function:: pyarrow_unwrap_sparse_tensor_csr(obj) -> shared_ptr[CSparseTensorCSR] +.. function:: pyarrow_unwrap_sparse_csr_matrix(obj) -> shared_ptr[CSparseCSRMatrix] Unwrap the Arrow C++ :cpp:class:`SparseCSRMatrix` pointer from *obj*. @@ -313,11 +313,11 @@ pyarray object of the corresponding type. An exception is raised on error. Wrap the Arrow C++ *tensor* in a Python :class:`pyarrow.Tensor` instance. -.. function:: pyarrow_wrap_sparse_tensor_coo(sp_array: const shared_ptr[CSparseTensorCOO]& sparse_tensor) -> object +.. function:: pyarrow_wrap_sparse_coo_tensor(sp_array: const shared_ptr[CSparseCOOTensor]& sparse_tensor) -> object Wrap the Arrow C++ *COO sparse tensor* in a Python :class:`pyarrow.SparseCOOTensor` instance. -.. function:: pyarrow_wrap_sparse_tensor_csr(sp_array: const shared_ptr[CSparseTensorCSR]& sparse_tensor) -> object +.. function:: pyarrow_wrap_sparse_csr_matrix(sp_array: const shared_ptr[CSparseCSRMatrix]& sparse_tensor) -> object Wrap the Arrow C++ *CSR sparse tensor* in a Python :class:`pyarrow.SparseCSRMatrix` instance. diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 06ec558230b..0055918db89 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1573,10 +1573,20 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil: CStatus ReadSerializedObject(CRandomAccessFile* src, CSerializedPyObject* out) - CStatus GetSerializedFromComponents(int num_tensors, int num_ndarrays, - int num_buffers, - object buffers, - CSerializedPyObject* out) + cdef cppclass SparseTensorCounts: + SparseTensorCounts() + int coo + int csr + int num_total_tensors() const + int num_total_buffers() const + + CStatus GetSerializedFromComponents( + int num_tensors, + const SparseTensorCounts& num_sparse_tensors, + int num_ndarrays, + int num_buffers, + object buffers, + CSerializedPyObject* out) cdef extern from "arrow/python/api.h" namespace "arrow::py::internal" nogil: diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index cf343b0ba32..dff2a2904e0 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -301,10 +301,16 @@ cdef class SerializedPyObject: int num_ndarrays = components['num_ndarrays'] int num_buffers = components['num_buffers'] list buffers = components['data'] + SparseTensorCounts num_sparse_tensors = SparseTensorCounts() SerializedPyObject result = SerializedPyObject() + num_sparse_tensors.coo = components['num_sparse_tensors']['coo'] + num_sparse_tensors.csr = components['num_sparse_tensors']['csr'] + with nogil: - check_status(GetSerializedFromComponents(num_tensors, num_ndarrays, + check_status(GetSerializedFromComponents(num_tensors, + num_sparse_tensors, + num_ndarrays, num_buffers, buffers, &result.data)) diff --git a/python/pyarrow/serialization.py b/python/pyarrow/serialization.py index f4b8e967410..ea6deab2269 100644 --- a/python/pyarrow/serialization.py +++ b/python/pyarrow/serialization.py @@ -228,16 +228,19 @@ def register_torch_serialization_handlers(serialization_context): def _serialize_torch_tensor(obj): if obj.is_sparse: - # TODO(pcm): Once ARROW-4453 is resolved, return sparse - # tensor representation here - return (obj._indices().detach().numpy(), - obj._values().detach().numpy(), list(obj.shape)) + return pa.SparseCOOTensor.from_numpy( + obj._values().detach().numpy(), + obj._indices().detach().numpy().T, + shape=list(obj.shape)) else: return obj.detach().numpy() def _deserialize_torch_tensor(data): - if isinstance(data, tuple): - return torch.sparse_coo_tensor(data[0], data[1], data[2]) + if isinstance(data, pa.SparseCOOTensor): + return torch.sparse_coo_tensor( + indices=data.to_numpy()[1].T, + values=data.to_numpy()[0][:, 0], + size=data.shape) else: return torch.from_numpy(data) @@ -299,6 +302,54 @@ def _deserialize_counter(data): custom_deserializer=_deserialize_counter) +# ---------------------------------------------------------------------- +# Set up serialization for scipy sparse matrices. Primitive types are handled +# efficiently with Arrow's SparseTensor facilities, see numpy_convert.cc) + +def _register_scipy_handlers(serialization_context): + try: + from scipy.sparse import csr_matrix, coo_matrix, isspmatrix_coo, \ + isspmatrix_csr, isspmatrix + + def _serialize_scipy_sparse(obj): + if isspmatrix_coo(obj): + return 'coo', pa.SparseCOOTensor.from_scipy(obj) + + elif isspmatrix_csr(obj): + return 'csr', pa.SparseCSRMatrix.from_scipy(obj) + + elif isspmatrix(obj): + return 'csr', pa.SparseCOOTensor.from_scipy(obj.to_coo()) + + else: + raise NotImplementedError( + "Serialization of {} is not supported.".format(obj[0])) + + def _deserialize_scipy_sparse(data): + if data[0] == 'coo': + return data[1].to_scipy() + + elif data[0] == 'csr': + return data[1].to_scipy() + + else: + return data[1].to_scipy() + + serialization_context.register_type( + coo_matrix, 'scipy.sparse.coo.coo_matrix', + custom_serializer=_serialize_scipy_sparse, + custom_deserializer=_deserialize_scipy_sparse) + + serialization_context.register_type( + csr_matrix, 'scipy.sparse.csr.csr_matrix', + custom_serializer=_serialize_scipy_sparse, + custom_deserializer=_deserialize_scipy_sparse) + + except ImportError: + # no scipy + pass + + def register_default_serialization_handlers(serialization_context): # ---------------------------------------------------------------------- @@ -351,6 +402,7 @@ def register_default_serialization_handlers(serialization_context): _register_collections_serialization_handlers(serialization_context) _register_custom_pandas_handlers(serialization_context) + _register_scipy_handlers(serialization_context) def default_serialization_context(): diff --git a/python/pyarrow/tensor.pxi b/python/pyarrow/tensor.pxi index 4b93676ee07..8661c1edd13 100644 --- a/python/pyarrow/tensor.pxi +++ b/python/pyarrow/tensor.pxi @@ -179,6 +179,34 @@ shape: {0.shape}""".format(self) c_dim_names, &csparse_tensor)) return pyarrow_wrap_sparse_coo_tensor(csparse_tensor) + @staticmethod + def from_scipy(obj, dim_names=None): + """ + Convert scipy.sparse.coo_matrix to arrow::SparseCOOTensor + """ + import scipy.sparse + if not isinstance(obj, scipy.sparse.coo_matrix): + raise TypeError( + "Expected scipy.sparse.coo_matrix, got {}".format(type(obj))) + + cdef shared_ptr[CSparseCOOTensor] csparse_tensor + cdef vector[int64_t] c_shape + cdef vector[c_string] c_dim_names + + for x in obj.shape: + c_shape.push_back(x) + if dim_names is not None: + for x in dim_names: + c_dim_names.push_back(tobytes(x)) + + coords = np.vstack([obj.row, obj.col]).T + coords = np.require(coords, dtype='i8', requirements='F') + + check_status(NdarraysToSparseCOOTensor(c_default_memory_pool(), + obj.data.view(), coords, c_shape, c_dim_names, + &csparse_tensor)) + return pyarrow_wrap_sparse_coo_tensor(csparse_tensor) + @staticmethod def from_tensor(obj): """ @@ -203,6 +231,22 @@ shape: {0.shape}""".format(self) &out_data, &out_coords)) return PyObject_to_object(out_data), PyObject_to_object(out_coords) + def to_scipy(self): + """ + Convert arrow::SparseCOOTensor to scipy.sparse.coo_matrix + """ + from scipy.sparse import coo_matrix + cdef PyObject* out_data + cdef PyObject* out_coords + + check_status(SparseCOOTensorToNdarray(self.sp_sparse_tensor, self, + &out_data, &out_coords)) + data = PyObject_to_object(out_data) + coords = PyObject_to_object(out_coords) + result = coo_matrix((data[:, 0], (coords[:, 0], coords[:, 1])), + shape=self.shape) + return result + def to_tensor(self): """ Convert arrow::SparseCOOTensor to arrow::Tensor @@ -247,7 +291,7 @@ shape: {0.shape}""".format(self) @property def dim_names(self): - return [frombytes(x) for x in tuple(self.stp.dim_names())] + return tuple(frombytes(x) for x in tuple(self.stp.dim_names())) @property def non_zero_length(self): @@ -311,6 +355,35 @@ shape: {0.shape}""".format(self) c_dim_names, &csparse_tensor)) return pyarrow_wrap_sparse_csr_matrix(csparse_tensor) + @staticmethod + def from_scipy(obj, dim_names=None): + """ + Convert scipy.sparse.csr_matrix to arrow::SparseCSRMatrix + """ + import scipy.sparse + if not isinstance(obj, scipy.sparse.csr_matrix): + raise TypeError( + "Expected scipy.sparse.csr_matrix, got {}".format(type(obj))) + + cdef shared_ptr[CSparseCSRMatrix] csparse_tensor + cdef vector[int64_t] c_shape + cdef vector[c_string] c_dim_names + + for x in obj.shape: + c_shape.push_back(x) + if dim_names is not None: + for x in dim_names: + c_dim_names.push_back(tobytes(x)) + + # Enforce precondition for CSparseCSRMatrix indices + indptr = np.require(obj.indptr, dtype='i8') + indices = np.require(obj.indices, dtype='i8') + + check_status(NdarraysToSparseCSRMatrix(c_default_memory_pool(), + obj.data, indptr, indices, c_shape, + c_dim_names, &csparse_tensor)) + return pyarrow_wrap_sparse_csr_matrix(csparse_tensor) + @staticmethod def from_tensor(obj): """ @@ -338,6 +411,24 @@ shape: {0.shape}""".format(self) return (PyObject_to_object(out_data), PyObject_to_object(out_indptr), PyObject_to_object(out_indices)) + def to_scipy(self): + """ + Convert arrow::SparseCSRMatrix to scipy.sparse.csr_matrix + """ + from scipy.sparse import csr_matrix + cdef PyObject* out_data + cdef PyObject* out_indptr + cdef PyObject* out_indices + + check_status(SparseCSRMatrixToNdarray(self.sp_sparse_tensor, self, + &out_data, &out_indptr, &out_indices)) + + data = PyObject_to_object(out_data) + indptr = PyObject_to_object(out_indptr) + indices = PyObject_to_object(out_indices) + result = csr_matrix((data[:, 0], indices, indptr), shape=self.shape) + return result + def to_tensor(self): """ Convert arrow::SparseCSRMatrix to arrow::Tensor @@ -382,7 +473,7 @@ shape: {0.shape}""".format(self) @property def dim_names(self): - return [frombytes(x) for x in tuple(self.stp.dim_names())] + return tuple(frombytes(x) for x in tuple(self.stp.dim_names())) @property def non_zero_length(self): diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py index 5ba54d3277b..53fccff5763 100644 --- a/python/pyarrow/tests/test_serialization.py +++ b/python/pyarrow/tests/test_serialization.py @@ -40,6 +40,12 @@ # failing (ARROW-2071) sys.modules['torch'] = None +try: + from scipy.sparse import csr_matrix, coo_matrix +except ImportError: + coo_matrix = None + csr_matrix = None + def assert_equal(obj1, obj2): if torch is not None and torch.is_tensor(obj1) and torch.is_tensor(obj2): @@ -111,6 +117,12 @@ def assert_equal(obj1, obj2): assert obj1.equals(obj2) elif isinstance(obj1, pa.Tensor) and isinstance(obj2, pa.Tensor): assert obj1.equals(obj2) + elif isinstance(obj1, pa.SparseCOOTensor) and \ + isinstance(obj2, pa.SparseCOOTensor): + assert obj1.equals(obj2) + elif isinstance(obj1, pa.SparseCSRMatrix) and \ + isinstance(obj2, pa.SparseCSRMatrix): + assert obj1.equals(obj2) elif isinstance(obj1, pa.RecordBatch) and isinstance(obj2, pa.RecordBatch): assert obj1.equals(obj2) elif isinstance(obj1, pa.Table) and isinstance(obj2, pa.Table): @@ -140,6 +152,11 @@ def assert_equal(obj1, obj2): ] +index_types = ('i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8') +tensor_types = ('i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8', + 'f2', 'f4', 'f8') + + if sys.version_info >= (3, 0): PRIMITIVE_OBJECTS += [0, np.array([["hi", u"hi"], [1.3, 1]])] else: @@ -520,6 +537,122 @@ def deserializer(data): assert np.alltrue(new_x.view(np.ndarray) == np.zeros(3)) +@pytest.mark.parametrize('tensor_type', tensor_types) +@pytest.mark.parametrize('index_type', index_types) +def test_sparse_coo_tensor_serialization(index_type, tensor_type): + tensor_dtype = np.dtype(tensor_type) + index_dtype = np.dtype(index_type) + data = np.array([[1, 2, 3, 4, 5, 6]]).T.astype(tensor_dtype) + coords = np.array([ + [0, 0, 2, 3, 1, 3], + [0, 2, 0, 4, 5, 5], + ]).T.astype(index_dtype) + shape = (4, 6) + dim_names = ('x', 'y') + + sparse_tensor = pa.SparseCOOTensor.from_numpy(data, coords, + shape, dim_names) + + context = pa.default_serialization_context() + serialized = pa.serialize(sparse_tensor, context=context).to_buffer() + result = pa.deserialize(serialized) + assert_equal(result, sparse_tensor) + + data_result, coords_result = result.to_numpy() + assert np.array_equal(data_result, data) + assert np.array_equal(coords_result, coords) + assert result.dim_names == dim_names + + +@pytest.mark.parametrize('tensor_type', tensor_types) +@pytest.mark.parametrize('index_type', index_types) +def test_sparse_coo_tensor_components_serialization(large_buffer, + index_type, tensor_type): + tensor_dtype = np.dtype(tensor_type) + index_dtype = np.dtype(index_type) + data = np.array([[1, 2, 3, 4, 5, 6]]).T.astype(tensor_dtype) + coords = np.array([ + [0, 0, 2, 3, 1, 3], + [0, 2, 0, 4, 5, 5], + ]).T.astype(index_dtype) + shape = (4, 6) + dim_names = ('x', 'y') + + sparse_tensor = pa.SparseCOOTensor.from_numpy(data, coords, + shape, dim_names) + serialization_roundtrip(sparse_tensor, large_buffer) + + +@pytest.mark.skipif(not coo_matrix, reason="requires scipy") +def test_scipy_sparse_coo_tensor_serialization(): + data = np.array([1, 2, 3, 4, 5, 6]) + row = np.array([0, 0, 2, 3, 1, 3]) + col = np.array([0, 2, 0, 4, 5, 5]) + shape = (4, 6) + + sparse_array = coo_matrix((data, (row, col)), shape=shape) + serialized = pa.serialize(sparse_array) + result = serialized.deserialize() + + assert np.array_equal(sparse_array.toarray(), result.toarray()) + + +@pytest.mark.parametrize('tensor_type', tensor_types) +@pytest.mark.parametrize('index_type', index_types) +def test_sparse_csr_matrix_serialization(index_type, tensor_type): + tensor_dtype = np.dtype(tensor_type) + index_dtype = np.dtype(index_type) + data = np.array([[8, 2, 5, 3, 4, 6]]).T.astype(tensor_dtype) + indptr = np.array([0, 2, 3, 4, 6]).astype(index_dtype) + indices = np.array([0, 2, 5, 0, 4, 5]).astype(index_dtype) + shape = (4, 6) + dim_names = ('x', 'y') + + sparse_tensor = pa.SparseCSRMatrix.from_numpy(data, indptr, indices, + shape, dim_names) + + context = pa.default_serialization_context() + serialized = pa.serialize(sparse_tensor, context=context).to_buffer() + result = pa.deserialize(serialized) + + data_result, indptr_result, indices_result = result.to_numpy() + assert np.array_equal(data_result, data) + assert np.array_equal(indptr_result, indptr) + assert np.array_equal(indices_result, indices) + assert result.dim_names == dim_names + + +@pytest.mark.parametrize('tensor_type', tensor_types) +@pytest.mark.parametrize('index_type', index_types) +def test_sparse_csr_matrix_components_serialization(large_buffer, + index_type, tensor_type): + tensor_dtype = np.dtype(tensor_type) + index_dtype = np.dtype(index_type) + data = np.array([8, 2, 5, 3, 4, 6]).astype(tensor_dtype) + indptr = np.array([0, 2, 3, 4, 6]).astype(index_dtype) + indices = np.array([0, 2, 5, 0, 4, 5]).astype(index_dtype) + shape = (4, 6) + dim_names = ('x', 'y') + + sparse_tensor = pa.SparseCSRMatrix.from_numpy(data, indptr, indices, + shape, dim_names) + serialization_roundtrip(sparse_tensor, large_buffer) + + +@pytest.mark.skipif(not csr_matrix, reason="requires scipy") +def test_scipy_sparse_csr_matrix_serialization(): + data = np.array([8, 2, 5, 3, 4, 6]) + indptr = np.array([0, 2, 3, 4, 6]) + indices = np.array([0, 2, 5, 0, 4, 5]) + shape = (4, 6) + + sparse_array = csr_matrix((data, indices, indptr), shape=shape) + serialized = pa.serialize(sparse_array) + result = serialized.deserialize() + + assert np.array_equal(sparse_array.toarray(), result.toarray()) + + @pytest.mark.filterwarnings( "ignore:the matrix subclass:PendingDeprecationWarning") def test_numpy_matrix_serialization(tmpdir): @@ -722,6 +855,7 @@ def test_serialize_to_components_invalid_cases(): components = { 'num_tensors': 0, + 'num_sparse_tensors': {'coo': 0, 'csr': 0}, 'num_ndarrays': 0, 'num_buffers': 1, 'data': [buf] @@ -732,6 +866,7 @@ def test_serialize_to_components_invalid_cases(): components = { 'num_tensors': 0, + 'num_sparse_tensors': {'coo': 0, 'csr': 0}, 'num_ndarrays': 1, 'num_buffers': 0, 'data': [buf, buf] diff --git a/python/pyarrow/tests/test_sparse_tensor.py b/python/pyarrow/tests/test_sparse_tensor.py index aaf0468f982..31c40bd94ed 100644 --- a/python/pyarrow/tests/test_sparse_tensor.py +++ b/python/pyarrow/tests/test_sparse_tensor.py @@ -21,6 +21,12 @@ import numpy as np import pyarrow as pa +try: + from scipy.sparse import csr_matrix, coo_matrix +except ImportError: + coo_matrix = None + csr_matrix = None + tensor_type_pairs = [ ('i1', pa.int8()), @@ -43,50 +49,56 @@ ]) def test_sparse_tensor_attrs(sparse_tensor_type): data = np.array([ - [0, 1, 0, 0, 1], - [0, 0, 0, 0, 0], - [0, 0, 0, 1, 0], - [0, 0, 0, 0, 0], - [0, 3, 0, 0, 0], + [8, 0, 2, 0, 0, 0], + [0, 0, 0, 0, 0, 5], + [3, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 4, 6], ]) - dim_names = ['x', 'y'] + dim_names = ('x', 'y') sparse_tensor = sparse_tensor_type.from_dense_numpy(data, dim_names) assert sparse_tensor.ndim == 2 - assert sparse_tensor.size == 25 + assert sparse_tensor.size == 24 assert sparse_tensor.shape == data.shape assert sparse_tensor.is_mutable assert sparse_tensor.dim_name(0) == dim_names[0] assert sparse_tensor.dim_names == dim_names - assert sparse_tensor.non_zero_length == 4 - - -def test_sparse_tensor_coo_base_object(): - data = np.array([[4], [9], [7], [5]]) - coords = np.array([[0, 0], [0, 2], [1, 1], [3, 3]]) - array = np.array([[4, 0, 9, 0], - [0, 7, 0, 0], - [0, 0, 0, 0], - [0, 0, 0, 5]]) + assert sparse_tensor.non_zero_length == 6 + + +def test_sparse_coo_tensor_base_object(): + expected_data = np.array([[8, 2, 5, 3, 4, 6]]).T + expected_coords = np.array([ + [0, 0, 1, 2, 3, 3], + [0, 2, 5, 0, 4, 5], + ]).T + array = np.array([ + [8, 0, 2, 0, 0, 0], + [0, 0, 0, 0, 0, 5], + [3, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 4, 6], + ]) sparse_tensor = pa.SparseCOOTensor.from_dense_numpy(array) n = sys.getrefcount(sparse_tensor) result_data, result_coords = sparse_tensor.to_numpy() assert sys.getrefcount(sparse_tensor) == n + 2 sparse_tensor = None - assert np.array_equal(data, result_data) - assert np.array_equal(coords, result_coords) + assert np.array_equal(expected_data, result_data) + assert np.array_equal(expected_coords, result_coords) assert result_coords.flags.f_contiguous # column-major -def test_sparse_tensor_csr_base_object(): - data = np.array([[1], [2], [3], [4], [5], [6]]) - indptr = np.array([0, 2, 3, 6]) - indices = np.array([0, 2, 2, 0, 1, 2]) - array = np.array([[1, 0, 2], - [0, 0, 3], - [4, 5, 6]]) - +def test_sparse_csr_matrix_base_object(): + data = np.array([[8, 2, 5, 3, 4, 6]]).T + indptr = np.array([0, 2, 3, 4, 6]) + indices = np.array([0, 2, 5, 0, 4, 5]) + array = np.array([ + [8, 0, 2, 0, 0, 0], + [0, 0, 0, 0, 0, 5], + [3, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 4, 6], + ]) sparse_tensor = pa.SparseCSRMatrix.from_dense_numpy(array) n = sys.getrefcount(sparse_tensor) result_data, result_indptr, result_indices = sparse_tensor.to_numpy() @@ -126,49 +138,57 @@ def ne(a, b): @pytest.mark.parametrize('dtype_str,arrow_type', tensor_type_pairs) -def test_sparse_tensor_coo_from_dense(dtype_str, arrow_type): +def test_sparse_coo_tensor_from_dense(dtype_str, arrow_type): dtype = np.dtype(dtype_str) - data = np.array([[4], [9], [7], [5]]).astype(dtype) - coords = np.array([[0, 0], [0, 2], [1, 1], [3, 3]]) - array = np.array([[4, 0, 9, 0], - [0, 7, 0, 0], - [0, 0, 0, 0], - [0, 0, 0, 5]]).astype(dtype) + expected_data = np.array([[8, 2, 5, 3, 4, 6]]).T.astype(dtype) + expected_coords = np.array([ + [0, 0, 1, 2, 3, 3], + [0, 2, 5, 0, 4, 5], + ]).T + array = np.array([ + [8, 0, 2, 0, 0, 0], + [0, 0, 0, 0, 0, 5], + [3, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 4, 6], + ]).astype(dtype) tensor = pa.Tensor.from_numpy(array) # Test from numpy array sparse_tensor = pa.SparseCOOTensor.from_dense_numpy(array) repr(sparse_tensor) - assert sparse_tensor.type == arrow_type result_data, result_coords = sparse_tensor.to_numpy() - assert np.array_equal(data, result_data) - assert np.array_equal(coords, result_coords) + assert sparse_tensor.type == arrow_type + assert np.array_equal(expected_data, result_data) + assert np.array_equal(expected_coords, result_coords) # Test from Tensor sparse_tensor = pa.SparseCOOTensor.from_tensor(tensor) repr(sparse_tensor) - assert sparse_tensor.type == arrow_type result_data, result_coords = sparse_tensor.to_numpy() - assert np.array_equal(data, result_data) - assert np.array_equal(coords, result_coords) + assert sparse_tensor.type == arrow_type + assert np.array_equal(expected_data, result_data) + assert np.array_equal(expected_coords, result_coords) @pytest.mark.parametrize('dtype_str,arrow_type', tensor_type_pairs) -def test_sparse_tensor_csr_from_dense(dtype_str, arrow_type): +def test_sparse_csr_matrix_from_dense(dtype_str, arrow_type): dtype = np.dtype(dtype_str) - dense_data = np.array([[1, 0, 2], - [0, 0, 3], - [4, 5, 6]]).astype(dtype) - - data = np.array([[1], [2], [3], [4], [5], [6]]) - indptr = np.array([0, 2, 3, 6]) - indices = np.array([0, 2, 2, 0, 1, 2]) - tensor = pa.Tensor.from_numpy(dense_data) + data = np.array([[8, 2, 5, 3, 4, 6]]).T.astype(dtype) + indptr = np.array([0, 2, 3, 4, 6]) + indices = np.array([0, 2, 5, 0, 4, 5]) + array = np.array([ + [8, 0, 2, 0, 0, 0], + [0, 0, 0, 0, 0, 5], + [3, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 4, 6], + ]).astype(dtype) + tensor = pa.Tensor.from_numpy(array) # Test from numpy array - sparse_tensor = pa.SparseCSRMatrix.from_dense_numpy(dense_data) + sparse_tensor = pa.SparseCSRMatrix.from_dense_numpy(array) repr(sparse_tensor) result_data, result_indptr, result_indices = sparse_tensor.to_numpy() + assert sparse_tensor.type == arrow_type assert np.array_equal(data, result_data) assert np.array_equal(indptr, result_indptr) assert np.array_equal(indices, result_indices) @@ -176,45 +196,48 @@ def test_sparse_tensor_csr_from_dense(dtype_str, arrow_type): # Test from Tensor sparse_tensor = pa.SparseCSRMatrix.from_tensor(tensor) repr(sparse_tensor) - assert sparse_tensor.type == arrow_type result_data, result_indptr, result_indices = sparse_tensor.to_numpy() + assert sparse_tensor.type == arrow_type assert np.array_equal(data, result_data) assert np.array_equal(indptr, result_indptr) assert np.array_equal(indices, result_indices) @pytest.mark.parametrize('dtype_str,arrow_type', tensor_type_pairs) -def test_sparse_tensor_coo_numpy_roundtrip(dtype_str, arrow_type): +def test_sparse_coo_tensor_numpy_roundtrip(dtype_str, arrow_type): dtype = np.dtype(dtype_str) - data = np.array([[4], [9], [7], [5]]).astype(dtype) - coords = np.array([[0, 0], [3, 3], [1, 1], [0, 2]]) - shape = (4, 4) - dim_names = ["x", "y"] + data = np.array([[1, 2, 3, 4, 5, 6]]).T.astype(dtype) + coords = np.array([ + [0, 0, 2, 3, 1, 3], + [0, 2, 0, 4, 5, 5], + ]).T + shape = (4, 6) + dim_names = ('x', 'y') sparse_tensor = pa.SparseCOOTensor.from_numpy(data, coords, shape, dim_names) repr(sparse_tensor) - assert sparse_tensor.type == arrow_type result_data, result_coords = sparse_tensor.to_numpy() + assert sparse_tensor.type == arrow_type assert np.array_equal(data, result_data) assert np.array_equal(coords, result_coords) assert sparse_tensor.dim_names == dim_names @pytest.mark.parametrize('dtype_str,arrow_type', tensor_type_pairs) -def test_sparse_tensor_csr_numpy_roundtrip(dtype_str, arrow_type): +def test_sparse_csr_matrix_numpy_roundtrip(dtype_str, arrow_type): dtype = np.dtype(dtype_str) - data = np.array([[1], [2], [3], [4], [5], [6]]).astype(dtype) - indptr = np.array([0, 2, 3, 6]) - indices = np.array([0, 2, 2, 0, 1, 2]) - shape = (3, 3) - dim_names = ["x", "y"] + data = np.array([[8, 2, 5, 3, 4, 6]]).T.astype(dtype) + indptr = np.array([0, 2, 3, 4, 6]) + indices = np.array([0, 2, 5, 0, 4, 5]) + shape = (4, 6) + dim_names = ('x', 'y') sparse_tensor = pa.SparseCSRMatrix.from_numpy(data, indptr, indices, shape, dim_names) repr(sparse_tensor) - assert sparse_tensor.type == arrow_type result_data, result_indptr, result_indices = sparse_tensor.to_numpy() + assert sparse_tensor.type == arrow_type assert np.array_equal(data, result_data) assert np.array_equal(indptr, result_indptr) assert np.array_equal(indices, result_indices) @@ -232,11 +255,73 @@ def test_dense_to_sparse_tensor(dtype_str, arrow_type, sparse_tensor_type): [0, 7, 0, 0], [0, 0, 0, 0], [0, 0, 0, 5]]).astype(dtype) + dim_names = ('x', 'y') - sparse_tensor = sparse_tensor_type.from_dense_numpy(array) + sparse_tensor = sparse_tensor_type.from_dense_numpy(array, dim_names) tensor = sparse_tensor.to_tensor() result_array = tensor.to_numpy() assert sparse_tensor.type == arrow_type assert tensor.type == arrow_type + assert sparse_tensor.dim_names == dim_names assert np.array_equal(array, result_array) + + +@pytest.mark.skipif(not coo_matrix, reason="requires scipy") +@pytest.mark.parametrize('dtype_str,arrow_type', tensor_type_pairs) +def test_sparse_coo_tensor_scipy_roundtrip(dtype_str, arrow_type): + dtype = np.dtype(dtype_str) + data = np.array([1, 2, 3, 4, 5, 6]).astype(dtype) + row = np.array([0, 0, 2, 3, 1, 3]) + col = np.array([0, 2, 0, 4, 5, 5]) + shape = (4, 6) + dim_names = ('x', 'y') + + sparse_array = coo_matrix((data, (row, col)), shape=shape) + sparse_tensor = pa.SparseCOOTensor.from_scipy(sparse_array, + dim_names=dim_names) + out_sparse_array = sparse_tensor.to_scipy() + + assert sparse_tensor.type == arrow_type + assert sparse_tensor.dim_names == dim_names + assert sparse_array.dtype == out_sparse_array.dtype + assert np.array_equal(sparse_array.data, out_sparse_array.data) + assert np.array_equal(sparse_array.row, out_sparse_array.row) + assert np.array_equal(sparse_array.col, out_sparse_array.col) + + if dtype_str == 'f2': + dense_array = \ + sparse_array.astype(np.float32).toarray().astype(np.float16) + else: + dense_array = sparse_array.toarray() + assert np.array_equal(dense_array, sparse_tensor.to_tensor().to_numpy()) + + +@pytest.mark.skipif(not csr_matrix, reason="requires scipy") +@pytest.mark.parametrize('dtype_str,arrow_type', tensor_type_pairs) +def test_sparse_csr_matrix_scipy_roundtrip(dtype_str, arrow_type): + dtype = np.dtype(dtype_str) + data = np.array([8, 2, 5, 3, 4, 6]).astype(dtype) + indptr = np.array([0, 2, 3, 4, 6]) + indices = np.array([0, 2, 5, 0, 4, 5]) + shape = (4, 6) + dim_names = ('x', 'y') + + sparse_array = csr_matrix((data, indices, indptr), shape=shape) + sparse_tensor = pa.SparseCSRMatrix.from_scipy(sparse_array, + dim_names=dim_names) + out_sparse_array = sparse_tensor.to_scipy() + + assert sparse_tensor.type == arrow_type + assert sparse_tensor.dim_names == dim_names + assert sparse_array.dtype == out_sparse_array.dtype + assert np.array_equal(sparse_array.data, out_sparse_array.data) + assert np.array_equal(sparse_array.indptr, out_sparse_array.indptr) + assert np.array_equal(sparse_array.indices, out_sparse_array.indices) + + if dtype_str == 'f2': + dense_array = \ + sparse_array.astype(np.float32).toarray().astype(np.float16) + else: + dense_array = sparse_array.toarray() + assert np.array_equal(dense_array, sparse_tensor.to_tensor().to_numpy())