diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index f2a81124728..91bdce294c2 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -83,6 +83,7 @@ set(ARROW_SRCS table.cc table_builder.cc tensor.cc + sparse_tensor.cc type.cc visitor.cc @@ -286,6 +287,7 @@ ADD_ARROW_TEST(type-test) ADD_ARROW_TEST(table-test) ADD_ARROW_TEST(table_builder-test) ADD_ARROW_TEST(tensor-test) +ADD_ARROW_TEST(sparse_tensor-test) ADD_ARROW_BENCHMARK(builder-benchmark) ADD_ARROW_BENCHMARK(column-benchmark) diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc index efc8ad82faf..114752934c9 100644 --- a/cpp/src/arrow/compare.cc +++ b/cpp/src/arrow/compare.cc @@ -30,6 +30,7 @@ #include "arrow/array.h" #include "arrow/buffer.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" #include "arrow/tensor.h" #include "arrow/type.h" @@ -782,6 +783,98 @@ bool TensorEquals(const Tensor& left, const Tensor& right) { return are_equal; } +namespace { + +template +struct SparseTensorEqualsImpl { + static bool Compare(const SparseTensorImpl& left, + const SparseTensorImpl& right) { + // TODO(mrkn): should we support the equality among different formats? + return false; + } +}; + +template +struct SparseTensorEqualsImpl { + static bool Compare(const SparseTensorImpl& left, + const SparseTensorImpl& right) { + DCHECK(left.type()->id() == right.type()->id()); + DCHECK(left.shape() == right.shape()); + DCHECK(left.non_zero_length() == right.non_zero_length()); + + const auto& left_index = checked_cast(*left.sparse_index()); + const auto& right_index = checked_cast(*right.sparse_index()); + + if (!left_index.Equals(right_index)) { + return false; + } + + const auto& size_meta = dynamic_cast(*left.type()); + const int byte_width = size_meta.bit_width() / CHAR_BIT; + DCHECK_GT(byte_width, 0); + + const uint8_t* left_data = left.data()->data(); + const uint8_t* right_data = right.data()->data(); + + return memcmp(left_data, right_data, + static_cast(byte_width * left.non_zero_length())); + } +}; + +template +inline bool SparseTensorEqualsImplDispatch(const SparseTensorImpl& left, + const SparseTensor& right) { + switch (right.format_id()) { + case SparseTensorFormat::COO: { + const auto& right_coo = + checked_cast&>(right); + return SparseTensorEqualsImpl::Compare(left, + right_coo); + } + + case SparseTensorFormat::CSR: { + const auto& right_csr = + checked_cast&>(right); + return SparseTensorEqualsImpl::Compare(left, + right_csr); + } + + default: + return false; + } +} + +} // namespace + +bool SparseTensorEquals(const SparseTensor& left, const SparseTensor& right) { + if (&left == &right) { + return true; + } else if (left.type()->id() != right.type()->id()) { + return false; + } else if (left.size() == 0) { + return true; + } else if (left.shape() != right.shape()) { + return false; + } else if (left.non_zero_length() != right.non_zero_length()) { + return false; + } + + switch (left.format_id()) { + case SparseTensorFormat::COO: { + const auto& left_coo = checked_cast&>(left); + return SparseTensorEqualsImplDispatch(left_coo, right); + } + + case SparseTensorFormat::CSR: { + const auto& left_csr = checked_cast&>(left); + return SparseTensorEqualsImplDispatch(left_csr, right); + } + + default: + return false; + } +} + bool TypeEquals(const DataType& left, const DataType& right) { bool are_equal; // The arrays are the same object diff --git a/cpp/src/arrow/compare.h b/cpp/src/arrow/compare.h index 21e2fdc24f1..d49d7cc0fdb 100644 --- a/cpp/src/arrow/compare.h +++ b/cpp/src/arrow/compare.h @@ -29,12 +29,16 @@ namespace arrow { class Array; class DataType; class Tensor; +class SparseTensor; /// Returns true if the arrays are exactly equal bool ARROW_EXPORT ArrayEquals(const Array& left, const Array& right); bool ARROW_EXPORT TensorEquals(const Tensor& left, const Tensor& right); +/// EXPERIMENTAL: Returns true if the given sparse tensors are exactly equal +bool ARROW_EXPORT SparseTensorEquals(const SparseTensor& left, const SparseTensor& right); + /// Returns true if the arrays are approximately equal. For non-floating point /// types, this is equivalent to ArrayEquals(left, right) bool ARROW_EXPORT ArrayApproxEquals(const Array& left, const Array& right); diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 8adf4a8b660..23709a46192 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -63,6 +63,8 @@ class Message::MessageImpl { return Message::RECORD_BATCH; case flatbuf::MessageHeader_Tensor: return Message::TENSOR; + case flatbuf::MessageHeader_SparseTensor: + return Message::SPARSE_TENSOR; default: return Message::NONE; } diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 092a19ff9a0..760012d1a68 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -70,7 +70,7 @@ constexpr int kMaxNestingDepth = 64; /// \brief An IPC message including metadata and body class ARROW_EXPORT Message { public: - enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR }; + enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR, SPARSE_TENSOR }; /// \brief Construct message, but do not validate /// diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 1d4c80c2946..da6711395f8 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -31,6 +31,7 @@ #include "arrow/ipc/Tensor_generated.h" // IWYU pragma: keep #include "arrow/ipc/message.h" #include "arrow/ipc/util.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" #include "arrow/tensor.h" #include "arrow/type.h" @@ -50,6 +51,7 @@ using DictionaryOffset = flatbuffers::Offset; using FieldOffset = flatbuffers::Offset; using KeyValueOffset = flatbuffers::Offset; using RecordBatchOffset = flatbuffers::Offset; +using SparseTensorOffset = flatbuffers::Offset; using Offset = flatbuffers::Offset; using FBString = flatbuffers::Offset; @@ -781,6 +783,106 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset, body_length, out); } +Status MakeSparseTensorIndexCOO(FBB& fbb, const SparseCOOIndex& sparse_index, + const std::vector& buffers, + flatbuf::SparseTensorIndex* fb_sparse_index_type, + Offset* fb_sparse_index, size_t* num_buffers) { + *fb_sparse_index_type = flatbuf::SparseTensorIndex_SparseTensorIndexCOO; + const BufferMetadata& indices_metadata = buffers[0]; + flatbuf::Buffer indices(indices_metadata.offset, indices_metadata.length); + *fb_sparse_index = flatbuf::CreateSparseTensorIndexCOO(fbb, &indices).Union(); + *num_buffers = 1; + return Status::OK(); +} + +Status MakeSparseMatrixIndexCSR(FBB& fbb, const SparseCSRIndex& sparse_index, + const std::vector& buffers, + flatbuf::SparseTensorIndex* fb_sparse_index_type, + Offset* fb_sparse_index, size_t* num_buffers) { + *fb_sparse_index_type = flatbuf::SparseTensorIndex_SparseMatrixIndexCSR; + const BufferMetadata& indptr_metadata = buffers[0]; + const BufferMetadata& indices_metadata = buffers[1]; + flatbuf::Buffer indptr(indptr_metadata.offset, indptr_metadata.length); + flatbuf::Buffer indices(indices_metadata.offset, indices_metadata.length); + *fb_sparse_index = flatbuf::CreateSparseMatrixIndexCSR(fbb, &indptr, &indices).Union(); + *num_buffers = 2; + return Status::OK(); +} + +Status MakeSparseTensorIndex(FBB& fbb, const SparseIndex& sparse_index, + const std::vector& buffers, + flatbuf::SparseTensorIndex* fb_sparse_index_type, + Offset* fb_sparse_index, size_t* num_buffers) { + switch (sparse_index.format_id()) { + case SparseTensorFormat::COO: + RETURN_NOT_OK(MakeSparseTensorIndexCOO( + fbb, checked_cast(sparse_index), buffers, + fb_sparse_index_type, fb_sparse_index, num_buffers)); + break; + + case SparseTensorFormat::CSR: + RETURN_NOT_OK(MakeSparseMatrixIndexCSR( + fbb, checked_cast(sparse_index), buffers, + fb_sparse_index_type, fb_sparse_index, num_buffers)); + break; + + default: + std::stringstream ss; + ss << "Unsupporoted sparse tensor format:: " << sparse_index.ToString() + << std::endl; + return Status::NotImplemented(ss.str()); + } + + return Status::OK(); +} + +Status MakeSparseTensor(FBB& fbb, const SparseTensor& sparse_tensor, int64_t body_length, + const std::vector& buffers, + SparseTensorOffset* offset) { + flatbuf::Type fb_type_type; + Offset fb_type; + RETURN_NOT_OK( + TensorTypeToFlatbuffer(fbb, *sparse_tensor.type(), &fb_type_type, &fb_type)); + + using TensorDimOffset = flatbuffers::Offset; + std::vector dims; + for (int i = 0; i < sparse_tensor.ndim(); ++i) { + FBString name = fbb.CreateString(sparse_tensor.dim_name(i)); + dims.push_back(flatbuf::CreateTensorDim(fbb, sparse_tensor.shape()[i], name)); + } + + auto fb_shape = fbb.CreateVector(dims); + + flatbuf::SparseTensorIndex fb_sparse_index_type; + Offset fb_sparse_index; + size_t num_index_buffers = 0; + RETURN_NOT_OK(MakeSparseTensorIndex(fbb, *sparse_tensor.sparse_index(), buffers, + &fb_sparse_index_type, &fb_sparse_index, + &num_index_buffers)); + + const BufferMetadata& data_metadata = buffers[num_index_buffers]; + flatbuf::Buffer data(data_metadata.offset, data_metadata.length); + + const int64_t non_zero_length = sparse_tensor.non_zero_length(); + + *offset = + flatbuf::CreateSparseTensor(fbb, fb_type_type, fb_type, fb_shape, non_zero_length, + fb_sparse_index_type, fb_sparse_index, &data); + + return Status::OK(); +} + +Status WriteSparseTensorMessage(const SparseTensor& sparse_tensor, int64_t body_length, + const std::vector& buffers, + std::shared_ptr* out) { + FBB fbb; + SparseTensorOffset fb_sparse_tensor; + RETURN_NOT_OK( + MakeSparseTensor(fbb, sparse_tensor, body_length, buffers, &fb_sparse_tensor)); + return WriteFBMessage(fbb, flatbuf::MessageHeader_SparseTensor, + fb_sparse_tensor.Union(), body_length, out); +} + Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length, const std::vector& nodes, const std::vector& buffers, @@ -933,6 +1035,52 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr* type return TypeFromFlatbuffer(tensor->type_type(), tensor->type(), {}, type); } +Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr* type, + std::vector* shape, + std::vector* dim_names, + int64_t* non_zero_length, + SparseTensorFormat::type* sparse_tensor_format_id) { + auto message = flatbuf::GetMessage(metadata.data()); + if (message->header_type() != flatbuf::MessageHeader_SparseTensor) { + return Status::IOError("Header of flatbuffer-encoded Message is not SparseTensor."); + } + if (message->header() == nullptr) { + return Status::IOError("Header-pointer of flatbuffer-encoded Message is null."); + } + + auto sparse_tensor = reinterpret_cast(message->header()); + int ndim = static_cast(sparse_tensor->shape()->size()); + + for (int i = 0; i < ndim; ++i) { + auto dim = sparse_tensor->shape()->Get(i); + + shape->push_back(dim->size()); + auto fb_name = dim->name(); + if (fb_name == 0) { + dim_names->push_back(""); + } else { + dim_names->push_back(fb_name->str()); + } + } + + *non_zero_length = sparse_tensor->non_zero_length(); + + switch (sparse_tensor->sparseIndex_type()) { + case flatbuf::SparseTensorIndex_SparseTensorIndexCOO: + *sparse_tensor_format_id = SparseTensorFormat::COO; + break; + + case flatbuf::SparseTensorIndex_SparseMatrixIndexCSR: + *sparse_tensor_format_id = SparseTensorFormat::CSR; + break; + + default: + return Status::Invalid("Unrecognized sparse index type"); + } + + return TypeFromFlatbuffer(sparse_tensor->type_type(), sparse_tensor->type(), {}, type); +} + // ---------------------------------------------------------------------- // Implement message writing diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index 152ca1367ec..6562382b878 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -33,6 +33,7 @@ #include "arrow/ipc/dictionary.h" // IYWU pragma: keep #include "arrow/ipc/message.h" #include "arrow/memory_pool.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" namespace arrow { @@ -40,6 +41,7 @@ namespace arrow { class DataType; class Schema; class Tensor; +class SparseTensor; namespace flatbuf = org::apache::arrow::flatbuf; @@ -103,6 +105,12 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr* type std::vector* shape, std::vector* strides, std::vector* dim_names); +// EXPERIMENTAL: Extracting metadata of a sparse tensor from the message +Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr* type, + std::vector* shape, + std::vector* dim_names, int64_t* length, + SparseTensorFormat::type* sparse_tensor_format_id); + /// Write a serialized message metadata with a length-prefix and padding to an /// 8-byte offset. Does not make assumptions about whether the stream is /// aligned already @@ -137,6 +145,10 @@ Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length, Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset, std::shared_ptr* out); +Status WriteSparseTensorMessage(const SparseTensor& sparse_tensor, int64_t body_length, + const std::vector& buffers, + std::shared_ptr* out); + Status WriteFileFooter(const Schema& schema, const std::vector& dictionaries, const std::vector& record_batches, DictionaryMemo* dictionary_memo, io::OutputStream* out); diff --git a/cpp/src/arrow/ipc/read-write-test.cc b/cpp/src/arrow/ipc/read-write-test.cc index 3a723badf37..bc27386f34f 100644 --- a/cpp/src/arrow/ipc/read-write-test.cc +++ b/cpp/src/arrow/ipc/read-write-test.cc @@ -38,6 +38,7 @@ #include "arrow/ipc/writer.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" #include "arrow/tensor.h" #include "arrow/test-util.h" @@ -844,6 +845,117 @@ TEST_F(TestTensorRoundTrip, NonContiguous) { CheckTensorRoundTrip(tensor); } +class TestSparseTensorRoundTrip : public ::testing::Test, public IpcTestFixture { + public: + void SetUp() { pool_ = default_memory_pool(); } + void TearDown() { io::MemoryMapFixture::TearDown(); } + + template + void CheckSparseTensorRoundTrip(const SparseTensorImpl& tensor) { + GTEST_FAIL(); + } +}; + +template <> +void TestSparseTensorRoundTrip::CheckSparseTensorRoundTrip( + const SparseTensorImpl& tensor) { + const auto& type = checked_cast(*tensor.type()); + const int elem_size = type.bit_width() / 8; + + int32_t metadata_length; + int64_t body_length; + + ASSERT_OK(mmap_->Seek(0)); + + ASSERT_OK(WriteSparseTensor(tensor, mmap_.get(), &metadata_length, &body_length, + default_memory_pool())); + + const auto& sparse_index = checked_cast(*tensor.sparse_index()); + const int64_t indices_length = elem_size * sparse_index.indices()->size(); + const int64_t data_length = elem_size * tensor.non_zero_length(); + const int64_t expected_body_length = indices_length + data_length; + ASSERT_EQ(expected_body_length, body_length); + + ASSERT_OK(mmap_->Seek(0)); + + std::shared_ptr result; + ASSERT_OK(ReadSparseTensor(mmap_.get(), &result)); + + const auto& resulted_sparse_index = + checked_cast(*result->sparse_index()); + ASSERT_EQ(resulted_sparse_index.indices()->data()->size(), indices_length); + ASSERT_EQ(result->data()->size(), data_length); + ASSERT_TRUE(result->Equals(*result)); +} + +template <> +void TestSparseTensorRoundTrip::CheckSparseTensorRoundTrip( + const SparseTensorImpl& tensor) { + const auto& type = checked_cast(*tensor.type()); + const int elem_size = type.bit_width() / 8; + + int32_t metadata_length; + int64_t body_length; + + ASSERT_OK(mmap_->Seek(0)); + + ASSERT_OK(WriteSparseTensor(tensor, mmap_.get(), &metadata_length, &body_length, + default_memory_pool())); + + const auto& sparse_index = checked_cast(*tensor.sparse_index()); + const int64_t indptr_length = elem_size * sparse_index.indptr()->size(); + const int64_t indices_length = elem_size * sparse_index.indices()->size(); + const int64_t data_length = elem_size * tensor.non_zero_length(); + const int64_t expected_body_length = indptr_length + indices_length + data_length; + ASSERT_EQ(expected_body_length, body_length); + + ASSERT_OK(mmap_->Seek(0)); + + std::shared_ptr result; + ASSERT_OK(ReadSparseTensor(mmap_.get(), &result)); + + const auto& resulted_sparse_index = + checked_cast(*result->sparse_index()); + ASSERT_EQ(resulted_sparse_index.indptr()->data()->size(), indptr_length); + ASSERT_EQ(resulted_sparse_index.indices()->data()->size(), indices_length); + ASSERT_EQ(result->data()->size(), data_length); + ASSERT_TRUE(result->Equals(*result)); +} + +TEST_F(TestSparseTensorRoundTrip, WithSparseCOOIndex) { + std::string path = "test-write-sparse-coo-tensor"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_)); + + std::vector shape = {2, 3, 4}; + std::vector dim_names = {"foo", "bar", "baz"}; + std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + + auto data = Buffer::Wrap(values); + NumericTensor t(data, shape, {}, dim_names); + SparseTensorImpl st(t); + + CheckSparseTensorRoundTrip(st); +} + +TEST_F(TestSparseTensorRoundTrip, WithSparseCSRIndex) { + std::string path = "test-write-sparse-csr-matrix"; + constexpr int64_t kBufferSize = 1 << 20; + ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_)); + + std::vector shape = {4, 6}; + std::vector dim_names = {"foo", "bar", "baz"}; + std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + + auto data = Buffer::Wrap(values); + NumericTensor t(data, shape, {}, dim_names); + SparseTensorImpl st(t); + + CheckSparseTensorRoundTrip(st); +} + TEST(TestRecordBatchStreamReader, MalformedInput) { const std::string empty_str = ""; const std::string garbage_str = "12345678"; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 59a322a6433..e856acafd71 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -38,6 +38,7 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/metadata-internal.h" #include "arrow/record_batch.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" #include "arrow/tensor.h" #include "arrow/type.h" @@ -726,5 +727,123 @@ Status ReadTensor(const Message& message, std::shared_ptr* out) { return Status::OK(); } +namespace { + +Status ReadSparseCOOIndex(const flatbuf::SparseTensor* sparse_tensor, int64_t ndim, + int64_t non_zero_length, io::RandomAccessFile* file, + std::shared_ptr* out) { + auto* sparse_index = sparse_tensor->sparseIndex_as_SparseTensorIndexCOO(); + auto* indices_buffer = sparse_index->indicesBuffer(); + std::shared_ptr indices_data; + RETURN_NOT_OK( + file->ReadAt(indices_buffer->offset(), indices_buffer->length(), &indices_data)); + std::vector shape({non_zero_length, ndim}); + const int64_t elsize = sizeof(int64_t); + std::vector strides({elsize, elsize * non_zero_length}); + *out = std::make_shared( + std::make_shared(indices_data, shape, strides)); + return Status::OK(); +} + +Status ReadSparseCSRIndex(const flatbuf::SparseTensor* sparse_tensor, int64_t ndim, + int64_t non_zero_length, io::RandomAccessFile* file, + std::shared_ptr* out) { + auto* sparse_index = sparse_tensor->sparseIndex_as_SparseMatrixIndexCSR(); + + auto* indptr_buffer = sparse_index->indptrBuffer(); + std::shared_ptr indptr_data; + RETURN_NOT_OK( + file->ReadAt(indptr_buffer->offset(), indptr_buffer->length(), &indptr_data)); + + auto* indices_buffer = sparse_index->indicesBuffer(); + std::shared_ptr indices_data; + RETURN_NOT_OK( + file->ReadAt(indices_buffer->offset(), indices_buffer->length(), &indices_data)); + + std::vector indptr_shape({ndim + 1}); + std::vector indices_shape({non_zero_length}); + *out = std::make_shared( + std::make_shared(indptr_data, indptr_shape), + std::make_shared(indices_data, indices_shape)); + return Status::OK(); +} + +Status MakeSparseTensorWithSparseCOOIndex( + const std::shared_ptr& type, const std::vector& shape, + const std::vector& dim_names, + const std::shared_ptr& sparse_index, int64_t non_zero_length, + const std::shared_ptr& data, std::shared_ptr* out) { + *out = std::make_shared>(sparse_index, type, data, + shape, dim_names); + return Status::OK(); +} + +Status MakeSparseTensorWithSparseCSRIndex( + const std::shared_ptr& type, const std::vector& shape, + const std::vector& dim_names, + const std::shared_ptr& sparse_index, int64_t non_zero_length, + const std::shared_ptr& data, std::shared_ptr* out) { + *out = std::make_shared>(sparse_index, type, data, + shape, dim_names); + return Status::OK(); +} + +} // namespace + +Status ReadSparseTensor(const Buffer& metadata, io::RandomAccessFile* file, + std::shared_ptr* out) { + std::shared_ptr type; + std::vector shape; + std::vector dim_names; + int64_t non_zero_length; + SparseTensorFormat::type sparse_tensor_format_id; + + RETURN_NOT_OK(internal::GetSparseTensorMetadata( + metadata, &type, &shape, &dim_names, &non_zero_length, &sparse_tensor_format_id)); + + auto message = flatbuf::GetMessage(metadata.data()); + auto sparse_tensor = reinterpret_cast(message->header()); + const flatbuf::Buffer* buffer = sparse_tensor->data(); + DCHECK(BitUtil::IsMultipleOf8(buffer->offset())) + << "Buffer of sparse index data " + << "did not start on 8-byte aligned offset: " << buffer->offset(); + + std::shared_ptr data; + RETURN_NOT_OK(file->ReadAt(buffer->offset(), buffer->length(), &data)); + + std::shared_ptr sparse_index; + switch (sparse_tensor_format_id) { + case SparseTensorFormat::COO: + RETURN_NOT_OK(ReadSparseCOOIndex(sparse_tensor, shape.size(), non_zero_length, file, + &sparse_index)); + return MakeSparseTensorWithSparseCOOIndex( + type, shape, dim_names, std::dynamic_pointer_cast(sparse_index), + non_zero_length, data, out); + + case SparseTensorFormat::CSR: + RETURN_NOT_OK(ReadSparseCSRIndex(sparse_tensor, shape.size(), non_zero_length, file, + &sparse_index)); + return MakeSparseTensorWithSparseCSRIndex( + type, shape, dim_names, std::dynamic_pointer_cast(sparse_index), + non_zero_length, data, out); + + default: + return Status::Invalid("Unsupported sparse index format"); + } +} + +Status ReadSparseTensor(const Message& message, std::shared_ptr* out) { + io::BufferReader buffer_reader(message.body()); + return ReadSparseTensor(*message.metadata(), &buffer_reader, out); +} + +Status ReadSparseTensor(io::InputStream* file, std::shared_ptr* out) { + std::unique_ptr message; + RETURN_NOT_OK(ReadContiguousPayload(file, &message)); + DCHECK_EQ(message->type(), Message::SPARSE_TENSOR); + io::BufferReader buffer_reader(message->body()); + return ReadSparseTensor(*message->metadata(), &buffer_reader, out); +} + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 942664d6f22..ebecea13ffb 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -33,6 +33,7 @@ class Buffer; class Schema; class Status; class Tensor; +class SparseTensor; namespace io { @@ -235,6 +236,22 @@ Status ReadTensor(io::InputStream* file, std::shared_ptr* out); ARROW_EXPORT Status ReadTensor(const Message& message, std::shared_ptr* out); +/// \brief EXPERIMETNAL: Read arrow::SparseTensor as encapsulated IPC message in file +/// +/// \param[in] file an InputStream pointed at the start of the message +/// \param[out] out the read sparse tensor +/// \return Status +ARROW_EXPORT +Status ReadSparseTensor(io::InputStream* file, std::shared_ptr* out); + +/// \brief EXPERIMENTAL: Read arrow::SparseTensor from IPC message +/// +/// \param[in] message a Message containing the tensor metadata and body +/// \param[out] out the read sparse tensor +/// \return Status +ARROW_EXPORT +Status ReadSparseTensor(const Message& message, std::shared_ptr* out); + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 6ce72e070e7..0bf68142c77 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "arrow/array.h" @@ -33,6 +34,7 @@ #include "arrow/ipc/util.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" +#include "arrow/sparse_tensor.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/tensor.h" @@ -671,6 +673,105 @@ Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, return Status::OK(); } +namespace internal { + +class SparseTensorSerializer { + public: + SparseTensorSerializer(int64_t buffer_start_offset, IpcPayload* out) + : out_(out), buffer_start_offset_(buffer_start_offset) {} + + ~SparseTensorSerializer() = default; + + Status VisitSparseIndex(const SparseIndex& sparse_index) { + switch (sparse_index.format_id()) { + case SparseTensorFormat::COO: + RETURN_NOT_OK( + VisitSparseCOOIndex(checked_cast(sparse_index))); + break; + + case SparseTensorFormat::CSR: + RETURN_NOT_OK( + VisitSparseCSRIndex(checked_cast(sparse_index))); + break; + + default: + std::stringstream ss; + ss << "Unable to convert type: " << sparse_index.ToString() << std::endl; + return Status::NotImplemented(ss.str()); + } + + return Status::OK(); + } + + Status SerializeMetadata(const SparseTensor& sparse_tensor) { + return WriteSparseTensorMessage(sparse_tensor, out_->body_length, buffer_meta_, + &out_->metadata); + } + + Status Assemble(const SparseTensor& sparse_tensor) { + if (buffer_meta_.size() > 0) { + buffer_meta_.clear(); + out_->body_buffers.clear(); + } + + RETURN_NOT_OK(VisitSparseIndex(*sparse_tensor.sparse_index())); + out_->body_buffers.emplace_back(sparse_tensor.data()); + + int64_t offset = buffer_start_offset_; + buffer_meta_.reserve(out_->body_buffers.size()); + + for (size_t i = 0; i < out_->body_buffers.size(); ++i) { + const Buffer* buffer = out_->body_buffers[i].get(); + int64_t size = buffer->size(); + int64_t padding = BitUtil::RoundUpToMultipleOf8(size) - size; + buffer_meta_.push_back({offset, size + padding}); + offset += size + padding; + } + + out_->body_length = offset - buffer_start_offset_; + DCHECK(BitUtil::IsMultipleOf8(out_->body_length)); + + return SerializeMetadata(sparse_tensor); + } + + private: + Status VisitSparseCOOIndex(const SparseCOOIndex& sparse_index) { + out_->body_buffers.emplace_back(sparse_index.indices()->data()); + return Status::OK(); + } + + Status VisitSparseCSRIndex(const SparseCSRIndex& sparse_index) { + out_->body_buffers.emplace_back(sparse_index.indptr()->data()); + out_->body_buffers.emplace_back(sparse_index.indices()->data()); + return Status::OK(); + } + + IpcPayload* out_; + + std::vector buffer_meta_; + + int64_t buffer_start_offset_; +}; + +Status GetSparseTensorPayload(const SparseTensor& sparse_tensor, MemoryPool* pool, + IpcPayload* out) { + SparseTensorSerializer writer(0, out); + return writer.Assemble(sparse_tensor); +} + +} // namespace internal + +Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, + MemoryPool* pool) { + internal::IpcPayload payload; + internal::SparseTensorSerializer writer(0, &payload); + RETURN_NOT_OK(writer.Assemble(sparse_tensor)); + + *body_length = payload.body_length; + return internal::WriteIpcPayload(payload, dst, metadata_length); +} + Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr& dictionary, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) { diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index a1c711146ef..5feb9e90cb0 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -36,6 +36,7 @@ class Schema; class Status; class Table; class Tensor; +class SparseTensor; namespace io { @@ -269,6 +270,20 @@ ARROW_EXPORT Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length); +// \brief EXPERIMENTAL: Write arrow::SparseTensor as a contiguous mesasge. The metadata, +// sparse index, and body are written assuming 64-byte alignment. It is the +// user's responsibility to ensure that the OutputStream has been aligned +// to a 64-byte multiple before writing the message. +// +// \param[in] tensor the SparseTensor to write +// \param[in] dst the OutputStream to write to +// \param[out] metadata_length the actual metadata length, including padding +// \param[out] body_length the actual message body length +ARROW_EXPORT +Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, + MemoryPool* pool); + namespace internal { // These internal APIs may change without warning or deprecation diff --git a/cpp/src/arrow/sparse_tensor-test.cc b/cpp/src/arrow/sparse_tensor-test.cc new file mode 100644 index 00000000000..d48f2d0229d --- /dev/null +++ b/cpp/src/arrow/sparse_tensor-test.cc @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Unit tests for DataType (and subclasses), Field, and Schema + +#include +#include +#include +#include + +#include + +#include + +#include "arrow/sparse_tensor.h" +#include "arrow/test-util.h" +#include "arrow/type.h" + +namespace arrow { + +static inline void CheckSparseIndexFormatType(SparseTensorFormat::type expected, + const SparseTensor& sparse_tensor) { + ASSERT_EQ(expected, sparse_tensor.format_id()); + ASSERT_EQ(expected, sparse_tensor.sparse_index()->format_id()); +} + +TEST(TestSparseCOOTensor, CreationEmptyTensor) { + std::vector shape = {2, 3, 4}; + SparseTensorImpl st1(int64(), shape); + + std::vector dim_names = {"foo", "bar", "baz"}; + SparseTensorImpl st2(int64(), shape, dim_names); + + ASSERT_EQ(0, st1.non_zero_length()); + ASSERT_EQ(0, st2.non_zero_length()); + + ASSERT_EQ(24, st1.size()); + ASSERT_EQ(24, st2.size()); + + ASSERT_EQ("foo", st2.dim_name(0)); + ASSERT_EQ("bar", st2.dim_name(1)); + ASSERT_EQ("baz", st2.dim_name(2)); + + ASSERT_EQ("", st1.dim_name(0)); + ASSERT_EQ("", st1.dim_name(1)); + ASSERT_EQ("", st1.dim_name(2)); +} + +TEST(TestSparseCOOTensor, CreationFromNumericTensor) { + std::vector shape = {2, 3, 4}; + std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + std::shared_ptr buffer = Buffer::Wrap(values); + std::vector dim_names = {"foo", "bar", "baz"}; + NumericTensor tensor1(buffer, shape); + NumericTensor tensor2(buffer, shape, {}, dim_names); + SparseTensorImpl st1(tensor1); + SparseTensorImpl st2(tensor2); + + CheckSparseIndexFormatType(SparseTensorFormat::COO, st1); + + ASSERT_EQ(12, st1.non_zero_length()); + ASSERT_TRUE(st1.is_mutable()); + + ASSERT_EQ("foo", st2.dim_name(0)); + ASSERT_EQ("bar", st2.dim_name(1)); + ASSERT_EQ("baz", st2.dim_name(2)); + + ASSERT_EQ("", st1.dim_name(0)); + ASSERT_EQ("", st1.dim_name(1)); + ASSERT_EQ("", st1.dim_name(2)); + + const int64_t* ptr = reinterpret_cast(st1.raw_data()); + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 1, ptr[i]); + } + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 11, ptr[i + 6]); + } + + const auto& si = internal::checked_cast(*st1.sparse_index()); + ASSERT_EQ(std::string("SparseCOOIndex"), si.ToString()); + + std::shared_ptr sidx = si.indices(); + ASSERT_EQ(std::vector({12, 3}), sidx->shape()); + ASSERT_TRUE(sidx->is_column_major()); + + // (0, 0, 0) -> 1 + ASSERT_EQ(0, sidx->Value({0, 0})); + ASSERT_EQ(0, sidx->Value({0, 1})); + ASSERT_EQ(0, sidx->Value({0, 2})); + + // (0, 0, 2) -> 2 + ASSERT_EQ(0, sidx->Value({1, 0})); + ASSERT_EQ(0, sidx->Value({1, 1})); + ASSERT_EQ(2, sidx->Value({1, 2})); + + // (0, 1, 1) -> 3 + ASSERT_EQ(0, sidx->Value({2, 0})); + ASSERT_EQ(1, sidx->Value({2, 1})); + ASSERT_EQ(1, sidx->Value({2, 2})); + + // (1, 2, 1) -> 15 + ASSERT_EQ(1, sidx->Value({10, 0})); + ASSERT_EQ(2, sidx->Value({10, 1})); + ASSERT_EQ(1, sidx->Value({10, 2})); + + // (1, 2, 3) -> 16 + ASSERT_EQ(1, sidx->Value({11, 0})); + ASSERT_EQ(2, sidx->Value({11, 1})); + ASSERT_EQ(3, sidx->Value({11, 2})); +} + +TEST(TestSparseCOOTensor, CreationFromTensor) { + std::vector shape = {2, 3, 4}; + std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + std::shared_ptr buffer = Buffer::Wrap(values); + std::vector dim_names = {"foo", "bar", "baz"}; + Tensor tensor1(int64(), buffer, shape); + Tensor tensor2(int64(), buffer, shape, {}, dim_names); + SparseTensorImpl st1(tensor1); + SparseTensorImpl st2(tensor2); + + ASSERT_EQ(12, st1.non_zero_length()); + ASSERT_TRUE(st1.is_mutable()); + + ASSERT_EQ("foo", st2.dim_name(0)); + ASSERT_EQ("bar", st2.dim_name(1)); + ASSERT_EQ("baz", st2.dim_name(2)); + + ASSERT_EQ("", st1.dim_name(0)); + ASSERT_EQ("", st1.dim_name(1)); + ASSERT_EQ("", st1.dim_name(2)); + + const int64_t* ptr = reinterpret_cast(st1.raw_data()); + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 1, ptr[i]); + } + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 11, ptr[i + 6]); + } + + const auto& si = internal::checked_cast(*st1.sparse_index()); + std::shared_ptr sidx = si.indices(); + ASSERT_EQ(std::vector({12, 3}), sidx->shape()); + ASSERT_TRUE(sidx->is_column_major()); + + // (0, 0, 0) -> 1 + ASSERT_EQ(0, sidx->Value({0, 0})); + ASSERT_EQ(0, sidx->Value({0, 1})); + ASSERT_EQ(0, sidx->Value({0, 2})); + + // (0, 0, 2) -> 2 + ASSERT_EQ(0, sidx->Value({1, 0})); + ASSERT_EQ(0, sidx->Value({1, 1})); + ASSERT_EQ(2, sidx->Value({1, 2})); + + // (0, 1, 1) -> 3 + ASSERT_EQ(0, sidx->Value({2, 0})); + ASSERT_EQ(1, sidx->Value({2, 1})); + ASSERT_EQ(1, sidx->Value({2, 2})); + + // (1, 2, 1) -> 15 + ASSERT_EQ(1, sidx->Value({10, 0})); + ASSERT_EQ(2, sidx->Value({10, 1})); + ASSERT_EQ(1, sidx->Value({10, 2})); + + // (1, 2, 3) -> 16 + ASSERT_EQ(1, sidx->Value({11, 0})); + ASSERT_EQ(2, sidx->Value({11, 1})); + ASSERT_EQ(3, sidx->Value({11, 2})); +} + +TEST(TestSparseCSRMatrix, CreationFromNumericTensor2D) { + std::vector shape = {6, 4}; + std::vector values = {1, 0, 2, 0, 0, 3, 0, 4, 5, 0, 6, 0, + 0, 11, 0, 12, 13, 0, 14, 0, 0, 15, 0, 16}; + std::shared_ptr buffer = Buffer::Wrap(values); + std::vector dim_names = {"foo", "bar", "baz"}; + NumericTensor tensor1(buffer, shape); + NumericTensor tensor2(buffer, shape, {}, dim_names); + + SparseTensorImpl st1(tensor1); + SparseTensorImpl st2(tensor2); + + CheckSparseIndexFormatType(SparseTensorFormat::CSR, st1); + + ASSERT_EQ(12, st1.non_zero_length()); + ASSERT_TRUE(st1.is_mutable()); + + ASSERT_EQ("foo", st2.dim_name(0)); + ASSERT_EQ("bar", st2.dim_name(1)); + ASSERT_EQ("baz", st2.dim_name(2)); + + ASSERT_EQ("", st1.dim_name(0)); + ASSERT_EQ("", st1.dim_name(1)); + ASSERT_EQ("", st1.dim_name(2)); + + const int64_t* ptr = reinterpret_cast(st1.raw_data()); + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 1, ptr[i]); + } + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(i + 11, ptr[i + 6]); + } + + const auto& si = internal::checked_cast(*st1.sparse_index()); + + ASSERT_EQ(std::string("SparseCSRIndex"), si.ToString()); + ASSERT_EQ(1, si.indptr()->ndim()); + ASSERT_EQ(1, si.indices()->ndim()); + + const int64_t* indptr_begin = reinterpret_cast(si.indptr()->raw_data()); + std::vector indptr_values(indptr_begin, + indptr_begin + si.indptr()->shape()[0]); + + ASSERT_EQ(7, indptr_values.size()); + ASSERT_EQ(std::vector({0, 2, 4, 6, 8, 10, 12}), indptr_values); + + const int64_t* indices_begin = + reinterpret_cast(si.indices()->raw_data()); + std::vector indices_values(indices_begin, + indices_begin + si.indices()->shape()[0]); + + ASSERT_EQ(12, indices_values.size()); + ASSERT_EQ(std::vector({0, 2, 1, 3, 0, 2, 1, 3, 0, 2, 1, 3}), indices_values); +} + +} // namespace arrow diff --git a/cpp/src/arrow/sparse_tensor.cc b/cpp/src/arrow/sparse_tensor.cc new file mode 100644 index 00000000000..101500d3643 --- /dev/null +++ b/cpp/src/arrow/sparse_tensor.cc @@ -0,0 +1,452 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/sparse_tensor.h" + +#include +#include +#include + +#include "arrow/compare.h" +#include "arrow/util/logging.h" + +namespace arrow { + +namespace { + +// ---------------------------------------------------------------------- +// SparseTensorConverter + +template +class SparseTensorConverter { + public: + explicit SparseTensorConverter(const NumericTensor&) {} + + Status Convert() { return Status::Invalid("Unsupported sparse index"); } +}; + +// ---------------------------------------------------------------------- +// SparseTensorConverter for SparseCOOIndex + +template +struct SparseTensorConverterBase { + using NumericTensorType = NumericTensor; + using value_type = typename NumericTensorType::value_type; + + explicit SparseTensorConverterBase(const NumericTensorType& tensor) : tensor_(tensor) {} + + bool TensorIsTriviallyIterable() const { + return tensor_.ndim() <= 1 || tensor_.is_contiguous(); + } + + size_t CountNonZero() const { + if (tensor_.size() == 0) { + return 0; + } + + if (TensorIsTriviallyIterable()) { + const value_type* data = reinterpret_cast(tensor_.raw_data()); + return std::count_if(data, data + tensor_.size(), + [](value_type x) { return x != 0; }); + } + + const std::vector& shape = tensor_.shape(); + const int64_t ndim = tensor_.ndim(); + + size_t count = 0; + std::vector coord(ndim, 0); + for (int64_t n = tensor_.size(); n > 0; n--) { + if (tensor_.Value(coord) != 0) { + ++count; + } + + // increment index + ++coord[ndim - 1]; + if (n > 1 && coord[ndim - 1] == shape[ndim - 1]) { + int64_t d = ndim - 1; + while (d > 0 && coord[d] == shape[d]) { + coord[d] = 0; + ++coord[d - 1]; + --d; + } + } + } + return count; + } + + const NumericTensorType& tensor_; +}; + +template +class SparseTensorConverter + : private SparseTensorConverterBase { + public: + using BaseClass = SparseTensorConverterBase; + using NumericTensorType = typename BaseClass::NumericTensorType; + using value_type = typename BaseClass::value_type; + + explicit SparseTensorConverter(const NumericTensorType& tensor) : BaseClass(tensor) {} + + Status Convert() { + const int64_t ndim = tensor_.ndim(); + const int64_t nonzero_count = static_cast(CountNonZero()); + + std::shared_ptr indices_buffer; + RETURN_NOT_OK( + AllocateBuffer(sizeof(int64_t) * ndim * nonzero_count, &indices_buffer)); + int64_t* indices = reinterpret_cast(indices_buffer->mutable_data()); + + std::shared_ptr values_buffer; + RETURN_NOT_OK(AllocateBuffer(sizeof(value_type) * nonzero_count, &values_buffer)); + value_type* values = reinterpret_cast(values_buffer->mutable_data()); + + if (ndim <= 1) { + const value_type* data = reinterpret_cast(tensor_.raw_data()); + const int64_t count = ndim == 0 ? 1 : tensor_.shape()[0]; + for (int64_t i = 0; i < count; ++i, ++data) { + if (*data != 0) { + *indices++ = i; + *values++ = *data; + } + } + } else { + const std::vector& shape = tensor_.shape(); + std::vector coord(ndim, 0); + + for (int64_t n = tensor_.size(); n > 0; n--) { + const value_type x = tensor_.Value(coord); + if (tensor_.Value(coord) != 0) { + *values++ = x; + + int64_t* indp = indices; + for (int64_t i = 0; i < ndim; ++i) { + *indp = coord[i]; + indp += nonzero_count; + } + indices++; + } + + // increment index + ++coord[ndim - 1]; + if (n > 1 && coord[ndim - 1] == shape[ndim - 1]) { + int64_t d = ndim - 1; + while (d > 0 && coord[d] == shape[d]) { + coord[d] = 0; + ++coord[d - 1]; + --d; + } + } + } + } + + // make results + const std::vector indices_shape = {nonzero_count, ndim}; + const int64_t indices_elsize = sizeof(int64_t); + const std::vector indices_strides = {indices_elsize, + indices_elsize * nonzero_count}; + sparse_index = + std::make_shared(std::make_shared( + indices_buffer, indices_shape, indices_strides)); + data = values_buffer; + + return Status::OK(); + } + + std::shared_ptr sparse_index; + std::shared_ptr data; + + private: + using SparseTensorConverterBase::tensor_; + using SparseTensorConverterBase::CountNonZero; +}; + +template +void MakeSparseTensorFromTensor(const Tensor& tensor, + std::shared_ptr* sparse_index, + std::shared_ptr* data) { + NumericTensor numeric_tensor(tensor.data(), tensor.shape(), tensor.strides()); + SparseTensorConverter converter(numeric_tensor); + DCHECK_OK(converter.Convert()); + *sparse_index = converter.sparse_index; + *data = converter.data; +} + +// ---------------------------------------------------------------------- +// SparseTensorConverter for SparseCSRIndex + +template +class SparseTensorConverter + : private SparseTensorConverterBase { + public: + using BaseClass = SparseTensorConverterBase; + using NumericTensorType = typename BaseClass::NumericTensorType; + using value_type = typename BaseClass::value_type; + + explicit SparseTensorConverter(const NumericTensorType& tensor) : BaseClass(tensor) {} + + Status Convert() { + const int64_t ndim = tensor_.ndim(); + if (ndim > 2) { + return Status::Invalid("Invalid tensor dimension"); + } + + const int64_t nr = tensor_.shape()[0]; + const int64_t nc = tensor_.shape()[1]; + const int64_t nonzero_count = static_cast(CountNonZero()); + + std::shared_ptr indptr_buffer; + std::shared_ptr indices_buffer; + + std::shared_ptr values_buffer; + RETURN_NOT_OK(AllocateBuffer(sizeof(value_type) * nonzero_count, &values_buffer)); + value_type* values = reinterpret_cast(values_buffer->mutable_data()); + + if (ndim <= 1) { + return Status::NotImplemented("TODO for ndim <= 1"); + } else { + RETURN_NOT_OK(AllocateBuffer(sizeof(int64_t) * (nr + 1), &indptr_buffer)); + int64_t* indptr = reinterpret_cast(indptr_buffer->mutable_data()); + + RETURN_NOT_OK(AllocateBuffer(sizeof(int64_t) * nonzero_count, &indices_buffer)); + int64_t* indices = reinterpret_cast(indices_buffer->mutable_data()); + + int64_t k = 0; + *indptr++ = 0; + for (int64_t i = 0; i < nr; ++i) { + for (int64_t j = 0; j < nc; ++j) { + const value_type x = tensor_.Value({i, j}); + if (x != 0) { + *values++ = x; + *indices++ = j; + k++; + } + } + *indptr++ = k; + } + } + + std::vector indptr_shape({nr + 1}); + std::shared_ptr indptr_tensor = + std::make_shared(indptr_buffer, indptr_shape); + + std::vector indices_shape({nonzero_count}); + std::shared_ptr indices_tensor = + std::make_shared(indices_buffer, indices_shape); + + sparse_index = std::make_shared(indptr_tensor, indices_tensor); + data = values_buffer; + + return Status::OK(); + } + + std::shared_ptr sparse_index; + std::shared_ptr data; + + private: + using BaseClass::tensor_; + using SparseTensorConverterBase::CountNonZero; +}; + +// ---------------------------------------------------------------------- +// Instantiate templates + +#define INSTANTIATE_SPARSE_TENSOR_CONVERTER(IndexType) \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter; \ + template class ARROW_TEMPLATE_EXPORT SparseTensorConverter + +INSTANTIATE_SPARSE_TENSOR_CONVERTER(SparseCOOIndex); +INSTANTIATE_SPARSE_TENSOR_CONVERTER(SparseCSRIndex); + +} // namespace + +// ---------------------------------------------------------------------- +// SparseCOOIndex + +// Constructor with a column-major NumericTensor +SparseCOOIndex::SparseCOOIndex(const std::shared_ptr& coords) + : SparseIndexBase(coords->shape()[0]), coords_(coords) { + DCHECK(coords_->is_column_major()); +} + +std::string SparseCOOIndex::ToString() const { return std::string("SparseCOOIndex"); } + +// ---------------------------------------------------------------------- +// SparseCSRIndex + +// Constructor with two index vectors +SparseCSRIndex::SparseCSRIndex(const std::shared_ptr& indptr, + const std::shared_ptr& indices) + : SparseIndexBase(indices->shape()[0]), indptr_(indptr), indices_(indices) { + DCHECK_EQ(1, indptr_->ndim()); + DCHECK_EQ(1, indices_->ndim()); +} + +std::string SparseCSRIndex::ToString() const { return std::string("SparseCSRIndex"); } + +// ---------------------------------------------------------------------- +// SparseTensor + +// Constructor with all attributes +SparseTensor::SparseTensor(const std::shared_ptr& type, + const std::shared_ptr& data, + const std::vector& shape, + const std::shared_ptr& sparse_index, + const std::vector& dim_names) + : type_(type), + data_(data), + shape_(shape), + sparse_index_(sparse_index), + dim_names_(dim_names) { + DCHECK(is_tensor_supported(type->id())); +} + +const std::string& SparseTensor::dim_name(int i) const { + static const std::string kEmpty = ""; + if (dim_names_.size() == 0) { + return kEmpty; + } else { + DCHECK_LT(i, static_cast(dim_names_.size())); + return dim_names_[i]; + } +} + +int64_t SparseTensor::size() const { + return std::accumulate(shape_.begin(), shape_.end(), 1LL, std::multiplies()); +} + +bool SparseTensor::Equals(const SparseTensor& other) const { + return SparseTensorEquals(*this, other); +} + +// ---------------------------------------------------------------------- +// SparseTensorImpl + +// Constructor with a dense tensor +template +SparseTensorImpl::SparseTensorImpl( + const std::shared_ptr& type, const std::vector& shape, + const std::vector& dim_names) + : SparseTensorImpl(nullptr, type, nullptr, shape, dim_names) {} + +// Constructor with a dense tensor +template +template +SparseTensorImpl::SparseTensorImpl(const NumericTensor& tensor) + : SparseTensorImpl(nullptr, tensor.type(), nullptr, tensor.shape(), + tensor.dim_names_) { + SparseTensorConverter converter(tensor); + DCHECK_OK(converter.Convert()); + sparse_index_ = converter.sparse_index; + data_ = converter.data; +} + +// Constructor with a dense tensor +template +SparseTensorImpl::SparseTensorImpl(const Tensor& tensor) + : SparseTensorImpl(nullptr, tensor.type(), nullptr, tensor.shape(), + tensor.dim_names_) { + switch (tensor.type()->id()) { + case Type::UINT8: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + case Type::INT8: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + case Type::UINT16: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + case Type::INT16: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + case Type::UINT32: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + case Type::INT32: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + case Type::UINT64: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + case Type::INT64: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + case Type::HALF_FLOAT: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + case Type::FLOAT: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + case Type::DOUBLE: + MakeSparseTensorFromTensor(tensor, &sparse_index_, + &data_); + return; + default: + break; + } +} + +// ---------------------------------------------------------------------- +// Instantiate templates + +#define INSTANTIATE_SPARSE_TENSOR(IndexType) \ + template class ARROW_TEMPLATE_EXPORT SparseTensorImpl; \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&); \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&); \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&); \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&); \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&); \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&); \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&); \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&); \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&); \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&); \ + template ARROW_EXPORT SparseTensorImpl::SparseTensorImpl( \ + const NumericTensor&) + +INSTANTIATE_SPARSE_TENSOR(SparseCOOIndex); +INSTANTIATE_SPARSE_TENSOR(SparseCSRIndex); + +} // namespace arrow diff --git a/cpp/src/arrow/sparse_tensor.h b/cpp/src/arrow/sparse_tensor.h new file mode 100644 index 00000000000..c7693d2ec95 --- /dev/null +++ b/cpp/src/arrow/sparse_tensor.h @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_SPARSE_TENSOR_H +#define ARROW_SPARSE_TENSOR_H + +#include +#include +#include + +#include "arrow/tensor.h" + +namespace arrow { + +// ---------------------------------------------------------------------- +// SparseIndex class + +/// \brief EXPERIMENTAL: Sparse tensor format enumeration +struct SparseTensorFormat { + enum type { COO, CSR }; +}; + +/// \brief EXPERIMENTAL: The base class for representing index of non-zero +/// values in sparse tensor +class ARROW_EXPORT SparseIndex { + public: + explicit SparseIndex(SparseTensorFormat::type format_id, int64_t non_zero_length) + : format_id_(format_id), non_zero_length_(non_zero_length) {} + + virtual ~SparseIndex() = default; + + SparseTensorFormat::type format_id() const { return format_id_; } + int64_t non_zero_length() const { return non_zero_length_; } + + virtual std::string ToString() const = 0; + + protected: + SparseTensorFormat::type format_id_; + int64_t non_zero_length_; +}; + +template +class SparseIndexBase : public SparseIndex { + public: + explicit SparseIndexBase(int64_t non_zero_length) + : SparseIndex(SparseIndexType::format_id, non_zero_length) {} +}; + +// ---------------------------------------------------------------------- +// SparseCOOIndex class + +/// \brief EXPERIMENTAL: The index data for COO sparse tensor +class ARROW_EXPORT SparseCOOIndex : public SparseIndexBase { + public: + using CoordsTensor = NumericTensor; + + static constexpr SparseTensorFormat::type format_id = SparseTensorFormat::COO; + + // Constructor with a column-major NumericTensor + explicit SparseCOOIndex(const std::shared_ptr& coords); + + const std::shared_ptr& indices() const { return coords_; } + + std::string ToString() const override; + + bool Equals(const SparseCOOIndex& other) const { + return indices()->Equals(*other.indices()); + } + + protected: + std::shared_ptr coords_; +}; + +// ---------------------------------------------------------------------- +// SparseCSRIndex class + +/// \brief EXPERIMENTAL: The index data for CSR sparse matrix +class ARROW_EXPORT SparseCSRIndex : public SparseIndexBase { + public: + using IndexTensor = NumericTensor; + + static constexpr SparseTensorFormat::type format_id = SparseTensorFormat::CSR; + + // Constructor with two index vectors + explicit SparseCSRIndex(const std::shared_ptr& indptr, + const std::shared_ptr& indices); + + const std::shared_ptr& indptr() const { return indptr_; } + const std::shared_ptr& indices() const { return indices_; } + + std::string ToString() const override; + + bool Equals(const SparseCSRIndex& other) const { + return indptr()->Equals(*other.indptr()) && indices()->Equals(*other.indices()); + } + + protected: + std::shared_ptr indptr_; + std::shared_ptr indices_; +}; + +// ---------------------------------------------------------------------- +// SparseTensor class + +/// \brief EXPERIMENTAL: The base class of sparse tensor container +class ARROW_EXPORT SparseTensor { + public: + virtual ~SparseTensor() = default; + + SparseTensorFormat::type format_id() const { return sparse_index_->format_id(); } + + std::shared_ptr type() const { return type_; } + std::shared_ptr data() const { return data_; } + + const uint8_t* raw_data() const { return data_->data(); } + uint8_t* raw_mutable_data() const { return data_->mutable_data(); } + + const std::vector& shape() const { return shape_; } + + const std::shared_ptr& sparse_index() const { return sparse_index_; } + + int ndim() const { return static_cast(shape_.size()); } + + const std::string& dim_name(int i) const; + + /// Total number of value cells in the sparse tensor + int64_t size() const; + + /// Return true if the underlying data buffer is mutable + bool is_mutable() const { return data_->is_mutable(); } + + /// Total number of non-zero cells in the sparse tensor + int64_t non_zero_length() const { + return sparse_index_ ? sparse_index_->non_zero_length() : 0; + } + + bool Equals(const SparseTensor& other) const; + + protected: + // Constructor with all attributes + SparseTensor(const std::shared_ptr& type, const std::shared_ptr& data, + const std::vector& shape, + const std::shared_ptr& sparse_index, + const std::vector& dim_names); + + std::shared_ptr type_; + std::shared_ptr data_; + std::vector shape_; + std::shared_ptr sparse_index_; + + /// These names are optional + std::vector dim_names_; +}; + +// ---------------------------------------------------------------------- +// SparseTensorImpl class + +/// \brief EXPERIMENTAL: Concrete sparse tensor implementation classes with sparse index +/// type +template +class ARROW_EXPORT SparseTensorImpl : public SparseTensor { + public: + virtual ~SparseTensorImpl() = default; + + // Constructor with all attributes + SparseTensorImpl(const std::shared_ptr& sparse_index, + const std::shared_ptr& type, + const std::shared_ptr& data, const std::vector& shape, + const std::vector& dim_names) + : SparseTensor(type, data, shape, sparse_index, dim_names) {} + + // Constructor for empty sparse tensor + SparseTensorImpl(const std::shared_ptr& type, + const std::vector& shape, + const std::vector& dim_names = {}); + + // Constructor with a dense numeric tensor + template + explicit SparseTensorImpl(const NumericTensor& tensor); + + // Constructor with a dense tensor + explicit SparseTensorImpl(const Tensor& tensor); + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(SparseTensorImpl); +}; + +/// \brief EXPERIMENTAL: Type alias for COO sparse tensor +using SparseTensorCOO = SparseTensorImpl; + +/// \brief EXPERIMENTAL: Type alias for CSR sparse matrix +using SparseTensorCSR = SparseTensorImpl; +using SparseMatrixCSR = SparseTensorImpl; + +} // namespace arrow + +#endif // ARROW_SPARSE_TENSOR_H diff --git a/cpp/src/arrow/tensor.h b/cpp/src/arrow/tensor.h index a9b5df81fa1..e81f0f0dff5 100644 --- a/cpp/src/arrow/tensor.h +++ b/cpp/src/arrow/tensor.h @@ -50,6 +50,9 @@ static inline bool is_tensor_supported(Type::type type_id) { return false; } +template +class SparseTensorImpl; + class ARROW_EXPORT Tensor { public: virtual ~Tensor() = default; @@ -110,6 +113,9 @@ class ARROW_EXPORT Tensor { /// These names are optional std::vector dim_names_; + template + friend class SparseTensorImpl; + private: ARROW_DISALLOW_COPY_AND_ASSIGN(Tensor); }; diff --git a/docs/source/format/IPC.rst b/docs/source/format/IPC.rst index 8cb74b87afc..62a1237436a 100644 --- a/docs/source/format/IPC.rst +++ b/docs/source/format/IPC.rst @@ -234,4 +234,28 @@ region) to be multiples of 64 bytes: :: +SparseTensor Message Format +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The ``SparseTensor`` message types provides another way to write a +multidimensional array of fixed-size values using Arrow's shared memory tools +in addition to ``Tensor``. ``SparseTensor`` is designed specifically for tensors +whose elements are almost zeros. Arrow implementations in general are not +required to implement this data format likewise ``Tensor``. + +When writing a standalone encapsulated sparse tensor message, we use the format as +indicated above, but additionally align the starting offset of the metadata as +well as the starting offsets of the sparse index and the sparse tensor body +(if writing to a shared memory region) to be multiples of 64 bytes: + + + + + + + + +The contents of the sparse tensor index is depends on what kinds of sparse +format is used. + .. _Flatbuffer: https://github.com/google/flatbuffers diff --git a/format/Message.fbs b/format/Message.fbs index 830718139d8..e14fdca8f15 100644 --- a/format/Message.fbs +++ b/format/Message.fbs @@ -87,7 +87,7 @@ table DictionaryBatch { /// which may include experimental metadata types. For maximum compatibility, /// it is best to send data using RecordBatch union MessageHeader { - Schema, DictionaryBatch, RecordBatch, Tensor + Schema, DictionaryBatch, RecordBatch, Tensor, SparseTensor } table Message { @@ -96,4 +96,4 @@ table Message { bodyLength: long; } -root_type Message; \ No newline at end of file +root_type Message; diff --git a/format/Tensor.fbs b/format/Tensor.fbs index 18b614c3bde..e77b353a0f3 100644 --- a/format/Tensor.fbs +++ b/format/Tensor.fbs @@ -23,6 +23,9 @@ include "Schema.fbs"; namespace org.apache.arrow.flatbuf; +/// ---------------------------------------------------------------------- +/// Data structures for dense tensors + /// Shape data for a single axis in a tensor table TensorDim { /// Length of dimension @@ -48,3 +51,96 @@ table Tensor { } root_type Tensor; + +/// ---------------------------------------------------------------------- +/// EXPERIMENTAL: Data structures for sparse tensors + +/// Coodinate format of sparse tensor index. +table SparseTensorIndexCOO { + /// COO's index list are represented as a NxM matrix, + /// where N is the number of non-zero values, + /// and M is the number of dimensions of a sparse tensor. + /// indicesBuffer stores the location and size of this index matrix. + /// The type of index value is long, so the stride for the index matrix is unnecessary. + /// + /// For example, let X be a 2x3x4x5 tensor, and it has the following 6 non-zero values: + /// + /// X[0, 1, 2, 0] := 1 + /// X[1, 1, 2, 3] := 2 + /// X[0, 2, 1, 0] := 3 + /// X[0, 1, 3, 0] := 4 + /// X[0, 1, 2, 1] := 5 + /// X[1, 2, 0, 4] := 6 + /// + /// In COO format, the index matrix of X is the following 4x6 matrix: + /// + /// [[0, 0, 0, 0, 1, 1], + /// [1, 1, 1, 2, 1, 2], + /// [2, 2, 3, 1, 2, 0], + /// [0, 1, 0, 0, 3, 4]] + /// + /// Note that the indices are sorted in lexcographical order. + indicesBuffer: Buffer; +} + +/// Compressed Sparse Row format, that is matrix-specific. +table SparseMatrixIndexCSR { + /// indptrBuffer stores the location and size of indptr array that + /// represents the range of the rows. + /// The i-th row spans from indptr[i] to indptr[i+1] in the data. + /// The length of this array is 1 + (the number of rows), and the type + /// of index value is long. + /// + /// For example, let X be the following 6x4 matrix: + /// + /// X := [[0, 1, 2, 0], + /// [0, 0, 3, 0], + /// [0, 4, 0, 5], + /// [0, 0, 0, 0], + /// [6, 0, 7, 8], + /// [0, 9, 0, 0]]. + /// + /// The array of non-zero values in X is: + /// + /// values(X) = [1, 2, 3, 4, 5, 6, 7, 8, 9]. + /// + /// And the indptr of X is: + /// + /// indptr(X) = [0, 2, 3, 5, 5, 8, 10]. + indptrBuffer: Buffer; + + /// indicesBuffer stores the location and size of the array that + /// contains the column indices of the corresponding non-zero values. + /// The type of index value is long. + /// + /// For example, the indices of the above X is: + /// + /// indices(X) = [1, 2, 2, 1, 3, 0, 2, 3, 1]. + indicesBuffer: Buffer; +} + +union SparseTensorIndex { + SparseTensorIndexCOO, + SparseMatrixIndexCSR +} + +table SparseTensor { + /// The type of data contained in a value cell. + /// Currently only fixed-width value types are supported, + /// no strings or nested types. + type: Type; + + /// The dimensions of the tensor, optionally named. + shape: [TensorDim]; + + /// The number of non-zero values in a sparse tensor. + non_zero_length: long; + + /// Sparse tensor index + sparseIndex: SparseTensorIndex; + + /// The location and size of the tensor's data + data: Buffer; +} + +root_type SparseTensor;